首页 > 技术,让世界更美好。 > 【转载】【Spark】Spark运行原理

【转载】【Spark】Spark运行原理

2017年3月2日

转载自http://www.jianshu.com/p/cedbebfeea8c

 

前两篇我们讲了spark的基础知识,包括spark的体系结构、执行框架、spark的基本数据类型以及spark中stage的划分等等。本篇只要介绍spark运行的原理。包括spark的内部执行机制,spark的基本数据类型RDD的执行流程,以及spark在不同集群下的运行架构。

 

1. Spark内部执行机制

 

1.1 内部执行流程

如下图1为分布式集群上spark应用程序的一般执行框架。主要由sparkcontext(spark上下文)、cluster manager(资源管理器)和executor(单个节点的执行进程)。其中cluster manager负责整个集群的统一资源管理。executor是应用执行的主要进程,内部含有多个task线程以及内存空间。

图1 spark分布式部署图

详细流程图如下图2:

图2 详细流程图
  • (1) 应用程序在使用spark-submit提交后,根据提交时的参数设置(deploy mode)在相应位置初始化sparkcontext,即spark的运行环境,并创建DAG Scheduler和Task Scheduer,Driver根据应用程序执行代码,将整个程序根据action算子划分成多个job,每个job内部构建DAG图,DAG Scheduler将DAG图划分为多个stage,同时每个stage内部划分为多个task,DAG Scheduler将taskset传给Task Scheduer,Task Scheduer负责集群上task的调度。至于stage和task的关系以及是如何划分的我们后面再详细讲。
  • (2) Driver根据sparkcontext中的资源需求向resource manager申请资源,包括executor数及内存资源。
  • (3) 资源管理器收到请求后在满足条件的work node节点上创建executor进程。
  • (4) Executor创建完成后会向driver反向注册,以便driver可以分配task给他执行。
  • (5) 当程序执行完后,driver向resource manager注销所申请的资源。

1.2 job、stage、task的关系

Job、stage和task是spark任务执行流程中的三个基本单位。其中job是最大的单位,Job是spark应用的action算子催生的;stage是由job拆分,在单个job内是根据shuffle算子来拆分stage的,单个stage内部可根据操作数据的分区数划分成多个task。如下图3所示

图3 job、stage和task的关系图

2. RDD 的执行流程

 

上一节我们介绍了spark应用程序的大概执行流程,由于spark应用程序中的数据块基本都是RDD,本节我们来看下应用程序中RDD的执行流程。

 

2.1 RDD 从创建到执行

 

RDD从创建到执行的流程如下图4所示

图4 RDD执行流程
  • (1) 首先针对一段应用代码,driver会以action算子为边界生成响应的DAG图
  • (2) DAG Scheduler从DAG图的末端开始,以图中的shuffle算子为边界来划分stage,stage划分完成后,将每个stage划分为多个task,DAG Scheduler将taskSet传给Task Scheduler来调用
  • (3) Task Scheduler根据一定的调度算法,将接收到的task池中的task分给work node节点中的executor执行
    这里我们看到RDD的执行流程中,DAG Scheduler和Task Scheduler起到非常关键的作用个,因此下面我们来看下DAG Scheduer和Task Scheduler的工作流程。

2.2 DAG Scheduler工作流程

DAG Scheduler是一个高级的scheduler 层,他实现了基于stage的调度,他为每一个job划分stage,并将单个stage分成多个task,然后他会将stage作为taskSet提交给底层的Task Scheduler,由Task Scheduler执行。
DAG的工作原理如下图5:

图5 DAG Scheduler工作流程

针对左边的一段代码,DAG Scheduler根据collect(action算子)将其划分到一个job中,在此job内部,划分stage,如上右图所示。DAG Scheduler在DAG图中从末端开始查找shuffle算子,上图中将reduceByKey为stage的分界,shuffle算子只有一个,因此分成两个stage。前一个stage中,RDD在map完成以后执行shuffle write将结果写到内存或磁盘上,后一个stage首先执行shuffle read读取数据在执行reduceByKey,即shuffle操作。

2.3 TASK Scheduler工作流程

Task Scheduler是sparkContext中除了DAG Scheduler的另一个非常重要的调度器,task Scheduler负责将DAGS cheduer产生的task调度到executor中执行。如下图6所示,Task Scheduler 负责将TaskSetPool中的task调度到executor中执行,一般的调度模式是FIFO(先进先出),也可以按照FAIR(公平调度)的调度模式,具体根据配置而定。其中FIFO:顾名思义是先进先出队列的调度模式,而FAIR则是根据权重来判断,权重可以根据资源的占用率来分,如可设占用较少资源的task的权重较高。这样就可以在资源较少时,调用后来的权重较高的task先执行了。至于每个executor中同时执行的task数则是由分配给每个executor中cpu的核数决定的。

