[ 
https://issues.apache.org/jira/browse/BEAM-990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alban Perillat-Merceroz updated BEAM-990:
-----------------------------------------
    Description: 
I use KafkaIO as a source, and I would like consumed offsets to be stored in 
Kafka (in the {{__consumer_offsets}} topic).

I'm configuring the Kafka reader with 
{code:java}
.updateConsumerProperties(ImmutableMap.of(
              ConsumerConfig.GROUP_ID_CONFIG, "my-group",
              ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE,
              ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't 
work with default value either (5000ms)
            ))
{code}

But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, 
next job will restart at latest offset).

I can't find in the code where the offsets are supposed to be committed.

I tried to add a manual commit in the {{consumerPollLoop()}} method, and it 
works, offsets are committed:

{code:java}
private void consumerPollLoop() {
            // Read in a loop and enqueue the batch of records, if any, to 
availableRecordsQueue
            while (!closed.get()) {
                try {
                    ConsumerRecords<byte[], byte[]> records = 
consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
                    if (!records.isEmpty() && !closed.get()) {
                        availableRecordsQueue.put(records); // blocks until 
dequeued.
                        // Manual commit
                        consumer.commitSync();
                    }
                } catch (InterruptedException e) {
                    LOG.warn("{}: consumer thread is interrupted", this, e); // 
not expected
                    break;
                } catch (WakeupException e) {
                    break;
                }
            }

            LOG.info("{}: Returning from consumer pool loop", this);
        }
{code}

Is this a bug in KafkaIO or am I misconfiguring something?

Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in 
Dataflow SDK 
(https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java),
 but I'm confident the code is similar for this case.

Edit: I found the correct method where KafkaIO is supposed to commit at the end 
of a batch. I'm currently testing it and will be able to open a pull request 
soon:

{code:java}
// KafkaCheckpointMark.java

    /**
     * Optional consumer that will be used to commit offsets into Kafka when 
finalizeCheckpoint() is called
     */
    @Nullable
    private final Consumer consumer;

    public KafkaCheckpointMark(List<PartitionMark> partitions, @Nullable 
Consumer consumer) {
        this.partitions = partitions;
        this.consumer = consumer;
    }

    /**
     * Commit synchronously into Kafka offsets that have been passed downstream.
     */
    @Override
    public void finalizeCheckpoint() throws IOException {
        if (consumer == null) {
            LOG.warn("finalizeCheckpoint(): no consumer provided, will not 
commit anything.");
            return;
        }
        if (partitions.size() == 0) {
            LOG.info("finalizeCheckpoint(): nothing to commit to Kafka.");
            return;
        }

        final Map<TopicPartition, OffsetAndMetadata> offsets = newHashMap();
        String committedOffsets = "";
        for (PartitionMark partition : partitions) {
            TopicPartition topicPartition = partition.getTopicPartition();
            offsets.put(topicPartition, new 
OffsetAndMetadata(partition.offset));
            committedOffsets += topicPartition.topic() + "-" + 
topicPartition.partition() + ":" + partition.offset + ",";
        }

        final String printableOffsets = committedOffsets.substring(0, 
committedOffsets.length() - 1);
        try {
            consumer.commitSync(offsets);
            LOG.info("finalizeCheckpoint(): committed Kafka offsets {}", 
printableOffsets);
        } catch (Exception e) {
            LOG.error("finalizeCheckpoint(): {} when trying to commit Kafka 
offsets [{}]",
                    e.getClass().getSimpleName(),
                    printableOffsets);
        }
    }
{code}

  was:
I use KafkaIO as a source, and I would like consumed offsets to be stored in 
Kafka (in the {{__consumer_offsets}} topic).

I'm configuring the Kafka reader with 
{code:java}
.updateConsumerProperties(ImmutableMap.of(
              ConsumerConfig.GROUP_ID_CONFIG, "my-group",
              ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE,
              ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't 
work with default value either (5000ms)
            ))
{code}

But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, 
next job will restart at latest offset).

I can't find in the code where the offsets are supposed to be committed.

I tried to add a manual commit in the {{consumerPollLoop()}} method, and it 
works, offsets are committed:

