Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5573#discussion_r172756524 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -389,6 +393,27 @@ public String cancelWithSavepoint(JobID jobId, @Nullable String savepointDirecto }); } + @Override + public Map<String, Object> getAccumulators(final JobID jobID) throws Exception { + final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance(); + final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters(); + accMsgParams.jobPathParameter.resolve(jobID); + accMsgParams.queryParameter.resolve(Collections.singletonList(true)); + + CompletableFuture<JobAccumulatorsInfo> responseFuture = sendRequest( + accumulatorsHeaders, + accMsgParams + ); + + return responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> { + if (accumulatorsInfo != null && accumulatorsInfo.getSerializedUserAccumulators() != null) { + return accumulatorsInfo.getSerializedUserAccumulators(); --- End diff -- the accumulators should be deserialized via `SerializedValue#deserialize(ClassLoader)` . If `Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader)` (that also should be overridden) was called use the passed in `ClassLoader`, otherwise `ClassLoader.getSystemClassLoader()`.
---