Hi,
I would guess that the watermark generation does not work as expected.
I would recommend to log the extracted timestamps + the watermarks to
understand how time is progressing, and when watermarks are generated to
trigger a window computation.

On Tue, Jan 24, 2017 at 6:53 PM, Sujit Sakre <sujit.sa...@northgateps.com>
wrote:

> Hi Aljoscha,
>
> Thanks.
>
> Yes, we are using Event Time.
> Yes, Flink program is kept running in the IDE, i.e. eclipse and not
> closed, after the first batch of events and when adding the second batch.
> Yes, We do have acustom timestamp/watermark assigner, implemented as
> BoundedOutOfOrdernessGenerator2
>
> Are we using the properties for Kafka correctly?
> We are using Flink 1.1.1 and Flink Kafka connector:
> flink-connector-kafka-0.9_2.11
>
> More about the behavior:
> I have noticed that sometimes even after the first writing to the Kafka
> queue,  and when the Flink program runs, sometimes it does process the
> queue immediately. We need to restart. This is quite random.
>
> Following is the rough outline of our code.
>
> public class SlidingWindow2{
>
> public static void main(String[] args) throws Exception {
>
> // set up the execution environment
> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
> ExecutionEnvironment();
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> // configure the Kafka consumer
> Properties kafkaProps = new Properties();
> kafkaProps.setProperty("zookeeper.connect", "localhost:2181");
> kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
> kafkaProps.setProperty("group.id", "demo");
> // always read the Kafka topic from the start
> kafkaProps.setProperty("auto.offset.reset" ,"earliest");
>
>                 FlinkKafkaConsumer09<Tuple5<String, String, Float, Float,
> String>> consumer = new FlinkKafkaConsumer09<>(
> "test",            // kafka topic name
> new dataSchema(),
> kafkaProps);
>                 DataStream<Tuple5<String, String, Float, Float, String>>
> stream1 = env.addSource(consumer);
>                 DataStream<Tuple5<String, String, Float, Float, String>>
> keyedStream = stream1.assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessGenerator2());
>
>                 keyedStream.keyBy(4)
> .window(SlidingEventTimeWindows.of(Time.minutes(6), Time.minutes(2)))
> .apply(new CustomSlidingWindowFunction());
>
>                 env.execute("Sliding Event Time Window Processing");
>
>            }
> }
>
>
> public static class CustomSlidingWindowFunction implements
> WindowFunction<Tuple5<String, String, Float, Float, String>, Tuple5<String,
> String, Float, Float, String>, Tuple, TimeWindow>{
>
> @Override
> public void apply(Tuple key, TimeWindow window, Iterable<Tuple5<String,
> String, Float, Float, String>> input,
> Collector<Tuple5<String, String, Float, Float, String>> out) throws
> Exception {
>
> ....
>         }
>
>
> // Implemented custom Periodic Watermark as below from public static class
> BoundedOutOfOrdernessGenerator2 implements 
> AssignerWithPeriodicWatermarks<Tuple5<String,
> String, Float, Float, String>> { /** * */ private static final long
> serialVersionUID = 1L; private final long maxOutOfOrderness =
> MAX_EVENT_DELAY; // constant set in seconds private long
> currentMaxTimestamp; @Override public long extractTimestamp(Tuple5<String,
> String, Float, Float, String> element, long previousElementTimestamp) {
> //System.out.println("inside extractTimestamp"); Date parseDate = null;
> SimpleDateFormat dateFormat = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss");
> try { parseDate = dateFormat.parse(element.f0); } catch (ParseException e)
> { e.printStackTrace(); } long timestamp = parseDate.getTime();
> currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return
> timestamp; } @Override public Watermark getCurrentWatermark() { // return
> the watermark as twice the current highest timestamp minus the
> out-of-orderness bound // this is because it is not covering the lateness
> sufficiently; now it does // in future this may be multiple of 3 or more if
> necessary to cover the gap in records received return new
> Watermark(currentMaxTimestamp * 2 - maxOutOfOrderness); } }
>
>
>
>
>
> *Sujit Sakre*
>
>
>
> On 24 January 2017 at 22:34, Aljoscha Krettek <aljos...@apache.org> wrote:
>
>> Hi,
>> a bit more information would be useful. Are you using event-time? Is the
>> Flink program kept running after adding the first batch of events and when
>> adding the second batch or is it to invocations of your Flink program? Do
>> you have a custom timestamp/watermark assigner?
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 24 Jan 2017 at 14:28 Sujit Sakre <sujit.sa...@northgateps.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We are using a sliding window function to process data read from Kafka
>>> Stream. We are using FlinkKafkaConsumer09 to read the data. The window
>>> function and sink are running correctly.
>>>
>>> To test the program, we are generating a stream of data from command
>>> line.
>>> This works when we add set of records once. When we add again, it does
>>> not work, Flink produces no result, even though the records are added to
>>> same Kafka topic from the same command line instance.
>>>
>>> Please could you suggest what could be wrong.
>>>
>>> Many thanks.
>>>
>>>
>>> *Sujit Sakre*
>>>
>>> This email is sent on behalf of Northgate Public Services (UK) Limited
>>> and its associated companies including Rave Technologies (India) Pvt
>>> Limited (together "Northgate Public Services") and is strictly confidential
>>> and intended solely for the addressee(s).
>>> If you are not the intended recipient of this email you must: (i) not
>>> disclose, copy or distribute its contents to any other person nor use its
>>> contents in any way or you may be acting unlawfully;  (ii) contact
>>> Northgate Public Services immediately on +44(0)1908 264500
>>> <+44%201908%20264500> quoting the name of the sender and the addressee
>>> then delete it from your system.
>>> Northgate Public Services has taken reasonable precautions to ensure
>>> that no viruses are contained in this email, but does not accept any
>>> responsibility once this email has been transmitted.  You should scan
>>> attachments (if any) for viruses.
>>>
>>> Northgate Public Services (UK) Limited, registered in England and Wales
>>> under number 00968498 with a registered address of Peoplebuilding 2,
>>> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
>>> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
>>> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
>>> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>>>
>>
>
> This email is sent on behalf of Northgate Public Services (UK) Limited and
> its associated companies including Rave Technologies (India) Pvt Limited
> (together "Northgate Public Services") and is strictly confidential and
> intended solely for the addressee(s).
> If you are not the intended recipient of this email you must: (i) not
> disclose, copy or distribute its contents to any other person nor use its
> contents in any way or you may be acting unlawfully;  (ii) contact
> Northgate Public Services immediately on +44(0)1908 264500
> <+44%201908%20264500> quoting the name of the sender and the addressee
> then delete it from your system.
> Northgate Public Services has taken reasonable precautions to ensure that
> no viruses are contained in this email, but does not accept any
> responsibility once this email has been transmitted.  You should scan
> attachments (if any) for viruses.
>
> Northgate Public Services (UK) Limited, registered in England and Wales
> under number 00968498 with a registered address of Peoplebuilding 2,
> Peoplebuilding Estate, Maylands Avenue, Hemel Hempstead, Hertfordshire, HP2
> 4NN.  Rave Technologies (India) Pvt Limited, registered in India under
> number 117068 with a registered address of 2nd Floor, Ballard House, Adi
> Marzban Marg, Ballard Estate, Mumbai, Maharashtra, India, 400001.
>

Reply via email to