本文共 5062 字,大约阅读时间需要 16 分钟。
最终调用makeOffers来让所有的task执行在work中的executorsspark-master\spark-master\core\src\main\scala\org\apache\spark\scheduler\cluster\CoarseGrainedSchedulerBackend.scala private def makeOffers() { // Make sure no executor is killed while some task is launching on it#通过resourceOffers得到taskDescs val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores, Some(executorData.executorAddress.hostPort)) }.toIndexedSeq scheduler.resourceOffers(workOffers) }#在worker节点上启动任务的运行 if (!taskDescs.isEmpty) { launchTasks(taskDescs) } }我们先看看scheduler.resourceOffersspark-master\core\src\main\scala\org\apache\spark\scheduler\TaskSchedulerImpl.scala def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false#查看host是否已经被包含 for (o <- offers) { if (!hostToExecutors.contains(o.host)) { hostToExecutors(o.host) = new HashSet[String]() }#处理新的excuter的加入,如果没有包含excuter的话,则设置newExecAvail 为true if (!executorIdToRunningTaskIds.contains(o.executorId)) { hostToExecutors(o.host) += o.executorId executorAdded(o.executorId, o.host) executorIdToHost(o.executorId) = o.host executorIdToRunningTaskIds(o.executorId) = HashSet[Long]() newExecAvail = true }#将o.host添加host的表中 for (rack <- getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } val shuffledOffers = shuffleOffers(filteredOffers) // Build a list of tasks to assign to each worker.#得到task的map val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))#得到可以使用的cpus和slots val availableCpus = shuffledOffers.map(o => o.cores).toArray val availableSlots = shuffledOffers.map(o => o.cores / CPUS_PER_TASK).sum# val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks))#将taskset添加到前面新加入的executor中 if (newExecAvail) { taskSet.executorAdded() } } for (taskSet <- sortedTaskSets) {#taskset设置了barrier且已经超过了slot的数量,则不能调度 if (taskSet.isBarrier && availableSlots < taskSet.numTasks) { logInfo(s"Skip current round of resource offers for barrier stage ${taskSet.stageId} " + s"because the barrier taskSet requires ${taskSet.numTasks} slots, while the total " + s"number of available slots is $availableSlots.") } else { var launchedAnyTask = false // Record all the executor IDs assigned barrier tasks on. val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]() for (currentMaxLocality <- taskSet.myLocalityLevels) { var launchedTaskAtCurrentMaxLocality = false do {#按照就近原则进行task调度 launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks, addressesWithDescs) launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) }#表明是否提交过task if (tasks.size > 0) { hasLaunchedTask = true } return tasks }其次在看看spark-master\core\src\main\scala\org\apache\spark\scheduler\cluster\CoarseGrainedSchedulerBackend.scalaprivate def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { val serializedTask = TaskDescription.encode(task)#不能超过序列化的最大size,这种情形将直接调用tasksetmgr的abort来触发异常 if (serializedTask.limit() >= maxRpcMessageSize) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + "spark.rpc.message.maxSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize) taskSetMgr.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) } } } else { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + s"${executorData.executorHost}.")#在work上的executor最终来执行序列化的task executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } } }
转载地址:http://fsnmi.baihongyu.com/