This is an automated email from the ASF dual-hosted git repository.

mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 501dc4c  [BEAM-3851] Option to preserve element timestamp while 
publishing to Kafka. (#4868)
501dc4c is described below

commit 501dc4cb17bb943aaa095feab959a9fed1aac20c
Author: Raghu Angadi <rang...@apache.org>
AuthorDate: Thu Mar 22 10:56:50 2018 -0700

    [BEAM-3851] Option to preserve element timestamp while publishing to Kafka. 
(#4868)
    
    * Option to preserve element timestamp while publishing to Kafka.
    
    * Let users provide custom timestamp function.
    
    * update javadoc
---
 .../beam/sdk/io/kafka/KafkaExactlyOnceSink.java    | 44 +++++++++++++--------
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 33 +++++++++++++++-
 .../io/kafka/KafkaPublishTimestampFunction.java    | 45 ++++++++++++++++++++++
 .../org/apache/beam/sdk/io/kafka/KafkaWriter.java  |  8 +++-
 .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java  | 16 ++++++--
 5 files changed, 123 insertions(+), 23 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
index 7345a92..9ae69da 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
@@ -64,6 +64,8 @@ import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -173,7 +175,8 @@ class KafkaExactlyOnceSink<K, V> extends 
PTransform<PCollection<KV<K, V>>, PColl
   /**
    * Shuffle messages assigning each randomly to a shard.
    */
-  private static class Reshard<K, V> extends DoFn<KV<K, V>, KV<Integer, KV<K, 
V>>> {
+  private static class Reshard<K, V>
+      extends DoFn<KV<K, V>, KV<Integer, TimestampedValue<KV<K, V>>>> {
 
     private final int numShards;
     private transient int shardId;
@@ -190,12 +193,13 @@ class KafkaExactlyOnceSink<K, V> extends 
PTransform<PCollection<KV<K, V>>, PColl
     @ProcessElement
     public void processElement(ProcessContext ctx) {
       shardId = (shardId + 1) % numShards; // round-robin among shards.
-      ctx.output(KV.of(shardId, ctx.element()));
+      ctx.output(KV.of(shardId, TimestampedValue.of(ctx.element(), 
ctx.timestamp())));
     }
   }
 
-  private static class Sequencer<K, V>
-    extends DoFn<KV<Integer, Iterable<KV<K, V>>>, KV<Integer, KV<Long, KV<K, 
V>>>> {
+  private static class Sequencer<K, V> extends DoFn<
+      KV<Integer, Iterable<TimestampedValue<KV<K, V>>>>,
+      KV<Integer, KV<Long, TimestampedValue<KV<K, V>>>>> {
 
     private static final String NEXT_ID = "nextId";
     @StateId(NEXT_ID)
@@ -205,7 +209,7 @@ class KafkaExactlyOnceSink<K, V> extends 
PTransform<PCollection<KV<K, V>>, PColl
     public void processElement(@StateId(NEXT_ID) ValueState<Long> nextIdState, 
ProcessContext ctx) {
       long nextId = MoreObjects.firstNonNull(nextIdState.read(), 0L);
       int shard = ctx.element().getKey();
-      for (KV<K, V> value : ctx.element().getValue()) {
+      for (TimestampedValue<KV<K, V>> value : ctx.element().getValue()) {
         ctx.output(KV.of(shard, KV.of(nextId, value)));
         nextId++;
       }
@@ -214,7 +218,7 @@ class KafkaExactlyOnceSink<K, V> extends 
PTransform<PCollection<KV<K, V>>, PColl
   }
 
   private static class ExactlyOnceWriter<K, V>
-    extends DoFn<KV<Integer, Iterable<KV<Long, KV<K, V>>>>, Void> {
+    extends DoFn<KV<Integer, Iterable<KV<Long, TimestampedValue<KV<K, V>>>>>, 
Void> {
 
     private static final String NEXT_ID = "nextId";
     private static final String MIN_BUFFERED_ID = "minBufferedId";
@@ -230,7 +234,7 @@ class KafkaExactlyOnceSink<K, V> extends 
PTransform<PCollection<KV<K, V>>, PColl
     @StateId(MIN_BUFFERED_ID)
     private final StateSpec<ValueState<Long>> minBufferedIdSpec = 
StateSpecs.value();
     @StateId(OUT_OF_ORDER_BUFFER)
-    private final StateSpec<BagState<KV<Long, KV<K, V>>>> outOfOrderBufferSpec;
+    private final StateSpec<BagState<KV<Long, TimestampedValue<KV<K, V>>>>> 
outOfOrderBufferSpec;
     // A random id assigned to each shard. Helps with detecting when multiple 
jobs are mistakenly
     // started with same groupId used for storing state on Kafka side, 
including the case where
     // a job is restarted with same groupId, but the metadata from previous 
run was not cleared.
@@ -248,7 +252,8 @@ class KafkaExactlyOnceSink<K, V> extends 
PTransform<PCollection<KV<K, V>>, PColl
 
     ExactlyOnceWriter(Write<K, V> spec, Coder<KV<K, V>> elemCoder) {
       this.spec = spec;
-      this.outOfOrderBufferSpec = 
StateSpecs.bag(KvCoder.of(BigEndianLongCoder.of(), elemCoder));
+      this.outOfOrderBufferSpec = StateSpecs.bag(
+          KvCoder.of(BigEndianLongCoder.of(), 
TimestampedValueCoder.of(elemCoder)));
     }
 
     @Setup
@@ -261,7 +266,7 @@ class KafkaExactlyOnceSink<K, V> extends 
PTransform<PCollection<KV<K, V>>, PColl
     public void processElement(@StateId(NEXT_ID) ValueState<Long> nextIdState,
                                @StateId(MIN_BUFFERED_ID) ValueState<Long> 
minBufferedIdState,
                                @StateId(OUT_OF_ORDER_BUFFER)
-                                 BagState<KV<Long, KV<K, V>>> oooBufferState,
+                                 BagState<KV<Long, TimestampedValue<KV<K, 
V>>>> oooBufferState,
                                @StateId(WRITER_ID) ValueState<String> 
writerIdState,
                                ProcessContext ctx)
       throws IOException {
@@ -297,10 +302,10 @@ class KafkaExactlyOnceSink<K, V> extends 
PTransform<PCollection<KV<K, V>>, PColl
         // There might be out of order messages buffered in earlier 
iterations. These
         // will get merged if and when minBufferedId matches nextId.
 
-        Iterator<KV<Long, KV<K, V>>> iter = 
ctx.element().getValue().iterator();
+        Iterator<KV<Long, TimestampedValue<KV<K, V>>>> iter = 
ctx.element().getValue().iterator();
 
         while (iter.hasNext()) {
-          KV<Long, KV<K, V>> kv = iter.next();
+          KV<Long, TimestampedValue<KV<K, V>>> kv = iter.next();
           long recordId = kv.getKey();
 
           if (recordId < nextId) {
@@ -339,7 +344,8 @@ class KafkaExactlyOnceSink<K, V> extends 
PTransform<PCollection<KV<K, V>>, PColl
             // Read all of them in to memory and sort them. Reading into memory
             // might be problematic in extreme cases. Might need to improve it 
in future.
 
-            List<KV<Long, KV<K, V>>> buffered = 
Lists.newArrayList(oooBufferState.read());
+            List<KV<Long, TimestampedValue<KV<K, V>>>> buffered =
+                Lists.newArrayList(oooBufferState.read());
             buffered.sort(new KV.OrderByKey<>());
 
             LOG.info("{} : merging {} buffered records (min buffered id is 
{}).",
@@ -349,8 +355,7 @@ class KafkaExactlyOnceSink<K, V> extends 
PTransform<PCollection<KV<K, V>>, PColl
             minBufferedIdState.clear();
             minBufferedId = Long.MAX_VALUE;
 
-            iter =
-              Iterators.mergeSorted(
+            iter = Iterators.mergeSorted(
                 ImmutableList.of(iter, buffered.iterator()), new 
KV.OrderByKey<>());
           }
         }
@@ -428,10 +433,17 @@ class KafkaExactlyOnceSink<K, V> extends 
PTransform<PCollection<KV<K, V>>, PColl
         ProducerSpEL.beginTransaction(producer);
       }
 
-      void sendRecord(KV<K, V> record, Counter sendCounter) {
+      void sendRecord(TimestampedValue<KV<K, V>> record, Counter sendCounter) {
         try {
+          Long timestampMillis = spec.getPublishTimestampFunction() != null
+            ? 
spec.getPublishTimestampFunction().getTimestamp(record.getValue(),
+                                                              
record.getTimestamp()).getMillis()
+            : null;
+
           producer.send(
-            new ProducerRecord<>(spec.getTopic(), record.getKey(), 
record.getValue()));
+              new ProducerRecord<>(
+                  spec.getTopic(), null, timestampMillis,
+                  record.getValue().getKey(), record.getValue().getValue()));
           sendCounter.inc();
         } catch (KafkaException e) {
           ProducerSpEL.abortTransaction(producer);
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index eb29229..eeb9da9 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -174,10 +174,15 @@ import org.slf4j.LoggerFactory;
  *       .withKeySerializer(LongSerializer.class)
  *       .withValueSerializer(StringSerializer.class)
  *
- *       // you can further customize KafkaProducer used to write the records 
by adding more
+ *       // You can further customize KafkaProducer used to write the records 
by adding more
  *       // settings for ProducerConfig. e.g, to enable compression :
  *       .updateProducerProperties(ImmutableMap.of("compression.type", "gzip"))
  *
+ *       // You set publish timestamp for the Kafka records.
+ *       .withInputTimestamp() // element timestamp is used while publishing 
to Kafka
+ *       // or you can also set a custom timestamp with a function.
+ *       .withPublishTimestampFunction((elem, elemTs) -> ...)
+ *
  *       // Optionally enable exactly-once sink (on supported runners). See 
JavaDoc for withEOS().
  *       .withEOS(20, "eos-sink-group-id");
  *    );
@@ -813,6 +818,8 @@ public class KafkaIO {
     @Nullable abstract Class<? extends Serializer<K>> getKeySerializer();
     @Nullable abstract Class<? extends Serializer<V>> getValueSerializer();
 
+    @Nullable abstract KafkaPublishTimestampFunction<KV<K, V>> 
getPublishTimestampFunction();
+
     // Configuration for EOS sink
     abstract boolean isEOS();
     @Nullable abstract String getSinkGroupId();
@@ -830,6 +837,8 @@ public class KafkaIO {
           SerializableFunction<Map<String, Object>, Producer<K, V>> fn);
       abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>> 
serializer);
       abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> 
serializer);
+      abstract Builder<K, V> setPublishTimestampFunction(
+        KafkaPublishTimestampFunction<KV<K, V>> timestampFunction);
       abstract Builder<K, V> setEOS(boolean eosEnabled);
       abstract Builder<K, V> setSinkGroupId(String sinkGroupId);
       abstract Builder<K, V> setNumShards(int numShards);
@@ -890,6 +899,28 @@ public class KafkaIO {
     }
 
     /**
+     * The timestamp for each record being published is set to timestamp of 
the element in the
+     * pipeline. This is equivalent to {@code withPublishTimestampFunction((e, 
ts) -> ts)}. <br>
+     * NOTE: Kafka's retention policies are based on message timestamps. If 
the pipeline
+     * is processing messages from the past, they might be deleted immediately 
by Kafka after
+     * being published if the timestamps are older than Kafka cluster's {@code 
log.retention.hours}.
+     */
+    public Write<K, V> withInputTimestamp() {
+      return 
withPublishTimestampFunction(KafkaPublishTimestampFunction.withElementTimestamp());
+    }
+
+    /**
+     * A function to provide timestamp for records being published. <br>
+     * NOTE: Kafka's retention policies are based on message timestamps. If 
the pipeline
+     * is processing messages from the past, they might be deleted immediately 
by Kafka after
+     * being published if the timestamps are older than Kafka cluster's {@code 
log.retention.hours}.
+     */
+    public Write<K, V> withPublishTimestampFunction(
+      KafkaPublishTimestampFunction<KV<K, V>> timestampFunction) {
+      return 
toBuilder().setPublishTimestampFunction(timestampFunction).build();
+    }
+
+    /**
      * Provides exactly-once semantics while writing to Kafka, which enables 
applications with
      * end-to-end exactly-once guarantees on top of exactly-once semantics 
<i>within</i> Beam
      * pipelines. It ensures that records written to sink are committed on 
Kafka exactly once,
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaPublishTimestampFunction.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaPublishTimestampFunction.java
new file mode 100644
index 0000000..e0ef639
--- /dev/null
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaPublishTimestampFunction.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Instant;
+
+/**
+ * An interface for providing custom timestamp for elements written to Kafka.
+ */
+public interface KafkaPublishTimestampFunction<T> extends Serializable {
+
+  /**
+   * Returns timestamp for element being published to Kafka.
+   * See @{@link org.apache.kafka.clients.producer.ProducerRecord}.
+   *
+   * @param element The element being published.
+   * @param elementTimestamp Timestamp of the element from the context
+   *                         (i.e. @{@link DoFn.ProcessContext#timestamp()}
+   */
+  Instant getTimestamp(T element, Instant elementTimestamp);
+
+  /**
+   * Returns {@link KafkaPublishTimestampFunction} returns element timestamp 
from ProcessContext.
+   */
+  static <T> KafkaPublishTimestampFunction<T> withElementTimestamp() {
+    return (element, elementTimestamp) -> elementTimestamp;
+  }
+}
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
index 00b76e5..9f2544a 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
@@ -54,8 +54,12 @@ class KafkaWriter<K, V> extends DoFn<KV<K, V>, Void> {
     checkForFailures();
 
     KV<K, V> kv = ctx.element();
-    producer.send(
-        new ProducerRecord<>(spec.getTopic(), kv.getKey(), kv.getValue()), new 
SendCallback());
+    Long timestampMillis = spec.getPublishTimestampFunction() != null
+      ? spec.getPublishTimestampFunction().getTimestamp(kv, 
ctx.timestamp()).getMillis()
+      : null;
+
+    producer.send(new ProducerRecord<>(
+        spec.getTopic(), null, timestampMillis, kv.getKey(), kv.getValue()), 
new SendCallback());
 
     elementsWritten.inc();
   }
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 3718c41..7ae8f1a 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -854,13 +854,14 @@ public class KafkaIOTest {
             .withTopic(topic)
             .withKeySerializer(IntegerSerializer.class)
             .withValueSerializer(LongSerializer.class)
+            .withInputTimestamp()
             .withProducerFactoryFn(new 
ProducerFactoryFn(producerWrapper.producerKey)));
 
       p.run();
 
       completionThread.shutdown();
 
-      verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, 
false);
+      verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, 
false, true);
     }
   }
 
@@ -891,7 +892,7 @@ public class KafkaIOTest {
 
       completionThread.shutdown();
 
-      verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, 
true);
+      verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, 
true, false);
     }
   }
 
@@ -930,13 +931,14 @@ public class KafkaIOTest {
                  .withEOS(1, "test")
                  .withConsumerFactoryFn(new ConsumerFactoryFn(
                    Lists.newArrayList(topic), 10, 10, 
OffsetResetStrategy.EARLIEST))
+                 .withPublishTimestampFunction((e, ts) -> ts)
                  .withProducerFactoryFn(new 
ProducerFactoryFn(producerWrapper.producerKey)));
 
       p.run();
 
       completionThread.shutdown();
 
-      verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, 
false);
+      verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, 
false, true);
     }
   }
 
@@ -1198,7 +1200,10 @@ public class KafkaIOTest {
   }
 
   private static void verifyProducerRecords(MockProducer<Integer, Long> 
mockProducer,
-                                            String topic, int numElements, 
boolean keyIsAbsent) {
+                                            String topic,
+                                            int numElements,
+                                            boolean keyIsAbsent,
+                                            boolean verifyTimestamp) {
 
     // verify that appropriate messages are written to kafka
     List<ProducerRecord<Integer, Long>> sent = mockProducer.history();
@@ -1215,6 +1220,9 @@ public class KafkaIOTest {
         assertEquals(i, record.key().intValue());
       }
       assertEquals(i, record.value().longValue());
+      if (verifyTimestamp) {
+        assertEquals(i, record.timestamp().intValue());
+      }
     }
   }
 

-- 
To stop receiving notification emails like this one, please contact
ming...@apache.org.

Reply via email to