Hadoop0.23.0
RM-NM-AM源码级分析
缩写词
NM:NodeManager
RM:ResourceManager
AM:
ApplicationManager
表格1 ResourceManager的服务
父类或接口
|
实现类
|
功能
|
Dispatcher
|
AsyncDispatcher
|
转交各种事件到不同的处理窗口,需要注册事件与处理类的绑定。(线程级)
|
AbstractLiveLiness
|
ContainerAllocationExpirer
|
监控Containter是否到期
|
AbstractLiveLiness
|
AMLivelinessMonitor
|
监控每一个App的工作状态
|
AbstractService/ResourceTracker
|
ResourceTrackerService
|
负责NM的注册以及与NM保持心跳
|
AbstractService/ClientRMProtocol
|
ClientRMService
|
连接到RM的客户端,client与RM之间的交互。
|
AbstractService/
NodeStatusUpdater
|
NodeStatusUpdaterImpl
|
监控节点状态更新的情
|
AbstractLivelinessMonitor
|
NMLivelinessMonitor
|
监控每一个NM的存活状态
|
AbstractService/AMRMProtocol
|
ApplicationMasterService
|
与AM交互的服务
|
/EventHandler<RMAppManagerEvent>
|
RMAppManager
|
RM统管App
list,
|
AbstractService/
EventHandler<AMLauncherEvent>
|
ApplicationMasterLauncher
|
管理AM的启动和退出。(线程级)
|
AbstractService/RMAdminProtocol
|
AdminService
|
实现RM的管理的协议的服务
|
表格2 NodeManager的服务
父类或接口
|
实现类
|
功能
|
ContainerExecutor
|
DefaultContainerExecutor(conf可配置)
|
Container执行服务
|
AbstractService
|
DeletionService
|
删除执行容器服务
|
AbstractService
|
NodeHealthCheckerService
|
检查节点的健康状况,提供可访问API
|
AbstractService/
NodeStatusUpdater
|
NodeStatusUpdaterImpl
|
提供向RM注册和心跳服务
|
AbstractService/
NodeResourceMonitor
|
NodeResourceMonitorImpl
|
Hadoop0.23.0没有具体的实现
|
CompositeService/
ServiceStateChangeListener, ContainerManager,
EventHandler<ContainerManagerEvent>
|
ContainerManagerImpl
|
提供节点内容器的管理服务
|
|
|
|
与RM交互的分析:
1、Client提交作业的过程。
(1)Client提交作业给JobSubmitter,JobSubmitter进行一系列的作业初始化工作之后,通过Java接口和继承机制,JobSubmitter中成员变量submitClient映射成YARNRunner的一个实例,YARNRunner是新版本的JobClient。
(2)YARNRunner执行submitJob。YARNRunner中的ResourceMgrDelegate通过RPC实现ClientRMProtocol接口的客户端实例代理,实现Client端与RM进行交互的目的。
ClientRMProtocol的接口如下表所示:
表3:接口ClientRMProtocol的函数
接口
|
含义
|
getNewApplication(GetNewApplicationRequest
request)
|
获取一个ApplicationID和集群当前状态的信息
|
submitApplication(
SubmitApplicationRequest request)
|
向RM提交一个作业
|
forceKillApplication(
KillApplicationRequest request)
|
向RM请求停止一个作业的执行
|
getApplicationReport(
GetApplicationReportRequest request)
|
从RM获取一个作业的状态报告
|
getClusterMetrics(
GetClusterMetricsRequest request)
|
从RM获得集群的资源使用情况信息
|
getAllApplications(
GetAllApplicationsRequest request)
|
从RM获得所有作业的状态信息
|
getClusterNodes(
GetClusterNodesRequest request)
|
从RM获得所有工作节点的状态信息
|
getQueueInfo(
GetQueueInfoRequest request)
|
从RM获取作业队列的信息
|
getQueueUserAcls(
GetQueueUserAclsInfoRequest request)
|
从RM获取当前用户的ACL
|
接口使用方式:
YarnConfiguration conf = new
YarnConfiguration();
InetSocketAddress rmAddress
=
NetUtils.createSocketAddr(this.conf.get(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS),
YarnConfiguration.DEFAULT_RM_PORT,
YarnConfiguration.RM_ADDRESS);
ClientRMProtocol applicationsManager
=
(ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
rmAddress, this.conf);
接下来就可以使用applicationManager调用表格3中的接口,接口函数中输入和返回参数请参阅源码。
RM中负责与Client进行交互的服务是ClientRMService。
2、NM与RM的交互
(1)NM向RM注册。NM继承CompositeService,在init()中将表2中的服务添加到系统中,NM调用start(),通过super.start()会启动所有服务。NodeStatusUpdater作为NM的一种服务被启动起来,在NodeStatusUpdaterImpl的start函数内,调用函数registerWithRM()实现向RM注册。
NM向RM注册服务的过程如下:
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress =
NetUtils.createSocketAddr(this.rmAddress,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS);
ResourceTracker resourceTracker = (ResourceTracker)
rpc.getProxy(ResourceTracker.class, rmAddress, conf);
resourceTracker.registerNodeManager(request);//request需要根据RM的端口号等相关信息设置
(2)NM与RM之间心跳。在NM通过启动NodeStatusUpdater服务,借助NodeStatusUpdaterImpl的registerWithRM()注册到对应的RM之后,调用startStatusUpdater(),启动一个线程,周期性地与RM之间保持心跳。
NM与RM之间的心跳链接如下:
NodeStatus nodeStatus = getNodeStatus();
nodeStatus.setResponseId(lastHeartBeatID);
NodeHeartbeatRequest request = recordFactory
.newRecordInstance(NodeHeartbeatRequest.class);
request.setNodeStatus(nodeStatus);
HeartbeatResponse response =
resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();//NM向RM报告状态信息,RM向NM做反馈
org.apache.hadoop.yarn.server.api.ResourceTracker是NM与RM之间的注册与心跳服务接口。
表4 ResourceTracker的接口函数
接口
|
含义
|
registerNodeManager(RegisterNodeManagerRequest
request)
|
NM注册到RM
|
nodeHeartbeat(NodeHeartbeatRequest request)
|
NM发送心跳到RM
|
RM中负责NM中心跳和注册功能的服务类为ResourceTrackerService
http://s16/middle/4a1f59bftb4bf4f84665f&690
图1
NM与RM之间的交互
(3)NM与RM之间交互与资源分配关系
在NM与RM之间进行心跳互联最为核心的工作是完成NM的Container的分配和释放。
²
NM启动过程,通过ResourceTracker的协议注册到RM上,注册过程会创建一个RMNode的对象,记录NM上执行Applications和Containers的状况。NM注册到RM意味着,RM有更多可以调度的资源。因此,注册过程最后发送NodeAddedSchedulerEvent事件发送给RM上绑定的调度器,调度器将RMNode加入到可利用资源列表。
²
NM与RM之间心跳过程。NM完成注册到RM的过程之后,会启动进入到一个线程循环中。
3、
RM与AM之间的交互
(1)Job提交到RM的过程。Client将作业提交到RM,RM的ClientRMService负责处理该请求。ClientRMService将作业封装成RMAppManagerSubmitEvent,直接调用RMAppManager的事件处理程序。
RMAppManager的handle函数,处理RMAppManagerSubmitEvent事件。使用RMAppImpl初始化RMApp,提交<app-id,RMApp>到RMContext,提交<app-id,
App-ACL>到ApplicationACLsManager。最后,将applicationId, RMAppEventType.START封装成RMAppEvent,由RM中的dispatcher分配给RMAppImpl的handle函数处理。
根据RMAppImpl设置的状态机,RMAppEventType.START事件驱动RMAppState.NEW状态到RMAppState.SUBMITTED,使用RMAppAttemptImpl的实例化RMAppAttempt,将RMAppAttemptEventType.START事件传递给RMAppAttempt。
根据RMAppAttemptImpl状态机的设置,RMAppAttemptEventType.START事件驱动RMAppAttemptState.NEW转移到RMAppAttemptState.SUBMITTED,同时将AppAttempt注册到RM的ApplicationMasterService,并将该AppAttempt提交到ResourceScheduler,Hadoop-Yarn提供的默认调度器是FifoResourceScheduler。然后,通过RM的SchedulerEventDispatcher分发AppAddedSchedulerEvent事件。
SchedulerEventDispatcher是线程模式的事件分配器,它将接收到的事件交给RM设置的ResourceScheduler来处理。FifoResourceScheduler使用handle(SchedulerEvent event)处理AppAddedSchedulerEvent。由AppAttemptID初始化一个SchedulerApp,并向RMAppAttemptImpl发送RMAppAttemptEventType.APP_ACCEPTED类型的事件。
根据RMAppAttemptImpl状态机的设置,RMAppAttemptEventType.APP_ACCEPTED事件驱动RMAppAttemptState.SUBMITTED转移到RMAppAttemptState.SCHEDULED,同时将该AppAttempt被接收的事件向RMApp发送RMAppEventType.APP_ACCEPTED类型的事件,使得RMAppState由SUBMITTED转移到ACCEPTED。最后,为了启动AM,向资源调度器请求一个Container。
到此,由JobClient提交的App的过程在RM的操作全部结束,这个过程中主要是创建了RMAppAttempt实例,并且修改了RMApp的状态,系统识别到当前有一个App被添加到RM。
(2)NM与RM的心跳,部署ApplicationMaster到NM的Container。
NM周期性地向RM传送Heartbeat,RM的ReourceTrackerService负责心跳事件,在RM端使用RMNode来标识NM节点的状态。NM心跳带来的NM节点状态的情况,会触发RMNodeStatusEvent,该类型的事件被绑定RMNodeImpl来处理。
心跳过程产生了RMNodeEventType.STATUS_UPDATE事件,触发
RMNodeState.RUNNING
->EnumSet.of(RMNodeState.RUNNING,
RMNodeState.UNHEALTHY) 的状态转移,并且会调用钩子new statusUpdateWhenHealthyTransition()。在该函数内会查阅NM状态信息中Container的状态,并把Container状态的更新以及Container的增删情况同步到RMNodeImpl中的justLaunchedContainers,并且将NM中处于RUNNING状态的Container封装到newlyLaunchedContainers,将NM中处于COMPLETE状态的Container封装到completedContainers,然后封装成NodeUpdateSchedulerEvent事件交由给FifoResourceScheduler来处理。
FifoResourceScheduler接收到NodeUpdateSchedulerEvent,执行nodeUpdate函数。nodeUpdate函数操作App到可用的Container上,或者回收已经处于FINISHED状态的Container状态资源。
假设我们第一次提交App,跟踪AM启动流程如下:
1)调用assignContainers(node);分配Container到心跳互联的节点。
2)assignContainers(node),遍历待提交的Application队列,计算启动AM的Container需要的资源和需求,与该node的资源和特征进行匹配。匹配成功意味着,当前节点满足Application调度的需求,进入第3)步。
3)assignContainersOnNode(node, application,
priority)将Container分配给app。由于App没有数据本地性的需求,因此,进入assignOffSwitchContainers(node, application,
priority),进入4)
4)调用assignContainer(node, application,
priority,
request.getNumContainers(), request,
NodeType.OFF_SWITCH);
创建一个新的Container,并将该Container通知app对应SchedulerApp,在SchedulerApp中创建RMContainer的实例,并添加到SchedulerApp的newlyAllocatedContainers,并触发RMContainerEventType.START事件。
5)RMContainerEventType.START事件的hook函数会触发一个RMAppAttemptContainerAllocatedEvent的事件,使得RMAppAttemptState从SCHEDULED到ALLOCATED,并调用回调函数。
6)RMAppAttemptContainerAllocatedEvent的回调函数,会通过FifoResourceScheduler获得一个AM Container,并触发RMContainerState状态从ALLOCATED到ACQUIRED,同时,将该Container设置为AppAttempt.masterContainer,最后触发AMLauncherEventType.LAUNCH事件。
7)AMLauncher调用launch(),获得NM上ContainerManager的代理,设置ContainerContext,然后通过代理调用NM端的ContainerManager的startContainer,这样在该NM上就启动了一个ApplicationMaster。
8)将ApplicationMaster注册到RM中的AMLivelinessMonitor,此时,每次RMAppAttemptEventType.STATUS_UPDATE事件,会通知AMLivelinessMonitor,通过时间戳探测AM的健康状况。
ps:快写吐了,凌乱发表。是不是看到这里,也快看吐了,对不住,影响大家心情了http://www/uc/myshow/blog/misc/gif/E___6706EN00SIGG.gif。建议大家还是看源码吧。为了不影响转载的心情,大家还是传递一下这个博客吧.Hadoop0.23.0初探5---RM、NM与AM源码级分析
如果愿意一起阅读Hadoop0.23的源码,并愿意与他人分享,联系方式如下。
http://s2/middle/4a1f59bftb37d788a9371&690
加载中,请稍候......