Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3278#discussion_r100695724 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java --- @@ -88,195 +87,296 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception { @Test public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception { KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class); + RuntimeContext mockRuntimeContext = mock(RuntimeContext.class); when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0); when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1); + + // out-of-order list of 4 partitions + List<PartitionInfo> mockPartitionsList = new ArrayList<>(4); + mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null)); + mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null)); + mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null)); + mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null)); + + KafkaProducer mockProducer = mock(KafkaProducer.class); + when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList); + when(mockProducer.metrics()).thenReturn(null); DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer( - FakeStandardProducerConfig.get(), mockPartitioner); + FakeStandardProducerConfig.get(), mockPartitioner, mockProducer); producer.setRuntimeContext(mockRuntimeContext); producer.open(new Configuration()); - // the internal mock KafkaProducer will return an out-of-order list of 4 partitions, - // which should be sorted before provided to the custom partitioner's open() method + // the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method int[] correctPartitionList = {0, 1, 2, 3}; verify(mockPartitioner).open(0, 1, correctPartitionList); } /** - * Test ensuring that the producer is not dropping buffered records.; - * we set a timeout because the test will not finish if the logic is broken + * Test ensuring that if an invoke call happens right after an async exception is caught, it should be rethrown */ - @Test(timeout=5000) - public void testAtLeastOnceProducer() throws Throwable { - runAtLeastOnceTest(true); + @Test + public void testAsyncErrorRethrownOnInvoke() throws Throwable { + KafkaProducer mockProducer = mock(KafkaProducer.class); + final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( + FakeStandardProducerConfig.get(), null, mockProducer); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink(producer)); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("msg-1")); + + // let the message request return an async exception + producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception")); + + try { + testHarness.processElement(new StreamRecord<>("msg-2")); + } catch (Exception e) { + // the next invoke should rethrow the async exception + Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception")); + return; + } + + Assert.fail(); } /** - * Ensures that the at least once producing test fails if the flushing is disabled + * Test ensuring that if a snapshot call happens right after an async exception is caught, it should be rethrown */ - @Test(expected = AssertionError.class, timeout=5000) - public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws Throwable { - runAtLeastOnceTest(false); - } - - private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws Throwable { - final AtomicBoolean snapshottingFinished = new AtomicBoolean(false); + @Test + public void testAsyncErrorRethrownOnCheckpoint() throws Throwable { + KafkaProducer mockProducer = mock(KafkaProducer.class); final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( - FakeStandardProducerConfig.get(), null, snapshottingFinished); - producer.setFlushOnCheckpoint(flushOnCheckpoint); + FakeStandardProducerConfig.get(), null, mockProducer); OneInputStreamOperatorTestHarness<String, Object> testHarness = - new OneInputStreamOperatorTestHarness<>(new StreamSink(producer)); + new OneInputStreamOperatorTestHarness<>(new StreamSink(producer)); testHarness.open(); - for (int i = 0; i < 100; i++) { - testHarness.processElement(new StreamRecord<>("msg-" + i)); + testHarness.processElement(new StreamRecord<>("msg-1")); + + // let the message request return an async exception + producer.getPendingCallbacks().get(0).onCompletion(null, new Exception("artificial async exception")); + + try { + testHarness.snapshot(123L, 123L); + } catch (Exception e) { + // the next invoke should rethrow the async exception + Assert.assertTrue(e.getCause().getMessage().contains("artificial async exception")); + return; } - // start a thread confirming all pending records - final Tuple1<Throwable> runnableError = new Tuple1<>(null); - final Thread threadA = Thread.currentThread(); + Assert.fail(); + } - Runnable confirmer = new Runnable() { + /** + * Test ensuring that if an async exception is caught for one of the flushed requests on checkpoint, + * it should be rethrown; we set a timeout because the test will not finish if the logic is broken. + * + * Note that this test does not test the snapshot method is blocked correctly when there are pending recorrds. + * The test for that is covered in testAtLeastOnceProducer. + */ + @Test(timeout=5000) + public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable { + KafkaProducer mockProducer = mock(KafkaProducer.class); + final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( + FakeStandardProducerConfig.get(), null, mockProducer); + producer.setFlushOnCheckpoint(true); + + final OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink(producer)); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("msg-1")); + testHarness.processElement(new StreamRecord<>("msg-2")); + testHarness.processElement(new StreamRecord<>("msg-3")); + + verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class)); + + // only let the first callback succeed for now + producer.getPendingCallbacks().get(0).onCompletion(null, null); + + final Tuple1<Throwable> asyncError = new Tuple1<>(null); + Thread snapshotThread = new Thread(new Runnable() { @Override public void run() { try { - MockProducer mp = producer.getProducerInstance(); - List<Callback> pending = mp.getPending(); - - // we need to find out if the snapshot() method blocks forever - // this is not possible. If snapshot() is running, it will - // start removing elements from the pending list. - synchronized (threadA) { - threadA.wait(500L); - } - // we now check that no records have been confirmed yet - Assert.assertEquals(100, pending.size()); - Assert.assertFalse("Snapshot method returned before all records were confirmed", - snapshottingFinished.get()); - - // now confirm all checkpoints - for (Callback c: pending) { - c.onCompletion(null, null); - } - pending.clear(); - } catch(Throwable t) { - runnableError.f0 = t; + // this should block at first, since there are still two pending records that needs to be flushed + testHarness.snapshot(123L, 123L); + } catch (Exception e) { + asyncError.f0 = e; } } - }; - Thread threadB = new Thread(confirmer); - threadB.start(); + }); + snapshotThread.start(); - // this should block: - testHarness.snapshot(0, 0); + // let the 2nd message fail with an async exception + producer.getPendingCallbacks().get(1).onCompletion(null, new Exception("artificial async failure for 2nd message")); + producer.getPendingCallbacks().get(2).onCompletion(null, null); - synchronized (threadA) { - threadA.notifyAll(); // just in case, to let the test fail faster - } - Assert.assertEquals(0, producer.getProducerInstance().getPending().size()); - Deadline deadline = FiniteDuration.apply(5, "s").fromNow(); - while (deadline.hasTimeLeft() && threadB.isAlive()) { - threadB.join(500); + snapshotThread.join(); + + // the snapshot should have failed with the async exception + Assert.assertTrue(asyncError.f0 != null && asyncError.f0.getCause().getMessage().contains("artificial async failure for 2nd message")); + } + + /** + * Test ensuring that the producer is not dropping buffered records; + * we set a timeout because the test will not finish if the logic is broken + */ + @Test(timeout=10000) + public void testAtLeastOnceProducer() throws Throwable { + + KafkaProducer mockProducer = mock(KafkaProducer.class); + final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( + FakeStandardProducerConfig.get(), null, mockProducer); + producer.setFlushOnCheckpoint(true); + + final OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink(producer)); + + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("msg-1")); + testHarness.processElement(new StreamRecord<>("msg-2")); + testHarness.processElement(new StreamRecord<>("msg-3")); + + verify(mockProducer, times(3)).send(any(ProducerRecord.class), any(Callback.class)); + Assert.assertEquals(3, producer.getPendingSize()); + + // start a thread to perform checkpointing + final Tuple1<Throwable> runnableError = new Tuple1<>(null); + final OneShotLatch snapshotReturnedLatch = new OneShotLatch(); + + Thread snapshotThread = new Thread(new Runnable() { + @Override + public void run() { + try { + // this should block until all records are flushed + testHarness.snapshot(123L, 123L); + } catch (Throwable e) { + runnableError.f0 = e; + } finally { + snapshotReturnedLatch.trigger(); + } + } + }); + snapshotThread.start(); + + // being extra safe that the snapshot is correctly blocked + try { + snapshotReturnedLatch.await(3, TimeUnit.SECONDS); + } catch (TimeoutException expected) { + // } - Assert.assertFalse("Thread A is expected to be finished at this point. If not, the test is prone to fail", threadB.isAlive()); + Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered()); + + producer.getPendingCallbacks().get(0).onCompletion(null, null); + Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered()); + Assert.assertEquals(2, producer.getPendingSize()); + + producer.getPendingCallbacks().get(1).onCompletion(null, null); + Assert.assertTrue("Snapshot returned before all records were flushed", !snapshotReturnedLatch.isTriggered()); + Assert.assertEquals(1, producer.getPendingSize()); + + producer.getPendingCallbacks().get(2).onCompletion(null, null); + Assert.assertEquals(0, producer.getPendingSize()); + + snapshotReturnedLatch.await(); + snapshotThread.join(); + if (runnableError.f0 != null) { throw runnableError.f0; } testHarness.close(); } + /** + * This test is meant to assure that testAtLeastOnceProducer is valid by testing that if flushing is disabled, + * the snapshot method does indeed finishes without waiting for pending records; + * we set a timeout because the test will not finish if the logic is broken + */ + @Test(timeout=5000) + public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable { - // ------------------------------------------------------------------------ + KafkaProducer mockProducer = mock(KafkaProducer.class); + final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( + FakeStandardProducerConfig.get(), null, mockProducer); + producer.setFlushOnCheckpoint(false); - private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> { - private static final long serialVersionUID = 1L; + final OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink(producer)); - private transient MockProducer prod; - private AtomicBoolean snapshottingFinished; + testHarness.open(); - @SuppressWarnings("unchecked") - public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner, AtomicBoolean snapshottingFinished) { - super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner); - this.snapshottingFinished = snapshottingFinished; - } + testHarness.processElement(new StreamRecord<>("msg")); - // constructor variant for test irrelated to snapshotting - @SuppressWarnings("unchecked") - public DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) { - super("dummy-topic", (KeyedSerializationSchema< T >) mock(KeyedSerializationSchema.class), producerConfig, partitioner); - this.snapshottingFinished = new AtomicBoolean(true); - } + // make sure that all callbacks have not been completed + verify(mockProducer, times(1)).send(any(ProducerRecord.class), any(Callback.class)); - @Override - protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) { - this.prod = new MockProducer(); - return this.prod; - } + // should return even if there are pending records + testHarness.snapshot(123L, 123L); - @Override - public void snapshotState(FunctionSnapshotContext ctx) throws Exception { - // call the actual snapshot state - super.snapshotState(ctx); - // notify test that snapshotting has been done - snapshottingFinished.set(true); - } + testHarness.close(); + } - @Override - protected void flush() { - this.prod.flush(); - } + // ------------------------------------------------------------------------ - public MockProducer getProducerInstance() { - return this.prod; - } - } + private static class DummyFlinkKafkaProducer<T> extends FlinkKafkaProducerBase<T> { + private static final long serialVersionUID = 1L; + + private final static String DUMMY_TOPIC = "dummy-topic"; - private static class MockProducer<K, V> extends KafkaProducer<K, V> { - List<Callback> pendingCallbacks = new ArrayList<>(); + private final KafkaProducer mockProducer; --- End diff -- Raw usage of Kafka producer
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---