本文共 7560 字,大约阅读时间需要 25 分钟。
最近看了读到了关于hadoop中yarn的编程模型的文章 ,想深入了解下它的事件机制是怎么实现的,就看了看其中的AsyncDispatcher类的源码,幸好该类涉及到的其它类不多,读起来也不算吃力
我们先来看其继承关系,AsyncDispatcher继承了AbstractService类,实现了Dispatcher接口,其中Dispatcher接口主要是定义了两个方法register以及getEventHandler,以及一个用于配置事件循环是否在错误后推出的配置,不用太关注这个。
重点看下AbstractService,AbstractService类实现了Service接口,AbstractService中我们主要关注几个方法:
服务生命周期部分:init,start,stop,serviceInit,serviceStart,serviceStop
服务监听器部分:registerServiceListener,unregisterServiceListener,registerGlobalListener,unregisterGlobalListener
其中init,start,stop是继承自service接口的,是暴露给用户用于初始化,启动和停止服务的接口,而serviceInit,serviceStart,serviceStop是空方法,在init,start,stop中被调用,用于子类继承重写,从而自定义自己的初始化,启动以及停止操作。
那么为什么不直接让子类重写init,start,stop方法呢,因为init,start,stop除了调用你们自定义的启动过程,还做了一些其它的事,就是改变service的状态以及通知其它监听了该服务的回掉,其中registerServiceListener,unregisterServiceListener,registerGlobalListener,unregisterGlobalListener就是用于注册服务状态回掉的接口,注意,此处的的回掉不是事件回掉,而是服务状态改变时的回掉,每个服务都有该功能,不要和后文的AbstractService的事件回掉弄混了。
大致了解了AbstractService的功能是提供自定义的服务初始化,启动和停止的接口以及维持服务状态+状态改变后通知接听者,我们接下来就可以开始看AbstractService了,对于AbstractService的大致流程我们大致可以在脑中想一下,不外乎内部有一个事件队列,同时有一个线程不断的从事件队列里取事件,然后根据注册情况进行分发,大致思路就是这样的。
我们重点是看看具体的实现,首先我们看创建取事件的线程的
Runnable createThread() { return new Runnable() { @Override public void run() {// 首先定义两个循环终止条件,stopped变量以及线程被Interrupt,其中stopped是volatile,保证可以及时的看到改变 while (!stopped && !Thread.currentThread().isInterrupted()) {// 任务队列是否为空 drained = eventQueue.isEmpty(); // blockNewEvents is only set when dispatcher is draining to stop, // adding this check is to avoid the overhead of acquiring the lock // and calling notify every time in the normal run of the loop. if (blockNewEvents) { synchronized (waitForDrained) { if (drained) {// 任务队列为空才提醒,waitForDrained就是一个Object的实例,用于做线程间通信而已 waitForDrained.notify(); } } } Event event; try {// 用阻塞的模式从事件队列的取事件 event = eventQueue.take(); } catch(InterruptedException ie) {// 如果take被中断,且不是service被停止导致的,那么log打印一句警告 if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", ie); } return; } if (event != null) {// 分发事件 dispatch(event); } } } }; }
大致就是做了take事件,然后dispatch事件的操作,其它操作则是为了保证线程优雅推出,以及推出前完成当前事件队列里事件的分发
接下来就是dispatch方法了
protected void dispatch(Event event) { //all events go thru this loop if (LOG.isDebugEnabled()) { LOG.debug("Dispatching the event " + event.getClass().getName() + "." + event.toString()); }// 获取事件类型 Class type = event.getType().getDeclaringClass(); try{// 获取相应的handler,EventHandler是个接口 EventHandler handler = eventDispatchers.get(type); if(handler != null) {// 调用handler的handle方法, handler.handle(event); } else { throw new Exception("No handler for registered for " + type); } } catch (Throwable t) { //TODO Maybe log the state of the queue LOG.fatal("Error in dispatcher thread", t); // If serviceStop is called, we should exit this thread gracefully. if (exitOnDispatchException && (ShutdownHookManager.get().isShutdownInProgress()) == false && stopped == false) { Thread shutDownThread = new Thread(createShutDownThread()); shutDownThread.setName("AsyncDispatcher ShutDown handler"); shutDownThread.start(); } } }
这个没啥好说的,注意一点就是,dispatch方法是直接调用了handler的handle方法的,所以如果handle里会出现要处理长时间的任务,务必记得新开一个线程执行,不然会影响eventloop的
接下来看一下用于注册事件的接口
public void register(Class eventType, EventHandler handler) { /* check to see if we have a listener registered */ EventHandlerregisteredHandler = (EventHandler ) eventDispatchers.get(eventType); LOG.info("Registering " + eventType + " for " + handler.getClass()); if (registeredHandler == null) { eventDispatchers.put(eventType, handler); } else if (!(registeredHandler instanceof MultiListenerHandler)){ /* for multiple listeners of an event add the multiple listener handler */ MultiListenerHandler multiHandler = new MultiListenerHandler(); multiHandler.addHandler(registeredHandler); multiHandler.addHandler(handler); eventDispatchers.put(eventType, multiHandler); } else { /* already a multilistener, just add to it */ MultiListenerHandler multiHandler = (MultiListenerHandler) registeredHandler; multiHandler.addHandler(handler); } }
这个纯粹看英文注释就知道注册事件时碰到的三种情况(事件未注册过,事件已注册过但事件的handler不是符合handler,事件已注册过且事件的handler是复合handler)下进行的操作了,提一句MultiListenerHandler是实现了EventHandler接口的类,代表符合handler,里面维持了一个handler的list
接下来看看怎么触发事件,正常来说就是将事件推入事件队列即可,但是hadoop这里的实现让我有点不太明白为什么要这么做,希望有人可以指导一下
首先我们看一下项目里别人是怎么触发事件的
dispatcher.getEventHandler().handle(event);
然后我们看一下getEventHandler方法
@Overridepublic EventHandler getEventHandler() { return handlerInstance;}
handlerInstance则是
private final EventHandler handlerInstance = new GenericEventHandler();
而GenericEventHandler则是实现了EventHandler接口的一个内部类
class GenericEventHandler implements EventHandler{ public void handle(Event event) { if (blockNewEvents) { return; } drained = false; /* all this method does is enqueue all the events onto the queue */ int qSize = eventQueue.size(); if (qSize != 0 && qSize % 1000 == 0 && lastEventQueueSizeLogged != qSize) { lastEventQueueSizeLogged = qSize; LOG.info("Size of event-queue is " + qSize); } int remCapacity = eventQueue.remainingCapacity(); if (remCapacity < 1000) { LOG.warn("Very low remaining capacity in the event-queue: " + remCapacity); } try { eventQueue.put(event); } catch (InterruptedException e) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", e); } // Need to reset drained flag to true if event queue is empty, // otherwise dispatcher will hang on stop. drained = eventQueue.isEmpty(); throw new YarnRuntimeException(e); } }; }
没错,我们终于在GenericEventHandler的handle方法里看到了将事件推入事件队列的操作了,不太理解为什么要绕这么一个大弯,望有人解答。handle里还做了检查事件队列剩余大小的工作。
最后我们看一下AsyncDispatcher的停止服务的方法
@Override protected void serviceStop() throws Exception {// 用一个标准位表示是否要处理完事件队列里的事件,该标志位是有接口可以更改的 if (drainEventsOnStop) {// 将blockNewEvents设为true,禁止再向事件队列里添加事件了 blockNewEvents = true; LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); long endTime = System.currentTimeMillis() + getConfig() .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT); synchronized (waitForDrained) { while (!drained && eventHandlingThread != null && eventHandlingThread.isAlive() && System.currentTimeMillis() < endTime) {// 等待事件循环线程将事件队列里的任务处理完 waitForDrained.wait(1000); LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" + eventHandlingThread.getState()); } } }// 设置服务停止标志位为true stopped = true; if (eventHandlingThread != null) {// 中断事件循环线程 eventHandlingThread.interrupt(); try {// 等待线程结束 eventHandlingThread.join(); } catch (InterruptedException ie) { LOG.warn("Interrupted Exception while stopping", ie); } } // stop all the components super.serviceStop(); }
转载地址:http://oigmi.baihongyu.com/