[jira] [Created] (FLINK-7347) "removeAll" is extremely inefficient in MessageAcknowledgingSourceBase.notifyCheckpointComplete
Yonatan Most created FLINK-7347: --- Summary: "removeAll" is extremely inefficient in MessageAcknowledgingSourceBase.notifyCheckpointComplete Key: FLINK-7347 URL: https://issues.apache.org/jira/browse/FLINK-7347 Project: Flink Issue Type: Improvement Components: DataStream API Affects Versions: 1.3.1 Reporter: Yonatan Most Observe this line in {{MessageAcknowledgingSourceBase.notifyCheckpointComplete}}: {code} idsProcessedButNotAcknowledged.removeAll(checkpoint.f1); {code} The implementation of {{removeAll}} is such that if the set is smaller than the collection to remove, then the set is iterated and every item is checked for containment in the collection. The type of {{checkpoint.f1}} here is {{ArrayList}}, so the {{contains}} action is very inefficient, and it is performed for every item in {{idsProcessedButNotAcknowledged}}. In our pipeline we had about 10 million events processed, and the checkpoint was stuck on the {{removeAll}} call for hours. A simple solution is to make {{idsForCurrentCheckpoint}} a {{HashSet}} instead of an {{ArrayList}}. The fact that it's a list is not really used anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6717) NullPointerException from MessageAcknowledgingSourceBase
Yonatan Most created FLINK-6717: --- Summary: NullPointerException from MessageAcknowledgingSourceBase Key: FLINK-6717 URL: https://issues.apache.org/jira/browse/FLINK-6717 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.2.1 Environment: Ubuntu 12.04.4 java version "1.8.0_111" Reporter: Yonatan Most Priority: Trivial It seems that if {{close}} is called before {{initializeState}}, then {{idsForCurrentCheckpoint}} is not initialized. {code} java.lang.NullPointerException: null at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.close(MessageAcknowledgingSourceBase.java:170) ~[flink-dist_2.10-1.2.0.jar:1.2.0] at org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.close(MultipleIdsMessageAcknowledgingSourceBase.java:94) ~[flink-dist_2.10-1.2.0.jar:1.2.0] at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:179) ~[blob_e8e6ccdf6cefe7e6370db4b1b2753baf9e977a24:na] at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) ~[flink-dist_2.10-1.2.0.jar:1.2.0] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127) ~[flink-dist_2.10-1.2.0.jar:1.2.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442) [flink-dist_2.10-1.2.0.jar:1.2.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343) [flink-dist_2.10-1.2.0.jar:1.2.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) [flink-dist_2.10-1.2.0.jar:1.2.0] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111] {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)