[FLINK-9212][REST] Port SubtasksAllAccumulatorsHandler to new REST endpoint

This closes #5893.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/210abeeb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/210abeeb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/210abeeb

Branch: refs/heads/release-1.5
Commit: 210abeebe8dd915f38b51a68884e9f42321af8cf
Parents: 300dc4c
Author: zhouhai02 <zhouha...@meituan.com>
Authored: Sun Apr 22 18:59:11 2018 +0800
Committer: zentol <ches...@apache.org>
Committed: Wed Apr 25 09:33:21 2018 +0200

----------------------------------------------------------------------
 .../job/SubtasksAllAccumulatorsHandler.java     |  82 +++++++++++
 .../SubtasksAllAccumulatorsHandlers.java        |  75 ++++++++++
 .../job/SubtasksAllAccumulatorsInfo.java        | 144 +++++++++++++++++++
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  12 ++
 .../job/SubtasksAllAccumulatorsInfoTest.java    |  58 ++++++++
 5 files changed, 371 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/210abeeb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
new file mode 100644
index 0000000..51efba2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksAllAccumulatorsHandler.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.SubtasksAllAccumulatorsInfo;
+import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the subtasks all accumulators.
+ */
+public class SubtasksAllAccumulatorsHandler extends 
AbstractJobVertexHandler<SubtasksAllAccumulatorsInfo, 
JobVertexMessageParameters> {
+
+       public SubtasksAllAccumulatorsHandler(CompletableFuture<String> 
localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, 
Time timeout, Map<String, String> responseHeaders, 
MessageHeaders<EmptyRequestBody, SubtasksAllAccumulatorsInfo, 
JobVertexMessageParameters> messageHeaders, ExecutionGraphCache 
executionGraphCache, Executor executor) {
+               super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders, executionGraphCache, executor);
+       }
+
+       @Override
+       protected SubtasksAllAccumulatorsInfo 
handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> 
request, AccessExecutionJobVertex jobVertex) throws RestHandlerException {
+               JobVertexID jobVertexId = jobVertex.getJobVertexId();
+               int parallelism = jobVertex.getParallelism();
+
+               final List<SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo> 
subtaskAccumulatorsInfos = new ArrayList<>();
+
+               for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
+                       TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
+                       String locationString = location == null ? 
"(unassigned)" : location.getHostname();
+
+                       StringifiedAccumulatorResult[] accs = 
vertex.getCurrentExecutionAttempt().getUserAccumulatorsStringified();
+                       List<UserAccumulator> userAccumulators = new 
ArrayList<>(accs.length);
+                       for (StringifiedAccumulatorResult acc : accs) {
+                               userAccumulators.add(new 
UserAccumulator(acc.getName(), acc.getType(), acc.getValue()));
+                       }
+
+                       subtaskAccumulatorsInfos.add(
+                               new 
SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo(
+                                       
vertex.getCurrentExecutionAttempt().getParallelSubtaskIndex(),
+                                       
vertex.getCurrentExecutionAttempt().getAttemptNumber(),
+                                       locationString,
+                                       userAccumulators
+                               ));
+               }
+
+               return new SubtasksAllAccumulatorsInfo(jobVertexId, 
parallelism, subtaskAccumulatorsInfos);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/210abeeb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java
new file mode 100644
index 0000000..e178c93
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksAllAccumulatorsHandlers.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler;
+import org.apache.flink.runtime.rest.messages.job.SubtasksAllAccumulatorsInfo;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link SubtasksAllAccumulatorsHandler}.
+ */
+public class SubtasksAllAccumulatorsHandlers implements 
MessageHeaders<EmptyRequestBody, SubtasksAllAccumulatorsInfo, 
JobVertexMessageParameters> {
+
+       private static final SubtasksAllAccumulatorsHandlers INSTANCE = new 
SubtasksAllAccumulatorsHandlers();
+
+       public static final String URL = "/jobs" +
+               "/:" + JobIDPathParameter.KEY +
+               "/vertices" +
+               "/:" + JobVertexIdPathParameter.KEY +
+               "/subtasks/accumulators";
+
+       private SubtasksAllAccumulatorsHandlers() {}
+
+       @Override
+       public Class<EmptyRequestBody> getRequestClass() {
+               return EmptyRequestBody.class;
+       }
+
+       @Override
+       public Class<SubtasksAllAccumulatorsInfo> getResponseClass() {
+               return SubtasksAllAccumulatorsInfo.class;
+       }
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public JobVertexMessageParameters getUnresolvedMessageParameters() {
+               return new JobVertexMessageParameters();
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.GET;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return URL;
+       }
+
+       public static SubtasksAllAccumulatorsHandlers getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/210abeeb/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java
new file mode 100644
index 0000000..ee2535f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfo.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import 
org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.Collection;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link SubtasksAllAccumulatorsHandler}.
+ */
+public class SubtasksAllAccumulatorsInfo implements ResponseBody {
+
+       public static final String FIELD_NAME_JOB_VERTEX_ID = "id";
+       public static final String FIELD_NAME_PARALLELISM = "parallelism";
+       public static final String FIELD_NAME_SUBTASKS = "subtasks";
+
+       @JsonProperty(FIELD_NAME_JOB_VERTEX_ID)
+       @JsonSerialize(using = JobVertexIDSerializer.class)
+       private final JobVertexID jobVertexId;
+
+       @JsonProperty(FIELD_NAME_PARALLELISM)
+       private final int parallelism;
+
+       @JsonProperty(FIELD_NAME_SUBTASKS)
+       private final Collection<SubtaskAccumulatorsInfo> 
subtaskAccumulatorsInfos;
+
+       @JsonCreator
+       public SubtasksAllAccumulatorsInfo(
+               @JsonDeserialize(using = JobVertexIDDeserializer.class) 
@JsonProperty(FIELD_NAME_JOB_VERTEX_ID) JobVertexID jobVertexId,
+               @JsonProperty(FIELD_NAME_PARALLELISM) int parallelism,
+               @JsonProperty(FIELD_NAME_SUBTASKS) 
Collection<SubtaskAccumulatorsInfo> subtaskAccumulatorsInfos) {
+               this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
+               this.parallelism = parallelism;
+               this.subtaskAccumulatorsInfos = 
Preconditions.checkNotNull(subtaskAccumulatorsInfos);
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               SubtasksAllAccumulatorsInfo that = 
(SubtasksAllAccumulatorsInfo) o;
+               return Objects.equals(jobVertexId, that.jobVertexId) &&
+                       parallelism == that.parallelism &&
+                       Objects.equals(subtaskAccumulatorsInfos, 
that.subtaskAccumulatorsInfos);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(jobVertexId, parallelism, 
subtaskAccumulatorsInfos);
+       }
+
+       // ---------------------------------------------------
+       // Static inner classes
+       // ---------------------------------------------------
+
+       /**
+        * Detailed information about subtask accumulators.
+        */
+       public static class SubtaskAccumulatorsInfo {
+               public static final String FIELD_NAME_SUBTASK_INDEX = "subtask";
+               public static final String FIELD_NAME_ATTEMPT_NUM = "attempt";
+               public static final String FIELD_NAME_HOST = "host";
+               public static final String FIELD_NAME_USER_ACCUMULATORS = 
"user-accumulators";
+
+
+               @JsonProperty(FIELD_NAME_SUBTASK_INDEX)
+               private final int subtaskIndex;
+
+               @JsonProperty(FIELD_NAME_ATTEMPT_NUM)
+               private final int attemptNum;
+
+               @JsonProperty(FIELD_NAME_HOST)
+               private final String host;
+
+               @JsonProperty(FIELD_NAME_USER_ACCUMULATORS)
+               private final Collection<UserAccumulator> userAccumulators;
+
+               @JsonCreator
+               public SubtaskAccumulatorsInfo(
+                       @JsonProperty(FIELD_NAME_SUBTASK_INDEX) int 
subtaskIndex,
+                       @JsonProperty(FIELD_NAME_ATTEMPT_NUM) int attemptNum,
+                       @JsonProperty(FIELD_NAME_HOST) String host,
+                       @JsonProperty(FIELD_NAME_USER_ACCUMULATORS) 
Collection<UserAccumulator> userAccumulators) {
+
+                       this.subtaskIndex = subtaskIndex;
+                       this.attemptNum = attemptNum;
+                       this.host = Preconditions.checkNotNull(host);
+                       this.userAccumulators = 
Preconditions.checkNotNull(userAccumulators);
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       SubtaskAccumulatorsInfo that = 
(SubtaskAccumulatorsInfo) o;
+                       return subtaskIndex == that.subtaskIndex &&
+                               attemptNum == that.attemptNum &&
+                               Objects.equals(host, that.host) &&
+                               Objects.equals(userAccumulators, 
that.userAccumulators);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(subtaskIndex, attemptNum, host, 
userAccumulators);
+               }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/210abeeb/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 0ea7550..1a67d92 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
 import 
org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.SubtasksAllAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
 import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
 import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
@@ -94,6 +95,7 @@ import 
org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders;
 import org.apache.flink.runtime.rest.messages.JobVertexDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersHeaders;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.SubtasksAllAccumulatorsHandlers;
 import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders;
 import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
 import org.apache.flink.runtime.rest.messages.YarnCancelJobTerminationHeaders;
@@ -318,6 +320,15 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                        executionGraphCache,
                        executor);
 
+               SubtasksAllAccumulatorsHandler subtasksAllAccumulatorsHandler = 
new SubtasksAllAccumulatorsHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders,
+                       SubtasksAllAccumulatorsHandlers.getInstance(),
+                       executionGraphCache,
+                       executor);
+
                TaskManagersHandler taskManagersHandler = new 
