[ https://issues.apache.org/jira/browse/BEAM-990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alban Perillat-Merceroz updated BEAM-990: ----------------------------------------- Description: I use KafkaIO as a source, and I would like consumed offsets to be stored in Kafka (in the {{__consumer_offsets}} topic). I'm configuring the Kafka reader with {code:java} .updateConsumerProperties(ImmutableMap.of( ConsumerConfig.GROUP_ID_CONFIG, "my-group", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't work with default value either (5000ms) )) {code} But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, next job will restart at latest offset). I can't find in the code where the offsets are supposed to be committed. I tried to add a manual commit in the {{consumerPollLoop()}} method, and it works, offsets are committed: {code:java} private void consumerPollLoop() { // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue while (!closed.get()) { try { ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis()); if (!records.isEmpty() && !closed.get()) { availableRecordsQueue.put(records); // blocks until dequeued. // Manual commit consumer.commitSync(); } } catch (InterruptedException e) { LOG.warn("{}: consumer thread is interrupted", this, e); // not expected break; } catch (WakeupException e) { break; } } LOG.info("{}: Returning from consumer pool loop", this); } {code} Is this a bug in KafkaIO or am I misconfiguring something? Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in Dataflow SDK (https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java), but I'm confident the code is similar for this case. Edit: I found the correct method where KafkaIO is supposed to commit at the end of a batch. I'm currently testing it and will be able to open a pull request soon: {code:java} // KafkaCheckpointMark.java /** * Optional consumer that will be used to commit offsets into Kafka when finalizeCheckpoint() is called */ @Nullable private final Consumer consumer; public KafkaCheckpointMark(List<PartitionMark> partitions, @Nullable Consumer consumer) { this.partitions = partitions; this.consumer = consumer; } /** * Commit synchronously into Kafka offsets that have been passed downstream. */ @Override public void finalizeCheckpoint() throws IOException { if (consumer == null) { LOG.warn("finalizeCheckpoint(): no consumer provided, will not commit anything."); return; } if (partitions.size() == 0) { LOG.info("finalizeCheckpoint(): nothing to commit to Kafka."); return; } final Map<TopicPartition, OffsetAndMetadata> offsets = newHashMap(); String committedOffsets = ""; for (PartitionMark partition : partitions) { TopicPartition topicPartition = partition.getTopicPartition(); offsets.put(topicPartition, new OffsetAndMetadata(partition.offset)); committedOffsets += topicPartition.topic() + "-" + topicPartition.partition() + ":" + partition.offset + ","; } final String printableOffsets = committedOffsets.substring(0, committedOffsets.length() - 1); try { consumer.commitSync(offsets); LOG.info("finalizeCheckpoint(): committed Kafka offsets {}", printableOffsets); } catch (Exception e) { LOG.error("finalizeCheckpoint(): {} when trying to commit Kafka offsets [{}]", e.getClass().getSimpleName(), printableOffsets); } } {code} was: I use KafkaIO as a source, and I would like consumed offsets to be stored in Kafka (in the {{__consumer_offsets}} topic). I'm configuring the Kafka reader with {code:java} .updateConsumerProperties(ImmutableMap.of( ConsumerConfig.GROUP_ID_CONFIG, "my-group", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't work with default value either (5000ms) )) {code} But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, next job will restart at latest offset). I can't find in the code where the offsets are supposed to be committed. I tried to add a manual commit in the {{consumerPollLoop()}} method, and it works, offsets are committed: {code:java} private void consumerPollLoop() { // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue while (!closed.get()) { try { ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis()); if (!records.isEmpty() && !closed.get()) { availableRecordsQueue.put(records); // blocks until dequeued. // Manual commit consumer.commitSync(); } } catch (InterruptedException e) { LOG.warn("{}: consumer thread is interrupted", this, e); // not expected break; } catch (WakeupException e) { break; } } LOG.info("{}: Returning from consumer pool loop", this); } {code} Is this a bug in KafkaIO or am I misconfiguring something? Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in Dataflow SDK (https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java), but I'm confident the code is similar for this case. > KafkaIO does not commit offsets to Kafka > ---------------------------------------- > > Key: BEAM-990 > URL: https://issues.apache.org/jira/browse/BEAM-990 > Project: Beam > Issue Type: Bug > Components: sdk-java-extensions > Reporter: Alban Perillat-Merceroz > Labels: KafkaIO > > I use KafkaIO as a source, and I would like consumed offsets to be stored in > Kafka (in the {{__consumer_offsets}} topic). > I'm configuring the Kafka reader with > {code:java} > .updateConsumerProperties(ImmutableMap.of( > ConsumerConfig.GROUP_ID_CONFIG, "my-group", > ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > java.lang.Boolean.TRUE, > ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't > work with default value either (5000ms) > )) > {code} > But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, > next job will restart at latest offset). > I can't find in the code where the offsets are supposed to be committed. > I tried to add a manual commit in the {{consumerPollLoop()}} method, and it > works, offsets are committed: > {code:java} > private void consumerPollLoop() { > // Read in a loop and enqueue the batch of records, if any, to > availableRecordsQueue > while (!closed.get()) { > try { > ConsumerRecords<byte[], byte[]> records = > consumer.poll(KAFKA_POLL_TIMEOUT.getMillis()); > if (!records.isEmpty() && !closed.get()) { > availableRecordsQueue.put(records); // blocks until > dequeued. > // Manual commit > consumer.commitSync(); > } > } catch (InterruptedException e) { > LOG.warn("{}: consumer thread is interrupted", this, e); > // not expected > break; > } catch (WakeupException e) { > break; > } > } > LOG.info("{}: Returning from consumer pool loop", this); > } > {code} > Is this a bug in KafkaIO or am I misconfiguring something? > Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in > Dataflow SDK > (https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java), > but I'm confident the code is similar for this case. > Edit: I found the correct method where KafkaIO is supposed to commit at the > end of a batch. I'm currently testing it and will be able to open a pull > request soon: > {code:java} > // KafkaCheckpointMark.java > /** > * Optional consumer that will be used to commit offsets into Kafka when > finalizeCheckpoint() is called > */ > @Nullable > private final Consumer consumer; > public KafkaCheckpointMark(List<PartitionMark> partitions, @Nullable > Consumer consumer) { > this.partitions = partitions; > this.consumer = consumer; > } > /** > * Commit synchronously into Kafka offsets that have been passed > downstream. > */ > @Override > public void finalizeCheckpoint() throws IOException { > if (consumer == null) { > LOG.warn("finalizeCheckpoint(): no consumer provided, will not > commit anything."); > return; > } > if (partitions.size() == 0) { > LOG.info("finalizeCheckpoint(): nothing to commit to Kafka."); > return; > } > final Map<TopicPartition, OffsetAndMetadata> offsets = newHashMap(); > String committedOffsets = ""; > for (PartitionMark partition : partitions) { > TopicPartition topicPartition = partition.getTopicPartition(); > offsets.put(topicPartition, new > OffsetAndMetadata(partition.offset)); > committedOffsets += topicPartition.topic() + "-" + > topicPartition.partition() + ":" + partition.offset + ","; > } > final String printableOffsets = committedOffsets.substring(0, > committedOffsets.length() - 1); > try { > consumer.commitSync(offsets); > LOG.info("finalizeCheckpoint(): committed Kafka offsets {}", > printableOffsets); > } catch (Exception e) { > LOG.error("finalizeCheckpoint(): {} when trying to commit Kafka > offsets [{}]", > e.getClass().getSimpleName(), > printableOffsets); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)