图6 TaskScheduler的工作流程

3. spark三种分布式部署模式

 

Spark支持的主要的三种分布式部署方式分别是standalone、spark on mesos和 spark on YARN。standalone模式,即独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。它是Spark实现的资源调度框架,其主要的节点有Client节点、Master节点和Worker节点。而yarn是统一的资源管理机制,在上面可以运行多套计算框架,如map reduce、storm等根据driver在集群中的位置不同,分为yarn client和yarn cluster。而mesos是一个更强大的分布式资源管理框架,它允许多种不同的框架部署在其上,包括yarn。

3.1 standalone框架

Standalone模式下,集群启动时包括Master与Worker,其中Master负责接收客户端提交的作业,管理Worker。根据作业提交的方式不同,分为driver on client 和drvier on worker。如下图7所示,上图为driver on work模式,下图为driver on client模式。两种模式的主要不同点在于driver所在的位置。

图7 standalone模式

Driver on work:

  • (1) 首先client提交作业到master
  • (2) Master让一个worker启动driver进程,driver进程创建SchedulerBackend进程
  • (3) Master让其余worker启动executor进程,executor进程创建ExecutorBackend 进程
  • (4) ExecutorBackend进程启动后,会向driver的SchedulerBackend进程注册
  • (5) Executor注册后,driver会将task发送给executor执行
  • (6) 直到所有的stage都完成,程序退出

Driver on client:

  • (1) 客户端启动后,直接运行程序,启动driver进程,driver进程向master注册
  • (2) Master寻找可用的work,并让worker启动executor进程
  • (3) Executor启动后向driver的SchedulerBackend进程注册
  • (4) Driver收到注册的executor后,将task分配给注册的executor执行
  • (5) 直到所有stage都完成,程序退出

3.2 yarn集群模式

Apache yarn是apache Hadoop开源项目的一部分。设计之初是为了解决mapreduce计算框架资源管理的问题。到haodoop 2.0使用yarn将mapreduce的分布式计算和资源管理区分开来。它的引入使得Hadoop分布式计算系统进入了平台化时代,即各种计算框架可以运行在一个集群中,由资源管理系统YRAN进行统一的管理和调度,从而共享整个集群资源、提高资源利用率。
YARN总体上也Master/slave架构——ResourceManager/NodeManager。前者(RM)负责对各个NodeManager(NM)上的资源进行统一管理和调度。而container是资源分配和调度的基本单位,其中封装了机器资源,如内存、CPU、磁盘和网络等,每个任务会被分配一个Container,该任务只能在该Container中执行,并使用该Container封装的资源。NodeManager的作用则是负责接收并启动应用的container、而向RM回报本节点上的应用Container运行状态和资源使用情况。ApplicationMaster与具体的Application相关,主要负责同ResourceManager协商以获取合适的Container,并跟踪这些Container的状态和监控其进度。如下图8所示为yarn集群的一般模型。

图8 yarn集群一般部署

Spark在yarn集群上的部署方式分为两种,yarn client(driver运行在客户端)和yarn cluster(driver运行在master上),driver on master如下图9所示。

图9 yarn cluster部署图
  • (1) Spark Yarn Client向YARN中提交应用程序,包括Application Master程序、启动Application Master的命令、需要在Executor中运行的程序等;
  • (2) Resource manager收到请求后,在其中一个node manager中为应用程序分配一个container,要求它在container中启动应用程序的Application Master,Application master初始化sparkContext以及创建DAG Scheduler和Task Scheduler。
  • (3) Application master根据sparkContext中的配置,向resource manager申请container,同时,Application master向Resource manager注册,这样用户可通过Resource manager查看应用程序的运行状态
  • (4) Resource manager 在集群中寻找符合条件的node manager,在node manager启动container,要求container启动executor,
  • (5) Executor启动后向Application master注册,并接收Application master分配的task
  • (6) 应用程序运行完成后,Application Master向Resource Manager申请注销并关闭自己。
    Driver on client如下图10所示:

    图10 yarn client部署图
  • (1) Spark Yarn Client向YARN的Resource Manager申请启动Application Master。同时在SparkContent初始化中将创建DAG Scheduler和TASK Scheduler等
  • (2) ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派
  • (3) Client中的SparkContext初始化完毕后,与Application Master建立通讯,向Resource Manager注册,根据任务信息向Resource Manager申请资源(Container)
  • (4) 当application master申请到资源后,便与node manager通信,要求它启动container
  • (5) Container启动后向driver中的sparkContext注册,并申请task
  • (6) 应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己。
    从下图11:Yarn-client和Yarn cluster模式对比可以看出,在Yarn-client(Driver on client)中,Application Master仅仅从Yarn中申请资源给Executor,之后client会跟container通信进行作业的调度。如果client离集群距离较远,建议不要采用此方式,不过此方式有利于交互式的作业。

    图11 Yarn-client和Yarn cluster模式对比

    \

