[GitHub] [beam] aromanenko-dev commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-23 Thread GitBox


aromanenko-dev commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r444380726



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java
##
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas;
+import 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the https://beam.apache.org/blog/splittable-do-fn/;>blog post 
and https://s.apache.org/beam-fn-api;>design doc. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source 
descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. 

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-12 Thread GitBox


aromanenko-dev commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r439565391



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-12 Thread GitBox


aromanenko-dev commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r439565391



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-12 Thread GitBox


aromanenko-dev commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r439547427



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-12 Thread GitBox


aromanenko-dev commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r439509913



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-09 Thread GitBox


aromanenko-dev commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r435359001



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -906,19 +955,110 @@ public void setValueDeserializer(String 
valueDeserializer) {
   Coder keyCoder = getKeyCoder(coderRegistry);
   Coder valueCoder = getValueCoder(coderRegistry);
 
-  // Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
-  Unbounded> unbounded =
-  org.apache.beam.sdk.io.Read.from(
-  
toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+  if (!isUseSDFTransform()
+  || 
!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), 
"beam_fn_api")

Review comment:
   In this case, please, add a comment about that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] aromanenko-dev commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-04 Thread GitBox


aromanenko-dev commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r434729096



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-04 Thread GitBox


aromanenko-dev commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r435354750



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   

[GitHub] [beam] aromanenko-dev commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-03 Thread GitBox


aromanenko-dev commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r434717019



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -817,6 +847,24 @@ public void setValueDeserializer(String valueDeserializer) 
{
   return toBuilder().setCommitOffsetsInFinalizeEnabled(true).build();
 }
 
+/**
+ * The {@link Read} transform will be expanded with {@link 
ReadFromKafkaViaSDF} transform. While
+ * working with {@link #useSDFTransformInRead()} and {@link
+ * #withTimestampPolicyFactory(TimestampPolicyFactory)} together, only 
{@link
+ * TimestampPolicyFactory#withCreateTime(Duration)}, {@link
+ * TimestampPolicyFactory#withLogAppendTime()} and {@link
+ * TimestampPolicyFactory#withProcessingTime()} will be populated 
correctly. For other custom
+ * {@link TimestampPolicy}, the transform will use {@link
+ * TimestampPolicyFactory#withProcessingTime()} by default. It's 
recommended to use {@link
+ * ReadFromKafkaViaSDF} directly in that case.
+ *
+ * Note that the expansion only happens when tbe pipeline has 
"beam_fn_api" experiment and
+ * meanwhile "beam_fn_api_use_deprecated_read" is not set.
+ */
+public Read useSDFTransformInRead() {

Review comment:
   Maybe call it just useSDF()? Because it's already known that it's a 
PTransform used in Read

##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+/**
+ * Common utility functions and default configurations for {@link 
KafkaIO.Read} and {@link
+ * ReadFromKafkaViaSDF}.
+ */
+final class KafkaIOUtils {
+  // A set of config defaults.

Review comment:
   I expect that all these constants and methods below were moved here 
without any changes and just for the sake of code refactoring. If not, please, 
add some comments on these.

##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -906,19 +955,110 @@ public void setValueDeserializer(String 
valueDeserializer) {
   Coder keyCoder = getKeyCoder(coderRegistry);
   Coder valueCoder = getValueCoder(coderRegistry);
 
-  // Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
-  Unbounded> unbounded =
-  org.apache.beam.sdk.io.Read.from(
-  
toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+  if (!isUseSDFTransform()
+  || 
!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), 
"beam_fn_api")

Review comment:
   It looks that we depend on some specific pipeline business logic here. 
I'd prefer to avoid this if possible.

##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -906,19 +955,110 @@ public void setValueDeserializer(String 
valueDeserializer) {
   Coder keyCoder = getKeyCoder(coderRegistry);
   Coder valueCoder = getValueCoder(coderRegistry);
 
-  // Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
-  Unbounded> unbounded =
-  org.apache.beam.sdk.io.Read.from(
-  
toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+  if (!isUseSDFTransform()
+  || 
!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), 
"beam_fn_api")
+  ||