Hi everyone,
I was looking into serializing the ArchivedExecutionGraph for another FLIP
and came across Accumulators [1] (don't mix that one up with the window
accumulators of the Table/SQL API). Accumulators were introduced in Flink
quite a while ago in Statosphere PR #340 [2].

I had a brief chat with Chesnay about it who pointed out that there was an
intention to use this for collecting metrics in the past. The Accumulator
JavaDoc provides a hint that it was inspired by Hadoop's Counter concept
[3] which also sounds like it is more or less equivalent to Flink's metrics.

The Accumulator is currently accessible through the RuntimeContext
interface which provides addAccumuator [4] and getAccumulator [5]. Usages
for these messages appear in the following classes:
- CollectSinkFunction [6]: Here it's used to collect the final data when
closing the function. This feels like a misuse of the feature. Instead, the
CollectSink could block the close call until all data was fetched from the
client program.
- DataSet.collect() [7]: Uses CollectHelper utilizes
SerializedListAccumulator to collect the final data similarly to
CollectSinkFunction
- EmptyFieldsCountAccumulator [8] is an example program that counts empty
fields. This could be migrated to MetricGroups
- ChecksumHashCodeHelper [9] is used in DataSetUtils where the calling
method is marked as deprecated for 2.0 already
- CollectOutputFormat [10] uses SerializedListAccumulator analogously to
DataSet.collect(). This class will be removed with the removal of the Scala
API in 2.0.

The initial investigation brings me to the conclusion that we can remove
the Accumulator feature in favor of Metrics and proper collect
implementations: That would also help cleaning up the
(Archived)ExecutionGraph: IMHO, we should have a clear separation between
Metrics (which are part of the ExecutionGraph) and processed data (which
shouldn't be part of the ExecutionGraph).

I'm curious what others think about this. Did I miss a scenario where
Accumulators are actually needed? Or is this already part of some other 2.0
effort [11] which I missed? I would suggest removing it could be a
nice-to-have item for 2.0.

Best,
Matthias



[1]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java
<https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Accumulator.java#L40>
[2] https://github.com/stratosphere/stratosphere/pull/340
[3]
https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Counters.html
[4]
https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L156
[5]
https://github.com/apache/flink/blob/63ee60859cac64f2bc6cfe2c5015ceb1199cea9c/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java#L165

[6]
https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L304
[7]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L145
[8]
https://github.com/apache/flink/blob/aa98c18d2ba975479fcfa4930b0139fa575d303e/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/EmptyFieldsCountAccumulator.java#L156
[9]
https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-java/src/main/java/org/apache/flink/api/java/Utils.java#L256
[10]
https://github.com/apache/flink/blob/91d81c427aa6312841ca868d54e8ce6ea721cd60/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/sinks/CollectTableSink.scala#L70

[11] https://cwiki.apache.org/confluence/display/FLINK/2.0+Release

-- 

[image: Aiven] <https://www.aiven.io>

*Matthias Pohl*
Opensource Software Engineer, *Aiven*
matthias.p...@aiven.io <i...@aiven.io>   |  +49 170 9869525
aiven.io <https://www.aiven.io>   |   <https://www.facebook.com/aivencloud>
  <https://www.linkedin.com/company/aiven/>   <https://twitter.com/aiven_io>
*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B

Reply via email to