lukecwik commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r467165240



##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1306,112 +1315,206 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       return 
toBuilder().setValueDeserializerProvider(deserializerProvider).build();
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+     *
+     * <p>In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
+     * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
+     * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
+     * provide the key coder explicitly.
+     */
     public ReadSourceDescriptors<K, V> withKeyDeserializer(
         Class<? extends Deserializer<K>> keyDeserializer) {
       return 
withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from 
Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize key objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withKeyDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withValueDeserializer(
         Class<? extends Deserializer<V>> valueDeserializer) {
       return 
withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from 
Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize key objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withKeyDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withKeyDeserializerAndCoder(
         Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
       return 
withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting value bytes read 
from Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize value objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withValueDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withValueDeserializerAndCoder(
         Class<? extends Deserializer<V>> valueDeserializer, Coder<V> 
valueCoder) {
       return 
withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
     }
 
+    /**
+     * A factory to create Kafka {@link Consumer} from consumer configuration. 
This is useful for
+     * supporting another version of Kafka consumer. Default is {@link 
KafkaConsumer}.
+     */
     public ReadSourceDescriptors<K, V> withConsumerFactoryFn(
         SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn) {
       return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
     }
 
+    /**
+     * Update configuration for the backend main consumer. Note that the 
default consumer properties
+     * will not be completely overridden. This method only updates the value 
which has the same key.
+     *
+     * <p>In {@link ReadFromKafkaDoFn}, there're two consumers running in the 
backend actually:<br>
+     * 1. the main consumer, which reads data from kafka;<br>
+     * 2. the secondary offset consumer, which is used to estimate backlog, by 
fetching latest
+     * offset;<br>
+     *
+     * <p>By default, main consumer uses the configuration from {@link
+     * KafkaIOUtils#DEFAULT_CONSUMER_PROPERTIES}.
+     */
     public ReadSourceDescriptors<K, V> withConsumerConfigUpdates(
         Map<String, Object> configUpdates) {
       Map<String, Object> config =
           KafkaIOUtils.updateKafkaProperties(getConsumerConfig(), 
configUpdates);
       return toBuilder().setConsumerConfig(config).build();
     }
 
+    /**
+     * A function to calculate output timestamp for a given {@link 
KafkaRecord}. The default value
+     * is {@link #withProcessingTime()}.
+     */
     public ReadSourceDescriptors<K, V> withExtractOutputTimestampFn(
         SerializableFunction<KafkaRecord<K, V>, Instant> fn) {
       return toBuilder().setExtractOutputTimestampFn(fn).build();
     }
 
+    /**
+     * A function to create a {@link WatermarkEstimator}. The default value is 
{@link
+     * MonotonicallyIncreasing}.
+     */
     public ReadSourceDescriptors<K, V> withCreatWatermarkEstimatorFn(
         SerializableFunction<Instant, WatermarkEstimator<Instant>> fn) {
       return toBuilder().setCreateWatermarkEstimatorFn(fn).build();
     }
 
+    /** Use the log append time as the output timestamp. */
     public ReadSourceDescriptors<K, V> withLogAppendTime() {
       return withExtractOutputTimestampFn(
           ReadSourceDescriptors.ExtractOutputTimestampFns.useLogAppendTime());
     }
 
+    /** Use the processing time as the output timestamp. */
     public ReadSourceDescriptors<K, V> withProcessingTime() {
       return withExtractOutputTimestampFn(
           ReadSourceDescriptors.ExtractOutputTimestampFns.useProcessingTime());
     }
 
+    /** Use the creation time of {@link KafkaRecord} as the output timestamp. 
*/
     public ReadSourceDescriptors<K, V> withCreateTime() {
       return withExtractOutputTimestampFn(
           ReadSourceDescriptors.ExtractOutputTimestampFns.useCreateTime());
     }
 
+    /** Use the {@link WallTime} as the watermark estimator. */
     public ReadSourceDescriptors<K, V> withWallTimeWatermarkEstimator() {
       return withCreatWatermarkEstimatorFn(
           state -> {
             return new WallTime(state);
           });
     }
 
+    /** Use the {@link MonotonicallyIncreasing} as the watermark estimator. */
     public ReadSourceDescriptors<K, V> 
withMonotonicallyIncreasingWatermarkEstimator() {
       return withCreatWatermarkEstimatorFn(
           state -> {
             return new MonotonicallyIncreasing(state);
           });
     }
 
+    /** Use the {@link Manual} as the watermark estimator. */
     public ReadSourceDescriptors<K, V> withManualWatermarkEstimator() {
       return withCreatWatermarkEstimatorFn(
           state -> {
             return new Manual(state);
           });
     }
 
-    // If a transactional producer is used and it's desired to only read 
records from committed
-    // transaction, it's recommended to set read_committed. Otherwise, 
read_uncommitted is the
-    // default value.
+    /**
+     * Sets "isolation_level" to "read_committed" in Kafka consumer 
configuration. This is ensures
+     * that the consumer does not read uncommitted messages. Kafka version 
0.11 introduced
+     * transactional writes. Applications requiring end-to-end exactly-once 
semantics should only
+     * read committed messages. See JavaDoc for {@link KafkaConsumer} for more 
description.

Review comment:
       ```suggestion
        * Sets "isolation_level" to "read_committed" in Kafka consumer 
configuration. This ensures
        * that the consumer does not read uncommitted messages. Kafka version 
0.11 introduced
        * transactional writes. Applications requiring end-to-end exactly-once 
semantics should only
        * read committed messages. See JavaDoc for {@link KafkaConsumer} for 
more description.
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1288,9 +1294,12 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
           .withMonotonicallyIncreasingWatermarkEstimator();
     }
 
-    // Note that if the bootstrapServers is set here but also populated with 
the element, the
-    // element
-    // will override the bootstrapServers from the config.
+    /**
+     * Sets the bootstrap servers for the Kafka consumer. If the 
bootstrapServers is set here but
+     * also populated with the {@link KafkaSourceDescriptor}, the {@link
+     * KafkaSourceDescriptor#getBootStrapServers()} will override the 
bootstrapServers from the
+     * config.

Review comment:
       ```suggestion
        * Sets the bootstrap servers to use for the Kafka consumer if 
unspecified via
        * KafkaSourceDescriptor#getBootStrapServers()}.
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1439,9 +1542,18 @@ public void processElement(
       }
     }
 
+    /**
+     * Set the {@link TimestampPolicyFactory}. If the {@link 
TimestampPolicyFactory} is given, the
+     * output timestamp will be computed by the {@link
+     * TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)} 
and the {@link
+     * Manual} is used as the watermark estimator.

Review comment:
       ```suggestion
        * Set the {@link TimestampPolicyFactory}. If the {@link 
TimestampPolicyFactory} is given, the
        * output timestamp will be computed by the {@link
        * TimestampPolicyFactory#createTimestampPolicy(TopicPartition, 
Optional)} and {@link
        * Manual} is used as the watermark estimator.
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1306,112 +1315,206 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       return 
toBuilder().setValueDeserializerProvider(deserializerProvider).build();
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+     *
+     * <p>In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
+     * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
+     * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
+     * provide the key coder explicitly.
+     */
     public ReadSourceDescriptors<K, V> withKeyDeserializer(
         Class<? extends Deserializer<K>> keyDeserializer) {
       return 
withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from 
Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize key objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withKeyDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withValueDeserializer(
         Class<? extends Deserializer<V>> valueDeserializer) {
       return 
withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from 
Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize key objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withKeyDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withKeyDeserializerAndCoder(
         Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
       return 
withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting value bytes read 
from Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize value objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withValueDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withValueDeserializerAndCoder(
         Class<? extends Deserializer<V>> valueDeserializer, Coder<V> 
valueCoder) {
       return 
withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
     }
 
+    /**
+     * A factory to create Kafka {@link Consumer} from consumer configuration. 
This is useful for
+     * supporting another version of Kafka consumer. Default is {@link 
KafkaConsumer}.
+     */
     public ReadSourceDescriptors<K, V> withConsumerFactoryFn(
         SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn) {
       return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
     }
 
+    /**
+     * Update configuration for the backend main consumer. Note that the 
default consumer properties
+     * will not be completely overridden. This method only updates the value 
which has the same key.
+     *
+     * <p>In {@link ReadFromKafkaDoFn}, there're two consumers running in the 
backend actually:<br>
+     * 1. the main consumer, which reads data from kafka;<br>
+     * 2. the secondary offset consumer, which is used to estimate backlog, by 
fetching latest
+     * offset;<br>
+     *
+     * <p>By default, main consumer uses the configuration from {@link
+     * KafkaIOUtils#DEFAULT_CONSUMER_PROPERTIES}.
+     */
     public ReadSourceDescriptors<K, V> withConsumerConfigUpdates(
         Map<String, Object> configUpdates) {
       Map<String, Object> config =
           KafkaIOUtils.updateKafkaProperties(getConsumerConfig(), 
configUpdates);
       return toBuilder().setConsumerConfig(config).build();
     }
 
+    /**
+     * A function to calculate output timestamp for a given {@link 
KafkaRecord}. The default value
+     * is {@link #withProcessingTime()}.
+     */
     public ReadSourceDescriptors<K, V> withExtractOutputTimestampFn(
         SerializableFunction<KafkaRecord<K, V>, Instant> fn) {
       return toBuilder().setExtractOutputTimestampFn(fn).build();
     }
 
+    /**
+     * A function to create a {@link WatermarkEstimator}. The default value is 
{@link
+     * MonotonicallyIncreasing}.
+     */
     public ReadSourceDescriptors<K, V> withCreatWatermarkEstimatorFn(
         SerializableFunction<Instant, WatermarkEstimator<Instant>> fn) {
       return toBuilder().setCreateWatermarkEstimatorFn(fn).build();
     }
 
+    /** Use the log append time as the output timestamp. */
     public ReadSourceDescriptors<K, V> withLogAppendTime() {
       return withExtractOutputTimestampFn(
           ReadSourceDescriptors.ExtractOutputTimestampFns.useLogAppendTime());
     }
 
+    /** Use the processing time as the output timestamp. */
     public ReadSourceDescriptors<K, V> withProcessingTime() {
       return withExtractOutputTimestampFn(
           ReadSourceDescriptors.ExtractOutputTimestampFns.useProcessingTime());
     }
 
+    /** Use the creation time of {@link KafkaRecord} as the output timestamp. 
*/
     public ReadSourceDescriptors<K, V> withCreateTime() {
       return withExtractOutputTimestampFn(
           ReadSourceDescriptors.ExtractOutputTimestampFns.useCreateTime());
     }
 
+    /** Use the {@link WallTime} as the watermark estimator. */
     public ReadSourceDescriptors<K, V> withWallTimeWatermarkEstimator() {
       return withCreatWatermarkEstimatorFn(
           state -> {
             return new WallTime(state);
           });
     }
 
+    /** Use the {@link MonotonicallyIncreasing} as the watermark estimator. */
     public ReadSourceDescriptors<K, V> 
withMonotonicallyIncreasingWatermarkEstimator() {
       return withCreatWatermarkEstimatorFn(
           state -> {
             return new MonotonicallyIncreasing(state);
           });
     }
 
+    /** Use the {@link Manual} as the watermark estimator. */
     public ReadSourceDescriptors<K, V> withManualWatermarkEstimator() {
       return withCreatWatermarkEstimatorFn(
           state -> {
             return new Manual(state);
           });
     }
 
-    // If a transactional producer is used and it's desired to only read 
records from committed
-    // transaction, it's recommended to set read_committed. Otherwise, 
read_uncommitted is the
-    // default value.
+    /**
+     * Sets "isolation_level" to "read_committed" in Kafka consumer 
configuration. This is ensures
+     * that the consumer does not read uncommitted messages. Kafka version 
0.11 introduced
+     * transactional writes. Applications requiring end-to-end exactly-once 
semantics should only
+     * read committed messages. See JavaDoc for {@link KafkaConsumer} for more 
description.
+     */
     public ReadSourceDescriptors<K, V> withReadCommitted() {
       return withConsumerConfigUpdates(ImmutableMap.of("isolation.level", 
"read_committed"));
     }
 
+    /**
+     * Enable committing record offset. If {@link #withReadCommitted()} or 
{@link
+     * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set together with {@link 
#commitOffsets()},
+     * {@link #commitOffsets()} will be ignored.
+     */
     public ReadSourceDescriptors<K, V> commitOffsets() {
       return toBuilder().setCommitOffsetEnabled(true).build();
     }
 
+    /**
+     * Set additional configuration for the backend offset consumer. It may be 
required for a
+     * secured Kafka cluster, especially when you see similar WARN log message 
'exception while
+     * fetching latest offset for partition {}. will be retried'.
+     *
+     * <p>In {@link ReadFromKafkaDoFn}, there're two consumers running in the 
backend actually:<br>
+     * 1. the main consumer, which reads data from kafka;<br>
+     * 2. the secondary offset consumer, which is used to estimate backlog, by 
fetching latest
+     * offset;<br>
+     *
+     * <p>By default, offset consumer inherits the configuration from main 
consumer, with an
+     * auto-generated {@link ConsumerConfig#GROUP_ID_CONFIG}. This may not 
work in a secured Kafka
+     * which requires more configurations.

Review comment:
       ```suggestion
        * Set additional configuration for the offset consumer. It may be 
required for a
        * secured Kafka cluster, especially when you see similar WARN log 
message {@code exception while
        * fetching latest offset for partition {}. will be retried}.
        *
        * <p>In {@link ReadFromKafkaDoFn}, there are two consumers running in 
the backend:
        * <ol>
        *   <li>the main consumer which reads data from kafka.
        *   <li>the secondary offset consumer which is used to estimate the 
backlog by fetching the latest offset.
        * </ol>
        *
        * <p>By default, offset consumer inherits the configuration from main 
consumer, with an
        * auto-generated {@link ConsumerConfig#GROUP_ID_CONFIG}. This may not 
work in a secured Kafka
        * which requires additional configuration.
        *
        * <p>See {@link #withConsumerConfigUpdates} for configuring the main 
consumer.
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1306,112 +1315,206 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       return 
toBuilder().setValueDeserializerProvider(deserializerProvider).build();
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+     *
+     * <p>In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
+     * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
+     * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
+     * provide the key coder explicitly.
+     */
     public ReadSourceDescriptors<K, V> withKeyDeserializer(
         Class<? extends Deserializer<K>> keyDeserializer) {
       return 
withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from 
Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize key objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withKeyDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withValueDeserializer(
         Class<? extends Deserializer<V>> valueDeserializer) {
       return 
withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from 
Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize key objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withKeyDeserializer(Class)}.

Review comment:
       ```suggestion
        * <p>Use this method to override the coder inference performed within 
{@link
        * #withKeyDeserializer(Class)}.
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1306,112 +1315,206 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       return 
toBuilder().setValueDeserializerProvider(deserializerProvider).build();
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+     *
+     * <p>In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
+     * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
+     * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
+     * provide the key coder explicitly.
+     */
     public ReadSourceDescriptors<K, V> withKeyDeserializer(
         Class<? extends Deserializer<K>> keyDeserializer) {
       return 
withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from 
Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize key objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withKeyDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withValueDeserializer(
         Class<? extends Deserializer<V>> valueDeserializer) {
       return 
withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from 
Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize key objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withKeyDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withKeyDeserializerAndCoder(
         Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
       return 
withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting value bytes read 
from Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize value objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withValueDeserializer(Class)}.

Review comment:
       ```suggestion
        * <p>Use this method to override the coder inference performed within 
{@link
        * #withValueDeserializer(Class)}.
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1306,112 +1315,206 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       return 
toBuilder().setValueDeserializerProvider(deserializerProvider).build();
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+     *
+     * <p>In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
+     * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
+     * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
+     * provide the key coder explicitly.
+     */
     public ReadSourceDescriptors<K, V> withKeyDeserializer(
         Class<? extends Deserializer<K>> keyDeserializer) {
       return 
withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from 
Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize key objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withKeyDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withValueDeserializer(
         Class<? extends Deserializer<V>> valueDeserializer) {
       return 
withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from 
Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize key objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withKeyDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withKeyDeserializerAndCoder(
         Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
       return 
withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting value bytes read 
from Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize value objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withValueDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withValueDeserializerAndCoder(
         Class<? extends Deserializer<V>> valueDeserializer, Coder<V> 
valueCoder) {
       return 
withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
     }
 
+    /**
+     * A factory to create Kafka {@link Consumer} from consumer configuration. 
This is useful for
+     * supporting another version of Kafka consumer. Default is {@link 
KafkaConsumer}.
+     */
     public ReadSourceDescriptors<K, V> withConsumerFactoryFn(
         SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn) {
       return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
     }
 
+    /**
+     * Update configuration for the backend main consumer. Note that the 
default consumer properties
+     * will not be completely overridden. This method only updates the value 
which has the same key.
+     *
+     * <p>In {@link ReadFromKafkaDoFn}, there're two consumers running in the 
backend actually:<br>
+     * 1. the main consumer, which reads data from kafka;<br>
+     * 2. the secondary offset consumer, which is used to estimate backlog, by 
fetching latest
+     * offset;<br>
+     *
+     * <p>By default, main consumer uses the configuration from {@link
+     * KafkaIOUtils#DEFAULT_CONSUMER_PROPERTIES}.

Review comment:
       ```suggestion
        * Updates configuration for the main consumer. This method merges 
updates from the provided map with      
        * with any prior updates using {@link
        * KafkaIOUtils#DEFAULT_CONSUMER_PROPERTIES} as the starting 
configuration.
        *
        * <p>In {@link ReadFromKafkaDoFn}, there're two consumers running in 
the backend:
        * <ol>
        *   <li>the main consumer which reads data from kafka.
        *   <li>the secondary offset consumer which is used to estimate the 
backlog by fetching the latest offset.
        * </ol>
        *
        * <p>See {@link #withConsumerConfigOverrides} for overriding the 
configuration instead of updating it.
        *
        * <p>See {@link #withOffsetConsumerConfigOverrides} for configuring the 
secondary offset consumer.
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1306,112 +1315,206 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       return 
toBuilder().setValueDeserializerProvider(deserializerProvider).build();
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+     *
+     * <p>In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
+     * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
+     * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
+     * provide the key coder explicitly.
+     */
     public ReadSourceDescriptors<K, V> withKeyDeserializer(
         Class<? extends Deserializer<K>> keyDeserializer) {
       return 
withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from 
Kafka along with a

Review comment:
       This comment seems incorrect, it looks like it should be a clone of the 
key one above but stating that we are deserializing the value.

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1306,112 +1315,206 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       return 
toBuilder().setValueDeserializerProvider(deserializerProvider).build();
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+     *
+     * <p>In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
+     * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
+     * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
+     * provide the key coder explicitly.
+     */
     public ReadSourceDescriptors<K, V> withKeyDeserializer(
         Class<? extends Deserializer<K>> keyDeserializer) {
       return 
withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from 
Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize key objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withKeyDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withValueDeserializer(
         Class<? extends Deserializer<V>> valueDeserializer) {
       return 
withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from 
Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize key objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withKeyDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withKeyDeserializerAndCoder(
         Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
       return 
withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting value bytes read 
from Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize value objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withValueDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withValueDeserializerAndCoder(
         Class<? extends Deserializer<V>> valueDeserializer, Coder<V> 
valueCoder) {
       return 
withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
     }
 
+    /**
+     * A factory to create Kafka {@link Consumer} from consumer configuration. 
This is useful for
+     * supporting another version of Kafka consumer. Default is {@link 
KafkaConsumer}.
+     */
     public ReadSourceDescriptors<K, V> withConsumerFactoryFn(
         SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn) {
       return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
     }
 
+    /**
+     * Update configuration for the backend main consumer. Note that the 
default consumer properties
+     * will not be completely overridden. This method only updates the value 
which has the same key.
+     *
+     * <p>In {@link ReadFromKafkaDoFn}, there're two consumers running in the 
backend actually:<br>
+     * 1. the main consumer, which reads data from kafka;<br>
+     * 2. the secondary offset consumer, which is used to estimate backlog, by 
fetching latest
+     * offset;<br>
+     *
+     * <p>By default, main consumer uses the configuration from {@link
+     * KafkaIOUtils#DEFAULT_CONSUMER_PROPERTIES}.
+     */
     public ReadSourceDescriptors<K, V> withConsumerConfigUpdates(
         Map<String, Object> configUpdates) {
       Map<String, Object> config =
           KafkaIOUtils.updateKafkaProperties(getConsumerConfig(), 
configUpdates);
       return toBuilder().setConsumerConfig(config).build();
     }
 
+    /**
+     * A function to calculate output timestamp for a given {@link 
KafkaRecord}. The default value
+     * is {@link #withProcessingTime()}.
+     */
     public ReadSourceDescriptors<K, V> withExtractOutputTimestampFn(
         SerializableFunction<KafkaRecord<K, V>, Instant> fn) {
       return toBuilder().setExtractOutputTimestampFn(fn).build();
     }
 
+    /**
+     * A function to create a {@link WatermarkEstimator}. The default value is 
{@link
+     * MonotonicallyIncreasing}.
+     */
     public ReadSourceDescriptors<K, V> withCreatWatermarkEstimatorFn(
         SerializableFunction<Instant, WatermarkEstimator<Instant>> fn) {
       return toBuilder().setCreateWatermarkEstimatorFn(fn).build();
     }
 
+    /** Use the log append time as the output timestamp. */
     public ReadSourceDescriptors<K, V> withLogAppendTime() {
       return withExtractOutputTimestampFn(
           ReadSourceDescriptors.ExtractOutputTimestampFns.useLogAppendTime());
     }
 
+    /** Use the processing time as the output timestamp. */
     public ReadSourceDescriptors<K, V> withProcessingTime() {
       return withExtractOutputTimestampFn(
           ReadSourceDescriptors.ExtractOutputTimestampFns.useProcessingTime());
     }
 
+    /** Use the creation time of {@link KafkaRecord} as the output timestamp. 
*/
     public ReadSourceDescriptors<K, V> withCreateTime() {
       return withExtractOutputTimestampFn(
           ReadSourceDescriptors.ExtractOutputTimestampFns.useCreateTime());
     }
 
+    /** Use the {@link WallTime} as the watermark estimator. */
     public ReadSourceDescriptors<K, V> withWallTimeWatermarkEstimator() {
       return withCreatWatermarkEstimatorFn(
           state -> {
             return new WallTime(state);
           });
     }
 
+    /** Use the {@link MonotonicallyIncreasing} as the watermark estimator. */
     public ReadSourceDescriptors<K, V> 
withMonotonicallyIncreasingWatermarkEstimator() {
       return withCreatWatermarkEstimatorFn(
           state -> {
             return new MonotonicallyIncreasing(state);
           });
     }
 
+    /** Use the {@link Manual} as the watermark estimator. */
     public ReadSourceDescriptors<K, V> withManualWatermarkEstimator() {
       return withCreatWatermarkEstimatorFn(
           state -> {
             return new Manual(state);
           });
     }
 
-    // If a transactional producer is used and it's desired to only read 
records from committed
-    // transaction, it's recommended to set read_committed. Otherwise, 
read_uncommitted is the
-    // default value.
+    /**
+     * Sets "isolation_level" to "read_committed" in Kafka consumer 
configuration. This is ensures
+     * that the consumer does not read uncommitted messages. Kafka version 
0.11 introduced
+     * transactional writes. Applications requiring end-to-end exactly-once 
semantics should only
+     * read committed messages. See JavaDoc for {@link KafkaConsumer} for more 
description.
+     */
     public ReadSourceDescriptors<K, V> withReadCommitted() {
       return withConsumerConfigUpdates(ImmutableMap.of("isolation.level", 
"read_committed"));
     }
 
+    /**
+     * Enable committing record offset. If {@link #withReadCommitted()} or 
{@link
+     * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set together with {@link 
#commitOffsets()},
+     * {@link #commitOffsets()} will be ignored.
+     */
     public ReadSourceDescriptors<K, V> commitOffsets() {
       return toBuilder().setCommitOffsetEnabled(true).build();
     }
 
+    /**
+     * Set additional configuration for the backend offset consumer. It may be 
required for a
+     * secured Kafka cluster, especially when you see similar WARN log message 
'exception while
+     * fetching latest offset for partition {}. will be retried'.
+     *
+     * <p>In {@link ReadFromKafkaDoFn}, there're two consumers running in the 
backend actually:<br>
+     * 1. the main consumer, which reads data from kafka;<br>
+     * 2. the secondary offset consumer, which is used to estimate backlog, by 
fetching latest
+     * offset;<br>
+     *
+     * <p>By default, offset consumer inherits the configuration from main 
consumer, with an
+     * auto-generated {@link ConsumerConfig#GROUP_ID_CONFIG}. This may not 
work in a secured Kafka
+     * which requires more configurations.
+     */
     public ReadSourceDescriptors<K, V> withOffsetConsumerConfigOverrides(
         Map<String, Object> offsetConsumerConfig) {
       return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
     }
 
+    /**
+     * Update configuration for the backend main consumer. Note that the 
default consumer properties
+     * will not be completely overridden. This method only updates the value 
which has the same key.
+     *
+     * <p>In {@link ReadFromKafkaDoFn}, there're two consumers running in the 
backend actually:<br>
+     * 1. the main consumer, which reads data from kafka;<br>
+     * 2. the secondary offset consumer, which is used to estimate backlog, by 
fetching latest
+     * offset;<br>
+     *
+     * <p>By default, main consumer uses the configuration from {@link
+     * KafkaIOUtils#DEFAULT_CONSUMER_PROPERTIES}.
+     */
     public ReadSourceDescriptors<K, V> withConsumerConfigOverrides(
         Map<String, Object> consumerConfig) {
       return toBuilder().setConsumerConfig(consumerConfig).build();
     }
 
-    // TODO(BEAM-10320): Create external build transform for 
ReadSourceDescriptors().
     ReadAllFromRow forExternalBuild() {
       return new ReadAllFromRow(this);
     }
 
-    // This transform is used in cross-language case. The input Row should be 
encoded with an
-    // equivalent schema as KafkaSourceDescriptor.
+    /**
+     * A transform that is used in cross-language case. The input Row should 
be encoded with an
+     * equivalent schema as {@link KafkaSourceDescriptor}.

Review comment:
       ```suggestion
        * A transform that is used in cross-language case. The input {@link 
Row} should be encoded with an
        * equivalent schema as {@link KafkaSourceDescriptor}.
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1306,112 +1315,206 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       return 
toBuilder().setValueDeserializerProvider(deserializerProvider).build();
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+     *
+     * <p>In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
+     * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
+     * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
+     * provide the key coder explicitly.
+     */
     public ReadSourceDescriptors<K, V> withKeyDeserializer(
         Class<? extends Deserializer<K>> keyDeserializer) {
       return 
withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from 
Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize key objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withKeyDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withValueDeserializer(
         Class<? extends Deserializer<V>> valueDeserializer) {
       return 
withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting key bytes read from 
Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize key objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withKeyDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withKeyDeserializerAndCoder(
         Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
       return 
withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
     }
 
+    /**
+     * Sets a Kafka {@link Deserializer} for interpreting value bytes read 
from Kafka along with a
+     * {@link Coder} for helping the Beam runner materialize value objects at 
runtime if necessary.
+     *
+     * <p>Use this method only if your pipeline doesn't work with plain {@link
+     * #withValueDeserializer(Class)}.
+     */
     public ReadSourceDescriptors<K, V> withValueDeserializerAndCoder(
         Class<? extends Deserializer<V>> valueDeserializer, Coder<V> 
valueCoder) {
       return 
withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
     }
 
+    /**
+     * A factory to create Kafka {@link Consumer} from consumer configuration. 
This is useful for
+     * supporting another version of Kafka consumer. Default is {@link 
KafkaConsumer}.
+     */
     public ReadSourceDescriptors<K, V> withConsumerFactoryFn(
         SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn) {
       return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
     }
 
+    /**
+     * Update configuration for the backend main consumer. Note that the 
default consumer properties
+     * will not be completely overridden. This method only updates the value 
which has the same key.
+     *
+     * <p>In {@link ReadFromKafkaDoFn}, there're two consumers running in the 
backend actually:<br>
+     * 1. the main consumer, which reads data from kafka;<br>
+     * 2. the secondary offset consumer, which is used to estimate backlog, by 
fetching latest
+     * offset;<br>
+     *
+     * <p>By default, main consumer uses the configuration from {@link
+     * KafkaIOUtils#DEFAULT_CONSUMER_PROPERTIES}.
+     */
     public ReadSourceDescriptors<K, V> withConsumerConfigUpdates(
         Map<String, Object> configUpdates) {
       Map<String, Object> config =
           KafkaIOUtils.updateKafkaProperties(getConsumerConfig(), 
configUpdates);
       return toBuilder().setConsumerConfig(config).build();
     }
 
+    /**
+     * A function to calculate output timestamp for a given {@link 
KafkaRecord}. The default value
+     * is {@link #withProcessingTime()}.
+     */
     public ReadSourceDescriptors<K, V> withExtractOutputTimestampFn(
         SerializableFunction<KafkaRecord<K, V>, Instant> fn) {
       return toBuilder().setExtractOutputTimestampFn(fn).build();
     }
 
+    /**
+     * A function to create a {@link WatermarkEstimator}. The default value is 
{@link
+     * MonotonicallyIncreasing}.
+     */
     public ReadSourceDescriptors<K, V> withCreatWatermarkEstimatorFn(
         SerializableFunction<Instant, WatermarkEstimator<Instant>> fn) {
       return toBuilder().setCreateWatermarkEstimatorFn(fn).build();
     }
 
+    /** Use the log append time as the output timestamp. */
     public ReadSourceDescriptors<K, V> withLogAppendTime() {
       return withExtractOutputTimestampFn(
           ReadSourceDescriptors.ExtractOutputTimestampFns.useLogAppendTime());
     }
 
+    /** Use the processing time as the output timestamp. */
     public ReadSourceDescriptors<K, V> withProcessingTime() {
       return withExtractOutputTimestampFn(
           ReadSourceDescriptors.ExtractOutputTimestampFns.useProcessingTime());
     }
 
+    /** Use the creation time of {@link KafkaRecord} as the output timestamp. 
*/
     public ReadSourceDescriptors<K, V> withCreateTime() {
       return withExtractOutputTimestampFn(
           ReadSourceDescriptors.ExtractOutputTimestampFns.useCreateTime());
     }
 
+    /** Use the {@link WallTime} as the watermark estimator. */
     public ReadSourceDescriptors<K, V> withWallTimeWatermarkEstimator() {
       return withCreatWatermarkEstimatorFn(
           state -> {
             return new WallTime(state);
           });
     }
 
+    /** Use the {@link MonotonicallyIncreasing} as the watermark estimator. */
     public ReadSourceDescriptors<K, V> 
withMonotonicallyIncreasingWatermarkEstimator() {
       return withCreatWatermarkEstimatorFn(
           state -> {
             return new MonotonicallyIncreasing(state);
           });
     }
 
+    /** Use the {@link Manual} as the watermark estimator. */
     public ReadSourceDescriptors<K, V> withManualWatermarkEstimator() {
       return withCreatWatermarkEstimatorFn(
           state -> {
             return new Manual(state);
           });
     }
 
-    // If a transactional producer is used and it's desired to only read 
records from committed
-    // transaction, it's recommended to set read_committed. Otherwise, 
read_uncommitted is the
-    // default value.
+    /**
+     * Sets "isolation_level" to "read_committed" in Kafka consumer 
configuration. This is ensures
+     * that the consumer does not read uncommitted messages. Kafka version 
0.11 introduced
+     * transactional writes. Applications requiring end-to-end exactly-once 
semantics should only
+     * read committed messages. See JavaDoc for {@link KafkaConsumer} for more 
description.
+     */
     public ReadSourceDescriptors<K, V> withReadCommitted() {
       return withConsumerConfigUpdates(ImmutableMap.of("isolation.level", 
"read_committed"));
     }
 
+    /**
+     * Enable committing record offset. If {@link #withReadCommitted()} or 
{@link
+     * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set together with {@link 
#commitOffsets()},
+     * {@link #commitOffsets()} will be ignored.
+     */
     public ReadSourceDescriptors<K, V> commitOffsets() {
       return toBuilder().setCommitOffsetEnabled(true).build();
     }
 
+    /**
+     * Set additional configuration for the backend offset consumer. It may be 
required for a
+     * secured Kafka cluster, especially when you see similar WARN log message 
'exception while
+     * fetching latest offset for partition {}. will be retried'.
+     *
+     * <p>In {@link ReadFromKafkaDoFn}, there're two consumers running in the 
backend actually:<br>
+     * 1. the main consumer, which reads data from kafka;<br>
+     * 2. the secondary offset consumer, which is used to estimate backlog, by 
fetching latest
+     * offset;<br>
+     *
+     * <p>By default, offset consumer inherits the configuration from main 
consumer, with an
+     * auto-generated {@link ConsumerConfig#GROUP_ID_CONFIG}. This may not 
work in a secured Kafka
+     * which requires more configurations.
+     */
     public ReadSourceDescriptors<K, V> withOffsetConsumerConfigOverrides(
         Map<String, Object> offsetConsumerConfig) {
       return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
     }
 
+    /**
+     * Update configuration for the backend main consumer. Note that the 
default consumer properties
+     * will not be completely overridden. This method only updates the value 
which has the same key.
+     *
+     * <p>In {@link ReadFromKafkaDoFn}, there're two consumers running in the 
backend actually:<br>
+     * 1. the main consumer, which reads data from kafka;<br>
+     * 2. the secondary offset consumer, which is used to estimate backlog, by 
fetching latest
+     * offset;<br>
+     *
+     * <p>By default, main consumer uses the configuration from {@link
+     * KafkaIOUtils#DEFAULT_CONSUMER_PROPERTIES}.

Review comment:
       ```suggestion
        * Replaces the configuration for the main consumer.
        *
        * <p>In {@link ReadFromKafkaDoFn}, there are two consumers running in 
the backend:
        * <ol>
        *   <li>the main consumer which reads data from kafka.
        *   <li>the secondary offset consumer which is used to estimate the 
backlog by fetching the latest offset.
        * </ol>
        *
        * <p>By default, main consumer uses the configuration from {@link
        * KafkaIOUtils#DEFAULT_CONSUMER_PROPERTIES}.
        *
        * See {@link #withConsumerConfigUpdates} for updating the configuration 
instead of overriding it.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to