Hello, Streams doesn’t currently define an interface for implementing methods to determine ID or event time.
According to the activity streams spec, the ‘id’ and ‘published’ fields of Activity and ActivityObject should contain this information, but it’s quite common for documents at the front of a pipeline to not yet be in activity streams format. By apache streams convention, getId() and getTimestamp() on StreamsDatum often (but not always) contain this information. This leads to scenarios building streams where to determine the ID or event time of a document, you have to do it manually, duplicating code embedded in the converter, or attempt a complete conversion. Contrast this with the approach for conversion itself, where org.apache.streams.data.ActivityConverter and org.apache.streams.data.ActivityObjectConverter implementations, spread throughout the provider modules, bind a conversion to a specific document type, and thus the appropriate converter for an incoming document can be determined and applied by org.apache.streams.converter.ActivityConverterProcessor and org.apache.streams.converter.ActivityObjectConverterProcessor respectively. Also a factor - frameworks such as flink and beam use Key and EventTime as core processing concepts - the sooner those values are known the more flexibility the pipeline designer has, and the more parallelism can occur. So it would benefit streams users if we make it possible to determine those values at the moment data enters the pipeline, prior to performing any type conversion or other intensive processing. I think we’d be better off defining core interfaces for extracting unique identifiers and event times that each provider module can implement, distinct from the provider converters (but used by them). Here’s what I have in mind: public interface IdentifierExtractor extends Serializable { Class documentClass(); String extractIdentifier(T document); } public interface EventTimeExtractor extends Serializable { Class documentClass(); java.time.Instant extractEventTime(T document); } And implementation for tweet: public class TweetIdentifierExtractor implements IdentifierExtractor { public documentClass() { return org.apache.streams.twitter.pojo.Tweet.class; } public extractIdentifier(org.apache.streams.twitter.pojo.Tweet tweet) { return “id:twitter:post:”+tweet.getIdStr(); } } public class TweetEventTimeExtractor { public static final String TWITTER_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy"; public static final DateTimeFormatter TWITTER_FORMATTER = DateTimeFormat.forPattern(TWITTER_FORMAT); public documentClass() { return org.apache.streams.twitter.pojo.Tweet.class; } public java.time.Instant extractEventTime(org.apache.streams.twitter.pojo.Tweet tweet) { return TWITTER_FORMAT.parseDateTime(tweet.getCreatedAt()); } } Does anyone disagree that this is a sensible thing to do with our existing code, and a reasonable standard to expect from any new providers that get built? Any other ideas or facts we should consider? Steve