[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-08-07 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r467317077



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -1306,112 +1315,206 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
   return 
toBuilder().setValueDeserializerProvider(deserializerProvider).build();
 }
 
+/**
+ * Sets a Kafka {@link Deserializer} to interpret key bytes read from 
Kafka.
+ *
+ * In addition, Beam also needs a {@link Coder} to serialize and 
deserialize key objects at
+ * runtime. KafkaIO tries to infer a coder for the key based on the {@link 
Deserializer} class,
+ * however in case that fails, you can use {@link 
#withKeyDeserializerAndCoder(Class, Coder)} to
+ * provide the key coder explicitly.
+ */
 public ReadSourceDescriptors withKeyDeserializer(
 Class> keyDeserializer) {
   return 
withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
 }
 
+/**
+ * Sets a Kafka {@link Deserializer} for interpreting key bytes read from 
Kafka along with a

Review comment:
   Done, thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-08-06 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r466683752



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
##
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.kafka.KafkaIO.ReadSourceDescriptors;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link KafkaSourceDescriptor} and outputs 
{@link KafkaRecord}.

Review comment:
   I would prefer that we leave the documentation here and link it from 
KafkaIO.ReadSourceDescriptors.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-08-06 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r466680059



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -198,6 +209,102 @@
  *...
  * }
  *
+ * Read from Kafka as a {@link DoFn}
+ *
+ * {@link ReadSourceDescriptors} is the {@link PTransform} that takes a 
PCollection of {@link

Review comment:
   The order is explained in the javadoc of ReadFromKafkaDoFn, [Initial 
Restriction] section.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-08-06 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r466673488



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -1051,33 +1198,352 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 }
   }
 
-  

-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
-
   /**
-   * Returns a new config map which is merge of current config and updates. 
Verifies the updates do
-   * not includes ignored properties.
+   * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more 
information on usage and
+   * configuration.
*/
-  private static Map updateKafkaProperties(
-  Map currentConfig,
-  Map ignoredProperties,
-  Map updates) {
+  @Experimental(Kind.PORTABILITY)
+  @AutoValue
+  public abstract static class ReadSourceDescriptors
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ReadSourceDescriptors.class);
+
+abstract Map getConsumerConfig();
+
+@Nullable
+abstract Map getOffsetConsumerConfig();
+
+@Nullable
+abstract DeserializerProvider getKeyDeserializerProvider();
+
+@Nullable
+abstract DeserializerProvider getValueDeserializerProvider();
+
+@Nullable
+abstract Coder getKeyCoder();
+
+@Nullable
+abstract Coder getValueCoder();
+
+abstract SerializableFunction, Consumer>
+getConsumerFactoryFn();
+
+@Nullable
+abstract SerializableFunction, Instant> 
getExtractOutputTimestampFn();
+
+@Nullable
+abstract SerializableFunction>
+getCreateWatermarkEstimatorFn();
+
+abstract boolean isCommitOffsetEnabled();
+
+@Nullable
+abstract TimestampPolicyFactory getTimestampPolicyFactory();
+
+abstract ReadSourceDescriptors.Builder toBuilder();
+
+@AutoValue.Builder
+abstract static class Builder {
+  abstract ReadSourceDescriptors.Builder 
setConsumerConfig(Map config);
+
+  abstract ReadSourceDescriptors.Builder setOffsetConsumerConfig(
+  Map offsetConsumerConfig);
+
+  abstract ReadSourceDescriptors.Builder setConsumerFactoryFn(
+  SerializableFunction, Consumer> 
consumerFactoryFn);
+
+  abstract ReadSourceDescriptors.Builder setKeyDeserializerProvider(
+  DeserializerProvider deserializerProvider);
+
+  abstract ReadSourceDescriptors.Builder 
setValueDeserializerProvider(
+  DeserializerProvider deserializerProvider);
+
+  abstract ReadSourceDescriptors.Builder setKeyCoder(Coder 
keyCoder);
+
+  abstract ReadSourceDescriptors.Builder setValueCoder(Coder 
valueCoder);
+
+  abstract ReadSourceDescriptors.Builder setExtractOutputTimestampFn(
+  SerializableFunction, Instant> fn);
+
+  abstract ReadSourceDescriptors.Builder 
setCreateWatermarkEstimatorFn(
+  SerializableFunction> fn);
+
+  abstract ReadSourceDescriptors.Builder setCommitOffsetEnabled(
+  boolean commitOffsetEnabled);
+
+  abstract ReadSourceDescriptors.Builder setTimestampPolicyFactory(
+  TimestampPolicyFactory policy);
+
+  abstract ReadSourceDescriptors build();
+}
+
+public static  ReadSourceDescriptors read() {
+  return new AutoValue_KafkaIO_ReadSourceDescriptors.Builder()
+  .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
+  .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
+  .setCommitOffsetEnabled(false)
+  .build()
+  .withProcessingTime()
+  .withMonotonicallyIncreasingWatermarkEstimator();
+}
+
+// Note that if the bootstrapServers is set here but also populated with 
the element, the
+// element
+// will override the bootstrapServers from the config.
+public ReadSourceDescriptors withBootstrapServers(String 
bootstrapServers) {
+  return withConsumerConfigUpdates(
+  ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers));
+}
+
+public ReadSourceDescriptors withKeyDeserializerProvider(
+DeserializerProvider deserializerProvider) {
+  return 
toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
+}
+
+public ReadSourceDescriptors withValueDeserializerProvider(
+DeserializerProvider deserializerProvider) {
+  return 
toBuilder().setValueDeserializerProvider(deserializerProvider).build();
+}
+
+public ReadSourceDescriptors withKeyDeserializer(
+Class> keyDeserializer) {
+  return 
withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
+}
+
+public ReadSourceDescriptors withValueDeserializer(
+Class> valueDeserializer) {
+  return 
withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
+}
+
+public 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-08-06 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r46972



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -1051,33 +1198,352 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 }
   }
 
