Hi Jayesh,

Glad that it finally worked! 

From a first look, I cannot spot anything wrong with the code itself.
The only thing I have to note is that the locations of the logs and the 
printouts  you put
in your code differ and normally they are not printed in the console.

Thanks,
Kostas

> On May 4, 2017, at 6:45 PM, Jayesh Patel <jpa...@keywcorp.com> wrote:
> 
> I figured out what’s wrong – there was a silly mistake on my side.  There is 
> nothing wrong with the code  here, but please do let me know if you see 
> anything wrong with my approach.
>  
> Thank you.
>  
> From: Jayesh Patel 
> Sent: Thursday, May 04, 2017 10:00 AM
> To: 'user@flink.apache.org' <user@flink.apache.org>
> Subject: assignTimestampsAndWatermarks not working as expected
>  
> Can anybody see what’s wrong with the following code?  I am using Flink 1.2 
> and have tried running it in Eclipse (local mode) as well as on a 3 node 
> cluster and it’s not behaving as expected.
>  
> The idea is to have a custom source collect messages from a JMS topic (I have 
> a fake source for now that generates some out of order messages with event 
> time that is not delayed more than 5 seconds).  The source doesn’t 
> collectWithTimestamp() or emitWatermark().
> The messages (events) include the event time.  In order to allow for late or 
> out of order messages I use assignTimestampsAndWatermarks with 
> BoundedOutOfOrdernessTimestampExtractor and the extractTimestamp() method 
> retrieves the event time from the event.
>  
> When I run this job, I don’t get the printout from the extractTimestamp() 
> method, nor do I get the logTuples.print() or stampedLogs.print() output.  
> When running on the local environment(Eclipse) I do see the printouts from 
> the fake source (MockSource – not shown here).  But I don’t even get those 
> when run from my 3 node cluster with parallelism of 3.
>  
> public static void main(String[] args) throws Exception {
>        final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>        env.getConfig().setAutoWatermarkInterval(2000); // just for debugging, 
> didn’t affect the behavior
>  
>        DataStream<Message> logs = env.addSource(new MockSource());
>        DataStream<Tuple2<String, CEFEvent>> logTuples = logs.map(new 
> ParseEvent());
>        logTuples.print();
>  
>  
>        DataStream<Tuple2<String, CEFEvent>> stampedLogs = 
> logTuples.assignTimestampsAndWatermarks(
> new 
> BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,CEFEvent>>(Time.seconds(5))
>  {
>                      private static final long serialVersionUID = 1L;
>                      @Override
>                      public long extractTimestamp(Tuple2<String,CEFEvent> 
> element) {
>                             // This is how to extract timestamp from the event
>                            long eventTime = 
> element.f1.getEventStartTime().toInstant().toEpochMilli();
>                            System.out.println("returning event time " + 
> eventTime);
>                            return eventTime;
>                      }});
>        stampedLogs.print();
>        env.execute(“simulation”);
> }
>  
> Thank you,
> Jayesh

Reply via email to