[ 
https://issues.apache.org/jira/browse/BEAM-9977?focusedWorklogId=440839&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-440839
 ]

ASF GitHub Bot logged work on BEAM-9977:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Jun/20 15:45
            Start Date: 03/Jun/20 15:45
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r434666700



##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaViaSDF.java
##########
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.Element;
+import org.apache.beam.sdk.transforms.DoFn.GetRestrictionCoder;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} that takes a PCollection of {@link 
KafkaSourceDescription} as input and
+ * outputs a PCollection of {@link KafkaRecord}. The core implementation is 
based on {@code
+ * SplittableDoFn}. For more details about the concept of {@code 
SplittableDoFn}, please refer to
+ * the beam blog post: https://beam.apache.org/blog/splittable-do-fn/ and 
design
+ * doc:https://s.apache.org/beam-fn-api. The major difference from {@link 
KafkaIO.Read} is, {@link
+ * ReadFromKafkaViaSDF} doesn't require source descriptions(e.g., {@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadFromKafkaViaSDF}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadFromKafkaViaSDF#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getConsumerFactoryFn()} is the same as 
{@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getOffsetConsumerConfig()} is the same as 
{@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getKeyCoder()} is the same as {@link
+ *       KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getValueCoder()} is the same as {@link
+ *       KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getKeyDeserializerProvider()} is the same 
as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadFromKafkaViaSDF#getValueDeserializerProvider()} is the 
same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadFromKafkaViaSDF#isCommitOffsetEnabled()} means the same as 
{@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadFromKafkaViaSDF} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("my_topic", 
1))))
+ *  .apply(ReadFromKafkaViaSDF.create()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadFromKafkaViaSDF}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other 
configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadFromKafkaViaSDF#commitOffsets()} enables committing offset 
after processing the
+ * record. Note that if {@code isolation.level} is set to "read_committed" or 
{@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, 
the {@link
+ * ReadFromKafkaViaSDF#commitOffsets()} will be ignored.
+ *
+ * <p>{@link 
ReadFromKafkaViaSDF#withExtractOutputTimestampFn(SerializableFunction)} asks 
for a
+ * function which takes a {@link KafkaRecord} as input and outputs 
outputTimestamp. This function is
+ * used to produce output timestamp per {@link KafkaRecord}. There are three 
built-in types: {@link
+ * ReadFromKafkaViaSDF#withProcessingTime()}, {@link 
ReadFromKafkaViaSDF#withCreateTime()} and
+ * {@link ReadFromKafkaViaSDF#withLogAppendTime()}.
+ *
+ * <p>For example, to create a {@link ReadFromKafkaViaSDF} with these 
configurations:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("my_topic", 
1))))
+ * .apply(ReadFromKafkaViaSDF.create()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class)
+ *          .withProcessingTime()
+ *          .commitOffsets());
+ *
+ * }</pre>
+ *
+ * <h3>Read from {@link KafkaSourceDescription}</h3>
+ *
+ * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The 
element is a {@link
+ * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} 
which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link 
OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is 
created.
+ *
+ * <h4>Initialize Restriction</h4>
+ *
+ * {@link ReadFromKafkaDoFn#initialRestriction(KafkaSourceDescription)} 
creates an initial range for
+ * a input element {@link KafkaSourceDescription}. The end of range will be 
initialized as {@code
+ * Long.MAX_VALUE}. For the start of the range:
+ *
+ * <ul>
+ *   <li>If {@link KafkaSourceDescription#getStartOffset()} is set, use this 
offset as start.
+ *   <li>If {@link KafkaSourceDescription#getStartReadTime()} is set, seek the 
start offset based on
+ *       this time.
+ *   <li>Otherwise, the last committed offset + 1 will be returned by {@link
+ *       Consumer#position(TopicPartition)} as the start.
+ * </ul>
+ *
+ * <h4>Initial Split</h4>
+ *
+ * <p>There is no initial split for now.
+ *
+ * <h4>Checkpoint and Resume Processing</h4>
+ *
+ * <p>There are 2 types of checkpoint here: self-checkpoint which invokes by 
the DoFn and
+ * system-checkpoint which is issued by the runner via {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest}. 
Every time the
+ * consumer gets empty response from {@link Consumer#poll(long)}, {@link 
ReadFromKafkaDoFn} will
+ * checkpoint at current {@link KafkaSourceDescription} and move to process 
the next element. These
+ * deferred elements will be resumed by the runner as soon as possible.
+ *
+ * <h4>Progress and Size</h4>
+ *
+ * <p>The progress is provided by {@link GrowableOffsetRangeTracker} or {@link 
OffsetRangeTracker}
+ * per {@link KafkaSourceDescription}. For an infinite {@link OffsetRange}, a 
Kafka {@link Consumer}
+ * is used in the {@link GrowableOffsetRangeTracker} as the {@link
+ * GrowableOffsetRangeTracker.RangeEndEstimator} to poll the latest offset. 
Please refer to {@link
+ * ReadFromKafkaDoFn.KafkaLatestOffsetEstimator} for details.
+ *
+ * <p>The size is computed by {@link 
ReadFromKafkaDoFn#getSize(KafkaSourceDescription,
+ * OffsetRange).} A {@link KafkaIOUtils.MovingAvg} is used to track the 
average size of kafka
+ * records.
+ *
+ * <h4>Track Watermark</h4>
+ *
+ * The estimated watermark is computed by {@link MonotonicallyIncreasing} 
based on output timestamps
+ * per {@link KafkaSourceDescription}.
+ */
+@AutoValue
+public abstract class ReadFromKafkaViaSDF<K, V>

Review comment:
       This seems strangely close to something we lived in the SDF version of 
HBaseIO. In the first version we did an artificial object called `HBaseQuery` 
that contained the minimum information we needed to be able to query the Data 
store in a SDF way, but then other requirements came in and we started to add 
extra parameters to end up with something that was almost close to the exact 
'complete' specification of the Read class so we decided to switch to use a 
`PCollection<Read>` as input otherwise we will be duplicating code, so we ended 
up with 
https://github.com/apache/beam/blob/f6ef9032f521180f1cc26959d9d6ab86dd37a13c/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
   
   Here you can have `PCollection<Read>` as an input and get rid of 
`KafkaSourceDescription` and this will have a more consistent user experience 
for final users. Notice that this `ReadAll` like pattern is also now used in 
[SolrIO](https://github.com/apache/beam/blob/f6ef9032f521180f1cc26959d9d6ab86dd37a13c/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java#L501)
 and there is an ongoing PR to introduce it for 
[CassandraIO](https://github.com/apache/beam/pull/10546) so maybe it is a good 
idea we follow it for consistency.
   
   Notice that in the SolrIO case the change looks even closer to this one 
because we ended up introducing `ReplicaInfo` (the spiritual equivalent of 
`TopicPartition`) into normal Read and we guarantee in expansion that this 
field gets filled if the users don't do it, but if they do well we asume they 
know what they are doing and we go with it.
   
   Another advantage of having the full specification is that you will be able 
to read not only from multiple topics but also from different clusters because 
of the power of having the full `Read` specification,




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 440839)
    Time Spent: 7h 10m  (was: 7h)

> Build Kafka Read on top of Java SplittableDoFn
> ----------------------------------------------
>
>                 Key: BEAM-9977
>                 URL: https://issues.apache.org/jira/browse/BEAM-9977
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-kafka
>            Reporter: Boyuan Zhang
>            Assignee: Boyuan Zhang
>            Priority: P2
>          Time Spent: 7h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to