Hi Jayesh, Glad that it finally worked!
From a first look, I cannot spot anything wrong with the code itself. The only thing I have to note is that the locations of the logs and the printouts you put in your code differ and normally they are not printed in the console. Thanks, Kostas > On May 4, 2017, at 6:45 PM, Jayesh Patel <jpa...@keywcorp.com> wrote: > > I figured out what’s wrong – there was a silly mistake on my side. There is > nothing wrong with the code here, but please do let me know if you see > anything wrong with my approach. > > Thank you. > > From: Jayesh Patel > Sent: Thursday, May 04, 2017 10:00 AM > To: 'user@flink.apache.org' <user@flink.apache.org> > Subject: assignTimestampsAndWatermarks not working as expected > > Can anybody see what’s wrong with the following code? I am using Flink 1.2 > and have tried running it in Eclipse (local mode) as well as on a 3 node > cluster and it’s not behaving as expected. > > The idea is to have a custom source collect messages from a JMS topic (I have > a fake source for now that generates some out of order messages with event > time that is not delayed more than 5 seconds). The source doesn’t > collectWithTimestamp() or emitWatermark(). > The messages (events) include the event time. In order to allow for late or > out of order messages I use assignTimestampsAndWatermarks with > BoundedOutOfOrdernessTimestampExtractor and the extractTimestamp() method > retrieves the event time from the event. > > When I run this job, I don’t get the printout from the extractTimestamp() > method, nor do I get the logTuples.print() or stampedLogs.print() output. > When running on the local environment(Eclipse) I do see the printouts from > the fake source (MockSource – not shown here). But I don’t even get those > when run from my 3 node cluster with parallelism of 3. > > public static void main(String[] args) throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > env.getConfig().setAutoWatermarkInterval(2000); // just for debugging, > didn’t affect the behavior > > DataStream<Message> logs = env.addSource(new MockSource()); > DataStream<Tuple2<String, CEFEvent>> logTuples = logs.map(new > ParseEvent()); > logTuples.print(); > > > DataStream<Tuple2<String, CEFEvent>> stampedLogs = > logTuples.assignTimestampsAndWatermarks( > new > BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,CEFEvent>>(Time.seconds(5)) > { > private static final long serialVersionUID = 1L; > @Override > public long extractTimestamp(Tuple2<String,CEFEvent> > element) { > // This is how to extract timestamp from the event > long eventTime = > element.f1.getEventStartTime().toInstant().toEpochMilli(); > System.out.println("returning event time " + > eventTime); > return eventTime; > }}); > stampedLogs.print(); > env.execute(“simulation”); > } > > Thank you, > Jayesh