[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r467317077 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -1306,112 +1315,206 @@ public void populateDisplayData(DisplayData.Builder builder) { return toBuilder().setValueDeserializerProvider(deserializerProvider).build(); } +/** + * Sets a Kafka {@link Deserializer} to interpret key bytes read from Kafka. + * + * In addition, Beam also needs a {@link Coder} to serialize and deserialize key objects at + * runtime. KafkaIO tries to infer a coder for the key based on the {@link Deserializer} class, + * however in case that fails, you can use {@link #withKeyDeserializerAndCoder(Class, Coder)} to + * provide the key coder explicitly. + */ public ReadSourceDescriptors withKeyDeserializer( Class> keyDeserializer) { return withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer)); } +/** + * Sets a Kafka {@link Deserializer} for interpreting key bytes read from Kafka along with a Review comment: Done, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r466683752 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java ## @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors; +import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; +import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A SplittableDoFn which reads from {@link KafkaSourceDescriptor} and outputs {@link KafkaRecord}. Review comment: I would prefer that we leave the documentation here and link it from KafkaIO.ReadSourceDescriptors. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r466680059 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -198,6 +209,102 @@ *... * } * + * Read from Kafka as a {@link DoFn} + * + * {@link ReadSourceDescriptors} is the {@link PTransform} that takes a PCollection of {@link Review comment: The order is explained in the javadoc of ReadFromKafkaDoFn, [Initial Restriction] section. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r466673488 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -1051,33 +1198,352 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - - - private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class); - /** - * Returns a new config map which is merge of current config and updates. Verifies the updates do - * not includes ignored properties. + * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more information on usage and + * configuration. */ - private static Map updateKafkaProperties( - Map currentConfig, - Map ignoredProperties, - Map updates) { + @Experimental(Kind.PORTABILITY) + @AutoValue + public abstract static class ReadSourceDescriptors + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(ReadSourceDescriptors.class); + +abstract Map getConsumerConfig(); + +@Nullable +abstract Map getOffsetConsumerConfig(); + +@Nullable +abstract DeserializerProvider getKeyDeserializerProvider(); + +@Nullable +abstract DeserializerProvider getValueDeserializerProvider(); + +@Nullable +abstract Coder getKeyCoder(); + +@Nullable +abstract Coder getValueCoder(); + +abstract SerializableFunction, Consumer> +getConsumerFactoryFn(); + +@Nullable +abstract SerializableFunction, Instant> getExtractOutputTimestampFn(); + +@Nullable +abstract SerializableFunction> +getCreateWatermarkEstimatorFn(); + +abstract boolean isCommitOffsetEnabled(); + +@Nullable +abstract TimestampPolicyFactory getTimestampPolicyFactory(); + +abstract ReadSourceDescriptors.Builder toBuilder(); + +@AutoValue.Builder +abstract static class Builder { + abstract ReadSourceDescriptors.Builder setConsumerConfig(Map config); + + abstract ReadSourceDescriptors.Builder setOffsetConsumerConfig( + Map offsetConsumerConfig); + + abstract ReadSourceDescriptors.Builder setConsumerFactoryFn( + SerializableFunction, Consumer> consumerFactoryFn); + + abstract ReadSourceDescriptors.Builder setKeyDeserializerProvider( + DeserializerProvider deserializerProvider); + + abstract ReadSourceDescriptors.Builder setValueDeserializerProvider( + DeserializerProvider deserializerProvider); + + abstract ReadSourceDescriptors.Builder setKeyCoder(Coder keyCoder); + + abstract ReadSourceDescriptors.Builder setValueCoder(Coder valueCoder); + + abstract ReadSourceDescriptors.Builder setExtractOutputTimestampFn( + SerializableFunction, Instant> fn); + + abstract ReadSourceDescriptors.Builder setCreateWatermarkEstimatorFn( + SerializableFunction> fn); + + abstract ReadSourceDescriptors.Builder setCommitOffsetEnabled( + boolean commitOffsetEnabled); + + abstract ReadSourceDescriptors.Builder setTimestampPolicyFactory( + TimestampPolicyFactory policy); + + abstract ReadSourceDescriptors build(); +} + +public static ReadSourceDescriptors read() { + return new AutoValue_KafkaIO_ReadSourceDescriptors.Builder() + .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN) + .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES) + .setCommitOffsetEnabled(false) + .build() + .withProcessingTime() + .withMonotonicallyIncreasingWatermarkEstimator(); +} + +// Note that if the bootstrapServers is set here but also populated with the element, the +// element +// will override the bootstrapServers from the config. +public ReadSourceDescriptors withBootstrapServers(String bootstrapServers) { + return withConsumerConfigUpdates( + ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); +} + +public ReadSourceDescriptors withKeyDeserializerProvider( +DeserializerProvider deserializerProvider) { + return toBuilder().setKeyDeserializerProvider(deserializerProvider).build(); +} + +public ReadSourceDescriptors withValueDeserializerProvider( +DeserializerProvider deserializerProvider) { + return toBuilder().setValueDeserializerProvider(deserializerProvider).build(); +} + +public ReadSourceDescriptors withKeyDeserializer( +Class> keyDeserializer) { + return withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer)); +} + +public ReadSourceDescriptors withValueDeserializer( +Class> valueDeserializer) { + return withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer)); +} + +public
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r46972 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -1051,33 +1198,352 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - - - private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class); - /** - * Returns a new config map which is merge of current config and updates. Verifies the updates do - * not includes ignored properties. + * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more information on usage and + * configuration. */ - private static Map updateKafkaProperties( - Map currentConfig, - Map ignoredProperties, - Map updates) { + @Experimental(Kind.PORTABILITY) + @AutoValue + public abstract static class ReadSourceDescriptors + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(ReadSourceDescriptors.class); + +abstract Map getConsumerConfig(); Review comment: Most of the documentations are at KafkaIO level. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r464667015 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -906,19 +926,89 @@ public void setValueDeserializer(String valueDeserializer) { Coder keyCoder = getKeyCoder(coderRegistry); Coder valueCoder = getValueCoder(coderRegistry); - // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. - Unbounded> unbounded = - org.apache.beam.sdk.io.Read.from( - toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource()); + // The Read will be expanded into SDF transform when "beam_fn_api" is enabled and + // "beam_fn_api_use_deprecated_read" is not enabled. + if (!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api") + || ExperimentalOptions.hasExperiment( + input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")) { +// Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. +Unbounded> unbounded = +org.apache.beam.sdk.io.Read.from( + toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource()); + +PTransform>> transform = unbounded; + +if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) { + transform = + unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords()); +} - PTransform>> transform = unbounded; +return input.getPipeline().apply(transform); + } else { +ReadViaSDF readTransform = +ReadViaSDF.read() +.withConsumerConfigOverrides(getConsumerConfig()) +.withOffsetConsumerConfigOverrides(getOffsetConsumerConfig()) +.withConsumerFactoryFn(getConsumerFactoryFn()) +.withKeyDeserializerProvider(getKeyDeserializerProvider()) +.withValueDeserializerProvider(getValueDeserializerProvider()) +.withManualWatermarkEstimator() +.withTimestampPolicyFactory(getTimestampPolicyFactory()); +if (isCommitOffsetsInFinalizeEnabled()) { + readTransform = readTransform.commitOffsets(); +} - if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) { -transform = - unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords()); +return input +.getPipeline() +.apply(Impulse.create()) +.apply(ParDo.of(new GenerateKafkaSourceDescription(this))) +.setCoder(SerializableCoder.of(KafkaSourceDescription.class)) +.apply(readTransform) +.setCoder(KafkaRecordCoder.of(keyCoder, valueCoder)); Review comment: The coder needs to be set when in x-lang case. It seems like there is something not correct when x-lang expand the transform. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r445242837 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -198,6 +213,154 @@ *... * } * + * Read from Kafka as a {@link DoFn} + * + * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link + * KafkaSourceDescription} as input and outputs a PCollection of {@link KafkaRecord}. The core + * implementation is based on {@code SplittableDoFn}. For more details about the concept of {@code + * SplittableDoFn}, please refer to the https://beam.apache.org/blog/splittable-do-fn/;>blog post and https://s.apache.org/beam-fn-api;>design doc. The major difference from {@link + * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadAll}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadAll#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadAll#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadAll#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadAll#getKeyCoder()} is the same as {@link KafkaIO.Read#getKeyCoder()}. + * {@link ReadAll#getValueCoder()} is the same as {@link KafkaIO.Read#getValueCoder()}. + * {@link ReadAll#getKeyDeserializerProvider()} is the same as {@link + * KafkaIO.Read#getKeyDeserializerProvider()}. + * {@link ReadAll#getValueDeserializerProvider()} is the same as {@link + * KafkaIO.Read#getValueDeserializerProvider()}. + * {@link ReadAll#isCommitOffsetEnabled()} means the same as {@link + * KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}. + * + * + * For example, to create a basic {@link ReadAll} transform: + * + * {@code + * pipeline + * .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1))) + * .apply(KafkaIO.readAll() + * .withBootstrapServers("broker_1:9092,broker_2:9092") + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class)); + * + * Note that the {@code bootstrapServers} can also be populated from {@link KafkaSourceDescription}: + * pipeline + * .apply(Create.of( + *KafkaSourceDescription.of( + * new TopicPartition("topic", 1), + * null, + * null, + * ImmutableList.of("broker_1:9092", "broker_2:9092")) + * .apply(KafkaIO.readAll() + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class)); + * + * } + * + * Configurations of {@link ReadAll} + * + * Except configurations of Kafka Consumer, there are some other configurations which are related + * to processing records. + * + * {@link ReadAll#commitOffsets()} enables committing offset after processing the record. Note + * that if {@code isolation.level} is set to "read_committed" or {@link + * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, the {@link + * ReadAll#commitOffsets()} will be ignored. + * + * {@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} asks for a function which + * takes a {@link KafkaRecord} as input and outputs outputTimestamp. This function is used to + * produce output timestamp per {@link KafkaRecord}. There are three built-in types: {@link + * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link + * ReadAll#withLogAppendTime()}. + * + * For example, to create a {@link ReadAll} with these configurations: + * + * {@code + * pipeline + * .apply(Create.of( + *KafkaSourceDescription.of( + * new TopicPartition("topic", 1), + * null, + * null, + * ImmutableList.of("broker_1:9092", "broker_2:9092")) + * .apply(KafkaIO.readAll() + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class) + * .withProcessingTime() + * .commitOffsets()); + * + * } + * + * Read from {@link KafkaSourceDescription} + * + * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The element is a {@link + * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} which represents record + * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link OffsetRange} ended with + * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r445236767 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.kafka.common.TopicPartition; +import org.joda.time.Instant; + +/** + * An AutoValue object which represents a Kafka source description. Note that this object should be + * encoded/decoded with equivalent {@link Schema} as a {@link Row} when crossing the wire. + */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class KafkaSourceDescription implements Serializable { Review comment: It seems like `Descriptor` makes more sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r445235681 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -1051,33 +1261,341 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - - - private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class); - /** - * Returns a new config map which is merge of current config and updates. Verifies the updates do - * not includes ignored properties. + * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more information on usage and + * configuration. */ - private static Map updateKafkaProperties( - Map currentConfig, - Map ignoredProperties, - Map updates) { + @Experimental(Kind.PORTABILITY) + @AutoValue + public abstract static class ReadAll + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(ReadAll.class); + +abstract Map getConsumerConfig(); + +@Nullable +abstract Map getOffsetConsumerConfig(); + +@Nullable +abstract DeserializerProvider getKeyDeserializerProvider(); + +@Nullable +abstract DeserializerProvider getValueDeserializerProvider(); + +@Nullable +abstract Coder getKeyCoder(); + +@Nullable +abstract Coder getValueCoder(); + +abstract SerializableFunction, Consumer> +getConsumerFactoryFn(); + +@Nullable +abstract SerializableFunction, Instant> getExtractOutputTimestampFn(); + +@Nullable +abstract SerializableFunction> +getCreateWatermarkEstimatorFn(); + +abstract boolean isCommitOffsetEnabled(); + +@Nullable +abstract TimestampPolicyFactory getTimestampPolicyFactory(); + +abstract ReadAll.Builder toBuilder(); + +@AutoValue.Builder +abstract static class Builder { + abstract ReadAll.Builder setConsumerConfig(Map config); + + abstract ReadAll.Builder setOffsetConsumerConfig( + Map offsetConsumerConfig); + + abstract ReadAll.Builder setConsumerFactoryFn( + SerializableFunction, Consumer> consumerFactoryFn); + + abstract ReadAll.Builder setKeyDeserializerProvider( + DeserializerProvider deserializerProvider); + + abstract ReadAll.Builder setValueDeserializerProvider( + DeserializerProvider deserializerProvider); + + abstract ReadAll.Builder setKeyCoder(Coder keyCoder); + + abstract ReadAll.Builder setValueCoder(Coder valueCoder); + + abstract ReadAll.Builder setExtractOutputTimestampFn( + SerializableFunction, Instant> fn); + + abstract ReadAll.Builder setCreateWatermarkEstimatorFn( + SerializableFunction> fn); + + abstract ReadAll.Builder setCommitOffsetEnabled(boolean commitOffsetEnabled); + + abstract ReadAll.Builder setTimestampPolicyFactory(TimestampPolicyFactory policy); + + abstract ReadAll build(); +} -for (String key : updates.keySet()) { +public static ReadAll read() { + return new AutoValue_KafkaIO_ReadAll.Builder() + .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN) + .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES) + .setCommitOffsetEnabled(false) + .build() + .withProcessingTime() + .withMonotonicallyIncreasingWatermarkEstimator(); +} + +// Note that if the bootstrapServers is set here but also populated with the element, the +// element +// will override the bootstrapServers from the config. +public ReadAll withBootstrapServers(String bootstrapServers) { + return withConsumerConfigUpdates( + ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); +} + +public ReadAll withKeyDeserializerProvider(DeserializerProvider deserializerProvider) { + return toBuilder().setKeyDeserializerProvider(deserializerProvider).build(); +} + +public ReadAll withValueDeserializerProvider( +DeserializerProvider deserializerProvider) { + return toBuilder().setValueDeserializerProvider(deserializerProvider).build(); +} + +public ReadAll withKeyDeserializer(Class> keyDeserializer) { + return withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer)); +} + +public ReadAll withValueDeserializer(Class> valueDeserializer) { + return withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer)); +} + +public ReadAll withKeyDeserializerAndCoder( +Class> keyDeserializer, Coder keyCoder) { + return withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build(); +} + +public ReadAll withValueDeserializerAndCoder( +Class> valueDeserializer, Coder valueCoder) { + return
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r445226123 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.kafka.common.TopicPartition; +import org.joda.time.Instant; + +/** + * An AutoValue object which represents a Kafka source description. Note that this object should be + * encoded/decoded with equivalent {@link Schema} as a {@link Row} when crossing the wire. + */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class KafkaSourceDescription implements Serializable { + @SchemaFieldName("topic") + abstract String getTopic(); + + @SchemaFieldName("partition") + abstract Integer getPartition(); + + @SchemaFieldName("start_read_offset") + @Nullable + abstract Long getStartReadOffset(); + + @SchemaFieldName("start_read_time") + @Nullable + abstract Instant getStartReadTime(); + + @SchemaFieldName("bootstrapServers") + @Nullable + abstract List getBootStrapServers(); + + private TopicPartition topicPartition = null; + + public TopicPartition getTopicPartition() { Review comment: Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r445225947 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -906,19 +1082,91 @@ public void setValueDeserializer(String valueDeserializer) { Coder keyCoder = getKeyCoder(coderRegistry); Coder valueCoder = getValueCoder(coderRegistry); - // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. - Unbounded> unbounded = - org.apache.beam.sdk.io.Read.from( - toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource()); + // The Read will be expanded into SDF transform when "beam_fn_api" is enabled and + // "beam_fn_api_use_deprecated_read" is not enabled. + if (!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api") + || ExperimentalOptions.hasExperiment( + input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")) { +// Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. +Unbounded> unbounded = +org.apache.beam.sdk.io.Read.from( + toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource()); + +PTransform>> transform = unbounded; + +if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) { + transform = + unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords()); +} - PTransform>> transform = unbounded; +return input.getPipeline().apply(transform); + } + ReadAll readTransform = + ReadAll.read() + .withConsumerConfigOverrides(getConsumerConfig()) + .withOffsetConsumerConfigOverrides(getOffsetConsumerConfig()) + .withConsumerFactoryFn(getConsumerFactoryFn()) + .withKeyDeserializerProvider(getKeyDeserializerProvider()) + .withValueDeserializerProvider(getValueDeserializerProvider()) + .withManualWatermarkEstimator() + .withTimestampPolicyFactory(getTimestampPolicyFactory()); + if (isCommitOffsetsInFinalizeEnabled()) { +readTransform = readTransform.commitOffsets(); + } + PCollection output = + input + .getPipeline() + .apply(Impulse.create()) + .apply(ParDo.of(new GenerateKafkaSourceDescription(this))); + try { + output.setCoder(KafkaSourceDescription.getCoder(input.getPipeline().getSchemaRegistry())); Review comment: It works with `setSchema` but I want to make it explicitly because it's possible that an user writes a DoFn which produces `KafkaSourceDescription`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r445016902 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -1051,33 +1261,341 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - - - private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class); - /** - * Returns a new config map which is merge of current config and updates. Verifies the updates do - * not includes ignored properties. + * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more information on usage and + * configuration. */ - private static Map updateKafkaProperties( - Map currentConfig, - Map ignoredProperties, - Map updates) { + @Experimental(Kind.PORTABILITY) + @AutoValue + public abstract static class ReadAll Review comment: Thanks for starting the discussion. If taking x-lang usage into consideration, `Read` is not a good choice as input for these DoFn. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r444654119 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,861 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; +import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas; +import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the https://beam.apache.org/blog/splittable-do-fn/;>blog post and https://s.apache.org/beam-fn-api;>design doc. The major difference from {@link + * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time.
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r444648850 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,861 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; +import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas; +import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the https://beam.apache.org/blog/splittable-do-fn/;>blog post and https://s.apache.org/beam-fn-api;>design doc. The major difference from {@link + * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time.
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r444648197 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,861 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; +import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas; +import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the https://beam.apache.org/blog/splittable-do-fn/;>blog post and https://s.apache.org/beam-fn-api;>design doc. The major difference from {@link + * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time.
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r444646093 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,861 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; +import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas; +import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the https://beam.apache.org/blog/splittable-do-fn/;>blog post and https://s.apache.org/beam-fn-api;>design doc. The major difference from {@link + * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time.
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r444564026 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,861 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; +import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas; +import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the https://beam.apache.org/blog/splittable-do-fn/;>blog post and https://s.apache.org/beam-fn-api;>design doc. The major difference from {@link + * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time.
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r34591 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -295,21 +301,32 @@ /** * Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka configuration * should set with {@link Read#withBootstrapServers(String)} and {@link Read#withTopics(List)}. - * Other optional settings include key and value {@link Deserializer}s, custom timestamp and + * Other optional settings include key and value {@link Deserializer}s, custom timestamp, * watermark functions. */ public static Read read() { return new AutoValue_KafkaIO_Read.Builder() .setTopics(new ArrayList<>()) .setTopicPartitions(new ArrayList<>()) -.setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN) -.setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES) +.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN) +.setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES) .setMaxNumRecords(Long.MAX_VALUE) .setCommitOffsetsInFinalizeEnabled(false) .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()) .build(); } + /** + * Creates an uninitialized {@link ReadViaSDF} {@link PTransform}. Different from {@link Read}, + * setting up {@code topics} and {@code bootstrapServers} is not required during construction + * time. But the {@code bootstrapServers} still can be configured {@link + * ReadViaSDF#withBootstrapServers(String)}. Please refer to {@link ReadViaSDF} for more details. + */ + public static > + ReadViaSDF readAll() { +return ReadViaSDF.read(); Review comment: The `WatermarkEstimatorT` is used when defining `createWatermarkEstimatorFn` and when `@NewWatermarkEstimator` is called. I understand that we can always use `WatermarkEstimator` as the type, I thought it would be better to make the type explicitly, This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r443937048 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,861 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; +import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas; +import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the https://beam.apache.org/blog/splittable-do-fn/;>blog post and https://s.apache.org/beam-fn-api;>design doc. The major difference from {@link + * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time.
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r443052453 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,742 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link + * KafkaSourceDescriptionSchemas} which represents Kafka source description as input and outputs a + * PCollection of {@link KafkaRecord}. The core implementation is based on {@code SplittableDoFn}. + * For more details about the concept of {@code SplittableDoFn}, please refer to the beam blog post: + * https://beam.apache.org/blog/splittable-do-fn/ and design doc:https://s.apache.org/beam-fn-api. + * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source + * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, + * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, + * the pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadViaSDF#getKeyCoder()} is the same as {@link KafkaIO.Read#getKeyCoder()}. + * {@link ReadViaSDF#getValueCoder()} is the same as {@link KafkaIO.Read#getValueCoder()}. + * {@link
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r443040694 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,742 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link + * KafkaSourceDescriptionSchemas} which represents Kafka source description as input and outputs a + * PCollection of {@link KafkaRecord}. The core implementation is based on {@code SplittableDoFn}. + * For more details about the concept of {@code SplittableDoFn}, please refer to the beam blog post: + * https://beam.apache.org/blog/splittable-do-fn/ and design doc:https://s.apache.org/beam-fn-api. + * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source + * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, + * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, + * the pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadViaSDF#getKeyCoder()} is the same as {@link KafkaIO.Read#getKeyCoder()}. + * {@link ReadViaSDF#getValueCoder()} is the same as {@link KafkaIO.Read#getValueCoder()}. + * {@link
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r442980355 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,742 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link + * KafkaSourceDescriptionSchemas} which represents Kafka source description as input and outputs a + * PCollection of {@link KafkaRecord}. The core implementation is based on {@code SplittableDoFn}. + * For more details about the concept of {@code SplittableDoFn}, please refer to the beam blog post: + * https://beam.apache.org/blog/splittable-do-fn/ and design doc:https://s.apache.org/beam-fn-api. + * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source + * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, + * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, + * the pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadViaSDF#getKeyCoder()} is the same as {@link KafkaIO.Read#getKeyCoder()}. + * {@link ReadViaSDF#getValueCoder()} is the same as {@link KafkaIO.Read#getValueCoder()}. + * {@link
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r442951203 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -681,11 +696,13 @@ public void setValueDeserializer(String valueDeserializer) { } /** - * Sets {@link TimestampPolicy} to {@link TimestampPolicyFactory.LogAppendTimePolicy}. The - * policy assigns Kafka's log append time (server side ingestion time) to each record. The - * watermark for each Kafka partition is the timestamp of the last record read. If a partition - * is idle, the watermark advances to couple of seconds behind wall time. Every record consumed - * from Kafka is expected to have its timestamp type set to 'LOG_APPEND_TIME'. + * Sets {@link TimestampPolicy} to {@link TimestampPolicyFactory.LogAppendTimePolicy} which is + * used when beam_fn_api is disabled, and sets {@code extractOutputTimestampFn} as {@link + * ReadViaSDF.ExtractOutputTimestampFns#withLogAppendTime()}, which is used when beam_fn_api is + * enabled. The policy assigns Kafka's log append time (server side ingestion time) to each + * record. The watermark for each Kafka partition is the timestamp of the last record read. If a + * partition is idle, the watermark advances to couple of seconds behind wall time. Every record + * consumed from Kafka is expected to have its timestamp type set to 'LOG_APPEND_TIME'. Review comment: The ` which is used when beam_fn_api is disabled` means `TimestampPolicy` is used when beam_fn_api is disabled. I'll say it explicitly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r442579140 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,742 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link + * KafkaSourceDescriptionSchemas} which represents Kafka source description as input and outputs a + * PCollection of {@link KafkaRecord}. The core implementation is based on {@code SplittableDoFn}. + * For more details about the concept of {@code SplittableDoFn}, please refer to the beam blog post: + * https://beam.apache.org/blog/splittable-do-fn/ and design doc:https://s.apache.org/beam-fn-api. + * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source + * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, + * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, + * the pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadViaSDF#getKeyCoder()} is the same as {@link KafkaIO.Read#getKeyCoder()}. + * {@link ReadViaSDF#getValueCoder()} is the same as {@link KafkaIO.Read#getValueCoder()}. + * {@link
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r442572993 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,742 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link + * KafkaSourceDescriptionSchemas} which represents Kafka source description as input and outputs a + * PCollection of {@link KafkaRecord}. The core implementation is based on {@code SplittableDoFn}. + * For more details about the concept of {@code SplittableDoFn}, please refer to the beam blog post: + * https://beam.apache.org/blog/splittable-do-fn/ and design doc:https://s.apache.org/beam-fn-api. + * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source + * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, + * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, + * the pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadViaSDF#getKeyCoder()} is the same as {@link KafkaIO.Read#getKeyCoder()}. + * {@link ReadViaSDF#getValueCoder()} is the same as {@link KafkaIO.Read#getValueCoder()}. + * {@link
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r442575623 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,742 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link + * KafkaSourceDescriptionSchemas} which represents Kafka source description as input and outputs a + * PCollection of {@link KafkaRecord}. The core implementation is based on {@code SplittableDoFn}. + * For more details about the concept of {@code SplittableDoFn}, please refer to the beam blog post: + * https://beam.apache.org/blog/splittable-do-fn/ and design doc:https://s.apache.org/beam-fn-api. + * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source + * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, + * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, + * the pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadViaSDF#getKeyCoder()} is the same as {@link KafkaIO.Read#getKeyCoder()}. + * {@link ReadViaSDF#getValueCoder()} is the same as {@link KafkaIO.Read#getValueCoder()}. + * {@link
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r442574397 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,742 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link + * KafkaSourceDescriptionSchemas} which represents Kafka source description as input and outputs a + * PCollection of {@link KafkaRecord}. The core implementation is based on {@code SplittableDoFn}. + * For more details about the concept of {@code SplittableDoFn}, please refer to the beam blog post: + * https://beam.apache.org/blog/splittable-do-fn/ and design doc:https://s.apache.org/beam-fn-api. + * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source + * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, + * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, + * the pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadViaSDF#getKeyCoder()} is the same as {@link KafkaIO.Read#getKeyCoder()}. + * {@link ReadViaSDF#getValueCoder()} is the same as {@link KafkaIO.Read#getValueCoder()}. + * {@link
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r442570364 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,742 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link + * KafkaSourceDescriptionSchemas} which represents Kafka source description as input and outputs a + * PCollection of {@link KafkaRecord}. The core implementation is based on {@code SplittableDoFn}. + * For more details about the concept of {@code SplittableDoFn}, please refer to the beam blog post: + * https://beam.apache.org/blog/splittable-do-fn/ and design doc:https://s.apache.org/beam-fn-api. + * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source + * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, + * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, + * the pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadViaSDF#getKeyCoder()} is the same as {@link KafkaIO.Read#getKeyCoder()}. + * {@link ReadViaSDF#getValueCoder()} is the same as {@link KafkaIO.Read#getValueCoder()}. + * {@link
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r442566478 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -795,6 +828,12 @@ public void setValueDeserializer(String valueDeserializer) { return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn)); } +/** A function to the compute output timestamp from a {@link KafkaRecord}. */ +public Read withExtractOutputTimestampFn( Review comment: > How is this different from withTimestampFn2? `withTimestampFn2` has been deprecated in `KafkaIO.Read`. The major concern of reusing `withTimestampFn2` is, it will means differently under SDF and UnboundedSource, which causes confusion. > Setting the top level properties allow us to say that this property is supported when used as an SDF. This is what I want to do by having `withExtractOutputTimestampFn ` > Also, what prevents us from supporting TimestampPolicy? We should be able to call it and give it the three pieces of information it requests (message backlog / backlog check time / current kafka record). The difficulty is the message backlog / backlog check time is not memorized per (element, restriction). With SDF framework, the backlog is retrieved by called `RestricitonTracker.getProgress()`, we cannot call it per element in order to extract timestamp. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r439566281 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java ## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and design + * doc:https://s.apache.org/beam-fn-api. The major difference from {@link KafkaIO.Read} is, {@link + * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadFromKafkaViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link + * KafkaIO.Read#getKeyCoder()}. + * {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link + * KafkaIO.Read#getValueCoder()}. + * {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same as {@link + *
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r439562951 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java ## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and design + * doc:https://s.apache.org/beam-fn-api. The major difference from {@link KafkaIO.Read} is, {@link + * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadFromKafkaViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link + * KafkaIO.Read#getKeyCoder()}. + * {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link + * KafkaIO.Read#getValueCoder()}. + * {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same as {@link + *
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r439531254 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java ## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and design + * doc:https://s.apache.org/beam-fn-api. The major difference from {@link KafkaIO.Read} is, {@link + * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadFromKafkaViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link + * KafkaIO.Read#getKeyCoder()}. + * {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link + * KafkaIO.Read#getValueCoder()}. + * {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same as {@link + *
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r43894 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java ## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and design + * doc:https://s.apache.org/beam-fn-api. The major difference from {@link KafkaIO.Read} is, {@link + * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadFromKafkaViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link + * KafkaIO.Read#getKeyCoder()}. + * {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link + * KafkaIO.Read#getValueCoder()}. + * {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same as {@link + *
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r436922850 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java ## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and design + * doc:https://s.apache.org/beam-fn-api. The major difference from {@link KafkaIO.Read} is, {@link + * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadFromKafkaViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link + * KafkaIO.Read#getKeyCoder()}. + * {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link + * KafkaIO.Read#getValueCoder()}. + * {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same as {@link + *
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r436187245 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java ## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and design + * doc:https://s.apache.org/beam-fn-api. The major difference from {@link KafkaIO.Read} is, {@link + * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadFromKafkaViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link + * KafkaIO.Read#getKeyCoder()}. + * {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link + * KafkaIO.Read#getValueCoder()}. + * {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same as {@link + *
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r436179356 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -1055,29 +1144,6 @@ public void populateDisplayData(DisplayData.Builder builder) { private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class); - /** Review comment: This common part is moved to the KafkaIOUtil.java This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r436177315 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -949,45 +1077,6 @@ public void setValueDeserializer(String valueDeserializer) { final SerializableFunction, OutT> fn) { return record -> fn.apply(record.getKV()); } - /// Review comment: This common part is moved to the KafkaIOUtil.java. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r436123330 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java ## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and design + * doc:https://s.apache.org/beam-fn-api. The major difference from {@link KafkaIO.Read} is, {@link + * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadFromKafkaViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link + * KafkaIO.Read#getKeyCoder()}. + * {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link + * KafkaIO.Read#getValueCoder()}. + * {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same as {@link + *
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r434890835 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -817,6 +847,24 @@ public void setValueDeserializer(String valueDeserializer) { return toBuilder().setCommitOffsetsInFinalizeEnabled(true).build(); } +/** + * The {@link Read} transform will be expanded with {@link ReadFromKafkaViaSDF} transform. While + * working with {@link #useSDFTransformInRead()} and {@link + * #withTimestampPolicyFactory(TimestampPolicyFactory)} together, only {@link + * TimestampPolicyFactory#withCreateTime(Duration)}, {@link + * TimestampPolicyFactory#withLogAppendTime()} and {@link + * TimestampPolicyFactory#withProcessingTime()} will be populated correctly. For other custom + * {@link TimestampPolicy}, the transform will use {@link + * TimestampPolicyFactory#withProcessingTime()} by default. It's recommended to use {@link + * ReadFromKafkaViaSDF} directly in that case. + * + * Note that the expansion only happens when tbe pipeline has "beam_fn_api" experiment and + * meanwhile "beam_fn_api_use_deprecated_read" is not set. + */ +public Read useSDFTransformInRead() { Review comment: Discussed with Luke offline. We think it would be better to make `KafkaIO.Read()` expand with SDF transform bu default when `beam_fn_api` is enabled( before introducing this SDF transform, we expand the `Read` with SDFUnboundedWrapper with beam_fn_api). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r434766297 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java ## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and design + * doc:https://s.apache.org/beam-fn-api. The major difference from {@link KafkaIO.Read} is, {@link + * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadFromKafkaViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link + * KafkaIO.Read#getKeyCoder()}. + * {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link + * KafkaIO.Read#getValueCoder()}. + * {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same as {@link + *
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r434746269 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + +/** + * Common utility functions and default configurations for {@link KafkaIO.Read} and {@link + * ReadFromKafkaViaSDF}. + */ +final class KafkaIOUtils { + // A set of config defaults. Review comment: Yes, the util is just moved from KafkaIO.java and for code-reuse purpose only. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r434745710 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -906,19 +955,110 @@ public void setValueDeserializer(String valueDeserializer) { Coder keyCoder = getKeyCoder(coderRegistry); Coder valueCoder = getValueCoder(coderRegistry); - // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. - Unbounded> unbounded = - org.apache.beam.sdk.io.Read.from( - toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource()); + if (!isUseSDFTransform() + || !ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api") Review comment: SDF is only supported over `beam_fn_api`. We shouldn't expand the `KafkaIO.Read` with SDF when the `beam_fn_api` is not enbaled, or `beam_fn_api_use_deprecated_read ` is enabled. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r434739659 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java ## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and design + * doc:https://s.apache.org/beam-fn-api. The major difference from {@link KafkaIO.Read} is, {@link + * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadFromKafkaViaSDF}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link + * KafkaIO.Read#getKeyCoder()}. + * {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link + * KafkaIO.Read#getValueCoder()}. + * {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same as {@link + *