[ https://issues.apache.org/jira/browse/BEAM-9977?focusedWorklogId=450218&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-450218 ]
ASF GitHub Bot logged work on BEAM-9977: ---------------------------------------- Author: ASF GitHub Bot Created on: 24/Jun/20 05:05 Start Date: 24/Jun/20 05:05 Worklog Time Spent: 10m Work Description: boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r444646093 ########## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ########## @@ -0,0 +1,861 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; +import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas; +import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the <a href="https://beam.apache.org/blog/splittable-do-fn/">blog post</a> and <a + * href="https://s.apache.org/beam-fn-api">design doc</a>. The major difference from {@link + * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadViaSDF}. + * + * <h3>Common Kafka Consumer Configurations</h3> + * + * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * <ul> + * <li>{@link ReadViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * <li>{@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * <li>{@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * <li>{@link ReadViaSDF#getKeyCoder()} is the same as {@link KafkaIO.Read#getKeyCoder()}. + * <li>{@link ReadViaSDF#getValueCoder()} is the same as {@link KafkaIO.Read#getValueCoder()}. + * <li>{@link ReadViaSDF#getKeyDeserializerProvider()} is the same as {@link + * KafkaIO.Read#getKeyDeserializerProvider()}. + * <li>{@link ReadViaSDF#getValueDeserializerProvider()} is the same as {@link + * KafkaIO.Read#getValueDeserializerProvider()}. + * <li>{@link ReadViaSDF#isCommitOffsetEnabled()} means the same as {@link + * KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}. + * </ul> + * + * <p>For example, to create a basic {@link ReadViaSDF} transform: + * + * <pre>{@code + * pipeline + * .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1))) + * .apply(KafkaIO.readAll() + * .withBootstrapServers("broker_1:9092,broker_2:9092") + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class)); + * + * Note that the {@code bootstrapServers} can also be populated from {@link KafkaSourceDescription}: + * pipeline + * .apply(Create.of( + * KafkaSourceDescription.of( + * new TopicPartition("topic", 1), + * null, + * null, + * ImmutableList.of("broker_1:9092", "broker_2:9092")) + * .apply(KafkaIO.readAll() + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class)); + * + * }</pre> + * + * <h3>Configurations of {@link ReadViaSDF}</h3> + * + * <p>Except configurations of Kafka Consumer, there are some other configurations which are related + * to processing records. + * + * <p>{@link ReadViaSDF#commitOffsets()} enables committing offset after processing the record. Note + * that if {@code isolation.level} is set to "read_committed" or {@link + * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, the {@link + * ReadViaSDF#commitOffsets()} will be ignored. + * + * <p>{@link ReadViaSDF#withExtractOutputTimestampFn(SerializableFunction)} asks for a function + * which takes a {@link KafkaRecord} as input and outputs outputTimestamp. This function is used to + * produce output timestamp per {@link KafkaRecord}. There are three built-in types: {@link + * ReadViaSDF#withProcessingTime()}, {@link ReadViaSDF#withCreateTime()} and {@link + * ReadViaSDF#withLogAppendTime()}. + * + * <p>For example, to create a {@link ReadViaSDF} with these configurations: + * + * <pre>{@code + * pipeline + * .apply(Create.of( + * KafkaSourceDescription.of( + * new TopicPartition("topic", 1), + * null, + * null, + * ImmutableList.of("broker_1:9092", "broker_2:9092")) + * .apply(KafkaIO.readAll() + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class) + * .withProcessingTime() + * .commitOffsets()); + * + * }</pre> + * + * <h3>Read from {@link KafkaSourceDescription}</h3> + * + * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The element is a {@link + * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} which represents record + * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link OffsetRange} ended with + * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is created. + * + * <h4>Initialize Restriction</h4> + * + * {@link ReadFromKafkaDoFn#initialRestriction(KafkaSourceDescription)} creates an initial range for + * a input element {@link KafkaSourceDescription}. The end of range will be initialized as {@code + * Long.MAX_VALUE}. For the start of the range: + * + * <ul> + * <li>If {@code startReadOffset} in {@link KafkaSourceDescription} is set, use this offset as + * start. + * <li>If {@code startReadTime} in {@link KafkaSourceDescription} is set, seek the start offset + * based on this time. + * <li>Otherwise, the last committed offset + 1 will be returned by {@link + * Consumer#position(TopicPartition)} as the start. + * </ul> + * + * <h4>Initial Split</h4> + * + * <p>There is no initial split for now. + * + * <h4>Checkpoint and Resume Processing</h4> + * + * <p>There are 2 types of checkpoint here: self-checkpoint which invokes by the DoFn and + * system-checkpoint which is issued by the runner via {@link + * org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest}. Every time the + * consumer gets empty response from {@link Consumer#poll(long)}, {@link ReadFromKafkaDoFn} will + * checkpoint at current {@link KafkaSourceDescription} and move to process the next element. These + * deferred elements will be resumed by the runner as soon as possible. + * + * <h4>Progress and Size</h4> + * + * <p>The progress is provided by {@link GrowableOffsetRangeTracker} or per {@link + * KafkaSourceDescription}. For an infinite {@link OffsetRange}, a Kafka {@link Consumer} is used in + * the {@link GrowableOffsetRangeTracker} as the {@link + * GrowableOffsetRangeTracker.RangeEndEstimator} to poll the latest offset. Please refer to {@link + * ReadFromKafkaDoFn.KafkaLatestOffsetEstimator} for details. + * + * <p>The size is computed by {@link ReadFromKafkaDoFn#getSize(KafkaSourceDescription, + * OffsetRange).} A {@link KafkaIOUtils.MovingAvg} is used to track the average size of kafka + * records. + * + * <h4>Track Watermark</h4> + * + * The {@link WatermarkEstimator} is created by {@link #getCreateWatermarkEstimatorFn()}. The + * estimated watermark is computed by this {@link WatermarkEstimator} based on output timestamps + * computed by {@link #getExtractOutputTimestampFn()} (SerializableFunction)}. The default + * configuration is using {@link #withProcessingTime()} as {@code extractTimestampFn} and {@link + * #withMonotonicallyIncreasingWatermarkEstimator()} as {@link WatermarkEstimator}. + */ +@Experimental(Kind.PORTABILITY) +@AutoValue +abstract class ReadViaSDF<K, V, WatermarkEstimatorT extends WatermarkEstimator<Instant>> + extends PTransform<PCollection<KafkaSourceDescription>, PCollection<KafkaRecord<K, V>>> { + + private static final Logger LOG = LoggerFactory.getLogger(ReadViaSDF.class); + + abstract Map<String, Object> getConsumerConfig(); + + @Nullable + abstract Map<String, Object> getOffsetConsumerConfig(); + + @Nullable + abstract DeserializerProvider getKeyDeserializerProvider(); + + @Nullable + abstract DeserializerProvider getValueDeserializerProvider(); + + @Nullable + abstract Coder<K> getKeyCoder(); + + @Nullable + abstract Coder<V> getValueCoder(); + + abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> + getConsumerFactoryFn(); + + @Nullable + abstract SerializableFunction<KafkaRecord<K, V>, Instant> getExtractOutputTimestampFn(); + + @Nullable + abstract SerializableFunction<Instant, WatermarkEstimatorT> getCreateWatermarkEstimatorFn(); + + abstract boolean isCommitOffsetEnabled(); + + @Nullable + abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory(); + + abstract Builder<K, V, WatermarkEstimatorT> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<K, V, WatermarkEstimatorT extends WatermarkEstimator<Instant>> { + abstract Builder<K, V, WatermarkEstimatorT> setConsumerConfig(Map<String, Object> config); + + abstract Builder<K, V, WatermarkEstimatorT> setOffsetConsumerConfig( + Map<String, Object> offsetConsumerConfig); + + abstract Builder<K, V, WatermarkEstimatorT> setConsumerFactoryFn( + SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn); + + abstract Builder<K, V, WatermarkEstimatorT> setKeyDeserializerProvider( + DeserializerProvider deserializerProvider); + + abstract Builder<K, V, WatermarkEstimatorT> setValueDeserializerProvider( + DeserializerProvider deserializerProvider); + + abstract Builder<K, V, WatermarkEstimatorT> setKeyCoder(Coder<K> keyCoder); + + abstract Builder<K, V, WatermarkEstimatorT> setValueCoder(Coder<V> valueCoder); + + abstract Builder<K, V, WatermarkEstimatorT> setExtractOutputTimestampFn( + SerializableFunction<KafkaRecord<K, V>, Instant> fn); + + abstract Builder<K, V, WatermarkEstimatorT> setCreateWatermarkEstimatorFn( + SerializableFunction<Instant, WatermarkEstimatorT> fn); + + abstract Builder<K, V, WatermarkEstimatorT> setCommitOffsetEnabled(boolean commitOffsetEnabled); + + abstract Builder<K, V, WatermarkEstimatorT> setTimestampPolicyFactory( + TimestampPolicyFactory<K, V> policy); + + abstract ReadViaSDF<K, V, WatermarkEstimatorT> build(); + } + + public static <K, V, WatermarkEstimatorT extends WatermarkEstimator<Instant>> + ReadViaSDF<K, V, WatermarkEstimatorT> read() { + return new AutoValue_ReadViaSDF.Builder<K, V, WatermarkEstimatorT>() + .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN) + .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES) + .setCommitOffsetEnabled(false) + .build() + .withProcessingTime() + .withMonotonicallyIncreasingWatermarkEstimator(); + } + + // Note that if the bootstrapServers is set here but also populated with the element, the element + // will override the bootstrapServers from the config. + public ReadViaSDF<K, V, WatermarkEstimatorT> withBootstrapServers(String bootstrapServers) { + return withConsumerConfigUpdates( + ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withKeyDeserializerProvider( + DeserializerProvider<K> deserializerProvider) { + return toBuilder().setKeyDeserializerProvider(deserializerProvider).build(); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withValueDeserializerProvider( + DeserializerProvider<V> deserializerProvider) { + return toBuilder().setValueDeserializerProvider(deserializerProvider).build(); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withKeyDeserializer( + Class<? extends Deserializer<K>> keyDeserializer) { + return withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer)); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withValueDeserializer( + Class<? extends Deserializer<V>> valueDeserializer) { + return withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer)); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withKeyDeserializerAndCoder( + Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) { + return withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build(); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withValueDeserializerAndCoder( + Class<? extends Deserializer<V>> valueDeserializer, Coder<V> valueCoder) { + return withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build(); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withConsumerFactoryFn( + SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) { + return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build(); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withConsumerConfigUpdates( + Map<String, Object> configUpdates) { + Map<String, Object> config = + KafkaIOUtils.updateKafkaProperties(getConsumerConfig(), configUpdates); + return toBuilder().setConsumerConfig(config).build(); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withExtractOutputTimestampFn( + SerializableFunction<KafkaRecord<K, V>, Instant> fn) { + return toBuilder().setExtractOutputTimestampFn(fn).build(); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withCreatWatermarkEstimatorFn( + SerializableFunction<Instant, WatermarkEstimatorT> fn) { + return toBuilder().setCreateWatermarkEstimatorFn(fn).build(); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withLogAppendTime() { + return withExtractOutputTimestampFn(ExtractOutputTimestampFns.useLogAppendTime()); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withProcessingTime() { + return withExtractOutputTimestampFn(ExtractOutputTimestampFns.useProcessingTime()); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withCreateTime() { + return withExtractOutputTimestampFn(ExtractOutputTimestampFns.useCreateTime()); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withWallTimeWatermarkEstimator() { + return withCreatWatermarkEstimatorFn( + state -> { + return (WatermarkEstimatorT) new WallTime(state); + }); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withMonotonicallyIncreasingWatermarkEstimator() { + return withCreatWatermarkEstimatorFn( + state -> { + return (WatermarkEstimatorT) new MonotonicallyIncreasing(state); + }); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withManualWatermarkEstimator() { + return withCreatWatermarkEstimatorFn( + state -> { + return (WatermarkEstimatorT) new Manual(state); + }); + } + + // If a transactional producer is used and it's desired to only read records from committed + // transaction, it's recommended to set read_committed. Otherwise, read_uncommitted is the default + // value. + public ReadViaSDF<K, V, WatermarkEstimatorT> withReadCommitted() { + return withConsumerConfigUpdates(ImmutableMap.of("isolation.level", "read_committed")); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> commitOffsets() { + return toBuilder().setCommitOffsetEnabled(true).build(); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withOffsetConsumerConfigOverrides( + Map<String, Object> offsetConsumerConfig) { + return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build(); + } + + public ReadViaSDF<K, V, WatermarkEstimatorT> withConsumerConfigOverrides( + Map<String, Object> consumerConfig) { + return toBuilder().setConsumerConfig(consumerConfig).build(); + } + + ReadViaSDFExternally forExternalBuild() { + return new ReadViaSDFExternally(this); + } + + private static class ReadViaSDFExternally< + K, V, WatermarkEstimatorT extends WatermarkEstimator<Instant>> + extends PTransform<PCollection<Row>, PCollection<KV<K, V>>> { + + private final ReadViaSDF<K, V, WatermarkEstimatorT> readViaSDF; + + ReadViaSDFExternally(ReadViaSDF read) { + readViaSDF = read; + } + + @Override + public PCollection<KV<K, V>> expand(PCollection<Row> input) { + return input + .apply( + ParDo.of( + new DoFn<Row, KafkaSourceDescription>() { + @ProcessElement + public void processElement( + @Element Row row, OutputReceiver<KafkaSourceDescription> outputReceiver) { + TopicPartition topicPartition = + new TopicPartition( + row.getString(Schemas.TOPIC), row.getInt32(Schemas.PARTITION)); + Instant startReadTime = + row.getInt64(Schemas.START_READ_TIME) != null + ? Instant.ofEpochMilli(row.getInt64(Schemas.START_READ_TIME)) + : null; + outputReceiver.output( + KafkaSourceDescription.of( + topicPartition, + row.getInt64(Schemas.START_READ_OFFSET), + startReadTime, + new ArrayList<>(row.getArray(Schemas.BOOTSTRAP_SERVERS)))); + } + })) + .apply(readViaSDF) + .apply( + ParDo.of( + new DoFn<KafkaRecord<K, V>, KV<K, V>>() { + @ProcessElement + public void processElement( + @Element KafkaRecord element, OutputReceiver<KV<K, V>> outputReceiver) { + outputReceiver.output(element.getKV()); + } + })) + .setCoder(KvCoder.<K, V>of(readViaSDF.getKeyCoder(), readViaSDF.getValueCoder())); + } + } + + ReadViaSDF<K, V, WatermarkEstimatorT> withTimestampPolicyFactory( + TimestampPolicyFactory<K, V> timestampPolicyFactory) { + return toBuilder().setTimestampPolicyFactory(timestampPolicyFactory).build(); + } + + @Override + public PCollection<KafkaRecord<K, V>> expand(PCollection<KafkaSourceDescription> input) { + checkArgument( + ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api"), + "The ReadFromKafkaViaSDF can only used when beam_fn_api is enabled."); + + checkArgument(getKeyDeserializerProvider() != null, "withKeyDeserializer() is required"); + checkArgument(getValueDeserializerProvider() != null, "withValueDeserializer() is required"); + + ConsumerSpEL consumerSpEL = new ConsumerSpEL(); + if (!consumerSpEL.hasOffsetsForTimes()) { + LOG.warn( + "Kafka client version {} is too old. Versions before 0.10.1.0 are deprecated and " + + "may not be supported in next release of Apache Beam. " + + "Please upgrade your Kafka client version.", + AppInfoParser.getVersion()); + } + + if (isCommitOffsetEnabled()) { + if (configuredKafkaCommit()) { + LOG.info( + "Either read_committed or auto_commit is set together with commitOffsetEnabled but you " + + "only need one of them. The commitOffsetEnabled is going to be ignored"); + } + } + + if (getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) { + LOG.warn( + "The bootstrapServers is not set. Then it must be populated through KafkaSourceDescription during runtime. Otherwise, the pipeline will fail."); + } + + CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry(); + Coder<K> keyCoder = getKeyCoder(coderRegistry); + Coder<V> valueCoder = getValueCoder(coderRegistry); + Coder<KafkaRecord<K, V>> outputCoder = KafkaRecordCoder.of(keyCoder, valueCoder); + PCollection<KafkaRecord<K, V>> output = + input + .apply(ParDo.of(new ReadFromKafkaDoFn<K, V, WatermarkEstimatorT>(this))) + .setCoder(outputCoder); + // TODO(BEAM-10123): Add CommitOffsetTransform to expansion. + if (isCommitOffsetEnabled() && !configuredKafkaCommit()) { + throw new IllegalStateException("Offset committed is not supported yet"); + } + return output; + } + + private Coder<K> getKeyCoder(CoderRegistry coderRegistry) { + return (getKeyCoder() != null) + ? getKeyCoder() + : getKeyDeserializerProvider().getCoder(coderRegistry); + } + + private Coder<V> getValueCoder(CoderRegistry coderRegistry) { + return (getValueCoder() != null) + ? getValueCoder() + : getValueDeserializerProvider().getCoder(coderRegistry); + } + + private boolean configuredKafkaCommit() { + return getConsumerConfig().get("isolation.level") == "read_committed" + || Boolean.TRUE.equals(getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); + } + + static class ExtractOutputTimestampFns<K, V> { + public static <K, V> SerializableFunction<KafkaRecord<K, V>, Instant> useProcessingTime() { + return record -> Instant.now(); + } + + public static <K, V> SerializableFunction<KafkaRecord<K, V>, Instant> useCreateTime() { + return record -> { + checkArgument( + record.getTimestampType() == KafkaTimestampType.CREATE_TIME, + "Kafka record's timestamp is not 'CREATE_TIME' " + + "(topic: %s, partition %s, offset %s, timestamp type '%s')", + record.getTopic(), + record.getPartition(), + record.getOffset(), + record.getTimestampType()); + return new Instant(record.getTimestamp()); + }; + } + + public static <K, V> SerializableFunction<KafkaRecord<K, V>, Instant> useLogAppendTime() { + return record -> { + checkArgument( + record.getTimestampType() == KafkaTimestampType.LOG_APPEND_TIME, + "Kafka record's timestamp is not 'LOG_APPEND_TIME' " + + "(topic: %s, partition %s, offset %s, timestamp type '%s')", + record.getTopic(), + record.getPartition(), + record.getOffset(), + record.getTimestampType()); + return new Instant(record.getTimestamp()); + }; + } + } + + /** + * A SplittableDoFn which reads from {@link KafkaSourceDescription} and outputs {@link + * KafkaRecord}. By default, a {@link MonotonicallyIncreasing} watermark estimator is used to + * track watermark. + */ + @VisibleForTesting + @UnboundedPerElement + static class ReadFromKafkaDoFn<K, V, WatermarkEstimatorT extends WatermarkEstimator<Instant>> + extends DoFn<KafkaSourceDescription, KafkaRecord<K, V>> { + + ReadFromKafkaDoFn(ReadViaSDF transform) { + this.consumerConfig = transform.getConsumerConfig(); + this.offsetConsumerConfig = transform.getOffsetConsumerConfig(); + this.keyDeserializerProvider = transform.getKeyDeserializerProvider(); + this.valueDeserializerProvider = transform.getValueDeserializerProvider(); + this.consumerFactoryFn = transform.getConsumerFactoryFn(); + this.extractOutputTimestampFn = transform.getExtractOutputTimestampFn(); + this.createWatermarkEstimatorFn = transform.getCreateWatermarkEstimatorFn(); + this.timestampPolicyFactory = transform.getTimestampPolicyFactory(); + } + + private final Map<String, Object> offsetConsumerConfig; + + private final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> + consumerFactoryFn; + private final SerializableFunction<KafkaRecord<K, V>, Instant> extractOutputTimestampFn; + private final SerializableFunction<Instant, WatermarkEstimatorT> createWatermarkEstimatorFn; + private final TimestampPolicyFactory<K, V> timestampPolicyFactory; + + // Variables that are initialized when bundle is started and closed when FinishBundle is called. + private transient ConsumerSpEL consumerSpEL = null; + private transient Deserializer<K> keyDeserializerInstance = null; + private transient Deserializer<V> valueDeserializerInstance = null; + + private transient HashMap<TopicPartition, KafkaIOUtils.MovingAvg> avgRecordSize; + private transient HashMap<TopicPartition, KafkaIOUtils.MovingAvg> avgOffsetGap; + + private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000); + + @VisibleForTesting final DeserializerProvider keyDeserializerProvider; + @VisibleForTesting final DeserializerProvider valueDeserializerProvider; + @VisibleForTesting final Map<String, Object> consumerConfig; + + /** + * A {@link GrowableOffsetRangeTracker.RangeEndEstimator} which uses a Kafka {@link Consumer} to + * fetch backlog. + */ + private static class KafkaLatestOffsetEstimator + implements GrowableOffsetRangeTracker.RangeEndEstimator { + + private final Consumer<byte[], byte[]> offsetConsumer; + private final TopicPartition topicPartition; + private final ConsumerSpEL consumerSpEL; + private final Supplier<Long> memorizedBacklog; + + KafkaLatestOffsetEstimator( + Consumer<byte[], byte[]> offsetConsumer, TopicPartition topicPartition) { + this.offsetConsumer = offsetConsumer; + this.topicPartition = topicPartition; + this.consumerSpEL = new ConsumerSpEL(); + this.consumerSpEL.evaluateAssign( + this.offsetConsumer, ImmutableList.of(this.topicPartition)); + memorizedBacklog = + Suppliers.memoizeWithExpiration( + () -> { + consumerSpEL.evaluateSeek2End(offsetConsumer, topicPartition); + return offsetConsumer.position(topicPartition); + }, + 5, + TimeUnit.SECONDS); + } + + @Override + protected void finalize() { + try { + Closeables.close(offsetConsumer, true); + } catch (Exception anyException) { + LOG.warn("Failed to close offset consumer for {}", topicPartition); + } + } + + @Override + public long estimate() { + return memorizedBacklog.get(); + } + } + + @GetInitialRestriction + public OffsetRange initialRestriction(@Element KafkaSourceDescription kafkaSourceDescription) { + Map<String, Object> updatedConsumerConfig = + overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescription); + try (Consumer<byte[], byte[]> offsetConsumer = + consumerFactoryFn.apply( + KafkaIOUtils.getOffsetConsumerConfig( + "initialOffset", offsetConsumerConfig, updatedConsumerConfig))) { + consumerSpEL.evaluateAssign( + offsetConsumer, ImmutableList.of(kafkaSourceDescription.getTopicPartition())); + long startOffset; + if (kafkaSourceDescription.getStartReadOffset() != null) { + startOffset = kafkaSourceDescription.getStartReadOffset(); + } else if (kafkaSourceDescription.getStartReadTime() != null) { + startOffset = + consumerSpEL.offsetForTime( + offsetConsumer, + kafkaSourceDescription.getTopicPartition(), + kafkaSourceDescription.getStartReadTime()); + } else { + startOffset = offsetConsumer.position(kafkaSourceDescription.getTopicPartition()); + } + return new OffsetRange(startOffset, Long.MAX_VALUE); + } + } + + @GetInitialWatermarkEstimatorState + public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) { + return currentElementTimestamp; + } + + @NewWatermarkEstimator + public WatermarkEstimatorT newWatermarkEstimator( + @WatermarkEstimatorState Instant watermarkEstimatorState) { + return createWatermarkEstimatorFn.apply(watermarkEstimatorState); + } + + @GetSize + public double getSize( + @Element KafkaSourceDescription kafkaSourceDescription, + @Restriction OffsetRange offsetRange) + throws Exception { + double numOfRecords = + ((HasProgress) restrictionTracker(kafkaSourceDescription, offsetRange)) + .getProgress() + .getWorkRemaining(); + + // Before processing elements, we don't have a good estimated size of records and offset gap. + if (avgOffsetGap.containsKey(kafkaSourceDescription.getTopicPartition())) { + numOfRecords = + numOfRecords / (1 + avgOffsetGap.get(kafkaSourceDescription.getTopicPartition()).get()); + } + return (!avgRecordSize.containsKey(kafkaSourceDescription.getTopicPartition()) + ? 1 Review comment: I don't think it's easy to come up with a default size here. There is no avgSize and avgGap only when initial sizing, which is not really important for streaming. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 450218) Time Spent: 16h 50m (was: 16h 40m) > Build Kafka Read on top of Java SplittableDoFn > ---------------------------------------------- > > Key: BEAM-9977 > URL: https://issues.apache.org/jira/browse/BEAM-9977 > Project: Beam > Issue Type: New Feature > Components: io-java-kafka > Reporter: Boyuan Zhang > Assignee: Boyuan Zhang > Priority: P2 > Time Spent: 16h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)