-  

-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
-
   /**
-   * Returns a new config map which is merge of current config and updates. 
Verifies the updates do
-   * not includes ignored properties.
+   * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more 
information on usage and
+   * configuration.
*/
-  private static Map updateKafkaProperties(
-  Map currentConfig,
-  Map ignoredProperties,
-  Map updates) {
+  @Experimental(Kind.PORTABILITY)
+  @AutoValue
+  public abstract static class ReadSourceDescriptors
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ReadSourceDescriptors.class);
+
+abstract Map getConsumerConfig();

Review comment:
   Most of the documentations are at KafkaIO level.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-08-03 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r464667015



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -906,19 +926,89 @@ public void setValueDeserializer(String 
valueDeserializer) {
   Coder keyCoder = getKeyCoder(coderRegistry);
   Coder valueCoder = getValueCoder(coderRegistry);
 
-  // Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
-  Unbounded> unbounded =
-  org.apache.beam.sdk.io.Read.from(
-  
toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+  // The Read will be expanded into SDF transform when "beam_fn_api" is 
enabled and
+  // "beam_fn_api_use_deprecated_read" is not enabled.
+  if (!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), 
"beam_fn_api")
+  || ExperimentalOptions.hasExperiment(
+  input.getPipeline().getOptions(), 
"beam_fn_api_use_deprecated_read")) {
+// Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
+Unbounded> unbounded =
+org.apache.beam.sdk.io.Read.from(
+
toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+
+PTransform>> transform = 
unbounded;
+
+if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
+  transform =
+  
unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());
+}
 
-  PTransform>> transform = unbounded;
+return input.getPipeline().apply(transform);
+  } else {
+ReadViaSDF readTransform =
+ReadViaSDF.read()
+.withConsumerConfigOverrides(getConsumerConfig())
+.withOffsetConsumerConfigOverrides(getOffsetConsumerConfig())
+.withConsumerFactoryFn(getConsumerFactoryFn())
+.withKeyDeserializerProvider(getKeyDeserializerProvider())
+.withValueDeserializerProvider(getValueDeserializerProvider())
+.withManualWatermarkEstimator()
+.withTimestampPolicyFactory(getTimestampPolicyFactory());
+if (isCommitOffsetsInFinalizeEnabled()) {
+  readTransform = readTransform.commitOffsets();
+}
 
-  if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
-transform =
-
unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());
+return input
+.getPipeline()
+.apply(Impulse.create())
+.apply(ParDo.of(new GenerateKafkaSourceDescription(this)))
+.setCoder(SerializableCoder.of(KafkaSourceDescription.class))
+.apply(readTransform)
+.setCoder(KafkaRecordCoder.of(keyCoder, valueCoder));

