Hello,

Streams doesn’t currently define an interface for implementing methods to 
determine ID or event time.  

According to the activity streams spec, the ‘id’ and ‘published’ fields of 
Activity and ActivityObject should contain this information, but it’s quite 
common for documents at the front of a pipeline to not yet be in activity 
streams format.

By apache streams convention, getId() and getTimestamp() on StreamsDatum often 
(but not always) contain this information.  
This leads to scenarios building streams where to determine the ID or event 
time of a document, you have to do it manually, duplicating code embedded in 
the converter, or attempt a complete conversion.  

Contrast this with the approach for conversion itself, where 
org.apache.streams.data.ActivityConverter and 
org.apache.streams.data.ActivityObjectConverter implementations, spread 
throughout the provider modules, bind a conversion to a specific document type, 
and thus the appropriate converter for an incoming document can be determined 
and applied by org.apache.streams.converter.ActivityConverterProcessor and 
org.apache.streams.converter.ActivityObjectConverterProcessor respectively.  

Also a factor - frameworks such as flink and beam use Key and EventTime as core 
processing concepts - the sooner those values are known the more flexibility 
the pipeline designer has, and the more parallelism can occur.  So it would 
benefit streams users if we make it possible to determine those values at the 
moment data enters the pipeline, prior to performing any type conversion or 
other intensive processing. 

I think we’d be better off defining core interfaces for extracting unique 
identifiers and event times that each provider module can implement, distinct 
from the provider converters (but used by them). 

Here’s what I have in mind:  

public interface IdentifierExtractor extends Serializable {  
Class documentClass();
String extractIdentifier(T document);
}

public interface EventTimeExtractor extends Serializable {
Class documentClass();
java.time.Instant extractEventTime(T document);
}

And implementation for tweet:

public class TweetIdentifierExtractor implements IdentifierExtractor {  
public documentClass() {
return org.apache.streams.twitter.pojo.Tweet.class;
}
public extractIdentifier(org.apache.streams.twitter.pojo.Tweet tweet) {
return “id:twitter:post:”+tweet.getIdStr();
}
}

public class TweetEventTimeExtractor {  
public static final String TWITTER_FORMAT = "EEE MMM dd HH:mm:ss Z yyyy";
public static final DateTimeFormatter TWITTER_FORMATTER = 
DateTimeFormat.forPattern(TWITTER_FORMAT);  
public documentClass() {
return org.apache.streams.twitter.pojo.Tweet.class;
}
public java.time.Instant extractEventTime(org.apache.streams.twitter.pojo.Tweet 
tweet) {
return TWITTER_FORMAT.parseDateTime(tweet.getCreatedAt());
}
}

Does anyone disagree that this is a sensible thing to do with our existing 
code, and a reasonable standard to expect from any new providers that get 
built?  Any other ideas or facts we should consider?

Steve

Reply via email to