博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
源码阅读——hadoop yarn之AsyncDispatcher
阅读量:4223 次
发布时间:2019-05-26

本文共 7560 字,大约阅读时间需要 25 分钟。

最近看了读到了关于hadoop中yarn的编程模型的文章 ,想深入了解下它的事件机制是怎么实现的,就看了看其中的AsyncDispatcher类的源码,幸好该类涉及到的其它类不多,读起来也不算吃力

我们先来看其继承关系,AsyncDispatcher继承了AbstractService类,实现了Dispatcher接口,其中Dispatcher接口主要是定义了两个方法register以及getEventHandler,以及一个用于配置事件循环是否在错误后推出的配置,不用太关注这个。

重点看下AbstractService,AbstractService类实现了Service接口,AbstractService中我们主要关注几个方法:

服务生命周期部分:init,start,stop,serviceInit,serviceStart,serviceStop

服务监听器部分:registerServiceListener,unregisterServiceListener,registerGlobalListener,unregisterGlobalListener

其中init,start,stop是继承自service接口的,是暴露给用户用于初始化,启动和停止服务的接口,而serviceInit,serviceStart,serviceStop是空方法,在init,start,stop中被调用,用于子类继承重写,从而自定义自己的初始化,启动以及停止操作。

那么为什么不直接让子类重写init,start,stop方法呢,因为init,start,stop除了调用你们自定义的启动过程,还做了一些其它的事,就是改变service的状态以及通知其它监听了该服务的回掉,其中registerServiceListener,unregisterServiceListener,registerGlobalListener,unregisterGlobalListener就是用于注册服务状态回掉的接口,注意,此处的的回掉不是事件回掉,而是服务状态改变时的回掉,每个服务都有该功能,不要和后文的AbstractService的事件回掉弄混了。

大致了解了AbstractService的功能是提供自定义的服务初始化,启动和停止的接口以及维持服务状态+状态改变后通知接听者,我们接下来就可以开始看AbstractService了,对于AbstractService的大致流程我们大致可以在脑中想一下,不外乎内部有一个事件队列,同时有一个线程不断的从事件队列里取事件,然后根据注册情况进行分发,大致思路就是这样的。

我们重点是看看具体的实现,首先我们看创建取事件的线程的

