Hello,
Streams doesn’t currently define an interface for implementing methods to
determine ID or event time.
According to the activity streams spec, the ‘id’ and ‘published’ fields of
Activity and ActivityObject should contain this information, but it’s quite
common for documents at the front of a pipeline to not yet be in activity
streams format.
By apache streams convention, getId() and getTimestamp() on StreamsDatum often
(but not always) contain this information.
This leads to scenarios building streams where to determine the ID or event
time of a document, you have to do it manually, duplicating code embedded in
the converter, or attempt a complete conversion.
Contrast this with the approach for conversion itself, where
org.apache.streams.data.ActivityConverter and
org.apache.streams.data.ActivityObjectConverter implementations, spread
throughout the provider modules, bind a conversion to a specific document type,
and thus the appropriate converter for an incoming document can be determined
and applied by org.apache.streams.converter.ActivityConverterProcessor and
org.apache.streams.converter.ActivityObjectConverterProcessor respectively.
Also a factor - frameworks such as flink and beam use Key and EventTime as core
processing concepts - the sooner those values are known the more flexibility
the pipeline designer has, and the more parallelism can occur. So it would
benefit streams users if we make it possible to determine those values at the
moment data enters the pipeline, prior to performing any type conversion or
other intensive processing.
I think we’d be better off defining core interfaces for extracting unique
identifiers and event times that each provider module can implement, distinct
from the provider converters (but used by them).
Here’s what I have in mind:
public interface IdentifierExtractor extends Serializable {
Class documentClass();
String extractIdentifier(T document);
}
public interface EventTimeExtractor extends Serializable {
Class documentClass();
java.time.Instant extractEventTime(T document);
}
And implementation for tweet:
public class TweetIdentifierExtractor implements IdentifierExtractor {
public documentClass() {
return org.apache.streams.twitter.pojo.Tweet.class;
}
public extractIdentifier(org.apache.streams.twitter.pojo.Tweet tweet) {
return “id:twitter:post:”+tweet.getIdStr();
}
}
public class TweetEventTimeExtractor {
public static final String TWITTER_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
public static final DateTimeFormatter TWITTER_FORMATTER =
DateTimeFormat.forPattern(TWITTER_FORMAT);
public documentClass() {
return org.apache.streams.twitter.pojo.Tweet.class;
}
public java.time.Instant extractEventTime(org.apache.streams.twitter.pojo.Tweet
tweet) {
return TWITTER_FORMAT.parseDateTime(tweet.getCreatedAt());
}
}
Does anyone disagree that this is a sensible thing to do with our existing
code, and a reasonable standard to expect from any new providers that get
built? Any other ideas or facts we should consider?
Steve