[ 
https://issues.apache.org/jira/browse/FLINK-6996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16073315#comment-16073315
 ] 

ASF GitHub Bot commented on FLINK-6996:
---------------------------------------

Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4206#discussion_r125416757
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 ---
    @@ -172,6 +195,144 @@ public void cancel() {
                }
        }
     
    +   /**
    +    * Tests the at-least-once semantic for the simple writes into Kafka.
    +    */
    +   @Test
    +   public void testOneToOneAtLeastOnceRegularSink() throws Exception {
    +           testOneToOneAtLeastOnce(true);
    +   }
    +
    +   /**
    +    * Tests the at-least-once semantic for the simple writes into Kafka.
    +    */
    +   @Test
    +   public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
    +           testOneToOneAtLeastOnce(false);
    +   }
    +
    +   /**
    +    * This test sets KafkaProducer so that it will not automatically flush 
the data and
    +    * and fails the broker to check whether FlinkKafkaProducer flushed 
records manually on snapshotState.
    +    */
    +   protected void testOneToOneAtLeastOnce(boolean regularSink) throws 
Exception {
    +           final String topic = regularSink ? "oneToOneTopicRegularSink" : 
"oneToOneTopicCustomOperator";
    +           final int partition = 0;
    +           final int numElements = 1000;
    +           final int failAfterElements = 333;
    +
    +           createTestTopic(topic, 1, 1);
    +
    +           TypeInformationSerializationSchema<Integer> schema = new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
    +           KeyedSerializationSchema<Integer> keyedSerializationSchema = 
new KeyedSerializationSchemaWrapper(schema);
    +
    +           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +           env.enableCheckpointing(500);
    +           env.setParallelism(1);
    +           env.setRestartStrategy(RestartStrategies.noRestart());
    +           env.getConfig().disableSysoutLogging();
    +
    +           Properties properties = new Properties();
    +           properties.putAll(standardProps);
    +           properties.putAll(secureProps);
    +           // decrease timeout and block time from 60s down to 10s - this 
is how long KafkaProducer will try send pending (not flushed) data on close()
    +           properties.setProperty("timeout.ms", "10000");
    +           properties.setProperty("max.block.ms", "10000");
    +           // increase batch.size and linger.ms - this tells KafkaProducer 
to batch produced events instead of flushing them immediately
    +           properties.setProperty("batch.size", "10240000");
    +           properties.setProperty("linger.ms", "10000");
    +
    +           int leaderId = kafkaServer.getLeaderToShutDown(topic);
    +           BrokerRestartingMapper.resetState();
    +
    +           // process exactly failAfterElements number of elements and 
then shutdown Kafka broker and fail application
    +           DataStream<Integer> inputStream = env
    +                   .fromCollection(getIntegersSequence(numElements))
    +                   .map(new BrokerRestartingMapper<Integer>(leaderId, 
failAfterElements));
    +
    +           StreamSink<Integer> kafkaSink = 
kafkaServer.getProducerSink(topic, keyedSerializationSchema, properties, new 
FlinkKafkaPartitioner<Integer>() {
    +                   @Override
    +                   public int partition(Integer record, byte[] key, byte[] 
value, String targetTopic, int[] partitions) {
    +                           return partition;
    +                   }
    +           });
    +
    +           if (regularSink) {
    +                   inputStream.addSink(kafkaSink.getUserFunction());
    +           }
    +           else {
    +                   kafkaServer.produceIntoKafka(inputStream, topic, 
keyedSerializationSchema, properties, new FlinkKafkaPartitioner<Integer>() {
    +                           @Override
    +                           public int partition(Integer record, byte[] 
key, byte[] value, String targetTopic, int[] partitions) {
    +                                   return partition;
    +                           }
    +                   });
    +           }
    +
    +           FailingIdentityMapper.failedBefore = false;
    +           try {
    +                   env.execute("One-to-one at least once test");
    +                   fail("Job should fail!");
    +           }
    +           catch (Exception ex) {
    --- End diff --
    
    FYI `getCause` Exception is type of `java.lang.Exception`, so there is no 
point in making an assertion on that.


> FlinkKafkaProducer010 doesn't guarantee at-least-once semantic
> --------------------------------------------------------------
>
>                 Key: FLINK-6996
>                 URL: https://issues.apache.org/jira/browse/FLINK-6996
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>
> FlinkKafkaProducer010 doesn't implement CheckpointedFunction interface. This 
> means, when it's used like a "regular sink function" (option a from [the java 
> doc|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.html])
>  it will not flush the data on "snapshotState"  as it is supposed to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to