Hi, I would guess that the watermark generation does not work as expected. I would recommend to log the extracted timestamps + the watermarks to understand how time is progressing, and when watermarks are generated to trigger a window computation.
On Tue, Jan 24, 2017 at 6:53 PM, Sujit Sakre <sujit.sa...@northgateps.com> wrote: > Hi Aljoscha, > > Thanks. > > Yes, we are using Event Time. > Yes, Flink program is kept running in the IDE, i.e. eclipse and not > closed, after the first batch of events and when adding the second batch. > Yes, We do have acustom timestamp/watermark assigner, implemented as > BoundedOutOfOrdernessGenerator2 > > Are we using the properties for Kafka correctly? > We are using Flink 1.1.1 and Flink Kafka connector: > flink-connector-kafka-0.9_2.11 > > More about the behavior: > I have noticed that sometimes even after the first writing to the Kafka > queue, and when the Flink program runs, sometimes it does process the > queue immediately. We need to restart. This is quite random. > > Following is the rough outline of our code. > > public class SlidingWindow2{ > > public static void main(String[] args) throws Exception { > > // set up the execution environment > StreamExecutionEnvironment env = StreamExecutionEnvironment.get > ExecutionEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > // configure the Kafka consumer > Properties kafkaProps = new Properties(); > kafkaProps.setProperty("zookeeper.connect", "localhost:2181"); > kafkaProps.setProperty("bootstrap.servers", "localhost:9092"); > kafkaProps.setProperty("group.id", "demo"); > // always read the Kafka topic from the start > kafkaProps.setProperty("auto.offset.reset" ,"earliest"); > > FlinkKafkaConsumer09<Tuple5<String, String, Float, Float, > String>> consumer = new FlinkKafkaConsumer09<>( > "test", // kafka topic name > new dataSchema(), > kafkaProps); > DataStream<Tuple5<String, String, Float, Float, String>> > stream1 = env.addSource(consumer); > DataStream<Tuple5<String, String, Float, Float, String>> > keyedStream = stream1.assignTimestampsAndWatermarks(new > BoundedOutOfOrdernessGenerator2()); > > keyedStream.keyBy(4) > .window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2))) > .apply(new CustomSlidingWindowFunction()); > > env.execute("Sliding Event Time Window Processing"); > > } > } > > > public static class CustomSlidingWindowFunction implements > WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String, > String, Float, Float, String>, Tuple, TimeWindow>{ > > @Override > public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String, > String, Float, Float, String>> input, > Collector<Tuple5<String, String, Float, Float, String>> out) throws > Exception { > > .... > } > > > // Implemented custom Periodic Watermark as below from public static class > BoundedOutOfOrdernessGenerator2 implements > AssignerWithPeriodicWatermarks<Tuple5<String, > String, Float, Float, String>> { /** * */ private static final long > serialVersionUID = 1L; private final long maxOutOfOrderness = > MAX_EVENT_DELAY; // constant set in seconds private long > currentMaxTimestamp; @Override public long extractTimestamp(Tuple5<String, > String, Float, Float, String> element, long previousElementTimestamp) { > //System.out.println("inside extractTimestamp"); Date parseDate = null; > SimpleDateFormat dateFormat = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss"); > try { parseDate = dateFormat.parse(element.f0); } catch (ParseException e) > { e.printStackTrace(); } long timestamp = parseDate.getTime(); > currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return > timestamp; } @Override public Watermark getCurrentWatermark() { // return > the watermark as twice the current highest timestamp minus the > out-of-orderness bound // this is because it is not covering the lateness > sufficiently; now it does // in future this may be multiple of 3 or more if > necessary to cover the gap in records received return new > Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness); } } > > > > > > *Sujit Sakre* > > > > On 24 January 2017 at 22:34, Aljoscha Krettek <aljos...@apache.org> wrote: > >> Hi, >> a bit more information would be useful. Are you using event-time? Is the >> Flink program kept running after adding the first batch of events and when >> adding the second batch or is it to invocations of your Flink program? Do >> you have a custom timestamp/watermark assigner? >> >> Cheers, >> Aljoscha >> >> On Tue, 24 Jan 2017 at 14:28 Sujit Sakre <sujit.sa...@northgateps.com> >> wrote: >> >>> Hi, >>> >>> We are using a sliding window function to process data read from Kafka >>> Stream. We are using FlinkKafkaConsumer09 to read the data. The window >>> function and sink are running correctly. >>> >>> To test the program, we are generating a stream of data from command >>> line. >>> This works when we add set of records once. When we add again, it does >>> not work, Flink produces no result, even though the records are added to >>> same Kafka topic from the same command line instance. >>> >>> Please could you suggest what could be wrong. >>> >>> Many thanks. >>> >>> >>> *Sujit Sakre* >>> >>> This email is sent on behalf of Northgate Public Services (UK) Limited >>> and its associated companies including Rave Technologies (India) Pvt >>> Limited (together "Northgate Public Services") and is strictly confidential >>> and intended solely for the addressee(s). >>> If you are not the intended recipient of this email you must: (i) not >>> disclose, copy or distribute its contents to any other person nor use its >>> contents in any way or you may be acting unlawfully; (ii) contact >>> Northgate Public Services immediately on +44(0)1908 264500 >>> <+44%201908%20264500> quoting the name of the sender and the addressee >>> then delete it from your system. >>> Northgate Public Services has taken reasonable precautions to ensure >>> that no viruses are contained in this email, but does not accept any >>> responsibility once this email has been transmitted. You should scan >>> attachments (if any) for viruses. >>> >>> Northgate Public Services (UK) Limited, registered in England and Wales >>> under number 00968498 with a registered address of Peoplebuilding 2, >>> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 >>> 4NN. Rave Technologies (India) Pvt Limited, registered in India under >>> number 117068 with a registered address of 2nd Floor, Ballard House, Adi >>> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001. >>> >> > > This email is sent on behalf of Northgate Public Services (UK) Limited and > its associated companies including Rave Technologies (India) Pvt Limited > (together "Northgate Public Services") and is strictly confidential and > intended solely for the addressee(s). > If you are not the intended recipient of this email you must: (i) not > disclose, copy or distribute its contents to any other person nor use its > contents in any way or you may be acting unlawfully; (ii) contact > Northgate Public Services immediately on +44(0)1908 264500 > <+44%201908%20264500> quoting the name of the sender and the addressee > then delete it from your system. > Northgate Public Services has taken reasonable precautions to ensure that > no viruses are contained in this email, but does not accept any > responsibility once this email has been transmitted. You should scan > attachments (if any) for viruses. > > Northgate Public Services (UK) Limited, registered in England and Wales > under number 00968498 with a registered address of Peoplebuilding 2, > Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2 > 4NN. Rave Technologies (India) Pvt Limited, registered in India under > number 117068 with a registered address of 2nd Floor, Ballard House, Adi > Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001. >