`
scholers
  • 浏览: 614964 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

MINA2的多线程模型问题探讨

 
阅读更多
  最近和一友人沟通,他在一个项目中使用MINA2,他反馈在启动了服务端之后,发现IO阻塞和堆内存一直上升;





JCONSOLE面板中的解释:
阻塞总数

Blocked count is the total number of times that the thread blocked to enter or reenter a monitor. I.e. the number of times a thread has been in the java.lang.Thread.State.BLOCKED state.
当线程试图获取一个内部的对象锁(不是java.util.concurrent库中的锁),而锁被其它线程占有,则该线程进入阻塞状态。

等待总数
Waited count is the total number of times that the thread waited for notification. i.e. the number of times that a thread has been in the ava.lang.Thread.State.WAITING or java.lang.Thread.State.TIMED_WAITING state.
当线程等待另外一个线程通知调度器的一个条件的时候,它自己进入等待状态。在调用Object.wait()或Thread.join()方法,或者等待java.util.concurrent库中的Lock或Condition时,会出现等待状况。

后这两天我自己本地拿一个简单案例测试了下,
采用JAVA原生的newCachedThreadPool,以及MINA2自带的 OrderedThreadPoolExecutor 线程池,在性能上没有太大的区别;
本身他原来的代码中就是使用了一种线程池,将IO线程和工作线程(业务线程)区分开的,所以这个处理的堆内存升高和IO堵塞是正常现象,我本地代码实测,暂用的堆内存并不多,而且是快速回收了,并不影响后续业务;
下图中大量的thread就是由于使用了ExecutorFilter之后,MINA2服务器端产生的大量线程。


  在服务端启动的代码中有一段如下:
filterChainBuilder.addLast("threadPool", new ExecutorFilter());


仔细研究了下JCONSOLE中的阻塞总数,结合本地运行的经验,下图的结果:



  根据图里面的信息,发现在NioProcessor.java中
   @Override
    protected int select(long timeout) throws Exception {
        return selector.select(timeout);
    }

然后继续跟踪,在AbstractPollingIoProcessor.java类中
 private class Processor implements Runnable {
        public void run() {
            assert (processorRef.get() == this);

            int nSessions = 0;
            lastIdleCheckTime = System.currentTimeMillis();

            for (;;) {
                try {
                    // This select has a timeout so that we can manage
                    // idle session when we get out of the select every
                    // second. (note : this is a hack to avoid creating
                    // a dedicated thread).
                    long t0 = System.currentTimeMillis();
                    int selected = select(SELECT_TIMEOUT);
                    long t1 = System.currentTimeMillis();
                    long delta = (t1 - t0);

                    if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
                        // Last chance : the select() may have been
                        // interrupted because we have had an closed channel.
                        if (isBrokenConnection()) {
                            LOG.warn("Broken connection");

                            // we can reselect immediately
                            // set back the flag to false
                            wakeupCalled.getAndSet(false);

                            continue;
                        } else {
                            LOG.warn("Create a new selector. Selected is 0, delta = "
                                            + (t1 - t0));
                            // Ok, we are hit by the nasty epoll
                            // spinning.
                            // Basically, there is a race condition
                            // which causes a closing file descriptor not to be
                            // considered as available as a selected channel, but
                            // it stopped the select. The next time we will
                            // call select(), it will exit immediately for the same
                            // reason, and do so forever, consuming 100%
                            // CPU.
                            // We have to destroy the selector, and
                            // register all the socket on a new one.
                            registerNewSelector();
                        }

                        // Set back the flag to false
                        wakeupCalled.getAndSet(false);

                        // and continue the loop
                        continue;
                    }

                    // Manage newly created session first
                    nSessions += handleNewSessions();

                    updateTrafficMask();

                    // Now, if we have had some incoming or outgoing events,
                    // deal with them
                    if (selected > 0) {
                        //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
                        process();
                    }

                    // Write the pending requests
                    long currentTime = System.currentTimeMillis();
                    flush(currentTime);

                    // And manage removed sessions
                    nSessions -= removeSessions();

                    // Last, not least, send Idle events to the idle sessions
                    notifyIdleSessions(currentTime);

                    // Get a chance to exit the infinite loop if there are no
                    // more sessions on this Processor
                    if (nSessions == 0) {
                        processorRef.set(null);
                        
                        if (newSessions.isEmpty() && isSelectorEmpty()) {
                            // newSessions.add() precedes startupProcessor
                            assert (processorRef.get() != this);
                            break;
                        }
                        
                        assert (processorRef.get() != this);
                        
                        if (!processorRef.compareAndSet(null, this)) {
                            // startupProcessor won race, so must exit processor
                            assert (processorRef.get() != this);
                            break;
                        }
                        
                        assert (processorRef.get() == this);
                    }

                    // Disconnect all sessions immediately if disposal has been
                    // requested so that we exit this loop eventually.
                    if (isDisposing()) {
                        for (Iterator<S> i = allSessions(); i.hasNext();) {
                            scheduleRemove(i.next());
                        }

                        wakeup();
                    }
                } catch (ClosedSelectorException cse) {
                    // If the selector has been closed, we can exit the loop
                    break;
                } catch (Throwable t) {
                    ExceptionMonitor.getInstance().exceptionCaught(t);

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                    }
                }
            }

            try {
                synchronized (disposalLock) {
                    if (disposing) {
                        doDispose();
                    }
                }
            } catch (Throwable t) {
                ExceptionMonitor.getInstance().exceptionCaught(t);
            } finally {
                disposalFuture.setValue(true);
            }
        }
    }

上述行发生了阻塞行为,这个类是干嘛的呢?这个类是MINA2中去轮训SELECTOR中是否有空闲的,查看一下selector的源码:
    /**
   [color=red][b]  * Selects a set of keys whose corresponding channels are ready for I/O
     * operations.
     *
     * <p> This method performs a blocking <a href="#selop">selection
     * operation</a>.  It returns only after at least one channel is selected,
     * this selector's {@link #wakeup wakeup} method is invoked, the current
     * thread is interrupted, or the given timeout period expires, whichever
     * comes first.
     *
     * <p> This method does not offer real-time guarantees: It schedules the
     * timeout as if by invoking the {@link Object#wait(long)} method. </p>[/b][/color]
     *
     * @param  timeout  If positive, block for up to <tt>timeout</tt>
     *                  milliseconds, more or less, while waiting for a
     *                  channel to become ready; if zero, block indefinitely;
     *                  must not be negative
     *
     * @return  The number of keys, possibly zero,
     *          whose ready-operation sets were updated
     *
     * @throws  IOException
     *          If an I/O error occurs
     *
     * @throws  ClosedSelectorException
     *          If this selector is closed
     *
     * @throws  IllegalArgumentException
     *          If the value of the timeout argument is negative
     */
    public abstract int select(long timeout)
	throws IOException;

仔细的看看注释:
Selects a set of keys whose corresponding channels are ready for I/O[color=red][/color]
原来是去轮训IO通道是否空闲,也就是说上述阻塞是发生在这里,也就是说IO通道被轮训发生的阻塞,所以说本文开头的阻塞数量是这样产生的:
当大量的并发消息过来,MINA2服务端需要轮训IO通道是否OK,OK之后就马上交给工作线程(如果有)去处理。所以从上面看来,这个是正常现象。在同时并发大量消息的时候,MINA2的IO通道还是会有堵塞。

另外:
  我们知道:MINA2采用的是责任链模式,在处理过程中插入不同的filter,来进行不同的处理。
当服务器端启动的时候,会启动默认的I0线程,也就是主线程,MINA2建议是CPU数量+1,如果代码中不增加上述一行,那么如果加入上述的ExecutorFilter之后,工作线程将会和IO线程分开,简单的例子,如果你要发一些消息给服务端,要服务端提交一个事务,如果不采用ExecutorFilter,那么IO线程就会一直等待这个事务结束,才会接着处理下一个任务,否则一直等待,如果采用了ExecutorFilter,那么IO线程会将任务交给ExecutorFilter的线程池,由线程池去处理事务,而IO线程就释放了,这样IO线程非常高效的能够同时处理很多的并发任务。
用new ExecutorFilter(),缺省使用OrderedThreadPoolExecutor线程池。需要注意的是这个线程池,对应同一个session而言,是顺序执行的.也就是说mina2里面的message处理都是顺序的,不会说出现在消息没有接受完毕之前关闭。
常见的也有这样的处理:修改成:
filterChainBuilder.addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool

()); 



  • 大小: 231.5 KB
  • 大小: 122.9 KB
  • 大小: 174.2 KB
  • 大小: 46 KB
1
0
分享到:
评论
5 楼 scholers 2014-07-03  
logicbaby 写道
qrong 写道
经测试,new ExecutorFilter(Executors.newCachedThreadPool())会导致在codec层收到的数据错乱,好大个坑。。。。。改回默认的orderThreadPool后无此问题。

很显然的问题,用CachedThreadPool不保证同一个session的消息有序被处理,改用UnorderedThreadPoolExecutor或者将codec filter在ExecutorFilter之前添加。


这位兄弟研究的很透彻!
4 楼 logicbaby 2014-06-04  
qrong 写道
经测试,new ExecutorFilter(Executors.newCachedThreadPool())会导致在codec层收到的数据错乱,好大个坑。。。。。改回默认的orderThreadPool后无此问题。

很显然的问题,用CachedThreadPool不保证同一个session的消息有序被处理,改用UnorderedThreadPoolExecutor或者将codec filter在ExecutorFilter之前添加。
3 楼 logicbaby 2014-06-04  
new ExecutorFilter(Executors.newCachedThreadPool())
用CachedThreadPool会创建过多的线程,如果有5000个活跃连接,这个东西就有可能创建5000个线程!无需顺序保证应该用new ExecutorFilter(new UnorderedThreadPoolExecutor());
2 楼 patrick_unicorn 2013-04-07  
qrong 写道
经测试,new ExecutorFilter(Executors.newCachedThreadPool())会导致在codec层收到的数据错乱,好大个坑。。。。。改回默认的orderThreadPool后无此问题。


这个问题应该是由于你将ExcecutorFilter放在了ProtocolCodecFilter之前造成的。
1 楼 qrong 2012-07-13  
经测试,new ExecutorFilter(Executors.newCachedThreadPool())会导致在codec层收到的数据错乱,好大个坑。。。。。改回默认的orderThreadPool后无此问题。

相关推荐

Global site tag (gtag.js) - Google Analytics