Hi everyone, I was looking into serializing the ArchivedExecutionGraph for another FLIP and came across Accumulators [1] (don't mix that one up with the window accumulators of the Table/SQL API). Accumulators were introduced in Flink quite a while ago in Statosphere PR #340 [2].
I had a brief chat with Chesnay about it who pointed out that there was an intention to use this for collecting metrics in the past. The Accumulator JavaDoc provides a hint that it was inspired by Hadoop's Counter concept [3] which also sounds like it is more or less equivalent to Flink's metrics. The Accumulator is currently accessible through the RuntimeContext interface which provides addAccumuator [4] and getAccumulator [5]. Usages for these messages appear in the following classes: - CollectSinkFunction [6]: Here it's used to collect the final data when closing the function. This feels like a misuse of the feature. Instead, the CollectSink could block the close call until all data was fetched from the client program. - DataSet.collect() [7]: Uses CollectHelper utilizes SerializedListAccumulator to collect the final data similarly to CollectSinkFunction - EmptyFieldsCountAccumulator [8] is an example program that counts empty fields. This could be migrated to MetricGroups - ChecksumHashCodeHelper [9] is used in DataSetUtils where the calling method is marked as deprecated for 2.0 already - CollectOutputFormat [10] uses SerializedListAccumulator analogously to DataSet.collect(). This class will be removed with the removal of the Scala API in 2.0. The initial investigation brings me to the conclusion that we can remove the Accumulator feature in favor of Metrics and proper collect implementations: That would also help cleaning up the (Archived)ExecutionGraph: IMHO, we should have a clear separation between Metrics (which are part of the ExecutionGraph) and processed data (which shouldn't be part of the ExecutionGraph). I'm curious what others think about this. Did I miss a scenario where Accumulators are actually needed? Or is this already part of some other 2.0 effort [11] which I missed? I would suggest removing it could be a nice-to-have item for 2.0. Best, Matthias [1] https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java <https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java#L40> [2] https://github.com/stratosphere/stratosphere/pull/340 [3] https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Counters.html [4] https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L156 [5] https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L165 [6] https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L304 [7] https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L145 [8] https://github.com/apache/flink/blob/aa98c18d2ba975479fcfa4930b0139fa575d303e/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java#L156 [9] https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L256 [10] https://github.com/apache/flink/blob/91d81c427aa6312841ca868d54e8ce6ea721cd60/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/sinks/CollectTableSink.scala#L70 [11] https://cwiki.apache.org/confluence/display/FLINK/2.0+Release -- [image: Aiven] <https://www.aiven.io> *Matthias Pohl* Opensource Software Engineer, *Aiven* matthias.p...@aiven.io <i...@aiven.io> | +49 170 9869525 aiven.io <https://www.aiven.io> | <https://www.facebook.com/aivencloud> <https://www.linkedin.com/company/aiven/> <https://twitter.com/aiven_io> *Aiven Deutschland GmbH* Alexanderufer 3-7, 10117 Berlin Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen Amtsgericht Charlottenburg, HRB 209739 B