Review comment:
   The coder needs to be set when in x-lang case. It seems like there is 
something not correct when x-lang expand the transform.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-24 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445242837



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -198,6 +213,154 @@
  *...
  * }
  *
+ * Read from Kafka as a {@link DoFn}
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the https://beam.apache.org/blog/splittable-do-fn/;>blog post and https://s.apache.org/beam-fn-api;>design doc. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadAll#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *   KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   {@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *   KafkaIO.Read#getValueDeserializerProvider()}.
+ *   {@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *   KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * 
+ *
+ * For example, to create a basic {@link ReadAll} transform:
+ *
+ * {@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *  .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *  .withKeyDeserializer(LongDeserializer.class).
+ *  .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline
+ * .apply(Create.of(
+ *KafkaSourceDescription.of(
+ *  new TopicPartition("topic", 1),
+ *  null,
+ *  null,
+ *  ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ * .withKeyDeserializer(LongDeserializer.class).
+ * .withValueDeserializer(StringDeserializer.class));
+ *
+ * }
+ *
+ * Configurations of {@link ReadAll}
+ *
+ * Except configurations of Kafka Consumer, there are some other 
configurations which are related
+ * to processing records.
+ *
+ * {@link ReadAll#commitOffsets()} enables committing offset after 
processing the record. Note
+ * that if {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, 
the {@link
+ * ReadAll#commitOffsets()} will be ignored.
+ *
+ * {@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} asks 
for a function which
+ * takes a {@link KafkaRecord} as input and outputs outputTimestamp. This 
function is used to
+ * produce output timestamp per {@link KafkaRecord}. There are three built-in 
types: {@link
+ * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link
+ * ReadAll#withLogAppendTime()}.
+ *
+ * For example, to create a {@link ReadAll} with these configurations:
+ *
+ * {@code
+ * pipeline
+ * .apply(Create.of(
+ *KafkaSourceDescription.of(
+ *  new TopicPartition("topic", 1),
+ *  null,
+ *  null,
+ *  ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *  .withKeyDeserializer(LongDeserializer.class).
+ *  .withValueDeserializer(StringDeserializer.class)
+ *  .withProcessingTime()
+ *  .commitOffsets());
+ *
+ * }
+ *
+ * Read from {@link KafkaSourceDescription}
+ *
+ * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The 
element is a {@link
+ * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} 
which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link 
OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-24 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445236767



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+
+/**
+ * An AutoValue object which represents a Kafka source description. Note that 
this object should be
+ * encoded/decoded with equivalent {@link Schema} as a {@link Row} when 
crossing the wire.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class KafkaSourceDescription implements Serializable {

Review comment:
   It seems like `Descriptor` makes more sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-24 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445235681



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -1051,33 +1261,341 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 }
   }
 
-  

-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
-
   /**
-   * Returns a new config map which is merge of current config and updates. 
Verifies the updates do
-   * not includes ignored properties.
+   * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more 
information on usage and
+   * configuration.
*/
-  private static Map updateKafkaProperties(
-  Map currentConfig,
-  Map ignoredProperties,
-  Map updates) {
+  @Experimental(Kind.PORTABILITY)
+  @AutoValue
+  public abstract static class ReadAll
+  extends PTransform, 
PCollection>> {
+
+private static final Logger LOG = LoggerFactory.getLogger(ReadAll.class);
+
+abstract Map getConsumerConfig();
+
+@Nullable
+abstract Map getOffsetConsumerConfig();
+
+@Nullable
+abstract DeserializerProvider getKeyDeserializerProvider();
+
+@Nullable
+abstract DeserializerProvider getValueDeserializerProvider();
+
+@Nullable
+abstract Coder getKeyCoder();
+
+@Nullable
+abstract Coder getValueCoder();
+
+abstract SerializableFunction, Consumer>
+getConsumerFactoryFn();
+
+@Nullable
+abstract SerializableFunction, Instant> 
getExtractOutputTimestampFn();
+
+@Nullable
+abstract SerializableFunction>
+getCreateWatermarkEstimatorFn();
+
+abstract boolean isCommitOffsetEnabled();
+
+@Nullable
+abstract TimestampPolicyFactory getTimestampPolicyFactory();
+
+abstract ReadAll.Builder toBuilder();
+
+@AutoValue.Builder
+abstract static class Builder {
+  abstract ReadAll.Builder setConsumerConfig(Map 
config);
+
+  abstract ReadAll.Builder setOffsetConsumerConfig(
+  Map offsetConsumerConfig);
+
+  abstract ReadAll.Builder setConsumerFactoryFn(
+  SerializableFunction, Consumer> 
consumerFactoryFn);
+
+  abstract ReadAll.Builder setKeyDeserializerProvider(
+  DeserializerProvider deserializerProvider);
+
+  abstract ReadAll.Builder setValueDeserializerProvider(
+  DeserializerProvider deserializerProvider);
+
+  abstract ReadAll.Builder setKeyCoder(Coder keyCoder);
+
+  abstract ReadAll.Builder setValueCoder(Coder valueCoder);
+
+  abstract ReadAll.Builder setExtractOutputTimestampFn(
+  SerializableFunction, Instant> fn);
+
+  abstract ReadAll.Builder setCreateWatermarkEstimatorFn(
+  SerializableFunction> fn);
+
+  abstract ReadAll.Builder setCommitOffsetEnabled(boolean 
commitOffsetEnabled);
+
+  abstract ReadAll.Builder 
setTimestampPolicyFactory(TimestampPolicyFactory policy);
+
+  abstract ReadAll build();
+}
 
-for (String key : updates.keySet()) {
+public static  ReadAll read() {
+  return new AutoValue_KafkaIO_ReadAll.Builder()
+  .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
+  .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
+  .setCommitOffsetEnabled(false)
+  .build()
+  .withProcessingTime()
+  .withMonotonicallyIncreasingWatermarkEstimator();
+}
+
+// Note that if the bootstrapServers is set here but also populated with 
the element, the
+// element
+// will override the bootstrapServers from the config.
+public ReadAll withBootstrapServers(String bootstrapServers) {
+  return withConsumerConfigUpdates(
+  ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers));
+}
+
+public ReadAll withKeyDeserializerProvider(DeserializerProvider 
deserializerProvider) {
+  return 
toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
+}
+
+public ReadAll withValueDeserializerProvider(
+DeserializerProvider deserializerProvider) {
+  return 
toBuilder().setValueDeserializerProvider(deserializerProvider).build();
+}
+
+public ReadAll withKeyDeserializer(Class> 
keyDeserializer) {
+  return 
withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
+}
+
+public ReadAll withValueDeserializer(Class> valueDeserializer) {
+  return 
withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
+}
+
+public ReadAll withKeyDeserializerAndCoder(
+Class> keyDeserializer, Coder keyCoder) {
+  return 
withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
+}
+
+public ReadAll withValueDeserializerAndCoder(
+Class> valueDeserializer, Coder 
valueCoder) {
+  return 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-24 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445226123



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+
+/**
+ * An AutoValue object which represents a Kafka source description. Note that 
this object should be
+ * encoded/decoded with equivalent {@link Schema} as a {@link Row} when 
crossing the wire.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class KafkaSourceDescription implements Serializable {
+  @SchemaFieldName("topic")
+  abstract String getTopic();
+
+  @SchemaFieldName("partition")
+  abstract Integer getPartition();
+
+  @SchemaFieldName("start_read_offset")
+  @Nullable
+  abstract Long getStartReadOffset();
+
+  @SchemaFieldName("start_read_time")
+  @Nullable
+  abstract Instant getStartReadTime();
+
+  @SchemaFieldName("bootstrapServers")
+  @Nullable
+  abstract List getBootStrapServers();
+
+  private TopicPartition topicPartition = null;
+
+  public TopicPartition getTopicPartition() {

Review comment:
   Thanks! 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-24 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445225947



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -906,19 +1082,91 @@ public void setValueDeserializer(String 
valueDeserializer) {
   Coder keyCoder = getKeyCoder(coderRegistry);
   Coder valueCoder = getValueCoder(coderRegistry);
 
-  // Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
-  Unbounded> unbounded =
-  org.apache.beam.sdk.io.Read.from(
-  
toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+  // The Read will be expanded into SDF transform when "beam_fn_api" is 
enabled and
+  // "beam_fn_api_use_deprecated_read" is not enabled.
+  if (!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), 
"beam_fn_api")
+  || ExperimentalOptions.hasExperiment(
+  input.getPipeline().getOptions(), 
"beam_fn_api_use_deprecated_read")) {
+// Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
+Unbounded> unbounded =
+org.apache.beam.sdk.io.Read.from(
+
toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+
+PTransform>> transform = 
unbounded;
+
+if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) {
+  transform =
+  
unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());
+}
 
-  PTransform>> transform = unbounded;
+return input.getPipeline().apply(transform);
+  }
+  ReadAll readTransform =
+  ReadAll.read()
+  .withConsumerConfigOverrides(getConsumerConfig())
+  .withOffsetConsumerConfigOverrides(getOffsetConsumerConfig())
+  .withConsumerFactoryFn(getConsumerFactoryFn())
+  .withKeyDeserializerProvider(getKeyDeserializerProvider())
+  .withValueDeserializerProvider(getValueDeserializerProvider())
+  .withManualWatermarkEstimator()
+  .withTimestampPolicyFactory(getTimestampPolicyFactory());
+  if (isCommitOffsetsInFinalizeEnabled()) {
+readTransform = readTransform.commitOffsets();
+  }
+  PCollection output =
+  input
+  .getPipeline()
+  .apply(Impulse.create())
+  .apply(ParDo.of(new GenerateKafkaSourceDescription(this)));
+  try {
+
output.setCoder(KafkaSourceDescription.getCoder(input.getPipeline().getSchemaRegistry()));

Review comment:
   It works with `setSchema` but I want to make it explicitly because it's 
possible that an user writes a DoFn which produces `KafkaSourceDescription`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-24 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445016902



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -1051,33 +1261,341 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 }
   }
 
-  

-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
-
   /**
-   * Returns a new config map which is merge of current config and updates. 
Verifies the updates do
-   * not includes ignored properties.
+   * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more 
information on usage and
+   * configuration.
*/
-  private static Map updateKafkaProperties(
-  Map currentConfig,
-  Map ignoredProperties,
-  Map updates) {
+  @Experimental(Kind.PORTABILITY)
+  @AutoValue
+  public abstract static class ReadAll

Review comment:
   Thanks for starting the discussion. If taking x-lang usage into 
consideration, `Read` is not a good choice as input for these DoFn. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-23 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r444654119



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java
##
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas;
+import 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the https://beam.apache.org/blog/splittable-do-fn/;>blog post 
and https://s.apache.org/beam-fn-api;>design doc. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source 
descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-23 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r444648850



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java
##
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas;
+import 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the https://beam.apache.org/blog/splittable-do-fn/;>blog post 
and https://s.apache.org/beam-fn-api;>design doc. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source 
descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-23 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r444648197



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java
##
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas;
+import 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the https://beam.apache.org/blog/splittable-do-fn/;>blog post 
and https://s.apache.org/beam-fn-api;>design doc. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source 
descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-23 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r444646093



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java
##
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas;
+import 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the https://beam.apache.org/blog/splittable-do-fn/;>blog post 
and https://s.apache.org/beam-fn-api;>design doc. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source 
descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-23 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r444564026



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java
##
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas;
+import 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the https://beam.apache.org/blog/splittable-do-fn/;>blog post 
and https://s.apache.org/beam-fn-api;>design doc. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source 
descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-23 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r34591



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -295,21 +301,32 @@
   /**
* Creates an uninitialized {@link Read} {@link PTransform}. Before use, 
basic Kafka configuration
* should set with {@link Read#withBootstrapServers(String)} and {@link 
Read#withTopics(List)}.
-   * Other optional settings include key and value {@link Deserializer}s, 
custom timestamp and
+   * Other optional settings include key and value {@link Deserializer}s, 
custom timestamp,
* watermark functions.
*/
   public static  Read read() {
 return new AutoValue_KafkaIO_Read.Builder()
 .setTopics(new ArrayList<>())
 .setTopicPartitions(new ArrayList<>())
-.setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
-.setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES)
+.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
+.setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
 .setMaxNumRecords(Long.MAX_VALUE)
 .setCommitOffsetsInFinalizeEnabled(false)
 .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime())
 .build();
   }
 
+  /**
+   * Creates an uninitialized {@link ReadViaSDF} {@link PTransform}. Different 
from {@link Read},
+   * setting up {@code topics} and {@code bootstrapServers} is not required 
during construction
+   * time. But the {@code bootstrapServers} still can be configured {@link
+   * ReadViaSDF#withBootstrapServers(String)}. Please refer to {@link 
ReadViaSDF} for more details.
+   */
+  public static >
+  ReadViaSDF readAll() {
+return ReadViaSDF.read();

Review comment:
   The `WatermarkEstimatorT` is used when defining 
`createWatermarkEstimatorFn` and when `@NewWatermarkEstimator` is called. I 
understand that we can always use `WatermarkEstimator` as the type, I thought 
it would be better to make the type explicitly,





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-22 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r443937048



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java
##
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas;
+import 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the https://beam.apache.org/blog/splittable-do-fn/;>blog post 
and https://s.apache.org/beam-fn-api;>design doc. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source 
descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-19 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r443052453



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java
##
@@ -0,0 +1,742 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link
+ * KafkaSourceDescriptionSchemas} which represents Kafka source description as 
input and outputs a
+ * PCollection of {@link KafkaRecord}. The core implementation is based on 
{@code SplittableDoFn}.
+ * For more details about the concept of {@code SplittableDoFn}, please refer 
to the beam blog post:
+ * https://beam.apache.org/blog/splittable-do-fn/ and design 
doc:https://s.apache.org/beam-fn-api.
+ * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} 
doesn't require source
+ * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link 
KafkaIO.Read#getTopics()},
+ * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline 
construction time. Instead,
+ * the pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadViaSDF#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadViaSDF#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   {@link 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-19 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r443040694



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java
##
@@ -0,0 +1,742 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link
+ * KafkaSourceDescriptionSchemas} which represents Kafka source description as 
input and outputs a
+ * PCollection of {@link KafkaRecord}. The core implementation is based on 
{@code SplittableDoFn}.
+ * For more details about the concept of {@code SplittableDoFn}, please refer 
to the beam blog post:
+ * https://beam.apache.org/blog/splittable-do-fn/ and design 
doc:https://s.apache.org/beam-fn-api.
+ * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} 
doesn't require source
+ * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link 
KafkaIO.Read#getTopics()},
+ * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline 
construction time. Instead,
+ * the pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadViaSDF#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadViaSDF#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   {@link 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-19 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r442980355



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java
##
@@ -0,0 +1,742 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link
+ * KafkaSourceDescriptionSchemas} which represents Kafka source description as 
input and outputs a
+ * PCollection of {@link KafkaRecord}. The core implementation is based on 
{@code SplittableDoFn}.
+ * For more details about the concept of {@code SplittableDoFn}, please refer 
to the beam blog post:
+ * https://beam.apache.org/blog/splittable-do-fn/ and design 
doc:https://s.apache.org/beam-fn-api.
+ * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} 
doesn't require source
+ * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link 
KafkaIO.Read#getTopics()},
+ * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline 
construction time. Instead,
+ * the pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadViaSDF#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadViaSDF#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   {@link 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-19 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r442951203



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -681,11 +696,13 @@ public void setValueDeserializer(String 
valueDeserializer) {
 }
 
 /**
- * Sets {@link TimestampPolicy} to {@link 
TimestampPolicyFactory.LogAppendTimePolicy}. The
- * policy assigns Kafka's log append time (server side ingestion time) to 
each record. The
- * watermark for each Kafka partition is the timestamp of the last record 
read. If a partition
- * is idle, the watermark advances to couple of seconds behind wall time. 
Every record consumed
- * from Kafka is expected to have its timestamp type set to 
'LOG_APPEND_TIME'.
+ * Sets {@link TimestampPolicy} to {@link 
TimestampPolicyFactory.LogAppendTimePolicy} which is
+ * used when beam_fn_api is disabled, and sets {@code 
extractOutputTimestampFn} as {@link
+ * ReadViaSDF.ExtractOutputTimestampFns#withLogAppendTime()}, which is 
used when beam_fn_api is
+ * enabled. The policy assigns Kafka's log append time (server side 
ingestion time) to each
+ * record. The watermark for each Kafka partition is the timestamp of the 
last record read. If a
+ * partition is idle, the watermark advances to couple of seconds behind 
wall time. Every record
+ * consumed from Kafka is expected to have its timestamp type set to 
'LOG_APPEND_TIME'.

Review comment:
   The ` which is used when beam_fn_api is disabled` means 
`TimestampPolicy` is used when beam_fn_api is disabled. I'll say it explicitly.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-18 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r442579140



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java
##
@@ -0,0 +1,742 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link
+ * KafkaSourceDescriptionSchemas} which represents Kafka source description as 
input and outputs a
+ * PCollection of {@link KafkaRecord}. The core implementation is based on 
{@code SplittableDoFn}.
+ * For more details about the concept of {@code SplittableDoFn}, please refer 
to the beam blog post:
+ * https://beam.apache.org/blog/splittable-do-fn/ and design 
doc:https://s.apache.org/beam-fn-api.
+ * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} 
doesn't require source
+ * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link 
KafkaIO.Read#getTopics()},
+ * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline 
construction time. Instead,
+ * the pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadViaSDF#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadViaSDF#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   {@link 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-18 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r442572993



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java
##
@@ -0,0 +1,742 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link
+ * KafkaSourceDescriptionSchemas} which represents Kafka source description as 
input and outputs a
+ * PCollection of {@link KafkaRecord}. The core implementation is based on 
{@code SplittableDoFn}.
+ * For more details about the concept of {@code SplittableDoFn}, please refer 
to the beam blog post:
+ * https://beam.apache.org/blog/splittable-do-fn/ and design 
doc:https://s.apache.org/beam-fn-api.
+ * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} 
doesn't require source
+ * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link 
KafkaIO.Read#getTopics()},
+ * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline 
construction time. Instead,
+ * the pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadViaSDF#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadViaSDF#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   {@link 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-18 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r442575623



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java
##
@@ -0,0 +1,742 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link
+ * KafkaSourceDescriptionSchemas} which represents Kafka source description as 
input and outputs a
+ * PCollection of {@link KafkaRecord}. The core implementation is based on 
{@code SplittableDoFn}.
+ * For more details about the concept of {@code SplittableDoFn}, please refer 
to the beam blog post:
+ * https://beam.apache.org/blog/splittable-do-fn/ and design 
doc:https://s.apache.org/beam-fn-api.
+ * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} 
doesn't require source
+ * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link 
KafkaIO.Read#getTopics()},
+ * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline 
construction time. Instead,
+ * the pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadViaSDF#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadViaSDF#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   {@link 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-18 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r442574397



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java
##
@@ -0,0 +1,742 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link
+ * KafkaSourceDescriptionSchemas} which represents Kafka source description as 
input and outputs a
+ * PCollection of {@link KafkaRecord}. The core implementation is based on 
{@code SplittableDoFn}.
+ * For more details about the concept of {@code SplittableDoFn}, please refer 
to the beam blog post:
+ * https://beam.apache.org/blog/splittable-do-fn/ and design 
doc:https://s.apache.org/beam-fn-api.
+ * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} 
doesn't require source
+ * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link 
KafkaIO.Read#getTopics()},
+ * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline 
construction time. Instead,
+ * the pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadViaSDF#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadViaSDF#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   {@link 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-18 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r442570364



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java
##
@@ -0,0 +1,742 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link
+ * KafkaSourceDescriptionSchemas} which represents Kafka source description as 
input and outputs a
+ * PCollection of {@link KafkaRecord}. The core implementation is based on 
{@code SplittableDoFn}.
+ * For more details about the concept of {@code SplittableDoFn}, please refer 
to the beam blog post:
+ * https://beam.apache.org/blog/splittable-do-fn/ and design 
doc:https://s.apache.org/beam-fn-api.
+ * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} 
doesn't require source
+ * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link 
KafkaIO.Read#getTopics()},
+ * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline 
construction time. Instead,
+ * the pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadViaSDF#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadViaSDF#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   {@link 

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-18 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r442566478



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -795,6 +828,12 @@ public void setValueDeserializer(String valueDeserializer) 
{
   return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn));
 }
 
+/** A function to the compute output timestamp from a {@link KafkaRecord}. 
*/
+public Read withExtractOutputTimestampFn(

Review comment:
   > How is this different from withTimestampFn2?
   
   `withTimestampFn2` has been deprecated  in `KafkaIO.Read`. The major concern 
of reusing `withTimestampFn2` is, it will means differently under SDF and 
UnboundedSource, which causes confusion.
   
> Setting the top level properties allow us to say that this property is 
supported when used as an SDF.
   
   This is what I want to do by having `withExtractOutputTimestampFn `
   
   > Also, what prevents us from supporting TimestampPolicy? We should be able 
to call it and give it the three pieces of information it requests (message 
backlog / backlog check time / current kafka record).
   
   The difficulty is the message backlog / backlog check time is not memorized 
per (element, restriction). With SDF framework, the backlog is retrieved by 
called `RestricitonTracker.getProgress()`, we cannot call it per element in 
order to extract timestamp. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-12 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r439566281



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-12 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r439562951



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-12 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r439531254



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-11 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r43894



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-09 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r436922850



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-05 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r436187245



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-05 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r436179356



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -1055,29 +1144,6 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
   private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
 
-  /**

Review comment:
   This common part is moved to the KafkaIOUtil.java





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-05 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r436177315



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -949,45 +1077,6 @@ public void setValueDeserializer(String 
valueDeserializer) {
 final SerializableFunction, OutT> fn) {
   return record -> fn.apply(record.getKV());
 }
-
///

Review comment:
   This common part is moved to the KafkaIOUtil.java.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-05 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r436123330



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-03 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r434890835



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -817,6 +847,24 @@ public void setValueDeserializer(String valueDeserializer) 
{
   return toBuilder().setCommitOffsetsInFinalizeEnabled(true).build();
 }
 
+/**
+ * The {@link Read} transform will be expanded with {@link 
ReadFromKafkaViaSDF} transform. While
+ * working with {@link #useSDFTransformInRead()} and {@link
+ * #withTimestampPolicyFactory(TimestampPolicyFactory)} together, only 
{@link
+ * TimestampPolicyFactory#withCreateTime(Duration)}, {@link
+ * TimestampPolicyFactory#withLogAppendTime()} and {@link
+ * TimestampPolicyFactory#withProcessingTime()} will be populated 
correctly. For other custom
+ * {@link TimestampPolicy}, the transform will use {@link
+ * TimestampPolicyFactory#withProcessingTime()} by default. It's 
recommended to use {@link
+ * ReadFromKafkaViaSDF} directly in that case.
+ *
+ * Note that the expansion only happens when tbe pipeline has 
"beam_fn_api" experiment and
+ * meanwhile "beam_fn_api_use_deprecated_read" is not set.
+ */
+public Read useSDFTransformInRead() {

Review comment:
   Discussed with Luke offline. We think it would be better to make 
`KafkaIO.Read()` expand with SDF transform bu default when `beam_fn_api` is 
enabled( before introducing this SDF transform, we expand the `Read` with 
SDFUnboundedWrapper with beam_fn_api).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-03 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r434766297



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *   

[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-03 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r434746269



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+/**
+ * Common utility functions and default configurations for {@link 
KafkaIO.Read} and {@link
+ * ReadFromKafkaViaSDF}.
+ */
+final class KafkaIOUtils {
+  // A set of config defaults.

Review comment:
   Yes, the util is just moved from KafkaIO.java and for code-reuse purpose 
only.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-03 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r434745710



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##
@@ -906,19 +955,110 @@ public void setValueDeserializer(String 
valueDeserializer) {
   Coder keyCoder = getKeyCoder(coderRegistry);
   Coder valueCoder = getValueCoder(coderRegistry);
 
-  // Handles unbounded source to bounded conversion if maxNumRecords or 
maxReadTime is set.
-  Unbounded> unbounded =
-  org.apache.beam.sdk.io.Read.from(
-  
toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource());
+  if (!isUseSDFTransform()
+  || 
!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), 
"beam_fn_api")

Review comment:
   SDF is only supported over `beam_fn_api`. We shouldn't expand the 
`KafkaIO.Read` with SDF when the `beam_fn_api` is not enbaled, or 
`beam_fn_api_use_deprecated_read ` is enabled.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF

2020-06-03 Thread GitBox


boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r434739659



##
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * Common Kafka Consumer Configurations
+ *
+ * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * 
+ *   {@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *   KafkaIO.Read#getConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *   KafkaIO.Read#getConsumerFactoryFn()}.
+ *   {@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *   KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *   KafkaIO.Read#getKeyCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *   KafkaIO.Read#getValueCoder()}.
+ *   {@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *