Repository: beam Updated Branches: refs/heads/master 741242732 -> 5c2da7dc2
append to #2135, add 1). fix issue of NO_TIMESTAMP type in 10; 2). rename field to 'timestamp'; Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f10509e7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f10509e7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f10509e7 Branch: refs/heads/master Commit: f10509e745ff234110bc50d16aba1cb6813036b6 Parents: 7412427 Author: mingmxu <ming...@ebay.com> Authored: Fri Mar 17 13:18:17 2017 -0700 Committer: Davor Bonaci <da...@google.com> Committed: Fri Mar 24 10:14:33 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/io/kafka/ConsumerSpEL.java | 43 +++++++++++++++++--- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 21 +++++++++- .../apache/beam/sdk/io/kafka/KafkaRecord.java | 15 +++++-- .../beam/sdk/io/kafka/KafkaRecordCoder.java | 5 +++ 4 files changed, 73 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f10509e7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java index b92b6fa..8fe17c1 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java @@ -17,10 +17,14 @@ */ package org.apache.beam.sdk.io.kafka; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.Collection; - import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.expression.Expression; import org.springframework.expression.ExpressionParser; import org.springframework.expression.spel.SpelParserConfiguration; @@ -33,16 +37,28 @@ import org.springframework.expression.spel.support.StandardEvaluationContext; * to eliminate the method definition differences. */ class ConsumerSpEL { - SpelParserConfiguration config = new SpelParserConfiguration(true, true); - ExpressionParser parser = new SpelExpressionParser(config); + private static final Logger LOG = LoggerFactory.getLogger(ConsumerSpEL.class); + + private SpelParserConfiguration config = new SpelParserConfiguration(true, true); + private ExpressionParser parser = new SpelExpressionParser(config); - Expression seek2endExpression = + private Expression seek2endExpression = parser.parseExpression("#consumer.seekToEnd(#tp)"); - Expression assignExpression = + private Expression assignExpression = parser.parseExpression("#consumer.assign(#tp)"); - public ConsumerSpEL() {} + private Method timestampMethod; + private boolean hasRecordTimestamp = false; + + public ConsumerSpEL() { + try { + timestampMethod = ConsumerRecord.class.getMethod("timestamp", (Class<?>[]) null); + hasRecordTimestamp = timestampMethod.getReturnType().equals(Long.TYPE); + } catch (NoSuchMethodException | SecurityException e) { + LOG.debug("Timestamp for Kafka message is not available."); + } + } public void evaluateSeek2End(Consumer consumer, TopicPartition topicPartitions) { StandardEvaluationContext mapContext = new StandardEvaluationContext(); @@ -57,4 +73,19 @@ class ConsumerSpEL { mapContext.setVariable("tp", topicPartitions); assignExpression.getValue(mapContext); } + + public long getRecordTimestamp(ConsumerRecord<byte[], byte[]> rawRecord) { + long timestamp; + try { + //for Kafka 0.9, set to System.currentTimeMillis(); + //for kafka 0.10, when NO_TIMESTAMP also set to System.currentTimeMillis(); + if (!hasRecordTimestamp || (timestamp = (long) timestampMethod.invoke(rawRecord)) <= 0L) { + timestamp = System.currentTimeMillis(); + } + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + // Not expected. Method timestamp() is already checked. + throw new RuntimeException(e); + } + return timestamp; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/f10509e7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 890fb2b..310392c 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -203,6 +203,14 @@ import org.slf4j.LoggerFactory; * {@link ProducerConfig} for sink. E.g. if you would like to enable offset * <em>auto commit</em> (for external monitoring or other purposes), you can set * <tt>"group.id"</tt>, <tt>"enable.auto.commit"</tt>, etc. + * + * <h3>Event Timestamp and Watermark</h3> + * By default record timestamp and watermark are based on processing time in KafkaIO reader. + * This can be overridden by providing {@code WatermarkFn} with + * {@link Read#withWatermarkFn(SerializableFunction)}, and {@code TimestampFn} with + * {@link Read#withTimestampFn(SerializableFunction)}.<br> + * Note that {@link KafkaRecord#getTimestamp()} reflects timestamp provided by Kafka if any, + * otherwise it is set to processing time. */ public class KafkaIO { /** @@ -428,6 +436,7 @@ public class KafkaIO { checkNotNull(getValueCoder(), "Value coder must be set"); } + @Override public PCollection<KafkaRecord<K, V>> expand(PBegin input) { // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. Unbounded<KafkaRecord<K, V>> unbounded = @@ -458,6 +467,7 @@ public class KafkaIO { private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT> unwrapKafkaAndThen(final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) { return new SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>() { + @Override public OutT apply(KafkaRecord<KeyT, ValueT> record) { return fn.apply(record.getKV()); } @@ -499,6 +509,7 @@ public class KafkaIO { private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> KAFKA_CONSUMER_FACTORY_FN = new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>() { + @Override public Consumer<byte[], byte[]> apply(Map<String, Object> config) { return new KafkaConsumer<>(config); } @@ -627,6 +638,7 @@ public class KafkaIO { } Collections.sort(partitions, new Comparator<TopicPartition>() { + @Override public int compare(TopicPartition tp1, TopicPartition tp2) { return ComparisonChain .start() @@ -750,6 +762,7 @@ public class KafkaIO { /** watermark before any records have been read. */ private static Instant initialWatermark = new Instant(Long.MIN_VALUE); + @Override public String toString() { return name; } @@ -800,13 +813,14 @@ public class KafkaIO { public UnboundedKafkaReader( UnboundedKafkaSource<K, V> source, @Nullable KafkaCheckpointMark checkpointMark) { - + this.consumerSpEL = new ConsumerSpEL(); this.source = source; this.name = "Reader-" + source.id; List<TopicPartition> partitions = source.spec.getTopicPartitions(); partitionStates = ImmutableList.copyOf(Lists.transform(partitions, new Function<TopicPartition, PartitionState>() { + @Override public PartitionState apply(TopicPartition tp) { return new PartitionState(tp, UNINITIALIZED_OFFSET); } @@ -886,7 +900,6 @@ public class KafkaIO { @Override public boolean start() throws IOException { - this.consumerSpEL = new ConsumerSpEL(); Read<K, V> spec = source.spec; consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig()); consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions()); @@ -909,6 +922,7 @@ public class KafkaIO { // Note that consumer is not thread safe, should not be accessed out side consumerPollLoop(). consumerPollThread.submit( new Runnable() { + @Override public void run() { consumerPollLoop(); } @@ -929,6 +943,7 @@ public class KafkaIO { offsetFetcherThread.scheduleAtFixedRate( new Runnable() { + @Override public void run() { updateLatestOffsets(); } @@ -986,6 +1001,7 @@ public class KafkaIO { rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), + consumerSpEL.getRecordTimestamp(rawRecord), decode(rawRecord.key(), source.spec.getKeyCoder()), decode(rawRecord.value(), source.spec.getValueCoder())); @@ -1059,6 +1075,7 @@ public class KafkaIO { return new KafkaCheckpointMark(ImmutableList.copyOf(// avoid lazy (consumedOffset can change) Lists.transform(partitionStates, new Function<PartitionState, PartitionMark>() { + @Override public PartitionMark apply(PartitionState p) { return new PartitionMark(p.topicPartition.topic(), p.topicPartition.partition(), http://git-wip-us.apache.org/repos/asf/beam/blob/f10509e7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java index fa202e1..e0e400e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.kafka; import java.io.Serializable; import java.util.Arrays; + import org.apache.beam.sdk.values.KV; /** @@ -31,25 +32,28 @@ public class KafkaRecord<K, V> implements Serializable { private final int partition; private final long offset; private final KV<K, V> kv; + private final long timestamp; public KafkaRecord( String topic, int partition, long offset, + long timestamp, K key, V value) { - this(topic, partition, offset, KV.of(key, value)); + this(topic, partition, offset, timestamp, KV.of(key, value)); } public KafkaRecord( String topic, int partition, long offset, + long timestamp, KV<K, V> kv) { - this.topic = topic; this.partition = partition; this.offset = offset; + this.timestamp = timestamp; this.kv = kv; } @@ -69,9 +73,13 @@ public class KafkaRecord<K, V> implements Serializable { return kv; } + public long getTimestamp() { + return timestamp; + } + @Override public int hashCode() { - return Arrays.deepHashCode(new Object[]{topic, partition, offset, kv}); + return Arrays.deepHashCode(new Object[]{topic, partition, offset, timestamp, kv}); } @Override @@ -82,6 +90,7 @@ public class KafkaRecord<K, V> implements Serializable { return topic.equals(other.topic) && partition == other.partition && offset == other.offset + && timestamp == other.timestamp && kv.equals(other.kv); } else { return false; http://git-wip-us.apache.org/repos/asf/beam/blob/f10509e7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java index ea78f09..2043a4c 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java @@ -19,10 +19,12 @@ package org.apache.beam.sdk.io.kafka; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.List; + import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; @@ -66,6 +68,7 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> { stringCoder.encode(value.getTopic(), outStream, nested); intCoder.encode(value.getPartition(), outStream, nested); longCoder.encode(value.getOffset(), outStream, nested); + longCoder.encode(value.getTimestamp(), outStream, nested); kvCoder.encode(value.getKV(), outStream, context); } @@ -77,6 +80,7 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> { stringCoder.decode(inStream, nested), intCoder.decode(inStream, nested), longCoder.decode(inStream, nested), + longCoder.decode(inStream, nested), kvCoder.decode(inStream, context)); } @@ -106,6 +110,7 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> { value.getTopic(), value.getPartition(), value.getOffset(), + value.getTimestamp(), (KV<Object, Object>) kvCoder.structuralValue(value.getKV())); } }