########################################################################
感谢大家的回答,我明白了一些了,并整理这个问题的文档

Flink1.7.2 Source、Window数据交互源码分析: 
https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/flink-source-window-data-exchange.md
 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/flink-source-window-data-exchange.md>
Flink1.7.2 并行计算源码分析: 
https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md
 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md>



########################################################################


Flink1.7.2 并行计算源码分析

 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#%E6%BA%90%E7%A0%81>源码

源码:https://github.com/opensourceteams/fink-maven-scala-2 
<https://github.com/opensourceteams/fink-maven-scala-2>
Flink1.7.2 Source、Window数据交互源码分析: 
https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/flink-source-window-data-exchange.md
 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/flink-source-window-data-exchange.md>
 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#%E6%A6%82%E8%BF%B0>概述

Flink Window如何进行并行计算
Flink source如何按key,hash分区,并发射到对应分区的下游Window
 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#wordcount-%E7%A8%8B%E5%BA%8F>WordCount
 程序

package 
com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.parallelism

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * nc -lk 1234  输入数据
  */
object SocketWindowWordCountLocal {



  def main(args: Array[String]): Unit = {


    val port = 1234
    // get the execution environment
   // val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment


    val configuration : Configuration = getConfiguration(true)

    val env:StreamExecutionEnvironment = 
StreamExecutionEnvironment.createLocalEnvironment(1,configuration)





    // get input data by connecting to the socket
    val dataStream = env.socketTextStream("localhost", port, '\n')



    import org.apache.flink.streaming.api.scala._
    val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => 
WordWithCount(w,1))
      .keyBy("word")
      /**
        * 每20秒刷新一次,相当于重新开始计数,
        * 好处,不需要一直拿所有的数据统计
        * 只需要在指定时间间隔内的增量数据,减少了数据规模
        */
      .timeWindow(Time.seconds(5))
      //.countWindow(3)
      //.countWindow(3,1)
      //.countWindowAll(3)


      .sum("count" )

    textResult
      .setParallelism(3)
      .print()




    if(args == null || args.size ==0){


      
println("==================================以下为执行计划==================================")
      println("执行地址(firefox效果更好):https://flink.apache.org/visualizer";)
      //执行计划
      //println(env.getExecutionPlan)
     // println("==================================以上为执行计划 
JSON串==================================\n")
      //StreamGraph
     println(env.getStreamGraph.getStreamingPlanAsJSON)



      //JsonPlanGenerator.generatePlan(jobGraph)

      env.execute("默认作业")

    }else{
      env.execute(args(0))
    }

    println("结束")

  }


  // Data type for words with count
  case class WordWithCount(word: String, count: Long){
    //override def toString: String = Thread.currentThread().getName + word + " 
: " + count
  }


  def getConfiguration(isDebug:Boolean = false):Configuration = {

    val configuration : Configuration = new Configuration()

    if(isDebug){
      val timeout = "100000 s"
      val timeoutHeartbeatPause = "1000000 s"
      configuration.setString("akka.ask.timeout",timeout)
      configuration.setString("akka.lookup.timeout",timeout)
      configuration.setString("akka.tcp.timeout",timeout)
      configuration.setString("akka.transport.heartbeat.interval",timeout)
      
configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
      configuration.setString("akka.watch.heartbeat.pause",timeout)
      configuration.setInteger("heartbeat.interval",10000000)
      configuration.setInteger("heartbeat.timeout",50000000)
    }


    configuration
  }


}


 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#%E8%BE%93%E5%85%A5%E6%95%B0%E6%8D%AE>输入数据

1 2 3 4 5 6 7 8 9 10
 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90>源码分析

 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#executiongraphscheduleeager>ExecutionGraph.scheduleEager

ExecutionGraph 调度

executionsToDeploy包括所有的(Source,Window,Sink),在这里设置的setParallelism()并行度为多少,就有多少个Window,本案例设置的为3,所以executionsToDeploy对象的数据如下

(Source: Socket Stream -> Flat Map -> Map (1/1))

(Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, 
SumAggregator, PassThroughWindowFunction) (3/3))

(Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, 
SumAggregator, PassThroughWindowFunction) (2/3))

(Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, 
SumAggregator, PassThroughWindowFunction) (1/3))

(Sink: Print to Std. Out (1/1))

详细executionsToDeploy对象

executionsToDeploy = {Arrays$ArrayList@5323}  size = 5
0 = {Execution@5324} "Attempt #0 (Source: Socket Stream -> Flat Map -> Map 
(1/1)) @ org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@22dc33b2 
- [SCHEDULED]" 1 = {Execution@5506} "Attempt #0 
(Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, 
SumAggregator, PassThroughWindowFunction) (3/3)) @ 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@8f216e4 - 
[SCHEDULED]" 2 = {Execution@5507} "Attempt #0 
(Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, 
SumAggregator, PassThroughWindowFunction) (2/3)) @ 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50ccca83 - 
[SCHEDULED]" 3 = {Execution@5508} "Attempt #0 
(Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, 
SumAggregator, PassThroughWindowFunction) (1/3)) @ 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@243b4f41 - 
[SCHEDULED]" 4 = {Execution@5509} "Attempt #0 (Sink: Print to Std. Out (1/1)) @ 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@67b9a9d7 - 
[SCHEDULED]" ```

源码
调用Execution.deploy()进行部署
/**
    *
    *
    * @param slotProvider  The resource provider from which the slots are 
allocated
    * @param timeout       The maximum time that the deployment may take, 
before a
    *                      TimeoutException is thrown.
    * @returns Future which is completed once the {@link ExecutionGraph} has 
been scheduled.
    * The future can also be completed exceptionally if an error happened.
    */
   private CompletableFuture<Void> scheduleEager(SlotProvider slotProvider, 
final Time timeout) {
        checkState(state == JobStatus.RUNNING, "job is not running currently");

        // Important: reserve all the space we need up front.
        // that way we do not have any operation that can fail between 
allocating the slots
        // and adding them to the list. If we had a failure in between there, 
that would
        // cause the slots to get lost
        final boolean queued = allowQueuedScheduling;

        // collecting all the slots may resize and fail in that operation 
without slots getting lost
        final ArrayList<CompletableFuture<Execution>> allAllocationFutures = 
new ArrayList<>(getNumberOfExecutionJobVertices());

        final Set<AllocationID> allPreviousAllocationIds =
                
Collections.unmodifiableSet(computeAllPriorAllocationIdsIfRequiredByScheduling());

        // allocate the slots (obtain all their futures
        for (ExecutionJobVertex ejv : getVerticesTopologically()) {
                // these calls are not blocking, they only return futures
                Collection<CompletableFuture<Execution>> allocationFutures = 
ejv.allocateResourcesForAll(
                        slotProvider,
                        queued,
                        LocationPreferenceConstraint.ALL,
                        allPreviousAllocationIds,
                        timeout);

                allAllocationFutures.addAll(allocationFutures);
        }

        // this future is complete once all slot futures are complete.
        // the future fails once one slot future fails.
        final ConjunctFuture<Collection<Execution>> allAllocationsFuture = 
FutureUtils.combineAll(allAllocationFutures);

        final CompletableFuture<Void> currentSchedulingFuture = 
allAllocationsFuture
                .thenAccept(
                        (Collection<Execution> executionsToDeploy) -> {
                                for (Execution execution : executionsToDeploy) {
                                        try {
                                                execution.deploy();
                                        } catch (Throwable t) {
                                                throw new CompletionException(
                                                        new FlinkException(
                                                                
String.format("Could not deploy execution %s.", execution),
                                                                t));
                                        }
                                }
                        })
                // Generate a more specific failure message for the eager 
scheduling
                .exceptionally(
                        (Throwable throwable) -> {
                                final Throwable strippedThrowable = 
ExceptionUtils.stripCompletionException(throwable);
                                final Throwable resultThrowable;

                                if (strippedThrowable instanceof 
TimeoutException) {
                                        int numTotal = 
allAllocationsFuture.getNumFuturesTotal();
                                        int numComplete = 
allAllocationsFuture.getNumFuturesCompleted();
                                        String message = "Could not allocate 
all requires slots within timeout of " +
                                                timeout + ". Slots required: " 
+ numTotal + ", slots allocated: " + numComplete;

                                        resultThrowable = new 
NoResourceAvailableException(message);
                                } else {
                                        resultThrowable = strippedThrowable;
                                }

                                throw new CompletionException(resultThrowable);
                        });

        return currentSchedulingFuture;
   }

 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#executionstate>ExecutionState

由于(Source、Window、Sink)都是做为Execution对象来运行,先来了解一下Execution有哪些状态,即状态的流转,方便理解流程
Execution状态的流转为: CREATED(已创建) -> SCHEDULED(已调度) -> DEPLOYING(部署中) -> 
RUNNING(运行中) -> FINISHED(已完成) 等,以下ExecutionState中有详细说明
package org.apache.flink.runtime.execution;