3.3 mesos集群模式

Mesos是apache下的开源分布式资源管理框架。起源于加州大学伯克利分校,后被twitter推广使用。Mesos上可以部署多种分布式框架,Mesos的架构图如下图12所示,其中Framework是指外部的计算框架,如Hadoop,Mesos等,这些计算框架可通过注册的方式接入mesos,以便mesos进行统一管理和资源分配。

图12 mesos一般部署图

在 Mesos 上运行的 framework 由两部分组成:一个是 scheduler ,通过注册到master 来获取集群资源。另一个是在 slave 节点上运行的executor进程,它可以执行 framework 的 task 。 Master 决定为每个framework 提供多少资源,framework 的 scheduler来选择其中提供的资源。当 framework同意了提供的资源,它通过master将 task发送到提供资源的slaves 上运行。Mesos的资源分配图如下图13。

图13 mesos资源分配图
  • (1) Slave1 向 Master 报告,有4个CPU和4 GB内存可用
  • (2) Master 发送一个 Resource Offer 给 Framework1 来描述 Slave1 有多少可用资源
  • (3) FrameWork1 中的 FW Scheduler会答复 Master,我有两个 Task 需要运行在 Slave1,一个 Task 需要<2个CPU,1 GB内存=””>,另外一个Task需要<1个CPU,2 GB内存=””>
  • (4) 最后,Master 发送这些 Tasks 给 Slave1。然后,Slave1还有1个CPU和1 GB内存没有使用,所以分配模块可以把这些资源提供给 Framework2
    Spark可作为其中一个分布式框架部署在mesos上,部署图与mesos的一般框架部署图类似,如下图14,这里不再重述。

    图14 spark on mesos部署图

3.4 spark 三种部署模式的区别

在这三种部署模式中,standalone作为spark自带的分布式部署模式,是最简单也是最基本的spark应用程序部署模式,这里就不再赘述。这里就讲一下yarn和mesos的区别:

  • (1) 就两种框架本身而言,mesos上可部署yarn框架。
  • (2) mesos双层调度机制,能支持多种调度模式,而Yarn通过Resource Mananger管理集群资源,只能使用一种调度模式。Mesos 的双层调度机制为:mesos可接入如yarn一般的分布式部署框架,但Mesos要求可接入的框架必须有一个调度器模块,该调度器负责框架内部的任务调度。当一个framework想要接入mesos时,需要修改自己的调度器,以便向mesos注册,并获取mesos分配给自己的资源, 这样再由自己的调度器将这些资源分配给框架中的任务,也就是说,整个mesos系统采用了双层调度框架:第一层,由mesos将资源分配给框架;第二层,框架自己的调度器将资源分配给自己内部的任务。
  • (3) mesos可实现粗、细粒度资源调度,可动态分配资源,而Yarn只能实现静态资源分配。其中粗粒度和细粒度调度定义如下:
    粗粒度模式(Coarse-grained Mode):程序运行之前就要把所需要的各种资源(每个executor占用多少资源,内部可运行多少个executor)申请好,运行过程中不能改变。
    细粒度模式(Fine-grained Mode):为了防止资源浪费,对资源进行按需分配。与粗粒度模式一样,应用程序启动时,先会启动executor,但每个executor占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,mesos会为每个executor动态分配资源,每分配一些,便可以运行一个新任务,单个Task运行完之后可以马上释放对应的资源。每个Task会汇报状态给Mesos slave和Mesos Master,便于更加细粒度管理和容错,这种调度模式类似于MapReduce调度模式,每个Task完全独立,优点是便于资源控制和隔离,但缺点也很明显,短作业运行延迟大。
本文的评论功能被关闭了.