[ 
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15862844#comment-15862844
 ] 

ASF GitHub Bot commented on FLINK-5701:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3278#discussion_r100695754
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
 ---
    @@ -88,195 +87,296 @@ public void testKeyValueDeserializersSetIfMissing() 
throws Exception {
        @Test
        public void testPartitionerOpenedWithDeterminatePartitionList() throws 
Exception {
                KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
    +
                RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
                when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
                
when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
    +           
    +           // out-of-order list of 4 partitions
    +           List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
    +           mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
    +           mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
    +           mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
    +           mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
    +           
    +           KafkaProducer mockProducer = mock(KafkaProducer.class);
    +           
when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
    +           when(mockProducer.metrics()).thenReturn(null);
     
                DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
    -                   FakeStandardProducerConfig.get(), mockPartitioner);
    +                   FakeStandardProducerConfig.get(), mockPartitioner, 
mockProducer);
                producer.setRuntimeContext(mockRuntimeContext);
     
                producer.open(new Configuration());
     
    -           // the internal mock KafkaProducer will return an out-of-order 
list of 4 partitions,
    -           // which should be sorted before provided to the custom 
partitioner's open() method
    +           // the out-of-order partitions list should be sorted before 
provided to the custom partitioner's open() method
                int[] correctPartitionList = {0, 1, 2, 3};
                verify(mockPartitioner).open(0, 1, correctPartitionList);
        }
     
        /**
    -    * Test ensuring that the producer is not dropping buffered records.;
    -    * we set a timeout because the test will not finish if the logic is 
broken
    +    * Test ensuring that if an invoke call happens right after an async 
exception is caught, it should be rethrown
         */
    -   @Test(timeout=5000)
    -   public void testAtLeastOnceProducer() throws Throwable {
    -           runAtLeastOnceTest(true);
    +   @Test
    +   public void testAsyncErrorRethrownOnInvoke() throws Throwable {
    +           KafkaProducer mockProducer = mock(KafkaProducer.class);
    +           final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    +                   FakeStandardProducerConfig.get(), null, mockProducer);
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +           // let the message request return an async exception
    +           producer.getPendingCallbacks().get(0).onCompletion(null, new 
Exception("artificial async exception"));
    +
    +           try {
    +                   testHarness.processElement(new StreamRecord<>("msg-2"));
    +           } catch (Exception e) {
    +                   // the next invoke should rethrow the async exception
    +                   
Assert.assertTrue(e.getCause().getMessage().contains("artificial async 
exception"));
    +                   return;
    +           }
    +
    +           Assert.fail();
        }
     
        /**
    -    * Ensures that the at least once producing test fails if the flushing 
is disabled
    +    * Test ensuring that if a snapshot call happens right after an async 
exception is caught, it should be rethrown
         */
    -   @Test(expected = AssertionError.class, timeout=5000)
    -   public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws 
Throwable {
    -           runAtLeastOnceTest(false);
    -   }
    -
    -   private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws 
Throwable {
    -           final AtomicBoolean snapshottingFinished = new 
AtomicBoolean(false);
    +   @Test
    +   public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
    +           KafkaProducer mockProducer = mock(KafkaProducer.class);
                final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    -                   FakeStandardProducerConfig.get(), null, 
snapshottingFinished);
    -           producer.setFlushOnCheckpoint(flushOnCheckpoint);
    +                   FakeStandardProducerConfig.get(), null, mockProducer);
     
                OneInputStreamOperatorTestHarness<String, Object> testHarness =
    -                           new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
     
                testHarness.open();
     
    -           for (int i = 0; i < 100; i++) {
    -                   testHarness.processElement(new StreamRecord<>("msg-" + 
i));
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +
    +           // let the message request return an async exception
    +           producer.getPendingCallbacks().get(0).onCompletion(null, new 
Exception("artificial async exception"));
    +
    +           try {
    +                   testHarness.snapshot(123L, 123L);
    +           } catch (Exception e) {
    +                   // the next invoke should rethrow the async exception
    +                   
Assert.assertTrue(e.getCause().getMessage().contains("artificial async 
exception"));
    +                   return;
                }
     
    -           // start a thread confirming all pending records
    -           final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    -           final Thread threadA = Thread.currentThread();
    +           Assert.fail();
    +   }
     
    -           Runnable confirmer = new Runnable() {
    +   /**
    +    * Test ensuring that if an async exception is caught for one of the 
flushed requests on checkpoint,
    +    * it should be rethrown; we set a timeout because the test will not 
finish if the logic is broken.
    +    *
    +    * Note that this test does not test the snapshot method is blocked 
correctly when there are pending recorrds.
    +    * The test for that is covered in testAtLeastOnceProducer.
    +    */
    +   @Test(timeout=5000)
    +   public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws 
Throwable {
    +           KafkaProducer mockProducer = mock(KafkaProducer.class);
    +           final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    +                   FakeStandardProducerConfig.get(), null, mockProducer);
    +           producer.setFlushOnCheckpoint(true);
    +
    +           final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +           testHarness.processElement(new StreamRecord<>("msg-2"));
    +           testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +           verify(mockProducer, times(3)).send(any(ProducerRecord.class), 
any(Callback.class));
    +
    +           // only let the first callback succeed for now
    +           producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +
    +           final Tuple1<Throwable> asyncError = new Tuple1<>(null);
    +           Thread snapshotThread = new Thread(new Runnable() {
                        @Override
                        public void run() {
                                try {
    -                                   MockProducer mp = 
producer.getProducerInstance();
    -                                   List<Callback> pending = 
mp.getPending();
    -
    -                                   // we need to find out if the 
snapshot() method blocks forever
    -                                   // this is not possible. If snapshot() 
is running, it will
    -                                   // start removing elements from the 
pending list.
    -                                   synchronized (threadA) {
    -                                           threadA.wait(500L);
    -                                   }
    -                                   // we now check that no records have 
been confirmed yet
    -                                   Assert.assertEquals(100, 
pending.size());
    -                                   Assert.assertFalse("Snapshot method 
returned before all records were confirmed",
    -                                           snapshottingFinished.get());
    -
    -                                   // now confirm all checkpoints
    -                                   for (Callback c: pending) {
    -                                           c.onCompletion(null, null);
    -                                   }
    -                                   pending.clear();
    -                           } catch(Throwable t) {
    -                                   runnableError.f0 = t;
    +                                   // this should block at first, since 
there are still two pending records that needs to be flushed
    +                                   testHarness.snapshot(123L, 123L);
    +                           } catch (Exception e) {
    +                                   asyncError.f0 = e;
                                }
                        }
    -           };
    -           Thread threadB = new Thread(confirmer);
    -           threadB.start();
    +           });
    +           snapshotThread.start();
     
    -           // this should block:
    -           testHarness.snapshot(0, 0);
    +           // let the 2nd message fail with an async exception
    +           producer.getPendingCallbacks().get(1).onCompletion(null, new 
Exception("artificial async failure for 2nd message"));
    +           producer.getPendingCallbacks().get(2).onCompletion(null, null);
     
    -           synchronized (threadA) {
    -                   threadA.notifyAll(); // just in case, to let the test 
fail faster
    -           }
    -           Assert.assertEquals(0, 
producer.getProducerInstance().getPending().size());
    -           Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
    -           while (deadline.hasTimeLeft() && threadB.isAlive()) {
    -                   threadB.join(500);
    +           snapshotThread.join();
    +
    +           // the snapshot should have failed with the async exception
    +           Assert.assertTrue(asyncError.f0 != null && 
asyncError.f0.getCause().getMessage().contains("artificial async failure for 
2nd message"));
    +   }
    +
    +   /**
    +    * Test ensuring that the producer is not dropping buffered records;
    +    * we set a timeout because the test will not finish if the logic is 
broken
    +    */
    +   @Test(timeout=10000)
    +   public void testAtLeastOnceProducer() throws Throwable {
    +
    +           KafkaProducer mockProducer = mock(KafkaProducer.class);
    +           final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    +                   FakeStandardProducerConfig.get(), null, mockProducer);
    +           producer.setFlushOnCheckpoint(true);
    +
    +           final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
    +
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("msg-1"));
    +           testHarness.processElement(new StreamRecord<>("msg-2"));
    +           testHarness.processElement(new StreamRecord<>("msg-3"));
    +
    +           verify(mockProducer, times(3)).send(any(ProducerRecord.class), 
any(Callback.class));
    +           Assert.assertEquals(3, producer.getPendingSize());
    +
    +           // start a thread to perform checkpointing
    +           final Tuple1<Throwable> runnableError = new Tuple1<>(null);
    +           final OneShotLatch snapshotReturnedLatch = new OneShotLatch();
    +
    +           Thread snapshotThread = new Thread(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           try {
    +                                   // this should block until all records 
are flushed
    +                                   testHarness.snapshot(123L, 123L);
    +                           } catch (Throwable e) {
    +                                   runnableError.f0 = e;
    +                           } finally {
    +                                   snapshotReturnedLatch.trigger();
    +                           }
    +                   }
    +           });
    +           snapshotThread.start();
    +
    +           // being extra safe that the snapshot is correctly blocked
    +           try {
    +                   snapshotReturnedLatch.await(3, TimeUnit.SECONDS);
    +           } catch (TimeoutException expected) {
    +                   //
                }
    -           Assert.assertFalse("Thread A is expected to be finished at this 
point. If not, the test is prone to fail", threadB.isAlive());
    +           Assert.assertTrue("Snapshot returned before all records were 
flushed", !snapshotReturnedLatch.isTriggered());
    +
    +           producer.getPendingCallbacks().get(0).onCompletion(null, null);
    +           Assert.assertTrue("Snapshot returned before all records were 
flushed", !snapshotReturnedLatch.isTriggered());
    +           Assert.assertEquals(2, producer.getPendingSize());
    +
    +           producer.getPendingCallbacks().get(1).onCompletion(null, null);
    +           Assert.assertTrue("Snapshot returned before all records were 
flushed", !snapshotReturnedLatch.isTriggered());
    +           Assert.assertEquals(1, producer.getPendingSize());
    +
    +           producer.getPendingCallbacks().get(2).onCompletion(null, null);
    +           Assert.assertEquals(0, producer.getPendingSize());
    +
    +           snapshotReturnedLatch.await();
    +           snapshotThread.join();
    +
                if (runnableError.f0 != null) {
                        throw runnableError.f0;
                }
     
                testHarness.close();
        }
     
    +   /**
    +    * This test is meant to assure that testAtLeastOnceProducer is valid 
by testing that if flushing is disabled,
    +    * the snapshot method does indeed finishes without waiting for pending 
records;
    +    * we set a timeout because the test will not finish if the logic is 
broken
    +    */
    +   @Test(timeout=5000)
    +   public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws 
Throwable {
     
    -   // 
------------------------------------------------------------------------
    +           KafkaProducer mockProducer = mock(KafkaProducer.class);
    +           final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
    +                   FakeStandardProducerConfig.get(), null, mockProducer);
    +           producer.setFlushOnCheckpoint(false);
     
    -   private static class DummyFlinkKafkaProducer<T> extends 
FlinkKafkaProducerBase<T> {
    -           private static final long serialVersionUID = 1L;
    +           final OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink(producer));
     
    -           private transient MockProducer prod;
    -           private AtomicBoolean snapshottingFinished;
    +           testHarness.open();
     
    -           @SuppressWarnings("unchecked")
    -           public DummyFlinkKafkaProducer(Properties producerConfig, 
KafkaPartitioner partitioner, AtomicBoolean snapshottingFinished) {
    -                   super("dummy-topic", (KeyedSerializationSchema< T >) 
mock(KeyedSerializationSchema.class), producerConfig, partitioner);
    -                   this.snapshottingFinished = snapshottingFinished;
    -           }
    +           testHarness.processElement(new StreamRecord<>("msg"));
     
    -           // constructor variant for test irrelated to snapshotting
    -           @SuppressWarnings("unchecked")
    -           public DummyFlinkKafkaProducer(Properties producerConfig, 
KafkaPartitioner partitioner) {
    -                   super("dummy-topic", (KeyedSerializationSchema< T >) 
mock(KeyedSerializationSchema.class), producerConfig, partitioner);
    -                   this.snapshottingFinished = new AtomicBoolean(true);
    -           }
    +           // make sure that all callbacks have not been completed
    +           verify(mockProducer, times(1)).send(any(ProducerRecord.class), 
any(Callback.class));
     
    -           @Override
    -           protected <K, V> KafkaProducer<K, V> 
getKafkaProducer(Properties props) {
    -                   this.prod = new MockProducer();
    -                   return this.prod;
    -           }
    +           // should return even if there are pending records
    +           testHarness.snapshot(123L, 123L);
     
    -           @Override
    -           public void snapshotState(FunctionSnapshotContext ctx) throws 
Exception {
    -                   // call the actual snapshot state
    -                   super.snapshotState(ctx);
    -                   // notify test that snapshotting has been done
    -                   snapshottingFinished.set(true);
    -           }
    +           testHarness.close();
    +   }
     
    -           @Override
    -           protected void flush() {
    -                   this.prod.flush();
    -           }
    +   // 
------------------------------------------------------------------------
     
    -           public MockProducer getProducerInstance() {
    -                   return this.prod;
    -           }
    -   }
    +   private static class DummyFlinkKafkaProducer<T> extends 
FlinkKafkaProducerBase<T> {
    +           private static final long serialVersionUID = 1L;
    +           
    +           private final static String DUMMY_TOPIC = "dummy-topic";
     
    -   private static class MockProducer<K, V> extends KafkaProducer<K, V> {
    -           List<Callback> pendingCallbacks = new ArrayList<>();
    +           private final KafkaProducer mockProducer;
    +           private final List<Callback> pendingCallbacks = new 
ArrayList<>();
    --- End diff --
    
    `Callback` is not serializable.


> FlinkKafkaProducer should check asyncException on checkpoints
> -------------------------------------------------------------
>
>                 Key: FLINK-5701
>                 URL: https://issues.apache.org/jira/browse/FLINK-5701
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Critical
>
> Reported in ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each 
> invoke() and decremented on each callback, used to check if the producer 
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff 
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}} 
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the 
> {{snapshotState}} method both before and after flushing and 
> {{pendingRecords}} becomes 0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to