/**
 * An enumeration of all states that a task can be in during its execution.
 * Tasks usually start in the state {@code CREATED} and switch states according 
to
 * this diagram:
 * <pre>{@code
 *
 *     CREATED  -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
 *        |            |            |          |
 *        |            |            |   +------+
 *        |            |            V   V
 *        |            |         CANCELLING -----+----> CANCELED
 *        |            |                         |
 *        |            +-------------------------+
 *        |
 *        |                                   ... -> FAILED
 *        V
 *    RECONCILING  -> RUNNING | FINISHED | CANCELED | FAILED
 *
 * }</pre>
 *
 * <p>It is possible to enter the {@code RECONCILING} state from {@code CREATED}
 * state if job manager fail over, and the {@code RECONCILING} state can switch 
into
 * any existing task state.
 *
 * <p>It is possible to enter the {@code FAILED} state from any other state.
 *
 * <p>The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are
 * considered terminal states.
 */
public enum ExecutionState {

        CREATED,
        
        SCHEDULED,
        
        DEPLOYING,
        
        RUNNING,

        /**
         * This state marks "successfully completed". It can only be reached 
when a
         * program reaches the "end of its input". The "end of input" can be 
reached
         * when consuming a bounded input (fix set of files, bounded query, 
etc) or
         * when stopping a program (not cancelling!) which make the input look 
like
         * it reached its end at a specific point.
         */
        FINISHED,
        
        CANCELING,
        
        CANCELED,
        
        FAILED,

        RECONCILING;

        public boolean isTerminal() {
                return this == FINISHED || this == CANCELED || this == FAILED;
        }
}

 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#executiondeploy>Execution.deploy()

对Execution进行部署

更新Execution状态,将当前Execution的状态由SCHEDULED更新为DEPLOYING,即由已调度状态更新为部署中

transitionState(previous, DEPLOYING)
INFO日志输出:部署哪一个Execution到哪一台机器上

