[ https://issues.apache.org/jira/browse/FLINK-3880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274298#comment-15274298 ]
Maximilian Michels commented on FLINK-3880: ------------------------------------------- You're right, the synchronized map is a bottle neck. Actually, it is not even necessary that it synchronizes. In a regular Flink job, it can only be accessed by one task at a time. Only if the user spawned additional threads, it could be concurrently modified. In this case the user would have to take care of the synchronization (and if not get a ConcurrentModificationException). So we can simply make it a normal map. > Use ConcurrentHashMap for Accumulators > -------------------------------------- > > Key: FLINK-3880 > URL: https://issues.apache.org/jira/browse/FLINK-3880 > Project: Flink > Issue Type: Improvement > Affects Versions: 1.1.0 > Reporter: Ken Krugler > Priority: Minor > > I was looking at improving DataSet performance - this is for a job created > using the Cascading-Flink planner for Cascading 3.1. > While doing a quick "poor man's profiler" session with one of the TaskManager > processes, I noticed that many (most?) of the threads that were actually > running were in this state: > {code:java} > "DataSource (/working1/terms) (8/20)" daemon prio=10 tid=0x00007f55673e0800 > nid=0x666a runnable [0x00007f556abcf000] > java.lang.Thread.State: RUNNABLE > at java.util.Collections$SynchronizedMap.get(Collections.java:2037) > - locked <0x00000006e73fe718> (a java.util.Collections$SynchronizedMap) > at > org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.getAccumulator(AbstractRuntimeUDFContext.java:162) > at > org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.getLongCounter(AbstractRuntimeUDFContext.java:113) > at > com.dataartisans.flink.cascading.runtime.util.FlinkFlowProcess.getOrInitCounter(FlinkFlowProcess.java:245) > at > com.dataartisans.flink.cascading.runtime.util.FlinkFlowProcess.increment(FlinkFlowProcess.java:128) > at > com.dataartisans.flink.cascading.runtime.util.FlinkFlowProcess.increment(FlinkFlowProcess.java:122) > at > cascading.tap.hadoop.util.MeasuredRecordReader.next(MeasuredRecordReader.java:65) > at cascading.scheme.hadoop.SequenceFile.source(SequenceFile.java:97) > at > cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.java:166) > at > cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:139) > at > com.dataartisans.flink.cascading.runtime.source.TapSourceStage.readNextRecord(TapSourceStage.java:70) > at > com.dataartisans.flink.cascading.runtime.source.TapInputFormat.reachedEnd(TapInputFormat.java:175) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745)}}} > {code} > It looks like Cascading is asking Flink to increment a counter with each > Tuple read, and that in turn is often blocked on getting access to the > Accumulator object in a map. It looks like this is a SynchronizedMap, but > using a ConcurrentHashMap (for example) would reduce this contention. -- This message was sent by Atlassian JIRA (v6.3.4#6332)