TaskManagersHandler(
                        restAddressFuture,
                        leaderRetriever,
@@ -575,6 +586,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                
handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), 
taskCheckpointStatisticDetailsHandler));
                handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), 
jobExceptionsHandler));
                
handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), 
jobVertexAccumulatorsHandler));
+               
handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), 
subtasksAllAccumulatorsHandler));
                handlers.add(Tuple2.of(JobDetailsHeaders.getInstance(), 
jobDetailsHandler));
                handlers.add(Tuple2.of(JobAccumulatorsHeaders.getInstance(), 
jobAccumulatorsHandler));
                handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), 
taskManagersHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/210abeeb/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java
new file mode 100644
index 0000000..2a71239
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtasksAllAccumulatorsInfoTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests (un)marshalling of the {@link SubtasksAllAccumulatorsInfo}.
+ */
+public class SubtasksAllAccumulatorsInfoTest extends 
RestResponseMarshallingTestBase<SubtasksAllAccumulatorsInfo> {
+       @Override
+       protected Class<SubtasksAllAccumulatorsInfo> getTestResponseClass() {
+               return SubtasksAllAccumulatorsInfo.class;
+       }
+
+       @Override
+       protected SubtasksAllAccumulatorsInfo getTestResponseInstance() throws 
Exception {
+               List<SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo> 
subtaskAccumulatorsInfos = new ArrayList<>(3);
+
+               List<UserAccumulator> userAccumulators = new ArrayList<>(2);
+               userAccumulators.add(new UserAccumulator("test name1", "test 
type1", "test value1"));
+               userAccumulators.add(new UserAccumulator("test name2", "test 
type2", "test value2"));
+
+               for (int i = 0; i < 3; ++i) {
+                       subtaskAccumulatorsInfos.add(
+                               new 
SubtasksAllAccumulatorsInfo.SubtaskAccumulatorsInfo(
+                                       i,
+                                       i,
+                                       "host-" + String.valueOf(i),
+                                       userAccumulators
+                               ));
+
+               }
+               return new SubtasksAllAccumulatorsInfo(new JobVertexID(),
+                       4,
+                       subtaskAccumulatorsInfos);
+       }
+}

Reply via email to