博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark job提交5
阅读量:4216 次
发布时间:2019-05-26

本文共 5062 字,大约阅读时间需要 16 分钟。

最终调用makeOffers来让所有的task执行在work中的executorsspark-master\spark-master\core\src\main\scala\org\apache\spark\scheduler\cluster\CoarseGrainedSchedulerBackend.scala    private def makeOffers() {      // Make sure no executor is killed while some task is launching on it#通过resourceOffers得到taskDescs      val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {        // Filter out executors under killing        val activeExecutors = executorDataMap.filterKeys(executorIsAlive)        val workOffers = activeExecutors.map {          case (id, executorData) =>            new WorkerOffer(id, executorData.executorHost, executorData.freeCores,              Some(executorData.executorAddress.hostPort))        }.toIndexedSeq        scheduler.resourceOffers(workOffers)      }#在worker节点上启动任务的运行      if (!taskDescs.isEmpty) {        launchTasks(taskDescs)      }    }我们先看看scheduler.resourceOffersspark-master\core\src\main\scala\org\apache\spark\scheduler\TaskSchedulerImpl.scala  def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {    // Mark each slave as alive and remember its hostname    // Also track if new executor is added    var newExecAvail = false#查看host是否已经被包含    for (o <- offers) {      if (!hostToExecutors.contains(o.host)) {        hostToExecutors(o.host) = new HashSet[String]()      }#处理新的excuter的加入,如果没有包含excuter的话,则设置newExecAvail 为true      if (!executorIdToRunningTaskIds.contains(o.executorId)) {        hostToExecutors(o.host) += o.executorId        executorAdded(o.executorId, o.host)        executorIdToHost(o.executorId) = o.host        executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()        newExecAvail = true      }#将o.host添加host的表中      for (rack <- getRackForHost(o.host)) {        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host      }    }    val shuffledOffers = shuffleOffers(filteredOffers)    // Build a list of tasks to assign to each worker.#得到task的map    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))#得到可以使用的cpus和slots    val availableCpus = shuffledOffers.map(o => o.cores).toArray    val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum#    val sortedTaskSets = rootPool.getSortedTaskSetQueue    for (taskSet <- sortedTaskSets) {      logDebug("parentName: %s, name: %s, runningTasks: %s".format(        taskSet.parent.name, taskSet.name, taskSet.runningTasks))#将taskset添加到前面新加入的executor中      if (newExecAvail) {        taskSet.executorAdded()      }    }    for (taskSet <- sortedTaskSets) {#taskset设置了barrier且已经超过了slot的数量,则不能调度      if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {        logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " +          s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " +          s"number of available slots is $availableSlots.")      } else {        var launchedAnyTask = false        // Record all the executor IDs assigned barrier tasks on.        val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()        for (currentMaxLocality <- taskSet.myLocalityLevels) {          var launchedTaskAtCurrentMaxLocality = false          do {#按照就近原则进行task调度            launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,              currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs)            launchedAnyTask |= launchedTaskAtCurrentMaxLocality          } while (launchedTaskAtCurrentMaxLocality)        }#表明是否提交过task     if (tasks.size > 0) {      hasLaunchedTask = true    }    return tasks  }其次在看看spark-master\core\src\main\scala\org\apache\spark\scheduler\cluster\CoarseGrainedSchedulerBackend.scalaprivate def launchTasks(tasks: Seq[Seq[TaskDescription]]) {      for (task <- tasks.flatten) {        val serializedTask = TaskDescription.encode(task)#不能超过序列化的最大size,这种情形将直接调用tasksetmgr的abort来触发异常        if (serializedTask.limit() >= maxRpcMessageSize) {          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>            try {              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +                "spark.rpc.message.maxSize (%d bytes). Consider increasing " +                "spark.rpc.message.maxSize or using broadcast variables for large values."              msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)              taskSetMgr.abort(msg)            } catch {              case e: Exception => logError("Exception in error callback", e)            }          }        }        else {          val executorData = executorDataMap(task.executorId)          executorData.freeCores -= scheduler.CPUS_PER_TASK          logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +            s"${executorData.executorHost}.")#在work上的executor最终来执行序列化的task          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))        }      }    }

 

转载地址:http://fsnmi.baihongyu.com/

你可能感兴趣的文章
ios 自定义返回按钮侧滑失效完美解决方案
查看>>
深入理解Objective-C Runtime
查看>>
深入理解RunLoop
查看>>
ios 应用内跳转微信
查看>>
iOS上如何让按钮文本左对齐问题
查看>>
Xcode8 兼容iOS 10 整理笔记
查看>>
iOS 枚举的巧用
查看>>
让你的 Xcode8 继续使用插件
查看>>
iOS去除导航栏和tabbar的1px横线
查看>>
iOS GitHub上常用第三方框架
查看>>
ios 隐藏cell分割线和自定义cell分割线颜色
查看>>
ios 图片保存到系统相册
查看>>
UIAlertController样式集合
查看>>
数据结构之—图
查看>>
计算机网络基础
查看>>
C++中内存(堆和栈)
查看>>
循环队列
查看>>
网络基础知识点总结1
查看>>
操作系统知识点总结1
查看>>
C++常见知识点总结
查看>>