Re: KafkaUnboundedReader

2020-07-29 Thread Maximilian Michels

Hi Dinh,

The check only de-duplicates in case the consumer processes the same 
offset multiple times. It ensures the offset is always increasing.


If this has been fixed in Kafka, which the comment assumes, the 
condition will never be true.


Which Kafka version are you using?

-Max

On 29.07.20 09:16, wang Wu wrote:

Hi,
I am curious about this comment:

if (offset < expected) { // -- (a)
	// this can happen when compression is enabled in Kafka (seems to be 
fixed in 0.10)

// should we check if the offset is way off from consumedOffset (say > 
1M)?
LOG.warn(
"{}: ignoring already consumed offset {} for {}",
this,
offset,
pState.topicPartition);
continue;
}


https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L167

Does it mean that Beam KafkaIO may skip processing some Kafka messages 
if the lag in consuming Kafka messages > 1 M?

Why Kafka compression may result in this bug?
Is there anyway to prevent loss messages and enable at-least-once delivery?

Context: We enable at-least-once delivery semantics on our Beam code by 
this code:


input
 .getPipeline()
 .apply(
 "ReadFromKafka",
 KafkaIO.readBytes()
 
.withBootstrapServers(getSource().getKafkaSourceConfig().getBootstrapServers())
 .withTopics(getTopics())
 .withConsumerConfigUpdates(
 ImmutableMap.of(
 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false,
 ConsumerConfig.GROUP_ID_CONFIG,groupId
))
 .withReadCommitted()
 .commitOffsetsInFinalize())

However, we notice that if we send > 1 millions Kafka message and the 
batch processing can not keep up, it seems that Beam process less number 
of messages than we sent.


Regards
Dinh


KafkaUnboundedReader

2020-07-29 Thread wang Wu
Hi,
I am curious about this comment:

if (offset < expected) { // -- (a)
  // this can happen when compression is enabled in Kafka (seems to be 
fixed in 0.10)
  // should we check if the offset is way off from consumedOffset (say 
> 1M)?
  LOG.warn(
  "{}: ignoring already consumed offset {} for {}",
  this,
  offset,
  pState.topicPartition);
  continue;
}

https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L167
 


Does it mean that Beam KafkaIO may skip processing some Kafka messages if the 
lag in consuming Kafka messages > 1 M?
Why Kafka compression may result in this bug?
Is there anyway to prevent loss messages and enable at-least-once delivery?

Context: We enable at-least-once delivery semantics on our Beam code by this 
code:

input
.getPipeline()
.apply(
"ReadFromKafka",
KafkaIO.readBytes()

.withBootstrapServers(getSource().getKafkaSourceConfig().getBootstrapServers())
.withTopics(getTopics())
.withConsumerConfigUpdates(
ImmutableMap.of(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false,
ConsumerConfig.GROUP_ID_CONFIG, groupId
))
.withReadCommitted()
.commitOffsetsInFinalize())
However, we notice that if we send > 1 millions Kafka message and the batch 
processing can not keep up, it seems that Beam process less number of messages 
than we sent.

Regards
Dinh