Runnable createThread() {    return new Runnable() {      @Override      public void run() {//        首先定义两个循环终止条件,stopped变量以及线程被Interrupt,其中stopped是volatile,保证可以及时的看到改变        while (!stopped && !Thread.currentThread().isInterrupted()) {//          任务队列是否为空          drained = eventQueue.isEmpty();          // blockNewEvents is only set when dispatcher is draining to stop,          // adding this check is to avoid the overhead of acquiring the lock          // and calling notify every time in the normal run of the loop.          if (blockNewEvents) {            synchronized (waitForDrained) {              if (drained) {//                任务队列为空才提醒,waitForDrained就是一个Object的实例,用于做线程间通信而已                waitForDrained.notify();              }            }          }          Event event;          try {//            用阻塞的模式从事件队列的取事件            event = eventQueue.take();          } catch(InterruptedException ie) {//            如果take被中断,且不是service被停止导致的,那么log打印一句警告            if (!stopped) {              LOG.warn("AsyncDispatcher thread interrupted", ie);            }            return;          }          if (event != null) {//            分发事件            dispatch(event);          }        }      }    };  }

大致就是做了take事件,然后dispatch事件的操作,其它操作则是为了保证线程优雅推出,以及推出前完成当前事件队列里事件的分发

接下来就是dispatch方法了

protected void dispatch(Event event) {    //all events go thru this loop    if (LOG.isDebugEnabled()) {      LOG.debug("Dispatching the event " + event.getClass().getName() + "."          + event.toString());    }//    获取事件类型    Class
type = event.getType().getDeclaringClass(); try{// 获取相应的handler,EventHandler是个接口 EventHandler handler = eventDispatchers.get(type); if(handler != null) {// 调用handler的handle方法, handler.handle(event); } else { throw new Exception("No handler for registered for " + type); } } catch (Throwable t) { //TODO Maybe log the state of the queue LOG.fatal("Error in dispatcher thread", t); // If serviceStop is called, we should exit this thread gracefully. if (exitOnDispatchException && (ShutdownHookManager.get().isShutdownInProgress()) == false && stopped == false) { Thread shutDownThread = new Thread(createShutDownThread()); shutDownThread.setName("AsyncDispatcher ShutDown handler"); shutDownThread.start(); } } }

这个没啥好说的,注意一点就是,dispatch方法是直接调用了handler的handle方法的,所以如果handle里会出现要处理长时间的任务,务必记得新开一个线程执行,不然会影响eventloop的

接下来看一下用于注册事件的接口

public void register(Class
eventType, EventHandler handler) {
/* check to see if we have a listener registered */ EventHandler
registeredHandler = (EventHandler
) eventDispatchers.get(eventType); LOG.info("Registering " + eventType + " for " + handler.getClass()); if (registeredHandler == null) { eventDispatchers.put(eventType, handler); } else if (!(registeredHandler instanceof MultiListenerHandler)){ /* for multiple listeners of an event add the multiple listener handler */ MultiListenerHandler multiHandler = new MultiListenerHandler(); multiHandler.addHandler(registeredHandler); multiHandler.addHandler(handler); eventDispatchers.put(eventType, multiHandler); } else { /* already a multilistener, just add to it */ MultiListenerHandler multiHandler = (MultiListenerHandler) registeredHandler; multiHandler.addHandler(handler); } }

这个纯粹看英文注释就知道注册事件时碰到的三种情况(事件未注册过,事件已注册过但事件的handler不是符合handler,事件已注册过且事件的handler是复合handler)下进行的操作了,提一句MultiListenerHandler是实现了EventHandler接口的类,代表符合handler,里面维持了一个handler的list

接下来看看怎么触发事件,正常来说就是将事件推入事件队列即可,但是hadoop这里的实现让我有点不太明白为什么要这么做,希望有人可以指导一下

首先我们看一下项目里别人是怎么触发事件的

dispatcher.getEventHandler().handle(event);

然后我们看一下getEventHandler方法

@Overridepublic EventHandler getEventHandler() {  return handlerInstance;}

handlerInstance则是

private final EventHandler handlerInstance = new GenericEventHandler();

而GenericEventHandler则是实现了EventHandler接口的一个内部类

class GenericEventHandler implements EventHandler
{
public void handle(Event event) { if (blockNewEvents) { return; } drained = false; /* all this method does is enqueue all the events onto the queue */ int qSize = eventQueue.size(); if (qSize != 0 && qSize % 1000 == 0 && lastEventQueueSizeLogged != qSize) { lastEventQueueSizeLogged = qSize; LOG.info("Size of event-queue is " + qSize); } int remCapacity = eventQueue.remainingCapacity(); if (remCapacity < 1000) { LOG.warn("Very low remaining capacity in the event-queue: " + remCapacity); } try { eventQueue.put(event); } catch (InterruptedException e) { if (!stopped) { LOG.warn("AsyncDispatcher thread interrupted", e); } // Need to reset drained flag to true if event queue is empty, // otherwise dispatcher will hang on stop. drained = eventQueue.isEmpty(); throw new YarnRuntimeException(e); } }; }

没错,我们终于在GenericEventHandler的handle方法里看到了将事件推入事件队列的操作了,不太理解为什么要绕这么一个大弯,望有人解答。handle里还做了检查事件队列剩余大小的工作。

最后我们看一下AsyncDispatcher的停止服务的方法

@Override  protected void serviceStop() throws Exception {//    用一个标准位表示是否要处理完事件队列里的事件,该标志位是有接口可以更改的    if (drainEventsOnStop) {//      将blockNewEvents设为true,禁止再向事件队列里添加事件了      blockNewEvents = true;      LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");      long endTime = System.currentTimeMillis() + getConfig()          .getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT,              YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT);      synchronized (waitForDrained) {        while (!drained && eventHandlingThread != null            && eventHandlingThread.isAlive()            && System.currentTimeMillis() < endTime) {//          等待事件循环线程将事件队列里的任务处理完          waitForDrained.wait(1000);          LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +              eventHandlingThread.getState());        }      }    }//    设置服务停止标志位为true    stopped = true;    if (eventHandlingThread != null) {//      中断事件循环线程      eventHandlingThread.interrupt();      try {//        等待线程结束        eventHandlingThread.join();      } catch (InterruptedException ie) {        LOG.warn("Interrupted Exception while stopping", ie);      }    }    // stop all the components    super.serviceStop();  }

转载地址:http://oigmi.baihongyu.com/

你可能感兴趣的文章
SSL与TLS的区别以及介绍
查看>>
HTTPS、TLS、SSL、HTTP区别和关系
查看>>
Kafka 入门三问
查看>>
c/c++ 内存泄漏检测,开源工具valgrind使用整理
查看>>
h264 sps pps详解
查看>>
AAC的ADTS头信息介绍
查看>>
Coroutine,你究竟干了什么?
查看>>
代码宏的一点小知识
查看>>
Sweet Snippet系列 之 随机选择
查看>>
名人•牛人•我们这些普通人
查看>>
小话游戏脚本(一)
查看>>
使用VS2010在项目中编写C++头文现出"PCH 警告:标头停止点不能位于宏或#if块中"
查看>>
统计源期刊
查看>>
多线程解码并保存为yuv
查看>>
使用信号量控制线程执行顺序,进而控制不同视频流的解码顺序
查看>>
解码单个视频及保存yuv数据到文件中
查看>>
为什么基类中的析构函数要声明为虚析构函数?
查看>>
对象切割 - 常量引用传递
查看>>
北邮同学面经
查看>>
Effective C++条款16:成对使用new和delete时要采取相同形式
查看>>