[ https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15857810#comment-15857810 ]
ASF GitHub Bot commented on FLINK-5701: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3278#discussion_r100035474 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java --- @@ -293,6 +293,61 @@ public void run() { testHarness.close(); } + /** + * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled, + * the snapshot method does indeed finishes without waiting for pending records; + * we set a timeout because the test will not finish if the logic is broken + */ + @Test(timeout=5000) + public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable { + final OneShotLatch inputLatch = new OneShotLatch(); + + final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( + FakeStandardProducerConfig.get(), null, inputLatch, 100, new AtomicBoolean(false)); + producer.setFlushOnCheckpoint(false); + + final OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink(producer)); + + testHarness.open(); + + List<Callback> pending = producer.getProducerInstance().getPending(); + + for (int i = 0; i < 100; i++) { + testHarness.processElement(new StreamRecord<>("msg-" + i)); + } --- End diff -- 1 is enough, will change this. > FlinkKafkaProducer should check asyncException on checkpoints > ------------------------------------------------------------- > > Key: FLINK-5701 > URL: https://issues.apache.org/jira/browse/FLINK-5701 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Streaming Connectors > Reporter: Tzu-Li (Gordon) Tai > Priority: Critical > > Reported in ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html > The problem: > The producer holds a {{pendingRecords}} value that is incremented on each > invoke() and decremented on each callback, used to check if the producer > needs to sync on pending callbacks on checkpoints. > On each checkpoint, we should only consider the checkpoint succeeded iff > after flushing the {{pendingRecords == 0}} and {{asyncException == null}} > (currently, we’re only checking {{pendingRecords}}). > A quick fix for this is to check and rethrow async exceptions in the > {{snapshotState}} method both before and after flushing and > {{pendingRecords}} becomes 0. -- This message was sent by Atlassian JIRA (v6.3.15#6346)