Hi, I am curious about this comment: if (offset < expected) { // -- (a) // this can happen when compression is enabled in Kafka (seems to be fixed in 0.10) // should we check if the offset is way off from consumedOffset (say > 1M)? LOG.warn( "{}: ignoring already consumed offset {} for {}", this, offset, pState.topicPartition); continue; }
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L167 <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L167> Does it mean that Beam KafkaIO may skip processing some Kafka messages if the lag in consuming Kafka messages > 1 M? Why Kafka compression may result in this bug? Is there anyway to prevent loss messages and enable at-least-once delivery? Context: We enable at-least-once delivery semantics on our Beam code by this code: input .getPipeline() .apply( "ReadFromKafka", KafkaIO.readBytes() .withBootstrapServers(getSource().getKafkaSourceConfig().getBootstrapServers()) .withTopics(getTopics()) .withConsumerConfigUpdates( ImmutableMap.of( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false, ConsumerConfig.GROUP_ID_CONFIG, groupId )) .withReadCommitted() .commitOffsetsInFinalize()) However, we notice that if we send > 1 millions Kafka message and the batch processing can not keep up, it seems that Beam process less number of messages than we sent. Regards Dinh