[ https://issues.apache.org/jira/browse/BEAM-990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Daniel Halperin updated BEAM-990: --------------------------------- Component/s: sdk-java-extensions > KafkaIO does not commit offsets to Kafka > ---------------------------------------- > > Key: BEAM-990 > URL: https://issues.apache.org/jira/browse/BEAM-990 > Project: Beam > Issue Type: Bug > Components: sdk-java-extensions > Reporter: Alban Perillat-Merceroz > Labels: KafkaIO > > I use KafkaIO as a source, and I would like consumed offsets to be stored in > Kafka (in the {{__consumer_offsets}} topic). > I'm configuring the Kafka reader with > {code:java} > .updateConsumerProperties(ImmutableMap.of( > ConsumerConfig.GROUP_ID_CONFIG, "my-group", > ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, > java.lang.Boolean.TRUE, > ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't > work with default value either (5000ms) > )) > {code} > But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, > next job will restart at latest offset). > I can't find in the code where the offsets are supposed to be committed. > I tried to add a manual commit in the {{consumerPollLoop()}} method, and it > works, offsets are committed: > {code:java} > private void consumerPollLoop() { > // Read in a loop and enqueue the batch of records, if any, to > availableRecordsQueue > while (!closed.get()) { > try { > ConsumerRecords<byte[], byte[]> records = > consumer.poll(KAFKA_POLL_TIMEOUT.getMillis()); > if (!records.isEmpty() && !closed.get()) { > availableRecordsQueue.put(records); // blocks until > dequeued. > // Manual commit > consumer.commitSync(); > } > } catch (InterruptedException e) { > LOG.warn("{}: consumer thread is interrupted", this, e); > // not expected > break; > } catch (WakeupException e) { > break; > } > } > LOG.info("{}: Returning from consumer pool loop", this); > } > {code} > Is this a bug in KafkaIO or am I misconfiguring something? > Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in > Dataflow SDK > (https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java), > but I'm confident the code is similar for this case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)