[jira] [Created] (BEAM-990) KafkaIO does not commit offsets to Kafka

2016-11-16 Thread Alban Perillat-Merceroz (JIRA)
Alban Perillat-Merceroz created BEAM-990:


 Summary: KafkaIO does not commit offsets to Kafka
 Key: BEAM-990
 URL: https://issues.apache.org/jira/browse/BEAM-990
 Project: Beam
  Issue Type: Bug
Reporter: Alban Perillat-Merceroz


I use KafkaIO as a source, and I would like consumed offsets to be stored in 
Kafka (in the {{__consumer_offsets}} topic).

I'm configuring the Kafka reader with 
{code:java}
.updateConsumerProperties(ImmutableMap.of(
  ConsumerConfig.GROUP_ID_CONFIG, "my-group",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE,
  ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't 
work with default value either (5000ms)
))
{code}

But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, 
next job will restart at latest offset).

I can't find in the code where the offsets are supposed to be committed.

I tried to add a manual commit in the {{consumerPollLoop()}} method, and it 
works, offsets are committed:

{code:java}
private void consumerPollLoop() {
// Read in a loop and enqueue the batch of records, if any, to 
availableRecordsQueue
while (!closed.get()) {
try {
ConsumerRecords records = 
consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
if (!records.isEmpty() && !closed.get()) {
availableRecordsQueue.put(records); // blocks until 
dequeued.
// Manual commit
consumer.commitSync();
}
} catch (InterruptedException e) {
LOG.warn("{}: consumer thread is interrupted", this, e); // 
not expected
break;
} catch (WakeupException e) {
break;
}
}

LOG.info("{}: Returning from consumer pool loop", this);
}
{code}

Is this a bug in KafkaIO or am I misconfiguring something?

Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in 
Dataflow SDK 
(https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java),
 but I'm confident the code is similar for this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-990) KafkaIO does not commit offsets to Kafka

2016-12-01 Thread Alban Perillat-Merceroz (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alban Perillat-Merceroz updated BEAM-990:
-
Description: 
I use KafkaIO as a source, and I would like consumed offsets to be stored in 
Kafka (in the {{__consumer_offsets}} topic).

I'm configuring the Kafka reader with 
{code:java}
.updateConsumerProperties(ImmutableMap.of(
  ConsumerConfig.GROUP_ID_CONFIG, "my-group",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE,
  ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't 
work with default value either (5000ms)
))
{code}

But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, 
next job will restart at latest offset).

I can't find in the code where the offsets are supposed to be committed.

I tried to add a manual commit in the {{consumerPollLoop()}} method, and it 
works, offsets are committed:

{code:java}
private void consumerPollLoop() {
// Read in a loop and enqueue the batch of records, if any, to 
availableRecordsQueue
while (!closed.get()) {
try {
ConsumerRecords records = 
consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
if (!records.isEmpty() && !closed.get()) {
availableRecordsQueue.put(records); // blocks until 
dequeued.
// Manual commit
consumer.commitSync();
}
} catch (InterruptedException e) {
LOG.warn("{}: consumer thread is interrupted", this, e); // 
not expected
break;
} catch (WakeupException e) {
break;
}
}

LOG.info("{}: Returning from consumer pool loop", this);
}
{code}

Is this a bug in KafkaIO or am I misconfiguring something?

Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in 
Dataflow SDK 
(https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java),
 but I'm confident the code is similar for this case.

Edit: I found the correct method where KafkaIO is supposed to commit at the end 
of a batch. I'm currently testing it and will be able to open a pull request 
soon:

{code:java}
// KafkaCheckpointMark.java

/**
 * Optional consumer that will be used to commit offsets into Kafka when 
finalizeCheckpoint() is called
 */
@Nullable
private final Consumer consumer;

public KafkaCheckpointMark(List partitions, @Nullable 
Consumer consumer) {
this.partitions = partitions;
this.consumer = consumer;
}

/**
 * Commit synchronously into Kafka offsets that have been passed downstream.
 */
@Override
public void finalizeCheckpoint() throws IOException {
if (consumer == null) {
LOG.warn("finalizeCheckpoint(): no consumer provided, will not 
commit anything.");
return;
}
if (partitions.size() == 0) {
LOG.info("finalizeCheckpoint(): nothing to commit to Kafka.");
return;
}

final Map offsets = newHashMap();
String committedOffsets = "";
for (PartitionMark partition : partitions) {
TopicPartition topicPartition = partition.getTopicPartition();
offsets.put(topicPartition, new 
OffsetAndMetadata(partition.offset));
committedOffsets += topicPartition.topic() + "-" + 
topicPartition.partition() + ":" + partition.offset + ",";
}

final String printableOffsets = committedOffsets.substring(0, 
committedOffsets.length() - 1);
try {
consumer.commitSync(offsets);
LOG.info("finalizeCheckpoint(): committed Kafka offsets {}", 
printableOffsets);
} catch (Exception e) {
LOG.error("finalizeCheckpoint(): {} when trying to commit Kafka 
offsets [{}]",
e.getClass().getSimpleName(),
printableOffsets);
}
}
{code}

  was:
I use KafkaIO as a source, and I would like consumed offsets to be stored in 
Kafka (in the {{__consumer_offsets}} topic).

I'm configuring the Kafka reader with 
{code:java}
.updateConsumerProperties(ImmutableMap.of(
  ConsumerConfig.GROUP_ID_CONFIG, "my-group",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE,
  ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't 
work with default value either (5000ms)
))
{code}

But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, 
next job will restart at latest offset).

I can't find in the code where the offsets are supposed to be committed.

I tried to add a manual commit in the {{consumerPollLoop()}} method, and