LOG.info(String.format("Deploying %s (attempt #%d) to %s", 
13:11:55,910 INFO  [flink-akka.actor.default-dispatcher-3] 
org.apache.flink.runtime.executiongraph.Execution.deploy(Execution.java:599)    
  - Deploying Source: Socket Stream -> Flat Map -> Map (1/1) (attempt #0) to 
localhost
构建TaskDeploymentDescriptor对象,该对象引用Task实例Execution的id,slot(槽位),就可以确定Execution在哪个slot上运行

final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
                        attemptId,
                        slot,
                        taskRestore,
                        attemptNumber);
slot得到TaskManager

final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
TaskManager.submitTask 提交任务,参数为TaskDeploymentDescriptor

final CompletableFuture<Acknowledge> submitResultFuture = 
taskManagerGateway.submitTask(deployment, rpcTimeout);
接下来就交给TaskManager去处理了

源码

/**
         * Deploys the execution to the previously assigned resource.
         *
         * @throws JobException if the execution cannot be deployed to the 
assigned resource
         */
        public void deploy() throws JobException {
                final LogicalSlot slot  = assignedResource;

                checkNotNull(slot, "In order to deploy the execution we first 
have to assign a resource via tryAssignResource.");

                // Check if the TaskManager died in the meantime
                // This only speeds up the response to TaskManagers failing 
concurrently to deployments.
                // The more general check is the rpcTimeout of the deployment 
call
                if (!slot.isAlive()) {
                        throw new JobException("Target slot (TaskManager) for 
deployment is no longer alive.");
                }

                // make sure exactly one deployment call happens from the 
correct state
                // note: the transition from CREATED to DEPLOYING is for 
testing purposes only
                ExecutionState previous = this.state;
                if (previous == SCHEDULED || previous == CREATED) {
                        if (!transitionState(previous, DEPLOYING)) {
                                // race condition, someone else beat us to the 
deploying call.
                                // this should actually not happen and 
indicates a race somewhere else
                                throw new IllegalStateException("Cannot deploy 
task: Concurrent deployment call race.");
                        }
                }
                else {
                        // vertex may have been cancelled, or it was already 
scheduled
                        throw new IllegalStateException("The vertex must be in 
CREATED or SCHEDULED state to be deployed. Found state " + previous);
                }

                if (this != slot.getPayload()) {
                        throw new IllegalStateException(
                                String.format("The execution %s has not been 
assigned to the assigned slot.", this));
                }

                try {

                        // race double check, did we fail/cancel and do we need 
to release the slot?
                        if (this.state != DEPLOYING) {
                                slot.releaseSlot(new FlinkException("Actual 
state of execution " + this + " (" + state + ") does not match expected state 
DEPLOYING."));
                                return;
                        }

                        if (LOG.isInfoEnabled()) {
                                LOG.info(String.format("Deploying %s (attempt 
#%d) to %s", vertex.getTaskNameWithSubtaskIndex(),
                                                attemptNumber, 
getAssignedResourceLocation().getHostname()));
                        }

                        final TaskDeploymentDescriptor deployment = 
vertex.createDeploymentDescriptor(
                                attemptId,
                                slot,
                                taskRestore,
                                attemptNumber);

                        // null taskRestore to let it be GC'ed
                        taskRestore = null;

                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();

                        final CompletableFuture<Acknowledge> submitResultFuture 
= taskManagerGateway.submitTask(deployment, rpcTimeout);

                        submitResultFuture.whenCompleteAsync(
                                (ack, failure) -> {
                                        // only respond to the failure case
                                        if (failure != null) {
                                                if (failure instanceof 
TimeoutException) {
                                                        String taskname = 
vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';

                                                        markFailed(new 
Exception(
                                                                "Cannot deploy 
task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
                                                                        + ") 
not responding after a rpcTimeout of " + rpcTimeout, failure));
                                                } else {
                                                        markFailed(failure);
                                                }
                                        }
                                },
                                executor);
                }
                catch (Throwable t) {
                        markFailed(t);
                        ExceptionUtils.rethrow(t);
                }
        }
 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#taskexecutorsubmittask>TaskExecutor.submitTask

TaskManager中是由TaskExecutor来提交任务

将外部化数据从BLOB存储加载回对象

// re-integrate offloaded data:
                try {
                        
tdd.loadBigData(blobCacheService.getPermanentBlobService());
                } catch (IOException | ClassNotFoundException e) {
                        throw new TaskSubmissionException("Could not 
re-integrate offloaded TaskDeploymentDescriptor data.", e);
                }
从序列化的对象中反序列化(通过类加载),JobInformation,TaskInformation,用来构建TaskInformation,Task

        // deserialize the pre-serialized information
                final JobInformation jobInformation;
                final TaskInformation taskInformation;
                try {
                        jobInformation = 
tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
                        taskInformation = 
tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
                } catch (IOException | ClassNotFoundException e) {
                        throw new TaskSubmissionException("Could not 
deserialize the job or task information.", e);
                }
指定Source中的拆分器,就是将不断产生数据的Source拆分给不同的Window做并行任务(RpcInputSplitProvider是其中的一种分配方式)

InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(
                        jobManagerConnection.getJobManagerGateway(),
                        taskInformation.getJobVertexId(),
                        tdd.getExecutionAttemptId(),
                        taskManagerConfiguration.getTimeout());
构建任务状态管理器TaskStateManager

final TaskStateManager taskStateManager = new TaskStateManagerImpl(
                        jobId,
                        tdd.getExecutionAttemptId(),
                        localStateStore,
                        taskRestore,
                        checkpointResponder);
构建任务Task

Task task = new Task(
                        jobInformation,
                        taskInformation,
                        tdd.getExecutionAttemptId(),
                        tdd.getAllocationId(),
                        tdd.getSubtaskIndex(),
                        tdd.getAttemptNumber(),
                        tdd.getProducedPartitions(),
                        tdd.getInputGates(),
                        tdd.getTargetSlotNumber(),
                        taskExecutorServices.getMemoryManager(),
                        taskExecutorServices.getIOManager(),
                        taskExecutorServices.getNetworkEnvironment(),
                        taskExecutorServices.getBroadcastVariableManager(),
                        taskStateManager,
                        taskManagerActions,
                        inputSplitProvider,
                        checkpointResponder,
                        blobCacheService,
                        libraryCache,
                        fileCache,
                        taskManagerConfiguration,
                        taskMetricGroup,
                        resultPartitionConsumableNotifier,
                        partitionStateChecker,
                        getRpcService().getExecutor());
将任务增加到任务槽位中

                        try {
                        taskAdded = taskSlotTable.addTask(task);
                } catch (SlotNotFoundException | SlotNotActiveException e) {
                        throw new TaskSubmissionException("Could not submit 
task.", e);
                }
调用任务的启动线程,该方法会触发调用Task.run()函数,

                if (taskAdded) {
                        task.startTaskThread();

                        return 
CompletableFuture.completedFuture(Acknowledge.get());
                } else {
                        final String message = "TaskManager already contains a 
task for id " +
                                task.getExecutionId() + '.';

                        log.debug(message);
                        throw new TaskSubmissionException(message);
                }
源码

@Override
        public CompletableFuture<Acknowledge> submitTask(
                        TaskDeploymentDescriptor tdd,
                        JobMasterId jobMasterId,
                        Time timeout) {

                try {
                        final JobID jobId = tdd.getJobId();
                        final JobManagerConnection jobManagerConnection = 
jobManagerTable.get(jobId);

                        if (jobManagerConnection == null) {
                                final String message = "Could not submit task 
because there is no JobManager " +
                                        "associated for the job " + jobId + '.';

                                log.debug(message);
                                throw new TaskSubmissionException(message);
                        }

                        if 
(!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) {
                                final String message = "Rejecting the task 
submission because the job manager leader id " +
                                        jobMasterId + " does not match the 
expected job manager leader id " +
                                        jobManagerConnection.getJobMasterId() + 
'.';

                                log.debug(message);
                                throw new TaskSubmissionException(message);
                        }

                        if (!taskSlotTable.tryMarkSlotActive(jobId, 
tdd.getAllocationId())) {
                                final String message = "No task slot allocated 
for job ID " + jobId +
                                        " and allocation ID " + 
tdd.getAllocationId() + '.';
                                log.debug(message);
                                throw new TaskSubmissionException(message);
                        }

                        // re-integrate offloaded data:
                        try {
                                
tdd.loadBigData(blobCacheService.getPermanentBlobService());
                        } catch (IOException | ClassNotFoundException e) {
                                throw new TaskSubmissionException("Could not 
re-integrate offloaded TaskDeploymentDescriptor data.", e);
                        }

                        // deserialize the pre-serialized information
                        final JobInformation jobInformation;
                        final TaskInformation taskInformation;
                        try {
                                jobInformation = 
tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
                                taskInformation = 
tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
                        } catch (IOException | ClassNotFoundException e) {
                                throw new TaskSubmissionException("Could not 
deserialize the job or task information.", e);
                        }

                        if (!jobId.equals(jobInformation.getJobId())) {
                                throw new TaskSubmissionException(
                                        "Inconsistent job ID information inside 
TaskDeploymentDescriptor (" +
                                                tdd.getJobId() + " vs. " + 
jobInformation.getJobId() + ")");
                        }

                        TaskMetricGroup taskMetricGroup = 
taskManagerMetricGroup.addTaskForJob(
                                jobInformation.getJobId(),
                                jobInformation.getJobName(),
                                taskInformation.getJobVertexId(),
                                tdd.getExecutionAttemptId(),
                                taskInformation.getTaskName(),
                                tdd.getSubtaskIndex(),
                                tdd.getAttemptNumber());

                        InputSplitProvider inputSplitProvider = new 
RpcInputSplitProvider(
                                jobManagerConnection.getJobManagerGateway(),
                                taskInformation.getJobVertexId(),
                                tdd.getExecutionAttemptId(),
                                taskManagerConfiguration.getTimeout());

                        TaskManagerActions taskManagerActions = 
jobManagerConnection.getTaskManagerActions();
                        CheckpointResponder checkpointResponder = 
jobManagerConnection.getCheckpointResponder();

                        LibraryCacheManager libraryCache = 
jobManagerConnection.getLibraryCacheManager();
                        ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier = 
jobManagerConnection.getResultPartitionConsumableNotifier();
                        PartitionProducerStateChecker partitionStateChecker = 
jobManagerConnection.getPartitionStateChecker();

                        final TaskLocalStateStore localStateStore = 
localStateStoresManager.localStateStoreForSubtask(
                                jobId,
                                tdd.getAllocationId(),
                                taskInformation.getJobVertexId(),
                                tdd.getSubtaskIndex());

                        final JobManagerTaskRestore taskRestore = 
tdd.getTaskRestore();

                        final TaskStateManager taskStateManager = new 
TaskStateManagerImpl(
                                jobId,
                                tdd.getExecutionAttemptId(),
                                localStateStore,
                                taskRestore,
                                checkpointResponder);

                        Task task = new Task(
                                jobInformation,
                                taskInformation,
                                tdd.getExecutionAttemptId(),
                                tdd.getAllocationId(),
                                tdd.getSubtaskIndex(),
                                tdd.getAttemptNumber(),
                                tdd.getProducedPartitions(),
                                tdd.getInputGates(),
                                tdd.getTargetSlotNumber(),
                                taskExecutorServices.getMemoryManager(),
                                taskExecutorServices.getIOManager(),
                                taskExecutorServices.getNetworkEnvironment(),
                                
taskExecutorServices.getBroadcastVariableManager(),
                                taskStateManager,
                                taskManagerActions,
                                inputSplitProvider,
                                checkpointResponder,
                                blobCacheService,
                                libraryCache,
                                fileCache,
                                taskManagerConfiguration,
                                taskMetricGroup,
                                resultPartitionConsumableNotifier,
                                partitionStateChecker,
                                getRpcService().getExecutor());

                        log.info("Received task {}.", 
task.getTaskInfo().getTaskNameWithSubtasks());

                        boolean taskAdded;

                        try {
                                taskAdded = taskSlotTable.addTask(task);
                        } catch (SlotNotFoundException | SlotNotActiveException 
e) {
                                throw new TaskSubmissionException("Could not 
submit task.", e);
                        }

                        if (taskAdded) {
                                task.startTaskThread();

                                return 
CompletableFuture.completedFuture(Acknowledge.get());
                        } else {
                                final String message = "TaskManager already 
contains a task for id " +
                                        task.getExecutionId() + '.';

                                log.debug(message);
                                throw new TaskSubmissionException(message);
                        }
                } catch (TaskSubmissionException e) {
                        return FutureUtils.completedExceptionally(e);
                }
        }

 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#taskrun>Task.run()

先来了解一下任务的概念,Task表示在TaskManager上执行并行子任务。 
Task包装Flink操作符(可以是用户函数)并运行它,提供所有必需的服务,例如使用输入数据,生成结果(中间结果分区)并与JobManager通信。 
Flink运算符(作为AbstractInvokable的子类实现,只有数据读取器,写入程序和某些事件回调。该任务将这些操作连接到网络堆栈和actor消息,并跟踪执行状态并处理异常。
 任务不知道它们与其他任务的关系,或者它们是第一次执行任务还是重复尝试。 所有这些只有JobManager知道。 
所有任务都知道它自己的可运行代码,任务的配置以及要使用和生成的中间结果的ID(如果有的话)。 每个任务由一个专用线程运行。

run()是引导任务并执行其代码的核心工作方法

这里是Task的执行状态,前面是Executition的执行状态,需要区分开来,更新任务状态,由CREATED(已创建)到DEPLOYING(部署中)

// ----------------------------
        //  Initial State transition
        // ----------------------------
        while (true) {
                ExecutionState current = this.executionState;
                if (current == ExecutionState.CREATED) {
                        if (transitionState(ExecutionState.CREATED, 
ExecutionState.DEPLOYING)) {
                                // success, we can start our work
                                break;
                        }
                }
                
创建文件系统流为这个任务

// activate safety net for task thread
                LOG.info("Creating FileSystem stream leak safety net for task 
{}", this);
                FileSystemSafetyNet.initializeSafetyNetForThread();
加载用户程序jar文件,给当前Task使用

// first of all, get a user-code classloader
                // this may involve downloading the job's JAR files and/or 
classes
                LOG.info("Loading JAR files for task {}.", this);

                userCodeClassLoader = createUserCodeClassloader();
                final ExecutionConfig executionConfig = 
serializedExecutionConfig.deserializeValue(userCodeClassLoader);
注册网络追踪给这当前任务

// ----------------------------------------------------------------
                // register the task with the network stack
                // this operation may fail if the system does not have enough
                // memory to run the necessary data exchanges
                // the registration must also strictly be undone
                // 
----------------------------------------------------------------

                LOG.info("Registering task at network: {}.", this);

                network.registerTask(this);

给当前任务构建运行环境

Environment env = new RuntimeEnvironment(
                        jobId,
                        vertexId,
                        executionId,
                        executionConfig,
                        taskInfo,
                        jobConfiguration,
                        taskConfiguration,
                        userCodeClassLoader,
                        memoryManager,
                        ioManager,
                        broadcastVariableManager,
                        taskStateManager,
                        accumulatorRegistry,
                        kvStateRegistry,
                        inputSplitProvider,
                        distributedCacheEntries,
                        producedPartitions,
                        inputGates,
                        network.getTaskEventDispatcher(),
                        checkpointResponder,
                        taskManagerConfig,
                        metrics,
                        this);
加载并实例化任务的可调用代码(用户代码)

// now load and instantiate the task's invokable code
                invokable = loadAndInstantiateInvokable(userCodeClassLoader, 
nameOfInvokableClass, env);
更新当前任务状态,从DEPLOYING(部署中)更新为RUNNING(运行中)

                        // switch to the RUNNING state, if that fails, we have 
been canceled/failed in the meantime
                if (!transitionState(ExecutionState.DEPLOYING, 
ExecutionState.RUNNING)) {
                        throw new CancelTaskException();
                }

StreamTask.invoke()

        // run the invokable
                invokable.invoke();
源码

/**
         * The core work method that bootstraps the task and executes its code.
         */
        @Override
        public void run() {

                // ----------------------------
                //  Initial State transition
                // ----------------------------
                while (true) {
                        ExecutionState current = this.executionState;
                        if (current == ExecutionState.CREATED) {
                                if (transitionState(ExecutionState.CREATED, 
ExecutionState.DEPLOYING)) {
                                        // success, we can start our work
                                        break;
                                }
                        }
                        else if (current == ExecutionState.FAILED) {
                                // we were immediately failed. tell the 
TaskManager that we reached our final state
                                notifyFinalState();
                                if (metrics != null) {
                                        metrics.close();
                                }
                                return;
                        }
                        else if (current == ExecutionState.CANCELING) {
                                if (transitionState(ExecutionState.CANCELING, 
ExecutionState.CANCELED)) {
                                        // we were immediately canceled. tell 
the TaskManager that we reached our final state
                                        notifyFinalState();
                                        if (metrics != null) {
                                                metrics.close();
                                        }
                                        return;
                                }
                        }
                        else {
                                if (metrics != null) {
                                        metrics.close();
                                }
                                throw new IllegalStateException("Invalid state 
for beginning of operation of task " + this + '.');
                        }
                }

                // all resource acquisitions and registrations from here on
                // need to be undone in the end
                Map<String, Future<Path>> distributedCacheEntries = new 
HashMap<>();
                AbstractInvokable invokable = null;

                try {
                        // ----------------------------
                        //  Task Bootstrap - We periodically
                        //  check for canceling as a shortcut
                        // ----------------------------

                        // activate safety net for task thread
                        LOG.info("Creating FileSystem stream leak safety net 
for task {}", this);
                        FileSystemSafetyNet.initializeSafetyNetForThread();

                        
blobService.getPermanentBlobService().registerJob(jobId);

                        // first of all, get a user-code classloader
                        // this may involve downloading the job's JAR files 
and/or classes
                        LOG.info("Loading JAR files for task {}.", this);

                        userCodeClassLoader = createUserCodeClassloader();
                        final ExecutionConfig executionConfig = 
serializedExecutionConfig.deserializeValue(userCodeClassLoader);

                        if (executionConfig.getTaskCancellationInterval() >= 0) 
{
                                // override task cancellation interval from 
Flink config if set in ExecutionConfig
                                taskCancellationInterval = 
executionConfig.getTaskCancellationInterval();
                        }

                        if (executionConfig.getTaskCancellationTimeout() >= 0) {
                                // override task cancellation timeout from 
Flink config if set in ExecutionConfig
                                taskCancellationTimeout = 
executionConfig.getTaskCancellationTimeout();
                        }

                        if (isCanceledOrFailed()) {
                                throw new CancelTaskException();
                        }

                        // 
----------------------------------------------------------------
                        // register the task with the network stack
                        // this operation may fail if the system does not have 
enough
                        // memory to run the necessary data exchanges
                        // the registration must also strictly be undone
                        // 
----------------------------------------------------------------

                        LOG.info("Registering task at network: {}.", this);

                        network.registerTask(this);

                        // add metrics for buffers
                        
this.metrics.getIOMetricGroup().initializeBufferMetrics(this);

                        // register detailed network metrics, if configured
                        if 
(taskManagerConfig.getConfiguration().getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS))
 {
                                // similar to 
MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup
                                MetricGroup networkGroup = 
this.metrics.getIOMetricGroup().addGroup("Network");
                                MetricGroup outputGroup = 
networkGroup.addGroup("Output");
                                MetricGroup inputGroup = 
networkGroup.addGroup("Input");

                                // output metrics
                                for (int i = 0; i < producedPartitions.length; 
i++) {
                                        
ResultPartitionMetrics.registerQueueLengthMetrics(
                                                outputGroup.addGroup(i), 
producedPartitions[i]);
                                }

                                for (int i = 0; i < inputGates.length; i++) {
                                        
InputGateMetrics.registerQueueLengthMetrics(
                                                inputGroup.addGroup(i), 
inputGates[i]);
                                }
                        }

                        // next, kick off the background copying of files for 
the distributed cache
                        try {
                                for (Map.Entry<String, 
DistributedCache.DistributedCacheEntry> entry :
                                                
DistributedCache.readFileInfoFromConfig(jobConfiguration)) {
                                        LOG.info("Obtaining local cache file 
for '{}'.", entry.getKey());
                                        Future<Path> cp = 
fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId, executionId);
                                        
distributedCacheEntries.put(entry.getKey(), cp);
                                }
                        }
                        catch (Exception e) {
                                throw new Exception(
                                        String.format("Exception while adding 
files to distributed cache of task %s (%s).", taskNameWithSubtask, 
executionId), e);
                        }

                        if (isCanceledOrFailed()) {
                                throw new CancelTaskException();
                        }

                        // 
----------------------------------------------------------------
                        //  call the user code initialization methods
                        // 
----------------------------------------------------------------

                        TaskKvStateRegistry kvStateRegistry = 
network.createKvStateTaskRegistry(jobId, getJobVertexId());

                        Environment env = new RuntimeEnvironment(
                                jobId,
                                vertexId,
                                executionId,
                                executionConfig,
                                taskInfo,
                                jobConfiguration,
                                taskConfiguration,
                                userCodeClassLoader,
                                memoryManager,
                                ioManager,
                                broadcastVariableManager,
                                taskStateManager,
                                accumulatorRegistry,
                                kvStateRegistry,
                                inputSplitProvider,
                                distributedCacheEntries,
                                producedPartitions,
                                inputGates,
                                network.getTaskEventDispatcher(),
                                checkpointResponder,
                                taskManagerConfig,
                                metrics,
                                this);

                        // now load and instantiate the task's invokable code
                        invokable = 
loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

                        // 
----------------------------------------------------------------
                        //  actual task core work
                        // 
----------------------------------------------------------------

                        // we must make strictly sure that the invokable is 
accessible to the cancel() call
                        // by the time we switched to running.
                        this.invokable = invokable;

                        // switch to the RUNNING state, if that fails, we have 
been canceled/failed in the meantime
                        if (!transitionState(ExecutionState.DEPLOYING, 
ExecutionState.RUNNING)) {
                                throw new CancelTaskException();
                        }

                        // notify everyone that we switched to running
                        taskManagerActions.updateTaskExecutionState(new 
TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

                        // make sure the user code classloader is accessible 
thread-locally
                        
executingThread.setContextClassLoader(userCodeClassLoader);

                        // run the invokable
                        invokable.invoke();

                        // make sure, we enter the catch block if the task 
leaves the invoke() method due
                        // to the fact that it has been canceled
                        if (isCanceledOrFailed()) {
                                throw new CancelTaskException();
                        }

                        // 
----------------------------------------------------------------
                        //  finalization of a successful execution
                        // 
----------------------------------------------------------------

                        // finish the produced partitions. if this fails, we 
consider the execution failed.
                        for (ResultPartition partition : producedPartitions) {
                                if (partition != null) {
                                        partition.finish();
                                }
                        }

                        // try to mark the task as finished
                        // if that fails, the task was canceled/failed in the 
meantime
                        if (!transitionState(ExecutionState.RUNNING, 
ExecutionState.FINISHED)) {
                                throw new CancelTaskException();
                        }
                }
                catch (Throwable t) {

                        // unwrap wrapped exceptions to make stack traces more 
compact
                        if (t instanceof WrappingRuntimeException) {
                                t = ((WrappingRuntimeException) t).unwrap();
                        }

                        // 
----------------------------------------------------------------
                        // the execution failed. either the invokable code 
properly failed, or
                        // an exception was thrown as a side effect of 
cancelling
                        // 
----------------------------------------------------------------

                        try {
                                // check if the exception is unrecoverable
                                if (ExceptionUtils.isJvmFatalError(t) ||
                                                (t instanceof OutOfMemoryError 
&& taskManagerConfig.shouldExitJvmOnOutOfMemoryError())) {

                                        // terminate the JVM immediately
                                        // don't attempt a clean shutdown, 
because we cannot expect the clean shutdown to complete
                                        try {
                                                LOG.error("Encountered fatal 
error {} - terminating the JVM", t.getClass().getName(), t);
                                        } finally {
                                                Runtime.getRuntime().halt(-1);
                                        }
                                }

                                // transition into our final state. we should 
be either in DEPLOYING, RUNNING, CANCELING, or FAILED
                                // loop for multiple retries during concurrent 
state changes via calls to cancel() or
                                // to failExternally()
                                while (true) {
                                        ExecutionState current = 
this.executionState;

                                        if (current == ExecutionState.RUNNING 
|| current == ExecutionState.DEPLOYING) {
                                                if (t instanceof 
CancelTaskException) {
                                                        if 
(transitionState(current, ExecutionState.CANCELED)) {
                                                                
cancelInvokable(invokable);
                                                                break;
                                                        }
                                                }
                                                else {
                                                        if 
(transitionState(current, ExecutionState.FAILED, t)) {
                                                                // proper 
failure of the task. record the exception as the root cause
                                                                failureCause = 
t;
                                                                
cancelInvokable(invokable);

                                                                break;
                                                        }
                                                }
                                        }
                                        else if (current == 
ExecutionState.CANCELING) {
                                                if (transitionState(current, 
ExecutionState.CANCELED)) {
                                                        break;
                                                }
                                        }
                                        else if (current == 
ExecutionState.FAILED) {
                                                // in state failed already, no 
transition necessary any more
                                                break;
                                        }
                                        // unexpected state, go to failed
                                        else if (transitionState(current, 
ExecutionState.FAILED, t)) {
                                                LOG.error("Unexpected state in 
task {} ({}) during an exception: {}.", taskNameWithSubtask, executionId, 
current);
                                                break;
                                        }
                                        // else fall through the loop and
                                }
                        }
                        catch (Throwable tt) {
                                String message = String.format("FATAL - 
exception in exception handler of task %s (%s).", taskNameWithSubtask, 
executionId);
                                LOG.error(message, tt);
                                notifyFatalError(message, tt);
                        }
                }
                finally {
                        try {
                                LOG.info("Freeing task resources for {} ({}).", 
taskNameWithSubtask, executionId);

                                // clear the reference to the invokable. this 
helps guard against holding references
                                // to the invokable and its structures in cases 
where this Task object is still referenced
                                this.invokable = null;

                                // stop the async dispatcher.
                                // copy dispatcher reference to stack, against 
concurrent release
                                ExecutorService dispatcher = 
this.asyncCallDispatcher;
                                if (dispatcher != null && 
!dispatcher.isShutdown()) {
                                        dispatcher.shutdownNow();
                                }

                                // free the network resources
                                network.unregisterTask(this);

                                // free memory resources
                                if (invokable != null) {
                                        memoryManager.releaseAll(invokable);
                                }

                                // remove all of the tasks library resources
                                libraryCache.unregisterTask(jobId, executionId);
                                fileCache.releaseJob(jobId, executionId);
                                
blobService.getPermanentBlobService().releaseJob(jobId);

                                // close and de-activate safety net for task 
thread
                                LOG.info("Ensuring all FileSystem streams are 
closed for task {}", this);
                                
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();

                                notifyFinalState();
                        }
                        catch (Throwable t) {
                                // an error in the resource cleanup is fatal
                                String message = String.format("FATAL - 
exception in resource cleanup of task %s (%s).", taskNameWithSubtask, 
executionId);
                                LOG.error(message, t);
                                notifyFatalError(message, t);
                        }

                        // un-register the metrics at the end so that the task 
may already be
                        // counted as finished when this happens
                        // errors here will only be logged
                        try {
                                metrics.close();
                        }
                        catch (Throwable t) {
                                LOG.error("Error during metrics de-registration 
of task {} ({}).", taskNameWithSubtask, executionId, t);
                        }
                }
        }
 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#streamtaskinvoke>StreamTask.invoke()

创建一个后端状态,stateBackend,此时为MemoryStateBackend

stateBackend = createStateBackend();
如果没有调置时间服务,就创建SystemProcessingTimeService,它将当前处理时间指定为调用的结果(时间)

                        // if the clock is not already set, then assign a 
default TimeServiceProvider
                if (timerService == null) {
                        ThreadFactory timerThreadFactory = new 
DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
                                "Time Trigger for " + getName(), 
getUserCodeClassLoader());

                        timerService = new SystemProcessingTimeService(this, 
getCheckpointLock(), timerThreadFactory);
                }
当前流任务对应的操作链条,此处不同的流任务对应的操作链条不一样,像source流中,用户自定义的函数链不一样,这个operatorChain也不一样,这里以WordCount为例说明

operatorChain = new OperatorChain<>(this, streamRecordWriters);
Source流中的操作链条 operatorChain.allOperators

headOperator = operatorChain.getHeadOperator()为StreamSource

allOperators = {StreamOperator[3]@5784} 
0 = {StreamMap@5793} 1 = {StreamFlatMap@5794} 2 = {StreamSource@5789} ```

任务初使化

// task specific initialization
                init();
在所有的operators是opened之前所有的触发器调度不能被执行,就是需要先把operator.open

                        // we need to make sure that any triggers scheduled in 
open() cannot be
                // executed before all operators are opened
                synchronized (lock) {

                        // both the following operations are protected by the 
lock
                        // so that we avoid race conditions in the case that 
initializeState()
                        // registers a timer, that fires before the open() is 
called.

                        initializeState();
                        openAllOperators();
                }
调用具体任务的run()函数去处理,这里分不同的类型

Source 调的是SourceStreamTask.run()函数
Window 调的是OneInputStreamTask.run()函数
        // let the task do its work
                isRunning = true;
                run();
源码

public final void invoke() throws Exception {

                boolean disposed = false;
                try {
                        // -------- Initialize ---------
                        LOG.debug("Initializing {}.", getName());

                        asyncOperationsThreadPool = 
Executors.newCachedThreadPool();

                        CheckpointExceptionHandlerFactory 
cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();

                        synchronousCheckpointExceptionHandler = 
cpExceptionHandlerFactory.createCheckpointExceptionHandler(
                                
getExecutionConfig().isFailTaskOnCheckpointError(),
                                getEnvironment());

                        asynchronousCheckpointExceptionHandler = new 
AsyncCheckpointExceptionHandler(this);

                        stateBackend = createStateBackend();
                        checkpointStorage = 
stateBackend.createCheckpointStorage(getEnvironment().getJobID());

                        // if the clock is not already set, then assign a 
default TimeServiceProvider
                        if (timerService == null) {
                                ThreadFactory timerThreadFactory = new 
DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
                                        "Time Trigger for " + getName(), 
getUserCodeClassLoader());

                                timerService = new 
SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
                        }

                        operatorChain = new OperatorChain<>(this, 
streamRecordWriters);
                        headOperator = operatorChain.getHeadOperator();

                        // task specific initialization
                        init();

                        // save the work of reloading state, etc, if the task 
is already canceled
                        if (canceled) {
                                throw new CancelTaskException();
                        }

                        // -------- Invoke --------
                        LOG.debug("Invoking {}", getName());

                        // we need to make sure that any triggers scheduled in 
open() cannot be
                        // executed before all operators are opened
                        synchronized (lock) {

                                // both the following operations are protected 
by the lock
                                // so that we avoid race conditions in the case 
that initializeState()
                                // registers a timer, that fires before the 
open() is called.

                                initializeState();
                                openAllOperators();
                        }

                        // final check to exit early before starting to run
                        if (canceled) {
                                throw new CancelTaskException();
                        }

                        // let the task do its work
                        isRunning = true;
                        run();

                        // if this left the run() method cleanly despite the 
fact that this was canceled,
                        // make sure the "clean shutdown" is not attempted
                        if (canceled) {
                                throw new CancelTaskException();
                        }

                        LOG.debug("Finished task {}", getName());

                        // make sure no further checkpoint and notification 
actions happen.
                        // we make sure that no other thread is currently in 
the locked scope before
                        // we close the operators by trying to acquire the 
checkpoint scope lock
                        // we also need to make sure that no triggers fire 
concurrently with the close logic
                        // at the same time, this makes sure that during any 
"regular" exit where still
                        synchronized (lock) {
                                // this is part of the main logic, so if this 
fails, the task is considered failed
                                closeAllOperators();

                                // make sure no new timers can come
                                timerService.quiesce();

                                // only set the StreamTask to not running after 
all operators have been closed!
                                // See FLINK-7430
                                isRunning = false;
                        }

                        // make sure all timers finish
                        timerService.awaitPendingAfterQuiesce();

                        LOG.debug("Closed operators for task {}", getName());

                        // make sure all buffered data is flushed
                        operatorChain.flushOutputs();

                        // make an attempt to dispose the operators such that 
failures in the dispose call
                        // still let the computation fail
                        tryDisposeAllOperators();
                        disposed = true;
                }
                finally {
                        // clean up everything we initialized
                        isRunning = false;

                        // Now that we are outside the user code, we do not 
want to be interrupted further
                        // upon cancellation. The shutdown logic below needs to 
make sure it does not issue calls
                        // that block and stall shutdown.
                        // Additionally, the cancellation watch dog will issue 
a hard-cancel (kill the TaskManager
                        // process) as a backup in case some shutdown procedure 
blocks outside our control.
                        setShouldInterruptOnCancel(false);

                        // clear any previously issued interrupt for a more 
graceful shutdown
                        Thread.interrupted();

                        // stop all timers and threads
                        tryShutdownTimerService();

                        // stop all asynchronous checkpoint threads
                        try {
                                cancelables.close();
                                shutdownAsyncThreads();
                        }
                        catch (Throwable t) {
                                // catch and log the exception to not replace 
the original exception
                                LOG.error("Could not shut down async checkpoint 
threads", t);
                        }

                        // we must! perform this cleanup
                        try {
                                cleanup();
                        }
                        catch (Throwable t) {
                                // catch and log the exception to not replace 
the original exception
                                LOG.error("Error during cleanup of stream 
task", t);
                        }

                        // if the operators were not disposed before, do a hard 
dispose
                        if (!disposed) {
                                disposeAllOperators();
                        }

                        // release the output resources. this method should 
never fail.
                        if (operatorChain != null) {
                                // beware: without synchronization, 
#performCheckpoint() may run in
                                //         parallel and this call is not 
thread-safe
                                synchronized (lock) {
                                        operatorChain.releaseOutputs();
                                }
                        }
                }
        }
 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#sourcestreamtaskrun>SourceStreamTask.run()

headOperator,会依次从StreamSource.operatorChain中调用(StreamSource,StreamFlatMap,StreamMap),这个就是链式调用,把这一个类型的任务,可以依次调用执行对应的operator,不需要每次一次operator输出中间结果
StreamSource操作会调用SocketTextStreamFunction.run()函数来处理
源码
        protected void run() throws Exception {
                headOperator.run(getCheckpointLock(), 
getStreamStatusMaintainer());
        }

 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#sockettextstreamfunctionrun>SocketTextStreamFunction.run()

建立Source的Sorcket连接,读取流中的数据,每次读取8K的数据放到缓存中,再按行进行解析
把一行数据放到ctx.collect(record);进行后续的处理
此处调用的是NonTimestampContext.collect(record)
public void run(SourceContext<String> ctx) throws Exception {
                final StringBuilder buffer = new StringBuilder();
                long attempt = 0;

                while (isRunning) {

                        try (Socket socket = new Socket()) {
                                currentSocket = socket;

                                LOG.info("Connecting to server socket " + 
hostname + ':' + port);
                                socket.connect(new InetSocketAddress(hostname, 
port), CONNECTION_TIMEOUT_TIME);
                                try (BufferedReader reader = new 
BufferedReader(new InputStreamReader(socket.getInputStream()))) {

                                        char[] cbuf = new char[8192];
                                        int bytesRead;
                                        while (isRunning && (bytesRead = 
reader.read(cbuf)) != -1) {
                                                buffer.append(cbuf, 0, 
bytesRead);
                                                int delimPos;
                                                while (buffer.length() >= 
delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
                                                        String record = 
buffer.substring(0, delimPos);
                                                        // truncate trailing 
carriage return
                                                        if 
(delimiter.equals("\n") && record.endsWith("\r")) {
                                                                record = 
record.substring(0, record.length() - 1);
                                                        }
                                                        ctx.collect(record);
                                                        buffer.delete(0, 
delimPos + delimiter.length());
                                                }
                                        }
                                }
                        }

                        // if we dropped out of this loop due to an EOF, sleep 
and retry
                        if (isRunning) {
                                attempt++;
                                if (maxNumRetries == -1 || attempt < 
maxNumRetries) {
                                        LOG.warn("Lost connection to server 
socket. Retrying in " + delayBetweenRetries + " msecs...");
                                        Thread.sleep(delayBetweenRetries);
                                }
                                else {
                                        // this should probably be here, but 
some examples expect simple exists of the stream source
                                        // throw new EOFException("Reached end 
of stream and reconnects are not enabled.");
                                        break;
                                }
                        }
                }

                // collect trailing data
                if (buffer.length() > 0) {
                        ctx.collect(buffer.toString());
                }
        }
 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#recordwriteremit>RecordWriter.emit

numChannels 为并行度,即为DataStrea.setParallelism(2) 设置的并行度
channelSelector.selectChannels(record, 
numChannels),分区算法,给当前数据分区(分区是为了给下游并行计算使用,在这里是发给不同的Window,并行计算)
调用KeyGroupStreamPartitioner.selectChannels具体的分区算法
源码
        public void emit(T record) throws IOException, InterruptedException {
                emit(record, channelSelector.selectChannels(record, 
numChannels));
        }
 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#keygroupstreampartitionerselectchannels>KeyGroupStreamPartitioner.selectChannels

分区实现KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, 
numberOfOutputChannels);

        分区代码
numberOfOutputChannels: 一共分为多少个分区,即并行度为多少
maxParallelism:最大并行度,默认为128
key:处理的数据,对应的key的值

KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, 
numberOfOutputChannels);

源码

        @Override
        public int[] selectChannels(
                SerializationDelegate<StreamRecord<T>> record,
                int numberOfOutputChannels) {

                K key;
                try {
                        key = 
keySelector.getKey(record.getInstance().getValue());
                } catch (Exception e) {
                        throw new RuntimeException("Could not extract key from 
" + record.getInstance().getValue(), e);
                }
                returnArray[0] = 
KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, 
numberOfOutputChannels);
                return returnArray;
        }

 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#oneinputstreamtaskrun>OneInputStreamTask.run()

StreamTask.run().run()函数调用,当为Window时调用OneInputStreamTask.run()
调用StreamInputProcessor.processInput()函数
源码
    protected void run() throws Exception {
                // cache processor reference on the stack, to make the code 
more JIT friendly
                final StreamInputProcessor<IN> inputProcessor = 
this.inputProcessor;

                while (running && inputProcessor.processInput()) {
                        // all the work happens in the "processInput" method
                }
        }
 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#streaminputprocessorprocessinput>StreamInputProcessor.processInput()

调用BarrierTracker.getNextNonBlocked()得到一个元素(key,value)得值,也就是source进行flatMap,map 
函数之后的数据,此时,还没有进行聚合操作,注意这里会得到
此时的数据还没有进行分配给不同的Window,当Source有数据发送过来后,就一条一条调用streamOperator.processElement(record),即WindowOperator.processElement进行处理
public boolean processInput() throws Exception {
                if (isFinished) {
                        return false;
                }
                if (numRecordsIn == null) {
                        try {
                                numRecordsIn = ((OperatorMetricGroup) 
streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
                        } catch (Exception e) {
                                LOG.warn("An exception occurred during the 
metrics setup.", e);
                                numRecordsIn = new SimpleCounter();
                        }
                }

                while (true) {
                        if (currentRecordDeserializer != null) {
                                DeserializationResult result = 
currentRecordDeserializer.getNextRecord(deserializationDelegate);

                                if (result.isBufferConsumed()) {
                                        
currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                                        currentRecordDeserializer = null;
                                }

                                if (result.isFullRecord()) {
                                        StreamElement recordOrMark = 
deserializationDelegate.getInstance();

                                        if (recordOrMark.isWatermark()) {
                                                // handle watermark
                                                
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                                                continue;
                                        } else if 
(recordOrMark.isStreamStatus()) {
                                                // handle stream status
                                                
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), 
currentChannel);
                                                continue;
                                        } else if 
(recordOrMark.isLatencyMarker()) {
                                                // handle latency marker
                                                synchronized (lock) {
                                                        
streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                                                }
                                                continue;
                                        } else {
                                                // now we can do the actual 
processing
                                                StreamRecord<IN> record = 
recordOrMark.asRecord();
                                                synchronized (lock) {
                                                        numRecordsIn.inc();
                                                        
streamOperator.setKeyContextElement1(record);
                                                        
streamOperator.processElement(record);
                                                }
                                                return true;
                                        }
                                }
                        }

                        final BufferOrEvent bufferOrEvent = 
barrierHandler.getNextNonBlocked();
                        if (bufferOrEvent != null) {
                                if (bufferOrEvent.isBuffer()) {
                                        currentChannel = 
bufferOrEvent.getChannelIndex();
                                        currentRecordDeserializer = 
recordDeserializers[currentChannel];
                                        
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                                }
                                else {
                                        // Event received
                                        final AbstractEvent event = 
bufferOrEvent.getEvent();
                                        if (event.getClass() != 
EndOfPartitionEvent.class) {
                                                throw new 
IOException("Unexpected event: " + event);
                                        }
                                }
                        }
                        else {
                                isFinished = true;
                                if (!barrierHandler.isEmpty()) {
                                        throw new 
IllegalStateException("Trailing data in checkpoint barrier handler.");
                                }
                                return false;
                        }
                }
        }
 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#windowoperatorprocesselementstreamrecord-element>WindowOperator.processElement(StreamRecord
 element)

WindowOperator.processElement,给每一个WordWithCount(1,1) 
这样的元素分配window,也就是确认每一个元素属于哪一个窗口,因为需要对同一个窗口的相同key进行聚合操作

final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);
把当前元素增加到state中保存,add函数中会对相同key进行聚合操作(reduce),对同一个window中相同key进行求和就是在这个方法中进行的

windowState.add(element.getValue());
triggerContext.onElement(element),对当前元素设置trigger,也就是当前元素的window在哪个时间点触发(结束的时间点),
 
把当前元素的key,增加到InternalTimerServiceImpl.processingTimeTimersQueue中,每一条数据会加一次,加完后会去重,相当于Set,对相同Key的处理,
 
后面发送给Sink的数据,就是遍历这个processingTimeTimersQueue中的数据,当然,每次发送第一个元素,发送后,会把最后一个元素放到第一个元素

TriggerResult triggerResult = triggerContext.onElement(element);

public void processElement(StreamRecord<IN> element) throws Exception {
                final Collection<W> elementWindows = 
windowAssigner.assignWindows(
                        element.getValue(), element.getTimestamp(), 
windowAssignerContext);

                //if element is handled by none of assigned elementWindows
                boolean isSkippedElement = true;

                final K key = this.<K>getKeyedStateBackend().getCurrentKey();

                if (windowAssigner instanceof MergingWindowAssigner) {
                        MergingWindowSet<W> mergingWindows = 
getMergingWindowSet();

                        for (W window: elementWindows) {

                                // adding the new window might result in a 
merge, in that case the actualWindow
                                // is the merged window and we work with that. 
If we don't merge then
                                // actualWindow == window
                                W actualWindow = 
mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
                                        @Override
                                        public void merge(W mergeResult,
                                                        Collection<W> 
mergedWindows, W stateWindowResult,
                                                        Collection<W> 
mergedStateWindows) throws Exception {

                                                if 
((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness 
<= internalTimerService.currentWatermark())) {
                                                        throw new 
UnsupportedOperationException("The end timestamp of an " +
                                                                        
"event-time window cannot become earlier than the current watermark " +
                                                                        "by 
merging. Current watermark: " + internalTimerService.currentWatermark() +
                                                                        " 
window: " + mergeResult);
                                                } else if 
(!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= 
internalTimerService.currentProcessingTime()) {
                                                        throw new 
UnsupportedOperationException("The end timestamp of a " +
                                                                        
"processing-time window cannot become earlier than the current processing time 
" +
                                                                        "by 
merging. Current processing time: " + 
internalTimerService.currentProcessingTime() +
                                                                        " 
window: " + mergeResult);
                                                }

                                                triggerContext.key = key;
                                                triggerContext.window = 
mergeResult;

                                                
triggerContext.onMerge(mergedWindows);

                                                for (W m: mergedWindows) {
                                                        triggerContext.window = 
m;
                                                        triggerContext.clear();
                                                        deleteCleanupTimer(m);
                                                }

                                                // merge the merged state 
windows into the newly resulting state window
                                                
windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
                                        }
                                });

                                // drop if the window is already late
                                if (isWindowLate(actualWindow)) {
                                        
mergingWindows.retireWindow(actualWindow);
                                        continue;
                                }
                                isSkippedElement = false;

                                W stateWindow = 
mergingWindows.getStateWindow(actualWindow);
                                if (stateWindow == null) {
                                        throw new IllegalStateException("Window 
" + window + " is not in in-flight window set.");
                                }

                                windowState.setCurrentNamespace(stateWindow);
                                windowState.add(element.getValue());

                                triggerContext.key = key;
                                triggerContext.window = actualWindow;

                                TriggerResult triggerResult = 
triggerContext.onElement(element);

                                if (triggerResult.isFire()) {
                                        ACC contents = windowState.get();
                                        if (contents == null) {
                                                continue;
                                        }
                                        emitWindowContents(actualWindow, 
contents);
                                }

                                if (triggerResult.isPurge()) {
                                        windowState.clear();
                                }
                                registerCleanupTimer(actualWindow);
                        }

                        // need to make sure to update the merging state in 
state
                        mergingWindows.persist();
                } else {
                        for (W window: elementWindows) {

                                // drop if the window is already late
                                if (isWindowLate(window)) {
                                        continue;
                                }
                                isSkippedElement = false;

                                windowState.setCurrentNamespace(window);
                                windowState.add(element.getValue());

                                triggerContext.key = key;
                                triggerContext.window = window;

                                TriggerResult triggerResult = 
triggerContext.onElement(element);

                                if (triggerResult.isFire()) {
                                        ACC contents = windowState.get();
                                        if (contents == null) {
                                                continue;
                                        }
                                        emitWindowContents(window, contents);
                                }

                                if (triggerResult.isPurge()) {
                                        windowState.clear();
                                }
                                registerCleanupTimer(window);
                        }
                }

                // side output input event if
                // element not handled by any window
                // late arriving tag has been set
                // windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
                if (isSkippedElement && isElementLate(element)) {
                        if (lateDataOutputTag != null){
                                sideOutput(element);
                        } else {
                                this.numLateRecordsDropped.inc();
                        }
                }
        }


 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#internaltimerserviceimplonprocessingtime>InternalTimerServiceImpl.onProcessingTime

processingTimeTimersQueue(HeapPriorityQueueSet) 该对象中存储了所有的key,这些key是去重后,按处理顺序排序

processingTimeTimersQueue.peek() 取出第一条数据进行处理

processingTimeTimersQueue.poll();会移除第一条数据,并且,拿最后一条数据,放第1一个元素,导致,所有元素的处理顺序是,先处理第一个元素,然后,把最后一个元素放第一个,
 最后一个就置为空,再循环处理所有数据,相当于处理完第一个元素,处后从最后一个元素开始处理,一直处理到完成,举例

1 2 1 3 2 5 4
存为 1 2 3 5 4 
顺序就变为
 1
 4
 5
 3
 2
keyContext.setCurrentKey(timer.getKey());//设置当前的key,当前需要处理的

triggerTarget.onProcessingTime(timer);// 调用 
WindowOperator.onProcessingTime(timer)处理

queue = {HeapPriorityQueueElement[129]@8184} 
 1 = {TimerHeapInternalTimer@12441} "Timer{timestamp=1551505439999, key=(1), 
namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
 2 = {TimerHeapInternalTimer@12442} "Timer{timestamp=1551505439999, key=(2), 
namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
 3 = {TimerHeapInternalTimer@12443} "Timer{timestamp=1551505439999, key=(3), 
namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
 5 = {TimerHeapInternalTimer@12443} "Timer{timestamp=1551505439999, key=(3), 
namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
 4 = {TimerHeapInternalTimer@12443} "Timer{timestamp=1551505439999, key=(3), 
namespace=TimeWindow{start=1551505380000, end=1551505440000}}"

调用 WindowOperator.onProcessingTime(timer)处理当前key;

public void onProcessingTime(long time) throws Exception {
                // null out the timer in case the Triggerable calls 
registerProcessingTimeTimer()
                // inside the callback.
                nextTimer = null;

                InternalTimer<K, N> timer;

                while ((timer = processingTimeTimersQueue.peek()) != null && 
timer.getTimestamp() <= time) {
                        processingTimeTimersQueue.poll();
                        keyContext.setCurrentKey(timer.getKey());
                        triggerTarget.onProcessingTime(timer);
                }

                if (timer != null && nextTimer == null) {
                        nextTimer = 
processingTimeService.registerTimer(timer.getTimestamp(), this);
                }
        }
 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#windowoperatoronprocessingtime>WindowOperator.onProcessingTime

triggerResult.isFire()// 当前元素对应的window已经可以发射了,即过了结束时间
windowState.get() //取出当前key对应的(key,value)此时已经是相同key聚合后的值
emitWindowContents(triggerContext.window, contents);//发送给Sink进行处理
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
                triggerContext.key = timer.getKey();
                triggerContext.window = timer.getNamespace();

                MergingWindowSet<W> mergingWindows;

                if (windowAssigner instanceof MergingWindowAssigner) {
                        mergingWindows = getMergingWindowSet();
                        W stateWindow = 
mergingWindows.getStateWindow(triggerContext.window);
                        if (stateWindow == null) {
                                // Timer firing for non-existent window, this 
can only happen if a
                                // trigger did not clean up timers. We have 
already cleared the merging
                                // window and therefore the Trigger state, 
however, so nothing to do.
                                return;
                        } else {
                                windowState.setCurrentNamespace(stateWindow);
                        }
                } else {
                        windowState.setCurrentNamespace(triggerContext.window);
                        mergingWindows = null;
                }

                TriggerResult triggerResult = 
triggerContext.onProcessingTime(timer.getTimestamp());

                if (triggerResult.isFire()) {
                        ACC contents = windowState.get();
                        if (contents != null) {
                                emitWindowContents(triggerContext.window, 
contents);
                        }
                }

                if (triggerResult.isPurge()) {
                        windowState.clear();
                }

                if (!windowAssigner.isEventTime() && 
isCleanupTime(triggerContext.window, timer.getTimestamp())) {
                        clearAllState(triggerContext.window, windowState, 
mergingWindows);
                }

                if (mergingWindows != null) {
                        // need to make sure to update the merging state in 
state
                        mergingWindows.persist();
                }
        }
 
<https://github.com/opensourceteams/fink-maven-scala-2/blob/master/md/miniCluster/Flink-Parallelism-Calculation.md#singleinputgate>SingleInputGate

中间数据处理流程(数据交互)
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.taskmanager.TaskActions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Timer;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
 * An input gate consumes one or more partitions of a single produced 
intermediate result.
 *
 * <p>Each intermediate result is partitioned over its producing parallel 
subtasks; each of these
 * partitions is furthermore partitioned into one or more subpartitions.
 *
 * <p>As an example, consider a map-reduce program, where the map operator 
produces data and the
 * reduce operator consumes the produced data.
 *
 * <pre>{@code
 * +-----+              +---------------------+              +--------+
 * | Map | = produce => | Intermediate Result | <= consume = | Reduce |
 * +-----+              +---------------------+              +--------+
 * }</pre>
 *
 * <p>When deploying such a program in parallel, the intermediate result will 
be partitioned over its
 * producing parallel subtasks; each of these partitions is furthermore 
partitioned into one or more
 * subpartitions.
 *
 * <pre>{@code
 *                            Intermediate result
 *               +-----------------------------------------+
 *               |                      +----------------+ |              
+-----------------------+
 * +-------+     | +-------------+  +=> | Subpartition 1 | | <=======+=== | 
Input Gate | Reduce 1 |
 * | Map 1 | ==> | | Partition 1 | =|   +----------------+ |         |    
+-----------------------+
 * +-------+     | +-------------+  +=> | Subpartition 2 | | <==+    |
 *               |                      +----------------+ |    |    | 
Subpartition request
 *               |                                         |    |    |
 *               |                      +----------------+ |    |    |
 * +-------+     | +-------------+  +=> | Subpartition 1 | | <==+====+
 * | Map 2 | ==> | | Partition 2 | =|   +----------------+ |    |         
+-----------------------+
 * +-------+     | +-------------+  +=> | Subpartition 2 | | <==+======== | 
Input Gate | Reduce 2 |
 *               |                      +----------------+ |              
+-----------------------+
 *               +-----------------------------------------+
 * }</pre>
 *
 * <p>In the above example, two map subtasks produce the intermediate result in 
parallel, resulting
 * in two partitions (Partition 1 and 2). Each of these partitions is further 
partitioned into two
 * subpartitions -- one for each parallel reduce subtask.
 */
public class SingleInputGate implements InputGate {






########################################################################







> 在 2019年3月4日,下午2:26,343122...@qq.com 写道:
> 
> 以下个人理解,可能不100%准确.
> 是根据keyBy,即你代码中的 .keyBy("word"), 
> 根据其值的hash值,模并行度得到余数, 来确定 数据该分到哪个分区, 
> 你代码里没有指定时间特征,默认是处理时间.
> 所有你的window,则是根据处理时间来分窗口的.
> 
> 
> 
> 
> 343122...@qq.com
> 
> 发件人: 刘 文
> 发送时间: 2019-03-04 11:53
> 收件人: user-zh@flink.apache.org
> 主题: Re: [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?
> ------------------------------------------------------    
> 很抱歉,我还是没有理解,我可以再次请求帮助吗?
> 
> 例如:
> ).并行度调置为2时setParallelism(2),会产生两个window线程
> ). 流 WordCount local ,flink 1.7.2
> ).这两个Window线程是如何读取到自己分区中的数据的,Window分区是如何确定的?
> ).输入数据
>  1 2 3 4 5 6 7 8 9 10
> ).source   ->  operator   -> 
>    ------------------
>    change [partition 0]
> 
> 
> key:1    partition:0
> key:2    partition:0
> key:3    partition:0
> key:4    partition:0
> key:6    partition:0
> key:10   partition:0
> ------------------
> change 1  [partition 1]
> key:5    partition:1
> key:7    partition:1
> key:8    partition:1
> key:9    partition:1
> ).window 0 (1/2)
>    window 当前partition是如何确定的?
>    window 是如何读到当前parition中的数据的?
> 
> ).window 1 (2/2) 
>    window 当前partition是如何确定的?
>    window 是如何读到当前parition中的数据的?
> 
> 
> ------------------------------------------------------    
> 
> 
> 
> 
> 
>> 在 2019年3月3日,下午9:26,刘 文 <thinktothi...@yahoo.com.INVALID> 写道:
>> 
>> WordCount.scala
>> package 
>> com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.parallelism
>> 
>> import org.apache.flink.configuration.Configuration
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.streaming.api.windowing.time.Time
>> 
>> /**
>> * nc -lk 1234  输入数据
>> */
>> object SocketWindowWordCountLocal {
>> 
>> 
>> 
>> def main(args: Array[String]): Unit = {
>> 
>> 
>>   val port = 1234
>>   // get the execution environment
>>  // val env: StreamExecutionEnvironment = 
>> StreamExecutionEnvironment.getExecutionEnvironment
>> 
>> 
>>   val configuration : Configuration = getConfiguration(true)
>> 
>>   val env:StreamExecutionEnvironment = 
>> StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
>> 
>> 
>> 
>> 
>> 
>>   // get input data by connecting to the socket
>>   val dataStream = env.socketTextStream("localhost", port, '\n')
>> 
>> 
>> 
>>   import org.apache.flink.streaming.api.scala._
>>   val textResult = dataStream.flatMap( w => w.split("\\s") ).map( w => 
>> WordWithCount(w,1))
>>     .keyBy("word")
>>     /**
>>       * 每20秒刷新一次,相当于重新开始计数,
>>       * 好处,不需要一直拿所有的数据统计
>>       * 只需要在指定时间间隔内的增量数据,减少了数据规模
>>       */
>>     .timeWindow(Time.seconds(5))
>>     //.countWindow(3)
>>     //.countWindow(3,1)
>>     //.countWindowAll(3)
>> 
>> 
>>     .sum("count" )
>> 
>>   textResult
>>     .setParallelism(100)
>>     .print()
>> 
>> 
>> 
>> 
>>   if(args == null || args.size ==0){
>> 
>> 
>>     
>> println("==================================以下为执行计划==================================")
>>     println("执行地址(firefox效果更好):https://flink.apache.org/visualizer";)
>>     //执行计划
>>     println(env.getExecutionPlan)
>>     println("==================================以上为执行计划 
>> JSON串==================================\n")
>>     //StreamGraph
>>    //println(env.getStreamGraph.getStreamingPlanAsJSON)
>> 
>> 
>> 
>>     //JsonPlanGenerator.generatePlan(jobGraph)
>> 
>>     env.execute("默认作业")
>> 
>>   }else{
>>     env.execute(args(0))
>>   }
>> 
>>   println("结束")
>> 
>> }
>> 
>> 
>> // Data type for words with count
>> case class WordWithCount(word: String, count: Long)
>> 
>> 
>> def getConfiguration(isDebug:Boolean = false):Configuration = {
>> 
>>   val configuration : Configuration = new Configuration()
>> 
>>   if(isDebug){
>>     val timeout = "100000 s"
>>     val timeoutHeartbeatPause = "1000000 s"
>>     configuration.setString("akka.ask.timeout",timeout)
>>     configuration.setString("akka.lookup.timeout",timeout)
>>     configuration.setString("akka.tcp.timeout",timeout)
>>     configuration.setString("akka.transport.heartbeat.interval",timeout)
>>     
>> configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause)
>>     configuration.setString("akka.watch.heartbeat.pause",timeout)
>>     configuration.setInteger("heartbeat.interval",10000000)
>>     configuration.setInteger("heartbeat.timeout",50000000)
>>   }
>> 
>> 
>>   configuration
>> }
>> 
>> 
>> }
>> 
>> 
>> 
>>> 在 2019年3月3日,下午9:05,刘 文 <thinktothi...@yahoo.com.INVALID> 写道:
>>> 
>>> 
>> [问题]Flink并行计算中,不同的Window是如何接收到自己分区的数据的,即Window是如何确定当前Window属于哪个分区数?
>>> 
>>> ).环境 Flink1.7.2 WordCount local,流处理
>>> ).source 中 RecordWriter.emit(),给每个元素按key,分到不同的partition,已确定每个元素的分区位置,分区个数由 
>>> DataStream.setParallelism(2)决定
>>> 
>>> public void emit(T record) throws IOException, InterruptedException {
>>>   emit(record, channelSelector.selectChannels(record, numChannels));
>>> }
>>> 
>>> 通过copyFromSerializerToTargetChannel(int targetChannel) 
>>> 往不同的通道写数据,就是往不同的分区对应的window发送数据(数据是一条一条发送)
>>> ).有多少个并行度,DataStream.setParallelism(2) ,就开启多少个Window
>>> 
>> 
> 
> 

回复