網(wǎng)站建設(shè)意向表深圳返利網(wǎng)站開發(fā)
鶴壁市浩天電氣有限公司
2026/01/22 06:32:13
網(wǎng)站建設(shè)意向表,深圳返利網(wǎng)站開發(fā),網(wǎng)站建設(shè)設(shè)計(jì)公司排名,一流高職院校建設(shè)專題網(wǎng)站今天我們一起來(lái)了解 Flink 最后一種執(zhí)行圖#xff0c;ExecutionGraph 的執(zhí)行過(guò)程。
基本概念
在閱讀源碼之前#xff0c;我們先來(lái)了解一下 ExecutionGraph 中的一些基本概念。ExecutionJobVertex: ExecutionJobVertex 是 ExecutionGraph 中的節(jié)點(diǎn)#xff0c;對(duì)應(yīng)的是 JobGra…今天我們一起來(lái)了解 Flink 最后一種執(zhí)行圖ExecutionGraph 的執(zhí)行過(guò)程?;靖拍钤陂喿x源碼之前我們先來(lái)了解一下 ExecutionGraph 中的一些基本概念。ExecutionJobVertex:ExecutionJobVertex 是 ExecutionGraph 中的節(jié)點(diǎn)對(duì)應(yīng)的是 JobGraph 中的 JobVertex。ExecutionVertex:每個(gè) ExecutionJobVertex 都包含了一組 ExecutionVertexExecutionVertex 的數(shù)量就是節(jié)點(diǎn)對(duì)應(yīng)的并行度。IntermediateResult:IntermediateResult 表示節(jié)點(diǎn)的輸出結(jié)果與之對(duì)應(yīng)的是 JobGraph 中的 IntermediateDataSet。IntermediateResultPartition:IntermediateResultPartition 是每個(gè) ExecutionVertex 的輸出。EdgeManager:EdgeManager 主要負(fù)責(zé)存儲(chǔ) ExecutionGraph 中所有之間的連接包括其并行度。Execution:Execution 可以認(rèn)為是一次實(shí)際的運(yùn)行嘗試。每次執(zhí)行時(shí)Flink 都會(huì)將ExecutionVertex 封裝成一個(gè) Execution并通過(guò)一個(gè) ExecutionAttemptID 來(lái)做唯一標(biāo)識(shí)。ExecutionGraph 生成過(guò)程了解了這些基本概念之后我們一起來(lái)看一下 ExecutionGraph 的具體生成過(guò)程。生成 ExecutionGraph 的代碼入口是 DefaultExecutionGraphBuilder.build 方法。首先是獲取一些基本信息包括 jobInformation、jobStatusChangedListeners 等。接下來(lái)就是創(chuàng)建一個(gè) DefaultExecutionGraph 和生成執(zhí)行計(jì)劃。// create a new execution graph, if none exists so farfinalDefaultExecutionGraphexecutionGraphnewDefaultExecutionGraph(jobInformation,futureExecutor,ioExecutor,rpcTimeout,executionHistorySizeLimit,classLoader,blobWriter,partitionGroupReleaseStrategyFactory,shuffleMaster,partitionTracker,executionDeploymentListener,executionStateUpdateListener,initializationTimestamp,vertexAttemptNumberStore,vertexParallelismStore,isDynamicGraph,executionJobVertexFactory,jobGraph.getJobStatusHooks(),markPartitionFinishedStrategy,taskDeploymentDescriptorFactory,jobStatusChangedListeners,executionPlanSchedulingContext);try{executionGraph.setPlan(JsonPlanGenerator.generatePlan(jobGraph));}catch(Throwablet){log.warn(Cannot create plan for job,t);// give the graph an empty planexecutionGraph.setPlan(newJobPlanInfo.Plan(,,,newArrayList()));}下面就是兩個(gè)比較核心的方法 getVerticesSortedTopologicallyFromSources 和 attachJobGraph。// topologically sort the job vertices and attach the graph to the existing oneListJobVertexsortedTopologyjobGraph.getVerticesSortedTopologicallyFromSources();executionGraph.attachJobGraph(sortedTopology,jobManagerJobMetricGroup);這兩個(gè)方法是先將 JobVertex 進(jìn)行排序然后構(gòu)建 ExecutionGraph 的拓?fù)鋱D。getVerticesSortedTopologicallyFromSourcespublicListJobVertexgetVerticesSortedTopologicallyFromSources()throwsInvalidProgramException{// early out on empty listsif(this.taskVertices.isEmpty()){returnCollections.emptyList();}ListJobVertexsortednewArrayListJobVertex(this.taskVertices.size());SetJobVertexremainingnewLinkedHashSetJobVertex(this.taskVertices.values());// start by finding the vertices with no input edges// and the ones with disconnected inputs (that refer to some standalone data set){IteratorJobVertexiterremaining.iterator();while(iter.hasNext()){JobVertexvertexiter.next();if(vertex.isInputVertex()){sorted.add(vertex);iter.remove();}}}intstartNodePos0;// traverse from the nodes that were added until we found all elementswhile(!remaining.isEmpty()){// first check if we have more candidates to start traversing from. if not, then the// graph is cyclic, which is not permittedif(startNodePossorted.size()){thrownewInvalidProgramException(The job graph is cyclic.);}JobVertexcurrentsorted.get(startNodePos);addNodesThatHaveNoNewPredecessors(current,sorted,remaining);}returnsorted;}這段代碼是將所有的節(jié)點(diǎn)進(jìn)行排序先將所有的 Source 節(jié)點(diǎn)篩選出來(lái)然后再將剩余節(jié)點(diǎn)假如列表。這樣就能構(gòu)建出最終的拓?fù)鋱D。attachJobGraphOverridepublicvoidattachJobGraph(ListJobVertexverticesToAttach,JobManagerJobMetricGroupjobManagerJobMetricGroup)throwsJobException{assertRunningInJobMasterMainThread();LOG.debug(Attaching {} topologically sorted vertices to existing job graph with {} vertices and {} intermediate results.,verticesToAttach.size(),tasks.size(),intermediateResults.size());attachJobVertices(verticesToAttach,jobManagerJobMetricGroup);if(!isDynamic){initializeJobVertices(verticesToAttach);}// the topology assigning should happen before notifying new vertices to failoverStrategyexecutionTopologyDefaultExecutionTopology.fromExecutionGraph(this);partitionGroupReleaseStrategypartitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());}attachJobGraph 方法主要包含兩步邏輯第一步是調(diào)用 attachJobVertices 方法創(chuàng)建 ExecutionJobVertex 實(shí)例第二步是調(diào)用 fromExecutionGraph 創(chuàng)建一些其他的核心對(duì)象。attachJobVerticesattachJobVertices 方法中就是遍歷所有的 JobVertex然后利用 JobVertex 生成 ExecutionJobVertex。/** Attach job vertices without initializing them. */privatevoidattachJobVertices(ListJobVertextopologicallySorted,JobManagerJobMetricGroupjobManagerJobMetricGroup)throwsJobException{for(JobVertexjobVertex:topologicallySorted){if(jobVertex.isInputVertex()!jobVertex.isStoppable()){this.isStoppablefalse;}VertexParallelismInformationparallelismInfoparallelismStore.getParallelismInfo(jobVertex.getID());// create the execution job vertex and attach it to the graphExecutionJobVertexejvexecutionJobVertexFactory.createExecutionJobVertex(this,jobVertex,parallelismInfo,coordinatorStore,jobManagerJobMetricGroup);ExecutionJobVertexpreviousTaskthis.tasks.putIfAbsent(jobVertex.getID(),ejv);if(previousTask!null){thrownewJobException(String.format(Encountered two job vertices with ID %s : previous[%s] / new[%s],jobVertex.getID(),ejv,previousTask));}this.verticesInCreationOrder.add(ejv);this.numJobVerticesTotal;}}initializeJobVertices在 DefaultExecutionGraph.initializeJobVertices 中是遍歷了剛剛排好序的 JobVertex獲取了 ExecutionJobVertex 之后調(diào)用了 ExecutionGraph.initializeJobVertex 方法。我們直接來(lái)看 ExecutionGraph.initializeJobVertex 的邏輯。defaultvoidinitializeJobVertex(ExecutionJobVertexejv,longcreateTimestamp)throwsJobException{initializeJobVertex(ejv,createTimestamp,VertexInputInfoComputationUtils.computeVertexInputInfos(ejv,getAllIntermediateResults()::get));}這里先是調(diào)用了 VertexInputInfoComputationUtils.computeVertexInputInfos 方法生成了 MapIntermediateDataSetID, JobVertexInputInfo jobVertexInputInfos。它表示的是每個(gè) ExecutionVertex 消費(fèi)上游 IntermediateResultPartition 的范圍。這里有兩種模式分別是 POINTWISE 點(diǎn)對(duì)點(diǎn)和 ALL_TO_ALL全對(duì)全在 POINTWISE 模式中會(huì)按照盡量均勻分布的方式處理。例如上游并發(fā)度是4下游并發(fā)度是2時(shí)那么前兩個(gè) IntermediateResultPartition 就會(huì)被第一個(gè) ExecutionVertex 消費(fèi)后兩個(gè) IntermediateResultPartition 就會(huì)被第二個(gè) ExecutionVertex 消費(fèi)。如果上游并發(fā)度是2下游是3時(shí)那么下游前兩個(gè) IntermediateResultPartition 會(huì)被第一個(gè) ExecutionVertex 消費(fèi)第三個(gè) IntermediateResultPartition 則會(huì)被第二個(gè) ExecutionVertex 消費(fèi)。publicstaticJobVertexInputInfocomputeVertexInputInfoForPointwise(intsourceCount,inttargetCount,FunctionInteger,IntegernumOfSubpartitionsRetriever,booleanisDynamicGraph){finalListExecutionVertexInputInfoexecutionVertexInputInfosnewArrayList();if(sourceCounttargetCount){for(intindex0;indextargetCount;index){intstartindex*sourceCount/targetCount;intend(index1)*sourceCount/targetCount;IndexRangepartitionRangenewIndexRange(start,end-1);IndexRangesubpartitionRangecomputeConsumedSubpartitionRange(index,1,()-numOfSubpartitionsRetriever.apply(start),isDynamicGraph,false,false);executionVertexInputInfos.add(newExecutionVertexInputInfo(index,partitionRange,subpartitionRange));}}else{for(intpartitionNum0;partitionNumsourceCount;partitionNum){intstart(partitionNum*targetCountsourceCount-1)/sourceCount;intend((partitionNum1)*targetCountsourceCount-1)/sourceCount;intnumConsumersend-start;IndexRangepartitionRangenewIndexRange(partitionNum,partitionNum);// Variable used in lambda expression should be final or effectively finalfinalintfinalPartitionNumpartitionNum;for(intistart;iend;i){IndexRangesubpartitionRangecomputeConsumedSubpartitionRange(i,numConsumers,()-numOfSubpartitionsRetriever.apply(finalPartitionNum),isDynamicGraph,false,false);executionVertexInputInfos.add(newExecutionVertexInputInfo(i,partitionRange,subpartitionRange));}}}returnnewJobVertexInputInfo(executionVertexInputInfos);}在 ALL_TO_ALL 模式中每個(gè)下游都會(huì)消費(fèi)所有上游的數(shù)據(jù)。publicstaticJobVertexInputInfocomputeVertexInputInfoForAllToAll(intsourceCount,inttargetCount,FunctionInteger,IntegernumOfSubpartitionsRetriever,booleanisDynamicGraph,booleanisBroadcast,booleanisSingleSubpartitionContainsAllData){finalListExecutionVertexInputInfoexecutionVertexInputInfosnewArrayList();IndexRangepartitionRangenewIndexRange(0,sourceCount-1);for(inti0;itargetCount;i){IndexRangesubpartitionRangecomputeConsumedSubpartitionRange(i,targetCount,()-numOfSubpartitionsRetriever.apply(0),isDynamicGraph,isBroadcast,isSingleSubpartitionContainsAllData);executionVertexInputInfos.add(newExecutionVertexInputInfo(i,partitionRange,subpartitionRange));}returnnewJobVertexInputInfo(executionVertexInputInfos);}生成好了 jobVertexInputInfos 之后我們?cè)倩氐?DefaultExecutionGraph.initializeJobVertex 方法中。OverridepublicvoidinitializeJobVertex(ExecutionJobVertexejv,longcreateTimestamp,MapIntermediateDataSetID,JobVertexInputInfojobVertexInputInfos)throwsJobException{checkNotNull(ejv);checkNotNull(jobVertexInputInfos);jobVertexInputInfos.forEach((resultId,info)-this.vertexInputInfoStore.put(ejv.getJobVertexId(),resultId,info));ejv.initialize(executionHistorySizeLimit,rpcTimeout,createTimestamp,this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId()),executionPlanSchedulingContext);ejv.connectToPredecessors(this.intermediateResults);for(IntermediateResultres:ejv.getProducedDataSets()){IntermediateResultpreviousDataSetthis.intermediateResults.putIfAbsent(res.getId(),res);if(previousDataSet!null){thrownewJobException(String.format(Encountered two intermediate data set with ID %s : previous[%s] / new[%s],res.getId(),res,previousDataSet));}}registerExecutionVerticesAndResultPartitionsFor(ejv);// enrich network memory.SlotSharingGroupslotSharingGroupejv.getSlotSharingGroup();if(areJobVerticesAllInitialized(slotSharingGroup)){SsgNetworkMemoryCalculationUtils.enrichNetworkMemory(slotSharingGroup,this::getJobVertex,shuffleMaster);}}首先來(lái)看 ExecutionJobVertex.initialize 方法。這個(gè)方法主要是生成 IntermediateResult 和 ExecutionVertex。protectedvoidinitialize(intexecutionHistorySizeLimit,Durationtimeout,longcreateTimestamp,SubtaskAttemptNumberStoreinitialAttemptCounts,ExecutionPlanSchedulingContextexecutionPlanSchedulingContext)throwsJobException{checkState(parallelismInfo.getParallelism()0);checkState(!isInitialized());this.taskVerticesnewExecutionVertex[parallelismInfo.getParallelism()];this.inputsnewArrayList(jobVertex.getInputs().size());// create the intermediate resultsthis.producedDataSetsnewIntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];for(inti0;ijobVertex.getProducedDataSets().size();i){finalIntermediateDataSetresultjobVertex.getProducedDataSets().get(i);this.producedDataSets[i]newIntermediateResult(result,this,this.parallelismInfo.getParallelism(),result.getResultType(),executionPlanSchedulingContext);}// create all task verticesfor(inti0;ithis.parallelismInfo.getParallelism();i){ExecutionVertexvertexcreateExecutionVertex(this,i,producedDataSets,timeout,createTimestamp,executionHistorySizeLimit,initialAttemptCounts.getAttemptCount(i));this.taskVertices[i]vertex;}// sanity check for the double referencing between intermediate result partitions and// execution verticesfor(IntermediateResultir:this.producedDataSets){if(ir.getNumberOfAssignedPartitions()!this.parallelismInfo.getParallelism()){thrownewRuntimeException(The intermediate results partitions were not correctly assigned.);}}// set up the input splits, if the vertex has anytry{SuppressWarnings(unchecked)InputSplitSourceInputSplitsplitSource(InputSplitSourceInputSplit)jobVertex.getInputSplitSource();if(splitSource!null){ThreadcurrentThreadThread.currentThread();ClassLoaderoldContextClassLoadercurrentThread.getContextClassLoader();currentThread.setContextClassLoader(graph.getUserClassLoader());try{inputSplitssplitSource.createInputSplits(this.parallelismInfo.getParallelism());if(inputSplits!null){splitAssignersplitSource.getInputSplitAssigner(inputSplits);}}finally{currentThread.setContextClassLoader(oldContextClassLoader);}}else{inputSplitsnull;}}catch(Throwablet){thrownewJobException(Creating the input splits caused an error: t.getMessage(),t);}}在創(chuàng)建 ExecutionVertex 時(shí)會(huì)創(chuàng)建 IntermediateResultPartition 和 Execution創(chuàng)建 Execution 時(shí)會(huì)設(shè)置 attemptNumber這個(gè)值默認(rèn)是0如果 ExecutionVertex 是重新調(diào)度的那么 attemptNumber 會(huì)自增加1。ExecutionJobVertex.connectToPredecessors 方法主要是生成 ExecutionVertex 與 IntermediateResultPartition 的關(guān)聯(lián)關(guān)系。這里設(shè)置關(guān)聯(lián)關(guān)系也分成了點(diǎn)對(duì)點(diǎn)和全對(duì)全兩種模式處理點(diǎn)對(duì)點(diǎn)模式需要計(jì)算 ExecutionVertex 對(duì)應(yīng)的 IntermediateResultPartition index 的范圍。兩種模式最終都調(diào)用了 connectInternal 方法。/** Connect all execution vertices to all partitions. */privatestaticvoidconnectInternal(ListExecutionVertextaskVertices,ListIntermediateResultPartitionpartitions,ResultPartitionTyperesultPartitionType,EdgeManageredgeManager){checkState(!taskVertices.isEmpty());checkState(!partitions.isEmpty());ConsumedPartitionGroupconsumedPartitionGroupcreateAndRegisterConsumedPartitionGroupToEdgeManager(taskVertices.size(),partitions,resultPartitionType,edgeManager);for(ExecutionVertexev:taskVertices){ev.addConsumedPartitionGroup(consumedPartitionGroup);}ListExecutionVertexIDconsumerVerticestaskVertices.stream().map(ExecutionVertex::getID).collect(Collectors.toList());ConsumerVertexGroupconsumerVertexGroupConsumerVertexGroup.fromMultipleVertices(consumerVertices,resultPartitionType);for(IntermediateResultPartitionpartition:partitions){partition.addConsumers(consumerVertexGroup);}consumedPartitionGroup.setConsumerVertexGroup(consumerVertexGroup);consumerVertexGroup.setConsumedPartitionGroup(consumedPartitionGroup);}這個(gè)方法中 ev.addConsumedPartitionGroup(consumedPartitionGroup); 負(fù)責(zé)將 ExecutionVertex 到 IntermediateResultPartition 的關(guān)聯(lián)關(guān)系保存在 EdgeManager.vertexConsumedPartitions 中。而 partition.addConsumers(consumerVertexGroup); 則負(fù)責(zé)將 IntermediateResultPartition 到 ExecutionVertex 的關(guān)系保存在 EdgeManager.partitionConsumers 中??偨Y(jié)通過(guò)本文我們了解了 Flink 是如何將 JobGraph 轉(zhuǎn)換成 ExecutionGraph 的。其中涉及到的一些核心概念名稱比較類似建議認(rèn)真學(xué)習(xí)和理解透徹之后再研究其生成方法和對(duì)應(yīng)關(guān)系也可以借助前文中 ExecutionGraph 示意圖輔助學(xué)習(xí)。