{code:java}
private void consumerPollLoop() {
            // Read in a loop and enqueue the batch of records, if any, to 
availableRecordsQueue
            while (!closed.get()) {
                try {
                    ConsumerRecords<byte[], byte[]> records = 
consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
                    if (!records.isEmpty() && !closed.get()) {
                        availableRecordsQueue.put(records); // blocks until 
dequeued.
                        // Manual commit
                        consumer.commitSync();
                    }
                } catch (InterruptedException e) {
                    LOG.warn("{}: consumer thread is interrupted", this, e); // 
not expected
                    break;
                } catch (WakeupException e) {
                    break;
                }
            }

            LOG.info("{}: Returning from consumer pool loop", this);
        }
{code}

Is this a bug in KafkaIO or am I misconfiguring something?

Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in 
Dataflow SDK 
(https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java),
 but I'm confident the code is similar for this case.


> KafkaIO does not commit offsets to Kafka
> ----------------------------------------
>
>                 Key: BEAM-990
>                 URL: https://issues.apache.org/jira/browse/BEAM-990
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>            Reporter: Alban Perillat-Merceroz
>              Labels: KafkaIO
>
> I use KafkaIO as a source, and I would like consumed offsets to be stored in 
> Kafka (in the {{__consumer_offsets}} topic).
> I'm configuring the Kafka reader with 
> {code:java}
> .updateConsumerProperties(ImmutableMap.of(
>               ConsumerConfig.GROUP_ID_CONFIG, "my-group",
>               ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
> java.lang.Boolean.TRUE,
>               ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't 
> work with default value either (5000ms)
>             ))
> {code}
> But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, 
> next job will restart at latest offset).
> I can't find in the code where the offsets are supposed to be committed.
> I tried to add a manual commit in the {{consumerPollLoop()}} method, and it 
> works, offsets are committed:
> {code:java}
> private void consumerPollLoop() {
>             // Read in a loop and enqueue the batch of records, if any, to 
> availableRecordsQueue
>             while (!closed.get()) {
>                 try {
>                     ConsumerRecords<byte[], byte[]> records = 
> consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
>                     if (!records.isEmpty() && !closed.get()) {
>                         availableRecordsQueue.put(records); // blocks until 
> dequeued.
>                         // Manual commit
>                         consumer.commitSync();
>                     }
>                 } catch (InterruptedException e) {
>                     LOG.warn("{}: consumer thread is interrupted", this, e); 
> // not expected
>                     break;
>                 } catch (WakeupException e) {
>                     break;
>                 }
>             }
>             LOG.info("{}: Returning from consumer pool loop", this);
>         }
> {code}
> Is this a bug in KafkaIO or am I misconfiguring something?
> Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in 
> Dataflow SDK 
> (https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java),
>  but I'm confident the code is similar for this case.
> Edit: I found the correct method where KafkaIO is supposed to commit at the 
> end of a batch. I'm currently testing it and will be able to open a pull 
> request soon:
> {code:java}
> // KafkaCheckpointMark.java
>     /**
>      * Optional consumer that will be used to commit offsets into Kafka when 
> finalizeCheckpoint() is called
>      */
>     @Nullable
>     private final Consumer consumer;
>     public KafkaCheckpointMark(List<PartitionMark> partitions, @Nullable 
> Consumer consumer) {
>         this.partitions = partitions;
>         this.consumer = consumer;
>     }
>     /**
>      * Commit synchronously into Kafka offsets that have been passed 
> downstream.
>      */
>     @Override
>     public void finalizeCheckpoint() throws IOException {
>         if (consumer == null) {
>             LOG.warn("finalizeCheckpoint(): no consumer provided, will not 
> commit anything.");
>             return;
>         }
>         if (partitions.size() == 0) {
>             LOG.info("finalizeCheckpoint(): nothing to commit to Kafka.");
>             return;
>         }
>         final Map<TopicPartition, OffsetAndMetadata> offsets = newHashMap();
>         String committedOffsets = "";
>         for (PartitionMark partition : partitions) {
>             TopicPartition topicPartition = partition.getTopicPartition();
>             offsets.put(topicPartition, new 
> OffsetAndMetadata(partition.offset));
>             committedOffsets += topicPartition.topic() + "-" + 
> topicPartition.partition() + ":" + partition.offset + ",";
>         }
>         final String printableOffsets = committedOffsets.substring(0, 
> committedOffsets.length() - 1);
>         try {
>             consumer.commitSync(offsets);
>             LOG.info("finalizeCheckpoint(): committed Kafka offsets {}", 
> printableOffsets);
>         } catch (Exception e) {
>             LOG.error("finalizeCheckpoint(): {} when trying to commit Kafka 
> offsets [{}]",
>                     e.getClass().getSimpleName(),
>                     printableOffsets);
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to