[jira] [Created] (FLINK-7347) "removeAll" is extremely inefficient in MessageAcknowledgingSourceBase.notifyCheckpointComplete

2017-08-02 Thread Yonatan Most (JIRA)
Yonatan Most created FLINK-7347:
---

 Summary: "removeAll" is extremely inefficient in 
MessageAcknowledgingSourceBase.notifyCheckpointComplete
 Key: FLINK-7347
 URL: https://issues.apache.org/jira/browse/FLINK-7347
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Affects Versions: 1.3.1
Reporter: Yonatan Most


Observe this line in 
{{MessageAcknowledgingSourceBase.notifyCheckpointComplete}}:
{code}
idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
{code}

The implementation of {{removeAll}} is such that if the set is smaller than the 
collection to remove, then the set is iterated and every item is checked for 
containment in the collection. The type of {{checkpoint.f1}} here is 
{{ArrayList}}, so the {{contains}} action is very inefficient, and it is 
performed for every item in {{idsProcessedButNotAcknowledged}}.
In our pipeline we had about 10 million events processed, and the checkpoint 
was stuck on the {{removeAll}} call for hours.
A simple solution is to make {{idsForCurrentCheckpoint}} a {{HashSet}} instead 
of an {{ArrayList}}. The fact that it's a list is not really used anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-6717) NullPointerException from MessageAcknowledgingSourceBase

2017-05-25 Thread Yonatan Most (JIRA)
Yonatan Most created FLINK-6717:
---

 Summary: NullPointerException from MessageAcknowledgingSourceBase
 Key: FLINK-6717
 URL: https://issues.apache.org/jira/browse/FLINK-6717
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.2.1
 Environment: Ubuntu 12.04.4 
java version "1.8.0_111"
Reporter: Yonatan Most
Priority: Trivial


It seems that if {{close}} is called before {{initializeState}}, then 
{{idsForCurrentCheckpoint}} is not initialized. 
{code}
java.lang.NullPointerException: null
at 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.close(MessageAcknowledgingSourceBase.java:170)
 ~[flink-dist_2.10-1.2.0.jar:1.2.0]
at 
org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase.close(MultipleIdsMessageAcknowledgingSourceBase.java:94)
 ~[flink-dist_2.10-1.2.0.jar:1.2.0]
at 
org.apache.flink.streaming.connectors.rabbitmq.RMQSource.close(RMQSource.java:179)
 ~[blob_e8e6ccdf6cefe7e6370db4b1b2753baf9e977a24:na]
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
 ~[flink-dist_2.10-1.2.0.jar:1.2.0]
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:127)
 ~[flink-dist_2.10-1.2.0.jar:1.2.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442)
 [flink-dist_2.10-1.2.0.jar:1.2.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343) 
[flink-dist_2.10-1.2.0.jar:1.2.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) 
[flink-dist_2.10-1.2.0.jar:1.2.0]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)