Benjamin BENOIST created BEAM-3754:
--------------------------------------

             Summary: Can't have commitOffsetsInFinalizeEnabled set to false 
with KafkaIO.readBytes()
                 Key: BEAM-3754
                 URL: https://issues.apache.org/jira/browse/BEAM-3754
             Project: Beam
          Issue Type: Bug
          Components: io-java-kafka
    Affects Versions: 2.3.0
         Environment: Dataflow pipeline using Kafka as a Sink
            Reporter: Benjamin BENOIST
            Assignee: Raghu Angadi


Beam v2.3 introduces finalized offsets, in order to reduce the gaps or 
duplicate processing of records while restarting a pipeline.

_read()_ sets this parameter to false [by 
default|https://github.com/apache/beam/blob/release-2.3.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L307]
 but _readBytes()_ 
[doesn't|https://github.com/apache/beam/blob/release-2.3.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L282],
 thus creating an exception:
{noformat}
Exception in thread "main" java.lang.IllegalStateException: Missing required 
properties: commitOffsetsInFinalizeEnabled
         at 
org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read$Builder.build(AutoValue_KafkaIO_Read.java:344)
         at 
org.apache.beam.sdk.io.kafka.KafkaIO.readBytes(KafkaIO.java:291){noformat}
The parameter can be set to true with _commitOffsetsInFinalize()_ but never to 
false.

Using _read()_ in the definition of _readBytes()_ could prevent this kind of 
error in the future:
{code:java}
public static Read<byte[], byte[]> readBytes() {
  return read()
    .setKeyDeserializer(ByteArrayDeserializer.class)
    .setValueDeserializer(ByteArrayDeserializer.class)
    .build();
}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to