Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/4206#discussion_r125411227 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java --- @@ -172,6 +195,144 @@ public void cancel() { } } + /** + * Tests the at-least-once semantic for the simple writes into Kafka. + */ + @Test + public void testOneToOneAtLeastOnceRegularSink() throws Exception { + testOneToOneAtLeastOnce(true); + } + + /** + * Tests the at-least-once semantic for the simple writes into Kafka. + */ + @Test + public void testOneToOneAtLeastOnceCustomOperator() throws Exception { + testOneToOneAtLeastOnce(false); + } + + /** + * This test sets KafkaProducer so that it will not automatically flush the data and + * and fails the broker to check whether FlinkKafkaProducer flushed records manually on snapshotState. + */ + protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception { + final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator"; + final int partition = 0; + final int numElements = 1000; + final int failAfterElements = 333; + + createTestTopic(topic, 1, 1); + + TypeInformationSerializationSchema<Integer> schema = new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig()); + KeyedSerializationSchema<Integer> keyedSerializationSchema = new KeyedSerializationSchemaWrapper(schema); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(500); + env.setParallelism(1); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().disableSysoutLogging(); + + Properties properties = new Properties(); + properties.putAll(standardProps); + properties.putAll(secureProps); + // decrease timeout and block time from 60s down to 10s - this is how long KafkaProducer will try send pending (not flushed) data on close() + properties.setProperty("timeout.ms", "10000"); + properties.setProperty("max.block.ms", "10000"); + // increase batch.size and linger.ms - this tells KafkaProducer to batch produced events instead of flushing them immediately + properties.setProperty("batch.size", "10240000"); + properties.setProperty("linger.ms", "10000"); + + int leaderId = kafkaServer.getLeaderToShutDown(topic); + BrokerRestartingMapper.resetState(); + + // process exactly failAfterElements number of elements and then shutdown Kafka broker and fail application + DataStream<Integer> inputStream = env + .fromCollection(getIntegersSequence(numElements)) + .map(new BrokerRestartingMapper<Integer>(leaderId, failAfterElements)); + + StreamSink<Integer> kafkaSink = kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() { + @Override + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return partition; + } + }); + + if (regularSink) { + inputStream.addSink(kafkaSink.getUserFunction()); + } + else { + kafkaServer.produceIntoKafka(inputStream, topic, keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() { + @Override + public int partition(Integer record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return partition; + } + }); + } + + FailingIdentityMapper.failedBefore = false; + try { + env.execute("One-to-one at least once test"); + fail("Job should fail!"); + } + catch (Exception ex) { --- End diff -- I think we need a more specific exception here. There may be actual exceptions thrown by Flink that would be masked by this assumption.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---