This is an automated email from the ASF dual-hosted git repository. mingmxu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 501dc4c [BEAM-3851] Option to preserve element timestamp while publishing to Kafka. (#4868) 501dc4c is described below commit 501dc4cb17bb943aaa095feab959a9fed1aac20c Author: Raghu Angadi <rang...@apache.org> AuthorDate: Thu Mar 22 10:56:50 2018 -0700 [BEAM-3851] Option to preserve element timestamp while publishing to Kafka. (#4868) * Option to preserve element timestamp while publishing to Kafka. * Let users provide custom timestamp function. * update javadoc --- .../beam/sdk/io/kafka/KafkaExactlyOnceSink.java | 44 +++++++++++++-------- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 33 +++++++++++++++- .../io/kafka/KafkaPublishTimestampFunction.java | 45 ++++++++++++++++++++++ .../org/apache/beam/sdk/io/kafka/KafkaWriter.java | 8 +++- .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 16 ++++++-- 5 files changed, 123 insertions(+), 23 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java index 7345a92..9ae69da 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java @@ -64,6 +64,8 @@ import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -173,7 +175,8 @@ class KafkaExactlyOnceSink<K, V> extends PTransform<PCollection<KV<K, V>>, PColl /** * Shuffle messages assigning each randomly to a shard. */ - private static class Reshard<K, V> extends DoFn<KV<K, V>, KV<Integer, KV<K, V>>> { + private static class Reshard<K, V> + extends DoFn<KV<K, V>, KV<Integer, TimestampedValue<KV<K, V>>>> { private final int numShards; private transient int shardId; @@ -190,12 +193,13 @@ class KafkaExactlyOnceSink<K, V> extends PTransform<PCollection<KV<K, V>>, PColl @ProcessElement public void processElement(ProcessContext ctx) { shardId = (shardId + 1) % numShards; // round-robin among shards. - ctx.output(KV.of(shardId, ctx.element())); + ctx.output(KV.of(shardId, TimestampedValue.of(ctx.element(), ctx.timestamp()))); } } - private static class Sequencer<K, V> - extends DoFn<KV<Integer, Iterable<KV<K, V>>>, KV<Integer, KV<Long, KV<K, V>>>> { + private static class Sequencer<K, V> extends DoFn< + KV<Integer, Iterable<TimestampedValue<KV<K, V>>>>, + KV<Integer, KV<Long, TimestampedValue<KV<K, V>>>>> { private static final String NEXT_ID = "nextId"; @StateId(NEXT_ID) @@ -205,7 +209,7 @@ class KafkaExactlyOnceSink<K, V> extends PTransform<PCollection<KV<K, V>>, PColl public void processElement(@StateId(NEXT_ID) ValueState<Long> nextIdState, ProcessContext ctx) { long nextId = MoreObjects.firstNonNull(nextIdState.read(), 0L); int shard = ctx.element().getKey(); - for (KV<K, V> value : ctx.element().getValue()) { + for (TimestampedValue<KV<K, V>> value : ctx.element().getValue()) { ctx.output(KV.of(shard, KV.of(nextId, value))); nextId++; } @@ -214,7 +218,7 @@ class KafkaExactlyOnceSink<K, V> extends PTransform<PCollection<KV<K, V>>, PColl } private static class ExactlyOnceWriter<K, V> - extends DoFn<KV<Integer, Iterable<KV<Long, KV<K, V>>>>, Void> { + extends DoFn<KV<Integer, Iterable<KV<Long, TimestampedValue<KV<K, V>>>>>, Void> { private static final String NEXT_ID = "nextId"; private static final String MIN_BUFFERED_ID = "minBufferedId"; @@ -230,7 +234,7 @@ class KafkaExactlyOnceSink<K, V> extends PTransform<PCollection<KV<K, V>>, PColl @StateId(MIN_BUFFERED_ID) private final StateSpec<ValueState<Long>> minBufferedIdSpec = StateSpecs.value(); @StateId(OUT_OF_ORDER_BUFFER) - private final StateSpec<BagState<KV<Long, KV<K, V>>>> outOfOrderBufferSpec; + private final StateSpec<BagState<KV<Long, TimestampedValue<KV<K, V>>>>> outOfOrderBufferSpec; // A random id assigned to each shard. Helps with detecting when multiple jobs are mistakenly // started with same groupId used for storing state on Kafka side, including the case where // a job is restarted with same groupId, but the metadata from previous run was not cleared. @@ -248,7 +252,8 @@ class KafkaExactlyOnceSink<K, V> extends PTransform<PCollection<KV<K, V>>, PColl ExactlyOnceWriter(Write<K, V> spec, Coder<KV<K, V>> elemCoder) { this.spec = spec; - this.outOfOrderBufferSpec = StateSpecs.bag(KvCoder.of(BigEndianLongCoder.of(), elemCoder)); + this.outOfOrderBufferSpec = StateSpecs.bag( + KvCoder.of(BigEndianLongCoder.of(), TimestampedValueCoder.of(elemCoder))); } @Setup @@ -261,7 +266,7 @@ class KafkaExactlyOnceSink<K, V> extends PTransform<PCollection<KV<K, V>>, PColl public void processElement(@StateId(NEXT_ID) ValueState<Long> nextIdState, @StateId(MIN_BUFFERED_ID) ValueState<Long> minBufferedIdState, @StateId(OUT_OF_ORDER_BUFFER) - BagState<KV<Long, KV<K, V>>> oooBufferState, + BagState<KV<Long, TimestampedValue<KV<K, V>>>> oooBufferState, @StateId(WRITER_ID) ValueState<String> writerIdState, ProcessContext ctx) throws IOException { @@ -297,10 +302,10 @@ class KafkaExactlyOnceSink<K, V> extends PTransform<PCollection<KV<K, V>>, PColl // There might be out of order messages buffered in earlier iterations. These // will get merged if and when minBufferedId matches nextId. - Iterator<KV<Long, KV<K, V>>> iter = ctx.element().getValue().iterator(); + Iterator<KV<Long, TimestampedValue<KV<K, V>>>> iter = ctx.element().getValue().iterator(); while (iter.hasNext()) { - KV<Long, KV<K, V>> kv = iter.next(); + KV<Long, TimestampedValue<KV<K, V>>> kv = iter.next(); long recordId = kv.getKey(); if (recordId < nextId) { @@ -339,7 +344,8 @@ class KafkaExactlyOnceSink<K, V> extends PTransform<PCollection<KV<K, V>>, PColl // Read all of them in to memory and sort them. Reading into memory // might be problematic in extreme cases. Might need to improve it in future. - List<KV<Long, KV<K, V>>> buffered = Lists.newArrayList(oooBufferState.read()); + List<KV<Long, TimestampedValue<KV<K, V>>>> buffered = + Lists.newArrayList(oooBufferState.read()); buffered.sort(new KV.OrderByKey<>()); LOG.info("{} : merging {} buffered records (min buffered id is {}).", @@ -349,8 +355,7 @@ class KafkaExactlyOnceSink<K, V> extends PTransform<PCollection<KV<K, V>>, PColl minBufferedIdState.clear(); minBufferedId = Long.MAX_VALUE; - iter = - Iterators.mergeSorted( + iter = Iterators.mergeSorted( ImmutableList.of(iter, buffered.iterator()), new KV.OrderByKey<>()); } } @@ -428,10 +433,17 @@ class KafkaExactlyOnceSink<K, V> extends PTransform<PCollection<KV<K, V>>, PColl ProducerSpEL.beginTransaction(producer); } - void sendRecord(KV<K, V> record, Counter sendCounter) { + void sendRecord(TimestampedValue<KV<K, V>> record, Counter sendCounter) { try { + Long timestampMillis = spec.getPublishTimestampFunction() != null + ? spec.getPublishTimestampFunction().getTimestamp(record.getValue(), + record.getTimestamp()).getMillis() + : null; + producer.send( - new ProducerRecord<>(spec.getTopic(), record.getKey(), record.getValue())); + new ProducerRecord<>( + spec.getTopic(), null, timestampMillis, + record.getValue().getKey(), record.getValue().getValue())); sendCounter.inc(); } catch (KafkaException e) { ProducerSpEL.abortTransaction(producer); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index eb29229..eeb9da9 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -174,10 +174,15 @@ import org.slf4j.LoggerFactory; * .withKeySerializer(LongSerializer.class) * .withValueSerializer(StringSerializer.class) * - * // you can further customize KafkaProducer used to write the records by adding more + * // You can further customize KafkaProducer used to write the records by adding more * // settings for ProducerConfig. e.g, to enable compression : * .updateProducerProperties(ImmutableMap.of("compression.type", "gzip")) * + * // You set publish timestamp for the Kafka records. + * .withInputTimestamp() // element timestamp is used while publishing to Kafka + * // or you can also set a custom timestamp with a function. + * .withPublishTimestampFunction((elem, elemTs) -> ...) + * * // Optionally enable exactly-once sink (on supported runners). See JavaDoc for withEOS(). * .withEOS(20, "eos-sink-group-id"); * ); @@ -813,6 +818,8 @@ public class KafkaIO { @Nullable abstract Class<? extends Serializer<K>> getKeySerializer(); @Nullable abstract Class<? extends Serializer<V>> getValueSerializer(); + @Nullable abstract KafkaPublishTimestampFunction<KV<K, V>> getPublishTimestampFunction(); + // Configuration for EOS sink abstract boolean isEOS(); @Nullable abstract String getSinkGroupId(); @@ -830,6 +837,8 @@ public class KafkaIO { SerializableFunction<Map<String, Object>, Producer<K, V>> fn); abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>> serializer); abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> serializer); + abstract Builder<K, V> setPublishTimestampFunction( + KafkaPublishTimestampFunction<KV<K, V>> timestampFunction); abstract Builder<K, V> setEOS(boolean eosEnabled); abstract Builder<K, V> setSinkGroupId(String sinkGroupId); abstract Builder<K, V> setNumShards(int numShards); @@ -890,6 +899,28 @@ public class KafkaIO { } /** + * The timestamp for each record being published is set to timestamp of the element in the + * pipeline. This is equivalent to {@code withPublishTimestampFunction((e, ts) -> ts)}. <br> + * NOTE: Kafka's retention policies are based on message timestamps. If the pipeline + * is processing messages from the past, they might be deleted immediately by Kafka after + * being published if the timestamps are older than Kafka cluster's {@code log.retention.hours}. + */ + public Write<K, V> withInputTimestamp() { + return withPublishTimestampFunction(KafkaPublishTimestampFunction.withElementTimestamp()); + } + + /** + * A function to provide timestamp for records being published. <br> + * NOTE: Kafka's retention policies are based on message timestamps. If the pipeline + * is processing messages from the past, they might be deleted immediately by Kafka after + * being published if the timestamps are older than Kafka cluster's {@code log.retention.hours}. + */ + public Write<K, V> withPublishTimestampFunction( + KafkaPublishTimestampFunction<KV<K, V>> timestampFunction) { + return toBuilder().setPublishTimestampFunction(timestampFunction).build(); + } + + /** * Provides exactly-once semantics while writing to Kafka, which enables applications with * end-to-end exactly-once guarantees on top of exactly-once semantics <i>within</i> Beam * pipelines. It ensures that records written to sink are committed on Kafka exactly once, diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaPublishTimestampFunction.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaPublishTimestampFunction.java new file mode 100644 index 0000000..e0ef639 --- /dev/null +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaPublishTimestampFunction.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import java.io.Serializable; +import org.apache.beam.sdk.transforms.DoFn; +import org.joda.time.Instant; + +/** + * An interface for providing custom timestamp for elements written to Kafka. + */ +public interface KafkaPublishTimestampFunction<T> extends Serializable { + + /** + * Returns timestamp for element being published to Kafka. + * See @{@link org.apache.kafka.clients.producer.ProducerRecord}. + * + * @param element The element being published. + * @param elementTimestamp Timestamp of the element from the context + * (i.e. @{@link DoFn.ProcessContext#timestamp()} + */ + Instant getTimestamp(T element, Instant elementTimestamp); + + /** + * Returns {@link KafkaPublishTimestampFunction} returns element timestamp from ProcessContext. + */ + static <T> KafkaPublishTimestampFunction<T> withElementTimestamp() { + return (element, elementTimestamp) -> elementTimestamp; + } +} diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java index 00b76e5..9f2544a 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java @@ -54,8 +54,12 @@ class KafkaWriter<K, V> extends DoFn<KV<K, V>, Void> { checkForFailures(); KV<K, V> kv = ctx.element(); - producer.send( - new ProducerRecord<>(spec.getTopic(), kv.getKey(), kv.getValue()), new SendCallback()); + Long timestampMillis = spec.getPublishTimestampFunction() != null + ? spec.getPublishTimestampFunction().getTimestamp(kv, ctx.timestamp()).getMillis() + : null; + + producer.send(new ProducerRecord<>( + spec.getTopic(), null, timestampMillis, kv.getKey(), kv.getValue()), new SendCallback()); elementsWritten.inc(); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 3718c41..7ae8f1a 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -854,13 +854,14 @@ public class KafkaIOTest { .withTopic(topic) .withKeySerializer(IntegerSerializer.class) .withValueSerializer(LongSerializer.class) + .withInputTimestamp() .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey))); p.run(); completionThread.shutdown(); - verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false); + verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false, true); } } @@ -891,7 +892,7 @@ public class KafkaIOTest { completionThread.shutdown(); - verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, true); + verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, true, false); } } @@ -930,13 +931,14 @@ public class KafkaIOTest { .withEOS(1, "test") .withConsumerFactoryFn(new ConsumerFactoryFn( Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST)) + .withPublishTimestampFunction((e, ts) -> ts) .withProducerFactoryFn(new ProducerFactoryFn(producerWrapper.producerKey))); p.run(); completionThread.shutdown(); - verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false); + verifyProducerRecords(producerWrapper.mockProducer, topic, numElements, false, true); } } @@ -1198,7 +1200,10 @@ public class KafkaIOTest { } private static void verifyProducerRecords(MockProducer<Integer, Long> mockProducer, - String topic, int numElements, boolean keyIsAbsent) { + String topic, + int numElements, + boolean keyIsAbsent, + boolean verifyTimestamp) { // verify that appropriate messages are written to kafka List<ProducerRecord<Integer, Long>> sent = mockProducer.history(); @@ -1215,6 +1220,9 @@ public class KafkaIOTest { assertEquals(i, record.key().intValue()); } assertEquals(i, record.value().longValue()); + if (verifyTimestamp) { + assertEquals(i, record.timestamp().intValue()); + } } } -- To stop receiving notification emails like this one, please contact ming...@apache.org.