Hi,
I am curious about this comment:

if (offset < expected) { // -- (a)
          // this can happen when compression is enabled in Kafka (seems to be 
fixed in 0.10)
          // should we check if the offset is way off from consumedOffset (say 
> 1M)?
          LOG.warn(
              "{}: ignoring already consumed offset {} for {}",
              this,
              offset,
              pState.topicPartition);
          continue;
        }

https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L167
 
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L167>

Does it mean that Beam KafkaIO may skip processing some Kafka messages if the 
lag in consuming Kafka messages > 1 M?
Why Kafka compression may result in this bug?
Is there anyway to prevent loss messages and enable at-least-once delivery?

Context: We enable at-least-once delivery semantics on our Beam code by this 
code:

input
    .getPipeline()
    .apply(
        "ReadFromKafka",
        KafkaIO.readBytes()
            
.withBootstrapServers(getSource().getKafkaSourceConfig().getBootstrapServers())
            .withTopics(getTopics())
            .withConsumerConfigUpdates(
                ImmutableMap.of(
                    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false,
                    ConsumerConfig.GROUP_ID_CONFIG, groupId
                    ))
            .withReadCommitted()
            .commitOffsetsInFinalize())
However, we notice that if we send > 1 millions Kafka message and the batch 
processing can not keep up, it seems that Beam process less number of messages 
than we sent.

Regards
Dinh

Reply via email to