Alban Perillat-Merceroz created BEAM-990: --------------------------------------------
Summary: KafkaIO does not commit offsets to Kafka Key: BEAM-990 URL: https://issues.apache.org/jira/browse/BEAM-990 Project: Beam Issue Type: Bug Reporter: Alban Perillat-Merceroz I use KafkaIO as a source, and I would like consumed offsets to be stored in Kafka (in the {{__consumer_offsets}} topic). I'm configuring the Kafka reader with {code:java} .updateConsumerProperties(ImmutableMap.of( ConsumerConfig.GROUP_ID_CONFIG, "my-group", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't work with default value either (5000ms) )) {code} But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, next job will restart at latest offset). I can't find in the code where the offsets are supposed to be committed. I tried to add a manual commit in the {{consumerPollLoop()}} method, and it works, offsets are committed: {code:java} private void consumerPollLoop() { // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue while (!closed.get()) { try { ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis()); if (!records.isEmpty() && !closed.get()) { availableRecordsQueue.put(records); // blocks until dequeued. // Manual commit consumer.commitSync(); } } catch (InterruptedException e) { LOG.warn("{}: consumer thread is interrupted", this, e); // not expected break; } catch (WakeupException e) { break; } } LOG.info("{}: Returning from consumer pool loop", this); } {code} Is this a bug in KafkaIO or am I misconfiguring something? Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in Dataflow SDK (https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java), but I'm confident the code is similar for this case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)