This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a16b64a Add EventTime interface to Record (#2446) a16b64a is described below commit a16b64a0615b145f695987464cf281f68175985b Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Sat Aug 25 22:33:08 2018 -0700 Add EventTime interface to Record (#2446) * Saved copy of work * revert conf changes * Prepare for the pr * Addressed reviewer comments --- .../org/apache/pulsar/functions/api/Record.java | 9 ++++++++ .../apache/pulsar/functions/sink/PulsarSink.java | 7 ++++++ .../pulsar/functions/source/PulsarRecord.java | 9 ++++++++ .../org/apache/pulsar/io/twitter/TweetData.java | 1 + .../apache/pulsar/io/twitter/TwitterFireHose.java | 27 ++++++++++++++++++++-- .../pulsar/io/twitter/TwitterFireHoseConfig.java | 3 +++ 6 files changed, 54 insertions(+), 2 deletions(-) diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java index 2704909..59cc104 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java @@ -47,6 +47,15 @@ public interface Record<T> { T getValue(); /** + * Retrieves the event time of the record from the source. + * + * @return millis since epoch + */ + default Optional<Long> getEventTime() { + return Optional.empty(); + } + + /** * Retrieves the partition information if any of the record. * * @return The partition id where the diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index c1da686..c3df393 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Base64; import java.util.Map; +import java.util.Optional; import lombok.AccessLevel; import lombok.Getter; @@ -239,6 +240,12 @@ public class PulsarSink<T> implements Sink<T> { msg.property("__pfn_input_topic__", pulsarRecord.getTopicName().get()) .property("__pfn_input_msg_id__", new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray()))); + } else { + // It is coming from some source + Optional<Long> eventTime = sinkRecord.getSourceRecord().getEventTime(); + if (eventTime.isPresent()) { + msg.eventTime(eventTime.get()); + } } pulsarSinkProcessor.sendOutputMessage(msg, record); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java index 46c9213..359f48e 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java @@ -75,6 +75,15 @@ public class PulsarRecord<T> implements RecordWithEncryptionContext<T> { } @Override + public Optional<Long> getEventTime() { + if (message.getEventTime() != 0) { + return Optional.of(message.getEventTime()); + } else { + return Optional.empty(); + } + } + + @Override public Optional<EncryptionContext> getEncryptionCtx() { return message.getEncryptionCtx(); } diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java index 3e5503d..36d4dc8 100644 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java @@ -44,6 +44,7 @@ public class TweetData { private String timestampMs; private Delete delete; + @Data public static class User { private Long id; diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java index 2631db1..fc61945 100644 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java @@ -34,9 +34,12 @@ import com.twitter.hbc.httpclient.auth.OAuth1; import java.io.IOException; import java.io.InputStream; import java.io.Serializable; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Map; import java.util.Optional; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.PushSource; import org.apache.pulsar.io.core.SourceContext; @@ -46,6 +49,7 @@ import org.slf4j.LoggerFactory; /** * Simple Push based Twitter FireHose Source */ +@Slf4j public class TwitterFireHose extends PushSource<TweetData> { private static final Logger LOG = LoggerFactory.getLogger(TwitterFireHose.class); @@ -126,7 +130,7 @@ public class TwitterFireHose extends PushSource<TweetData> { // We don't really care if the record succeeds or not. // However might be in the future to count failures // TODO:- Figure out the metrics story for connectors - consume(new TwitterRecord(tweet)); + consume(new TwitterRecord(tweet, config.getGuestimateTweetTime())); } catch (Exception e) { LOG.error("Exception thrown: {}", e); } @@ -166,9 +170,12 @@ public class TwitterFireHose extends PushSource<TweetData> { static private class TwitterRecord implements Record<TweetData> { private final TweetData tweet; + private static SimpleDateFormat dateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy"); + private final boolean guestimateTweetTime; - public TwitterRecord(TweetData tweet) { + public TwitterRecord(TweetData tweet, boolean guestimateTweetTime) { this.tweet = tweet; + this.guestimateTweetTime = guestimateTweetTime; } @Override @@ -178,6 +185,22 @@ public class TwitterFireHose extends PushSource<TweetData> { } @Override + public Optional<Long> getEventTime() { + try { + if (tweet.getCreatedAt() != null) { + Date d = dateFormat.parse(tweet.getCreatedAt()); + return Optional.of(d.toInstant().toEpochMilli()); + } else if (guestimateTweetTime) { + return Optional.of(System.currentTimeMillis()); + } else { + return Optional.empty(); + } + } catch (Exception e) { + return Optional.empty(); + } + } + + @Override public TweetData getValue() { return tweet; } diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java index 83f1baf..88acb33 100644 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java @@ -44,6 +44,9 @@ public class TwitterFireHoseConfig implements Serializable { private String consumerSecret; private String token; private String tokenSecret; + // Most firehose events have null createdAt time. If this parameter is set to true + // then we estimate the createdTime of each firehose event to be current time. + private Boolean guestimateTweetTime = false; // ------ Optional property keys