Robert Metzger created FLINK-2008:
-------------------------------------

             Summary: PersistentKafkaSource is sometimes emitting tuples 
multiple times
                 Key: FLINK-2008
                 URL: https://issues.apache.org/jira/browse/FLINK-2008
             Project: Flink
          Issue Type: Bug
          Components: Kafka Connector, Streaming
    Affects Versions: 0.9
            Reporter: Robert Metzger
            Assignee: Robert Metzger


The PersistentKafkaSource is expected to emit records exactly once.

Two test cases of the KafkaITCase are sporadically failing because records are 
emitted multiple times.
Affected tests:
{{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been 
changed manually in ZK:
{code}
java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 
array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2]
{code}

{{brokerFailureTest()}} also fails:
{code}
05/13/2015 08:13:16     Custom source -> Stream Sink(1/1) switched to FAILED 
java.lang.AssertionError: Received tuple with value 21 twice
        at org.junit.Assert.fail(Assert.java:88)
        at org.junit.Assert.assertTrue(Assert.java:41)
        at org.junit.Assert.assertFalse(Assert.java:64)
        at 
org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877)
        at 
org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859)
        at 
org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39)
        at 
org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
        at 
org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54)
        at 
org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
        at 
org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173)
        at 
org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40)
        at 
org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to