[ 
https://issues.apache.org/jira/browse/BEAM-3851?focusedWorklogId=83270&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83270
 ]

ASF GitHub Bot logged work on BEAM-3851:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Mar/18 17:56
            Start Date: 22/Mar/18 17:56
    Worklog Time Spent: 10m 
      Work Description: XuMingmin closed pull request #4868: [BEAM-3851] Option 
to preserve element timestamp while publishing to Kafka.
URL: https://github.com/apache/beam/pull/4868
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
index 7345a92f201..9ae69da3834 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
@@ -64,6 +64,8 @@
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -173,7 +175,8 @@ static void ensureEOSSupport() {
   /**
    * Shuffle messages assigning each randomly to a shard.
    */
-  private static class Reshard<K, V> extends DoFn<KV<K, V>, KV<Integer, KV<K, 
V>>> {
+  private static class Reshard<K, V>
+      extends DoFn<KV<K, V>, KV<Integer, TimestampedValue<KV<K, V>>>> {
 
     private final int numShards;
     private transient int shardId;
@@ -190,12 +193,13 @@ public void setup() {
     @ProcessElement
     public void processElement(ProcessContext ctx) {
       shardId = (shardId + 1) % numShards; // round-robin among shards.
-      ctx.output(KV.of(shardId, ctx.element()));
+      ctx.output(KV.of(shardId, TimestampedValue.of(ctx.element(), 
ctx.timestamp())));
     }
   }
 
-  private static class Sequencer<K, V>
-    extends DoFn<KV<Integer, Iterable<KV<K, V>>>, KV<Integer, KV<Long, KV<K, 
V>>>> {
+  private static class Sequencer<K, V> extends DoFn<
+      KV<Integer, Iterable<TimestampedValue<KV<K, V>>>>,
+      KV<Integer, KV<Long, TimestampedValue<KV<K, V>>>>> {
 
     private static final String NEXT_ID = "nextId";
     @StateId(NEXT_ID)
@@ -205,7 +209,7 @@ public void processElement(ProcessContext ctx) {
     public void processElement(@StateId(NEXT_ID) ValueState<Long> nextIdState, 
ProcessContext ctx) {
       long nextId = MoreObjects.firstNonNull(nextIdState.read(), 0L);
       int shard = ctx.element().getKey();
-      for (KV<K, V> value : ctx.element().getValue()) {
+      for (TimestampedValue<KV<K, V>> value : ctx.element().getValue()) {
         ctx.output(KV.of(shard, KV.of(nextId, value)));
         nextId++;
       }
@@ -214,7 +218,7 @@ public void processElement(@StateId(NEXT_ID) 
ValueState<Long> nextIdState, Proce
   }
 
   private static class ExactlyOnceWriter<K, V>
-    extends DoFn<KV<Integer, Iterable<KV<Long, KV<K, V>>>>, Void> {
+    extends DoFn<KV<Integer, Iterable<KV<Long, TimestampedValue<KV<K, V>>>>>, 
Void> {
 
     private static final String NEXT_ID = "nextId";
     private static final String MIN_BUFFERED_ID = "minBufferedId";
@@ -230,7 +234,7 @@ public void processElement(@StateId(NEXT_ID) 
ValueState<Long> nextIdState, Proce
     @StateId(MIN_BUFFERED_ID)
     private final StateSpec<ValueState<Long>> minBufferedIdSpec = 
StateSpecs.value();
     @StateId(OUT_OF_ORDER_BUFFER)
-    private final StateSpec<BagState<KV<Long, KV<K, V>>>> outOfOrderBufferSpec;
+    private final StateSpec<BagState<KV<Long, TimestampedValue<KV<K, V>>>>> 
outOfOrderBufferSpec;
     // A random id assigned to each shard. Helps with detecting when multiple 
jobs are mistakenly
     // started with same groupId used for storing state on Kafka side, 
including the case where
     // a job is restarted with same groupId, but the metadata from previous 
run was not cleared.
@@ -248,7 +252,8 @@ public void processElement(@StateId(NEXT_ID) 
ValueState<Long> nextIdState, Proce
 
     ExactlyOnceWriter(Write<K, V> spec, Coder<KV<K, V>> elemCoder) {
       this.spec = spec;
-      this.outOfOrderBufferSpec = 
StateSpecs.bag(KvCoder.of(BigEndianLongCoder.of(), elemCoder));
+      this.outOfOrderBufferSpec = StateSpecs.bag(
+          KvCoder.of(BigEndianLongCoder.of(), 
TimestampedValueCoder.of(elemCoder)));
     }
 
     @Setup
@@ -261,7 +266,7 @@ public void setup() {
     public void processElement(@StateId(NEXT_ID) ValueState<Long> nextIdState,
                                @StateId(MIN_BUFFERED_ID) ValueState<Long> 
minBufferedIdState,
                                @StateId(OUT_OF_ORDER_BUFFER)
-                                 BagState<KV<Long, KV<K, V>>> oooBufferState,
+                                 BagState<KV<Long, TimestampedValue<KV<K, 
V>>>> oooBufferState,
                                @StateId(WRITER_ID) ValueState<String> 
writerIdState,
                                ProcessContext ctx)
       throws IOException {
@@ -297,10 +302,10 @@ public void processElement(@StateId(NEXT_ID) 
ValueState<Long> nextIdState,
         // There might be out of order messages buffered in earlier 
iterations. These
         // will get merged if and when minBufferedId matches nextId.
 
-        Iterator<KV<Long, KV<K, V>>> iter = 
ctx.element().getValue().iterator();
+        Iterator<KV<Long, TimestampedValue<KV<K, V>>>> iter = 
ctx.element().getValue().iterator();
 
         while (iter.hasNext()) {
-          KV<Long, KV<K, V>> kv = iter.next();
+          KV<Long, TimestampedValue<KV<K, V>>> kv = iter.next();
           long recordId = kv.getKey();
 
           if (recordId < nextId) {
@@ -339,7 +344,8 @@ public void processElement(@StateId(NEXT_ID) 
ValueState<Long> nextIdState,
             // Read all of them in to memory and sort them. Reading into memory
             // might be problematic in extreme cases. Might need to improve it 
in future.
 
-            List<KV<Long, KV<K, V>>> buffered = 
Lists.newArrayList(oooBufferState.read());
+            List<KV<Long, TimestampedValue<KV<K, V>>>> buffered =
+                Lists.newArrayList(oooBufferState.read());
             buffered.sort(new KV.OrderByKey<>());
 
             LOG.info("{} : merging {} buffered records (min buffered id is 
{}).",
@@ -349,8 +355,7 @@ public void processElement(@StateId(NEXT_ID) 
ValueState<Long> nextIdState,
             minBufferedIdState.clear();
             minBufferedId = Long.MAX_VALUE;
 
-            iter =
-              Iterators.mergeSorted(
+            iter = Iterators.mergeSorted(
                 ImmutableList.of(iter, buffered.iterator()), new 
KV.OrderByKey<>());
           }
         }
@@ -428,10 +433,17 @@ void beginTxn() {
         ProducerSpEL.beginTransaction(producer);
       }
 
-      void sendRecord(KV<K, V> record, Counter sendCounter) {
+      void sendRecord(TimestampedValue<KV<K, V>> record, Counter sendCounter) {
         try {
+          Long timestampMillis = spec.getPublishTimestampFunction() != null
+            ? 
spec.getPublishTimestampFunction().getTimestamp(record.getValue(),
+                                                              
record.getTimestamp()).getMillis()
+            : null;
+
           producer.send(
-            new ProducerRecord<>(spec.getTopic(), record.getKey(), 
record.getValue()));
+              new ProducerRecord<>(
+                  spec.getTopic(), null, timestampMillis,
+                  record.getValue().getKey(), record.getValue().getValue()));
           sendCounter.inc();
         } catch (KafkaException e) {
           ProducerSpEL.abortTransaction(producer);
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index eb292299256..eeb9da95814 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -174,10 +174,15 @@
  *       .withKeySerializer(LongSerializer.class)
  *       .withValueSerializer(StringSerializer.class)
  *
- *       // you can further customize KafkaProducer used to write the records 
by adding more
+ *       // You can further customize KafkaProducer used to write the records 
by adding more
  *       // settings for ProducerConfig. e.g, to enable compression :
  *       .updateProducerProperties(ImmutableMap.of("compression.type", "gzip"))
  *
+ *       // You set publish timestamp for the Kafka records.
+ *       .withInputTimestamp() // element timestamp is used while publishing 
to Kafka
+ *       // or you can also set a custom timestamp with a function.
+ *       .withPublishTimestampFunction((elem, elemTs) -> ...)
+ *
  *       // Optionally enable exactly-once sink (on supported runners). See 
JavaDoc for withEOS().
  *       .withEOS(20, "eos-sink-group-id");
  *    );
@@ -813,6 +818,8 @@ private KafkaIO() {}
     @Nullable abstract Class<? extends Serializer<K>> getKeySerializer();
     @Nullable abstract Class<? extends Serializer<V>> getValueSerializer();
 
+    @Nullable abstract KafkaPublishTimestampFunction<KV<K, V>> 
getPublishTimestampFunction();
+
     // Configuration for EOS sink
     abstract boolean isEOS();
     @Nullable abstract String getSinkGroupId();
@@ -830,6 +837,8 @@ private KafkaIO() {}
           SerializableFunction<Map<String, Object>, Producer<K, V>> fn);
       abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>> 
serializer);
       abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> 
serializer);
+      abstract Builder<K, V> setPublishTimestampFunction(
+        KafkaPublishTimestampFunction<KV<K, V>> timestampFunction);
       abstract Builder<K, V> setEOS(boolean eosEnabled);
       abstract Builder<K, V> setSinkGroupId(String sinkGroupId);
       abstract Builder<K, V> setNumShards(int numShards);
@@ -889,6 +898,28 @@ private KafkaIO() {}
       return toBuilder().setProducerFactoryFn(producerFactoryFn).build();
     }
 
+    /**
+     * The timestamp for each record being published is set to timestamp of 
the element in the
+     * pipeline. This is equivalent to {@code withPublishTimestampFunction((e, 
ts) -> ts)}. <br>
+     * NOTE: Kafka's retention policies are based on message timestamps. If 
the pipeline
+     * is processing messages from the past, they might be deleted immediately 
by Kafka after
+     * being published if the timestamps are older than Kafka cluster's {@code 
log.retention.hours}.
+     */
+    public Write<K, V> withInputTimestamp() {
+      return 
withPublishTimestampFunction(KafkaPublishTimestampFunction.withElementTimestamp());
+    }
+
+    /**
+     * A function to provide timestamp for records being published. <br>
+     * NOTE: Kafka's retention policies are based on message timestamps. If 
the pipeline
+     * is processing messages from the past, they might be deleted immediately 
by Kafka after
+     * being published if the timestamps are older than Kafka cluster's {@code 
log.retention.hours}.
+     */
+    public Write<K, V> withPublishTimestampFunction(
+      KafkaPublishTimestampFunction<KV<K, V>> timestampFunction) {
+      return 
toBuilder().setPublishTimestampFunction(timestampFunction).build();
+    }
+
     /**
      * Provides exactly-once semantics while writing to Kafka, which enables 
applications with
      * end-to-end exactly-once guarantees on top of exactly-once semantics 
<i>within</i> Beam
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaPublishTimestampFunction.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaPublishTimestampFunction.java
new file mode 100644
index 00000000000..e0ef6397d23
--- /dev/null
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaPublishTimestampFunction.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.joda.time.Instant;
+
+/**
+ * An interface for providing custom timestamp for elements written to Kafka.
+ */
+public interface KafkaPublishTimestampFunction<T> extends Serializable {
+
+  /**
+   * Returns timestamp for element being published to Kafka.
+   * See @{@link org.apache.kafka.clients.producer.ProducerRecord}.
+   *
+   * @param element The element being published.
+   * @param elementTimestamp Timestamp of the element from the context
+   *                         (i.e. @{@link DoFn.ProcessContext#timestamp()}
+   */
+  Instant getTimestamp(T element, Instant elementTimestamp);
+
+  /**
+   * Returns {@link KafkaPublishTimestampFunction} returns element timestamp 
from ProcessContext.
+   */
+  static <T> KafkaPublishTimestampFunction<T> withElementTimestamp() {
+    return (element, elementTimestamp) -> elementTimestamp;
+  }
+}
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
index 00b76e503b8..9f2544a96b0 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
@@ -54,8 +54,12 @@ public void processElement(ProcessContext ctx) throws 
Exception {
     checkForFailures();
 
     KV<K, V> kv = ctx.element();
-    producer.send(
-        new ProducerRecord<>(spec.getTopic(), kv.getKey(), kv.getValue()), new 
SendCallback());
+    Long timestampMillis = spec.getPublishTimestampFunction() != null
+      ? spec.getPublishTimestampFunction().getTimestamp(kv, 
ctx.timestamp()).getMillis()
+      : null;
+
+    producer.send(new ProducerRecord<>(
+        spec.getTopic(), null, timestampMillis, kv.getKey(), kv.getValue()), 
new SendCallback());
 
     elementsWritten.inc();
   }
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 3718c410827..7ae8f1a3ad8 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -854,13 +854,14 @@ public void testSink() throws Exception {
             .withTopic(topic)
             .withKeySerializer(IntegerSerializer.class)
             .withValueSerializer(LongSerializer.class)
+            .withInputTimestamp()
             .withProducerFactoryFn(new 
ProducerFactoryFn(producerWrapper.producerKey)));
 
       p.run();
 
       completionThread.shutdown();
 
-      verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, 
false);
+      verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, 
false, true);
     }
   }
 
@@ -891,7 +892,7 @@ public void testValuesSink() throws Exception {
 
       completionThread.shutdown();
 
-      verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, 
true);
+      verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, 
true, false);
     }
   }
 
@@ -930,13 +931,14 @@ public void testExactlyOnceSink() {
                  .withEOS(1, "test")
                  .withConsumerFactoryFn(new ConsumerFactoryFn(
                    Lists.newArrayList(topic), 10, 10, 
OffsetResetStrategy.EARLIEST))
+                 .withPublishTimestampFunction((e, ts) -> ts)
                  .withProducerFactoryFn(new 
ProducerFactoryFn(producerWrapper.producerKey)));
 
       p.run();
 
       completionThread.shutdown();
 
-      verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, 
false);
+      verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, 
false, true);
     }
   }
 
@@ -1198,7 +1200,10 @@ public void testSinkMetrics() throws Exception {
   }
 
   private static void verifyProducerRecords(MockProducer<Integer, Long> 
mockProducer,
-                                            String topic, int numElements, 
boolean keyIsAbsent) {
+                                            String topic,
+                                            int numElements,
+                                            boolean keyIsAbsent,
+                                            boolean verifyTimestamp) {
 
     // verify that appropriate messages are written to kafka
     List<ProducerRecord<Integer, Long>> sent = mockProducer.history();
@@ -1215,6 +1220,9 @@ private static void 
verifyProducerRecords(MockProducer<Integer, Long> mockProduc
         assertEquals(i, record.key().intValue());
       }
       assertEquals(i, record.value().longValue());
+      if (verifyTimestamp) {
+        assertEquals(i, record.timestamp().intValue());
+      }
     }
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 83270)
    Time Spent: 2h  (was: 1h 50m)

> Support element timestamps while publishing to Kafka.
> -----------------------------------------------------
>
>                 Key: BEAM-3851
>                 URL: https://issues.apache.org/jira/browse/BEAM-3851
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>    Affects Versions: 2.3.0
>            Reporter: Raghu Angadi
>            Assignee: Raghu Angadi
>            Priority: Major
>             Fix For: 2.5.0
>
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> KafkaIO sink should support using input element timestamp for the message 
> published to Kafka. Otherwise there is no way for user to influence the 
> timestamp of the messages in Kafka sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to