[ 
https://issues.apache.org/jira/browse/FLINK-9869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16679216#comment-16679216
 ] 

ASF GitHub Bot commented on FLINK-9869:
---------------------------------------

TisonKun closed pull request #6345: [FLINK-9869][runtime] Send PartitionInfo in 
batch to Improve perfornance
URL: https://github.com/apache/flink/pull/6345
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/job_manager_configuration.html 
b/docs/_includes/generated/job_manager_configuration.html
index 0458af24c06..83d8abb7d27 100644
--- a/docs/_includes/generated/job_manager_configuration.html
+++ b/docs/_includes/generated/job_manager_configuration.html
@@ -42,6 +42,11 @@
             <td style="word-wrap: break-word;">6123</td>
             <td>The config parameter defining the network port to connect to 
for communication with the job manager. Like jobmanager.rpc.address, this value 
is only interpreted in setups where a single JobManager with static 
name/address and port exists (simple standalone setups, or container setups 
with dynamic service name resolution). This config option is not used in many 
high-availability setups, when a leader-election service (like ZooKeeper) is 
used to elect and discover the JobManager leader from potentially multiple 
standby JobManagers.</td>
         </tr>
+        <tr>
+            <td><h5>jobmanager.update-partition-info.send-interval</h5></td>
+            <td style="word-wrap: break-word;">10</td>
+            <td>The interval of send update-partition-info message.</td>
+        </tr>
         <tr>
             <td><h5>jobstore.cache-size</h5></td>
             <td style="word-wrap: break-word;">52428800</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 1666f213d18..43091a256b2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -154,6 +154,11 @@
                .defaultValue(60L * 60L)
                .withDescription("The time in seconds after which a completed 
job expires and is purged from the job store.");
 
+       public static final ConfigOption<Long> 
UPDATE_PARTITION_INFO_SEND_INTERVAL =
+               key("jobmanager.update-partition-info.send-interval")
+               .defaultValue(10L)
+               .withDescription("The interval of send update-partition-info 
message.");
+
        /**
         * The timeout in milliseconds for requesting a slot from Slot Pool.
         */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 801f35a41dc..4a157f9cb60 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -27,18 +27,13 @@
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -69,6 +64,8 @@
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.stream.Collectors;
@@ -178,6 +175,10 @@
 
        // 
--------------------------------------------------------------------------------------------
 
+       private final Object updatePartitionLock = new Object();
+
+       private ScheduledFuture updatePartitionFuture;
+
        /**
         * Creates a new Execution attempt.
         *
@@ -588,24 +589,27 @@ public void deploy() throws JobException {
 
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
 
-                       final CompletableFuture<Acknowledge> submitResultFuture 
= taskManagerGateway.submitTask(deployment, rpcTimeout);
-
-                       submitResultFuture.whenCompleteAsync(
-                               (ack, failure) -> {
-                                       // only respond to the failure case
-                                       if (failure != null) {
-                                               if (failure instanceof 
TimeoutException) {
-                                                       String taskname = 
vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
-
-                                                       markFailed(new 
Exception(
-                                                               "Cannot deploy 
task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
-                                                                       + ") 
not responding after a rpcTimeout of " + rpcTimeout, failure));
-                                               } else {
-                                                       markFailed(failure);
-                                               }
-                                       }
-                               },
-                               executor);
+                       executor.execute(
+                               () -> {
+                                       final CompletableFuture<Acknowledge> 
submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout);
+
+                                       submitResultFuture.whenCompleteAsync(
+                                               (ack, failure) -> {
+                                                       // only respond to the 
failure case
+                                                       if (failure != null) {
+                                                               if (failure 
instanceof TimeoutException) {
+                                                                       String 
taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
+
+                                                                       
markFailed(new Exception(
+                                                                               
"Cannot deploy task " + taskname + " - TaskManager (" + 
getAssignedResourceLocation()
+                                                                               
        + ") not responding after a rpcTimeout of " + rpcTimeout, failure));
+                                                               } else {
+                                                                       
markFailed(failure);
+                                                               }
+                                                       }
+                                               },
+                                               executor);
+                               });
                }
                catch (Throwable t) {
                        markFailed(t);
@@ -759,44 +763,11 @@ else if (numConsumers == 0) {
                        // 
----------------------------------------------------------------
                        else {
                                if (consumerState == RUNNING) {
-                                       final LogicalSlot consumerSlot = 
consumer.getAssignedResource();
-
-                                       if (consumerSlot == null) {
-                                               // The consumer has been reset 
concurrently
-                                               continue;
-                                       }
-
-                                       final TaskManagerLocation 
partitionTaskManagerLocation = partition.getProducer()
-                                                       
.getCurrentAssignedResource().getTaskManagerLocation();
-                                       final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
-
-                                       final ResourceID consumerTaskManager = 
consumerSlot.getTaskManagerLocation().getResourceID();
-
-                                       final ResultPartitionID partitionId = 
new ResultPartitionID(partition.getPartitionId(), attemptId);
-
-                                       final ResultPartitionLocation 
partitionLocation;
-
-                                       if 
(consumerTaskManager.equals(partitionTaskManager)) {
-                                               // Consuming task is deployed 
to the same instance as the partition => local
-                                               partitionLocation = 
ResultPartitionLocation.createLocal();
-                                       }
-                                       else {
-                                               // Different instances => remote
-                                               final ConnectionID connectionId 
= new ConnectionID(
-                                                               
partitionTaskManagerLocation,
-                                                               
partition.getIntermediateResult().getConnectionIndex());
-
-                                               partitionLocation = 
ResultPartitionLocation.createRemote(connectionId);
-                                       }
-
-                                       final InputChannelDeploymentDescriptor 
descriptor = new InputChannelDeploymentDescriptor(
-                                                       partitionId, 
partitionLocation);
-
-                                       consumer.sendUpdatePartitionInfoRpcCall(
-                                               Collections.singleton(
-                                                       new PartitionInfo(
-                                                               
partition.getIntermediateResult().getId(),
-                                                               descriptor)));
+                                       // cache the partition info and trigger 
a timer to group them and send in batch
+                                       final Execution partitionExecution = 
partition.getProducer()
+                                               .getCurrentExecutionAttempt();
+                                       
consumerVertex.cachePartitionInfo(PartialInputChannelDeploymentDescriptor.fromEdge(partition,
 partitionExecution));
+                                       
consumerVertex.getCurrentExecutionAttempt().sendPartitionInfoAsync();
                                }
                                // 
----------------------------------------------------------------
                                // Consumer is scheduled or deploying => cache 
input channel
@@ -1031,6 +1002,10 @@ void 
cachePartitionInfo(PartialInputChannelDeploymentDescriptor partitionInfo) {
        }
 
        void sendPartitionInfos() {
+               synchronized (updatePartitionLock) {
+                       updatePartitionFuture = null;
+               }
+
                // check if the ExecutionVertex has already been archived and 
thus cleared the
                // partial partition infos queue
                if (partialInputChannelDeploymentDescriptors != null && 
!partialInputChannelDeploymentDescriptors.isEmpty()) {
@@ -1050,6 +1025,17 @@ void sendPartitionInfos() {
                }
        }
 
+       void sendPartitionInfoAsync() {
+               synchronized (updatePartitionLock) {
+                       if (updatePartitionFuture == null) {
+                               updatePartitionFuture = 
getVertex().getExecutionGraph().getFutureExecutorService().schedule(
+                                       () -> {
+                                               sendPartitionInfos();
+                                       }, 
vertex.getExecutionGraph().getUpdatePartitionInfoSendInterval(), 
TimeUnit.MILLISECONDS);
+                       }
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Internal Actions
        // 
--------------------------------------------------------------------------------------------
@@ -1218,16 +1204,21 @@ private void sendUpdatePartitionInfoRpcCall(
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
                        final TaskManagerLocation taskManagerLocation = 
slot.getTaskManagerLocation();
 
-                       CompletableFuture<Acknowledge> 
updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, 
partitionInfos, rpcTimeout);
-
-                       updatePartitionsResultFuture.whenCompleteAsync(
-                               (ack, failure) -> {
-                                       // fail if there was a failure
-                                       if (failure != null) {
-                                               fail(new 
IllegalStateException("Update task on TaskManager " + taskManagerLocation +
-                                                       " failed due to:", 
failure));
-                                       }
-                               }, executor);
+                       executor.execute(
+                               () -> {
+                                       CompletableFuture<Acknowledge> 
updatePartitionsResultFuture =
+                                               
taskManagerGateway.updatePartitions(attemptId, partitionInfos, rpcTimeout);
+
+                                       
updatePartitionsResultFuture.whenCompleteAsync(
+                                               (ack, failure) -> {
+                                                       // fail if there was a 
failure
+                                                       if (failure != null) {
+                                                               fail(new 
IllegalStateException("Update task on TaskManager " + taskManagerLocation +
+                                                                       " 
failed due to:", failure));
+                                                       }
+                                               }, executor);
+                               }
+                       );
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index acb1e16fe71..9687a640be1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -290,6 +290,8 @@
         * available after archiving. */
        private CheckpointStatsTracker checkpointStatsTracker;
 
+       private long updatePartitionInfoSendInterval;
+
        // ------ Fields that are only relevant for archived execution graphs 
------------
        private String jsonPlan;
 
@@ -746,6 +748,15 @@ public Executor getFutureExecutor() {
                return futureExecutor;
        }
 
+       /**
+        * Returns the ExecutionContext associated with this ExecutionGraph.
+        *
+        * @return ExecutionContext associated with this ExecutionGraph
+        */
+       public ScheduledExecutorService getFutureExecutorService() {
+               return futureExecutor;
+       }
+
        /**
         * Merges all accumulator results from the tasks previously executed in 
the Executions.
         * @return The accumulator map
@@ -804,6 +815,15 @@ public Executor getFutureExecutor() {
                return 
StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
        }
 
+       public long getUpdatePartitionInfoSendInterval() {
+               return updatePartitionInfoSendInterval;
+       }
+
+       public void setUpdatePartitionInfoSendInterval(long 
updatePartitionInfoSendInterval) {
+               this.updatePartitionInfoSendInterval = 
updatePartitionInfoSendInterval;
+       }
+
+
        // 
--------------------------------------------------------------------------------------------
        //  Actions
        // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index f1a861d2ca1..0cd32ed6294 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -24,6 +24,7 @@
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.JobException;
@@ -367,6 +368,9 @@ public static ExecutionGraph buildGraph(
 
                executionGraph.getFailoverStrategy().registerMetrics(metrics);
 
+               executionGraph.setUpdatePartitionInfoSendInterval(
+                       
jobManagerConfig.getLong(JobManagerOptions.UPDATE_PARTITION_INFO_SEND_INTERVAL));
+
                return executionGraph;
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 12b4277941f..649c39d5711 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -159,7 +159,7 @@ public void testBuildDeploymentDescriptor() {
 
                        ExecutionGraph eg = new ExecutionGraph(
                                expectedJobInformation,
-                               TestingUtils.defaultExecutor(),
+                               new DirectScheduledExecutorService(),
                                TestingUtils.defaultExecutor(),
                                AkkaUtils.getDefaultTimeout(),
                                new NoRestartStrategy(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index d91380ed275..3e36d95409e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -317,6 +317,9 @@ public void testFailCallOvertakesDeploymentAnswer() {
                        vertex.deployToSlot(slot);
                        assertEquals(ExecutionState.DEPLOYING, 
vertex.getExecutionState());
 
+                       // execute the deploy rpc call
+                       queue.triggerNextAction();
+
                        Exception testError = new Exception("test error");
                        vertex.fail(testError);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Send PartitionInfo in batch to Improve perfornance
> --------------------------------------------------
>
>                 Key: FLINK-9869
>                 URL: https://issues.apache.org/jira/browse/FLINK-9869
>             Project: Flink
>          Issue Type: Improvement
>          Components: Local Runtime
>    Affects Versions: 1.5.1
>            Reporter: TisonKun
>            Assignee: TisonKun
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.5.6
>
>
> ... current we send partition info as soon as one arrive. we could 
> `cachePartitionInfo` and then `sendPartitionInfoAsync`, which will improve 
> performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to