Re: Window Process function is not getting trigger

2021-02-28 Thread Kezhu Wang
Hi,

Glad to hear.

Normally, you would not encounter this if there are massive data.
`WatermarkStrategy.withIdleness` could be more appropriate in production.


Best,
Kezhu Wang


On February 24, 2021 at 22:35:11, sagar (sagarban...@gmail.com) wrote:

Thanks Kezhu, It worked!!!

On Wed, Feb 24, 2021 at 2:47 PM Kezhu Wang  wrote:

> Try `env.setParallelism(1)`. Default parallelism for local environment is
> `Runtime.getRuntime.availableProcessors`.
>
> You test data set are so small that when they are scatter cross multiple
> parallel instances, there will be no data with event time assigned to
> trigger downstream computation.
>
> Or you could try `WatermarkStrategy.withIdleness`.
>
>
> Best,
> Kezhu Wang
>
> On February 24, 2021 at 15:43:47, sagar (sagarban...@gmail.com) wrote:
>
> It is fairly simple requirement, if I changed it to PRocessing time it
> works fine , but not working with event time..help appreciated!
>
> On Wed, Feb 24, 2021 at 10:51 AM sagar  wrote:
>
>> HI
>>
>> Corrected with below code, but still getting same issue
>>
>> Instant instant = 
>> p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant();
>> long timeInMillis = instant.toEpochMilli();
>> System.out.println(timeInMillis);
>> return timeInMillis;
>>
>>
>> On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang  wrote:
>>
>>> I saw one potential issue. Your timestamp assigner returns timestamp in
>>> second resolution while Flink requires millisecond resolution.
>>>
>>>
>>> Best,
>>> Kezhu Wang
>>>
>>> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:
>>>
>>> I have simple flink stream program, where I am using socket as my
>>> continuous source
>>> I have window size of 2 seconds.
>>>
>>> Somehow my window process function is not triggering and even if I pass
>>> events in any order, flink is not ignoring
>>>
>>> I can see the output only when I kill my socket , please find the code
>>> snippet below
>>>
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>
>>>
>>> DataStream price = env.socketTextStream("localhost",
>>> 9998).uid("price source").map(new MapFunction() {
>>> @Override
>>> public Price map(String s) throws Exception {
>>> return new Price(s.split(",")[0],
>>> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
>>> BigDecimal(s.split(",")[3]), s.split(",")[4], new
>>> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
>>> }
>>> }
>>> );
>>>
>>> DataStream priceStream = price
>>>
>>>  
>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
>>> .withTimestampAssigner((p,timestamp) ->
>>> {
>>> ZoneId zoneId = ZoneId.systemDefault();
>>> long epoch =
>>> p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
>>> System.out.println(epoch);
>>>  return epoch;
>>> }))
>>> .keyBy(new KeySelector() {
>>> @Override
>>> public String getKey(Price price) throws Exception {
>>> return price.getPerformanceId();
>>> }
>>> }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
>>> .process(new ProcessWindowFunction>> TimeWindow>() {
>>>
>>> @Override
>>> public void process(String s, Context context,
>>> Iterable iterable, Collector collector) throws Exception {
>>> System.out.println(context.window().getStart()+
>>> "Current watermark: "+context.window().getEnd());
>>> Price p1 = null ;
>>> for(Price p : iterable)
>>> {
>>> System.out.println(p.toString());
>>> p1= p;
>>> }
>>> collector.collect(p1);
>>> }
>>> });
>>>
>>>
>>> priceStream.writeAsText("c:\\ab.txt");
>>>
>>> also data I am inputting are
>>>
>>> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
>>> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
>>> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
>>> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
>>> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
>>> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
>>> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
>>> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01
>>>
>>> --
>>> ---Regards---
>>>
>>>   Sagar Bandal
>>>
>>> This is confidential mail ,All Rights are Reserved.If you are not
>>> intended receipiant please ignore this email.
>>>
>>>
>>
>> --
>> ---Regards---
>>
>>   Sagar Bandal
>>
>> This is confidential mail ,All Rights are Reserved.If you are not
>> intended receipiant 

Re: Window Process function is not getting trigger

2021-02-24 Thread sagar
Thanks Kezhu, It worked!!!

On Wed, Feb 24, 2021 at 2:47 PM Kezhu Wang  wrote:

> Try `env.setParallelism(1)`. Default parallelism for local environment is
> `Runtime.getRuntime.availableProcessors`.
>
> You test data set are so small that when they are scatter cross multiple
> parallel instances, there will be no data with event time assigned to
> trigger downstream computation.
>
> Or you could try `WatermarkStrategy.withIdleness`.
>
>
> Best,
> Kezhu Wang
>
> On February 24, 2021 at 15:43:47, sagar (sagarban...@gmail.com) wrote:
>
> It is fairly simple requirement, if I changed it to PRocessing time it
> works fine , but not working with event time..help appreciated!
>
> On Wed, Feb 24, 2021 at 10:51 AM sagar  wrote:
>
>> HI
>>
>> Corrected with below code, but still getting same issue
>>
>> Instant instant = 
>> p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant();
>> long timeInMillis = instant.toEpochMilli();
>> System.out.println(timeInMillis);
>> return timeInMillis;
>>
>>
>> On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang  wrote:
>>
>>> I saw one potential issue. Your timestamp assigner returns timestamp in
>>> second resolution while Flink requires millisecond resolution.
>>>
>>>
>>> Best,
>>> Kezhu Wang
>>>
>>> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:
>>>
>>> I have simple flink stream program, where I am using socket as my
>>> continuous source
>>> I have window size of 2 seconds.
>>>
>>> Somehow my window process function is not triggering and even if I pass
>>> events in any order, flink is not ignoring
>>>
>>> I can see the output only when I kill my socket , please find the code
>>> snippet below
>>>
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>
>>>
>>> DataStream price = env.socketTextStream("localhost",
>>> 9998).uid("price source").map(new MapFunction() {
>>> @Override
>>> public Price map(String s) throws Exception {
>>> return new Price(s.split(",")[0],
>>> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
>>> BigDecimal(s.split(",")[3]), s.split(",")[4], new
>>> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
>>> }
>>> }
>>> );
>>>
>>> DataStream priceStream = price
>>>
>>>  
>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
>>> .withTimestampAssigner((p,timestamp) ->
>>> {
>>> ZoneId zoneId = ZoneId.systemDefault();
>>> long epoch =
>>> p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
>>> System.out.println(epoch);
>>>  return epoch;
>>> }))
>>> .keyBy(new KeySelector() {
>>> @Override
>>> public String getKey(Price price) throws Exception {
>>> return price.getPerformanceId();
>>> }
>>> }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
>>> .process(new ProcessWindowFunction>> TimeWindow>() {
>>>
>>> @Override
>>> public void process(String s, Context context,
>>> Iterable iterable, Collector collector) throws Exception {
>>> System.out.println(context.window().getStart()+
>>> "Current watermark: "+context.window().getEnd());
>>> Price p1 = null ;
>>> for(Price p : iterable)
>>> {
>>> System.out.println(p.toString());
>>> p1= p;
>>> }
>>> collector.collect(p1);
>>> }
>>> });
>>>
>>>
>>> priceStream.writeAsText("c:\\ab.txt");
>>>
>>> also data I am inputting are
>>>
>>> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
>>> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
>>> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
>>> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
>>> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
>>> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
>>> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
>>> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01
>>>
>>> --
>>> ---Regards---
>>>
>>>   Sagar Bandal
>>>
>>> This is confidential mail ,All Rights are Reserved.If you are not
>>> intended receipiant please ignore this email.
>>>
>>>
>>
>> --
>> ---Regards---
>>
>>   Sagar Bandal
>>
>> This is confidential mail ,All Rights are Reserved.If you are not
>> intended receipiant please ignore this email.
>>
>
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended
> receipiant please ignore this email.
>
>

-- 
---Regards---

  Sagar Bandal

This is confidential 

Re: Window Process function is not getting trigger

2021-02-24 Thread Kezhu Wang
Try `env.setParallelism(1)`. Default parallelism for local environment is
`Runtime.getRuntime.availableProcessors`.

You test data set are so small that when they are scatter cross multiple
parallel instances, there will be no data with event time assigned to
trigger downstream computation.

Or you could try `WatermarkStrategy.withIdleness`.


Best,
Kezhu Wang

On February 24, 2021 at 15:43:47, sagar (sagarban...@gmail.com) wrote:

It is fairly simple requirement, if I changed it to PRocessing time it
works fine , but not working with event time..help appreciated!

On Wed, Feb 24, 2021 at 10:51 AM sagar  wrote:

> HI
>
> Corrected with below code, but still getting same issue
>
> Instant instant = 
> p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant();
> long timeInMillis = instant.toEpochMilli();
> System.out.println(timeInMillis);
> return timeInMillis;
>
>
> On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang  wrote:
>
>> I saw one potential issue. Your timestamp assigner returns timestamp in
>> second resolution while Flink requires millisecond resolution.
>>
>>
>> Best,
>> Kezhu Wang
>>
>> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:
>>
>> I have simple flink stream program, where I am using socket as my
>> continuous source
>> I have window size of 2 seconds.
>>
>> Somehow my window process function is not triggering and even if I pass
>> events in any order, flink is not ignoring
>>
>> I can see the output only when I kill my socket , please find the code
>> snippet below
>>
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>
>>
>> DataStream price = env.socketTextStream("localhost",
>> 9998).uid("price source").map(new MapFunction() {
>> @Override
>> public Price map(String s) throws Exception {
>> return new Price(s.split(",")[0],
>> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
>> BigDecimal(s.split(",")[3]), s.split(",")[4], new
>> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
>> }
>> }
>> );
>>
>> DataStream priceStream = price
>>
>>  
>> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
>> .withTimestampAssigner((p,timestamp) ->
>> {
>> ZoneId zoneId = ZoneId.systemDefault();
>> long epoch =
>> p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
>> System.out.println(epoch);
>>  return epoch;
>> }))
>> .keyBy(new KeySelector() {
>> @Override
>> public String getKey(Price price) throws Exception {
>> return price.getPerformanceId();
>> }
>> }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
>> .process(new ProcessWindowFunction> TimeWindow>() {
>>
>> @Override
>> public void process(String s, Context context,
>> Iterable iterable, Collector collector) throws Exception {
>> System.out.println(context.window().getStart()+
>> "Current watermark: "+context.window().getEnd());
>> Price p1 = null ;
>> for(Price p : iterable)
>> {
>> System.out.println(p.toString());
>> p1= p;
>> }
>> collector.collect(p1);
>> }
>> });
>>
>>
>> priceStream.writeAsText("c:\\ab.txt");
>>
>> also data I am inputting are
>>
>> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
>> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
>> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
>> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
>> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
>> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
>> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
>> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01
>>
>> --
>> ---Regards---
>>
>>   Sagar Bandal
>>
>> This is confidential mail ,All Rights are Reserved.If you are not
>> intended receipiant please ignore this email.
>>
>>
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended
> receipiant please ignore this email.
>


-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Re: Window Process function is not getting trigger

2021-02-23 Thread sagar
It is fairly simple requirement, if I changed it to PRocessing time it
works fine , but not working with event time..help appreciated!

On Wed, Feb 24, 2021 at 10:51 AM sagar  wrote:

> HI
>
> Corrected with below code, but still getting same issue
>
> Instant instant = 
> p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant();
> long timeInMillis = instant.toEpochMilli();
> System.out.println(timeInMillis);
> return timeInMillis;
>
>
> On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang  wrote:
>
>> I saw one potential issue. Your timestamp assigner returns timestamp in
>> second resolution while Flink requires millisecond resolution.
>>
>>
>> Best,
>> Kezhu Wang
>>
>> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:
>>
>> I have simple flink stream program, where I am using socket as my
>> continuous source
>> I have window size of 2 seconds.
>>
>> Somehow my window process function is not triggering and even if I pass
>> events in any order, flink is not ignoring
>>
>> I can see the output only when I kill my socket , please find the code
>> snippet below
>>
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>
>>
>> DataStream price = env.socketTextStream("localhost",
>> 9998).uid("price source").map(new MapFunction() {
>> @Override
>> public Price map(String s) throws Exception {
>> return new Price(s.split(",")[0],
>> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
>> BigDecimal(s.split(",")[3]), s.split(",")[4], new
>> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
>> }
>> }
>> );
>>
>> DataStream priceStream = price
>>
>>  
>> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
>> .withTimestampAssigner((p,timestamp) ->
>> {
>> ZoneId zoneId = ZoneId.systemDefault();
>> long epoch =
>> p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
>> System.out.println(epoch);
>>  return epoch;
>> }))
>> .keyBy(new KeySelector() {
>> @Override
>> public String getKey(Price price) throws Exception {
>> return price.getPerformanceId();
>> }
>> }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
>> .process(new ProcessWindowFunction> TimeWindow>() {
>>
>> @Override
>> public void process(String s, Context context,
>> Iterable iterable, Collector collector) throws Exception {
>> System.out.println(context.window().getStart()+
>> "Current watermark: "+context.window().getEnd());
>> Price p1 = null ;
>> for(Price p : iterable)
>> {
>> System.out.println(p.toString());
>> p1= p;
>> }
>> collector.collect(p1);
>> }
>> });
>>
>>
>> priceStream.writeAsText("c:\\ab.txt");
>>
>> also data I am inputting are
>>
>> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
>> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
>> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
>> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
>> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
>> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
>> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
>> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01
>>
>> --
>> ---Regards---
>>
>>   Sagar Bandal
>>
>> This is confidential mail ,All Rights are Reserved.If you are not
>> intended receipiant please ignore this email.
>>
>>
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended
> receipiant please ignore this email.
>


-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Re: Window Process function is not getting trigger

2021-02-23 Thread sagar
HI

Corrected with below code, but still getting same issue

Instant instant =
p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant();
long timeInMillis = instant.toEpochMilli();
System.out.println(timeInMillis);
return timeInMillis;


On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang  wrote:

> I saw one potential issue. Your timestamp assigner returns timestamp in
> second resolution while Flink requires millisecond resolution.
>
>
> Best,
> Kezhu Wang
>
> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:
>
> I have simple flink stream program, where I am using socket as my
> continuous source
> I have window size of 2 seconds.
>
> Somehow my window process function is not triggering and even if I pass
> events in any order, flink is not ignoring
>
> I can see the output only when I kill my socket , please find the code
> snippet below
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>
>
> DataStream price = env.socketTextStream("localhost",
> 9998).uid("price source").map(new MapFunction() {
> @Override
> public Price map(String s) throws Exception {
> return new Price(s.split(",")[0],
> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
> BigDecimal(s.split(",")[3]), s.split(",")[4], new
> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
> }
> }
> );
>
> DataStream priceStream = price
>
>  
> .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
> .withTimestampAssigner((p,timestamp) ->
> {
> ZoneId zoneId = ZoneId.systemDefault();
> long epoch =
> p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
> System.out.println(epoch);
>  return epoch;
> }))
> .keyBy(new KeySelector() {
> @Override
> public String getKey(Price price) throws Exception {
> return price.getPerformanceId();
> }
> }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
> .process(new ProcessWindowFunction TimeWindow>() {
>
> @Override
> public void process(String s, Context context,
> Iterable iterable, Collector collector) throws Exception {
> System.out.println(context.window().getStart()+
> "Current watermark: "+context.window().getEnd());
> Price p1 = null ;
> for(Price p : iterable)
> {
> System.out.println(p.toString());
> p1= p;
> }
> collector.collect(p1);
> }
> });
>
>
> priceStream.writeAsText("c:\\ab.txt");
>
> also data I am inputting are
>
> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended
> receipiant please ignore this email.
>
>

-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Re: Window Process function is not getting trigger

2021-02-23 Thread Kezhu Wang
I saw one potential issue. Your timestamp assigner returns timestamp in
second resolution while Flink requires millisecond resolution.


Best,
Kezhu Wang

On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:

I have simple flink stream program, where I am using socket as my
continuous source
I have window size of 2 seconds.

Somehow my window process function is not triggering and even if I pass
events in any order, flink is not ignoring

I can see the output only when I kill my socket , please find the code
snippet below

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);


DataStream price = env.socketTextStream("localhost",
9998).uid("price source").map(new MapFunction() {
@Override
public Price map(String s) throws Exception {
return new Price(s.split(",")[0],
LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
BigDecimal(s.split(",")[3]), s.split(",")[4], new
BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
}
}
);

DataStream priceStream = price

 
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
.withTimestampAssigner((p,timestamp) ->
{
ZoneId zoneId = ZoneId.systemDefault();
long epoch = p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
System.out.println(epoch);
 return epoch;
}))
.keyBy(new KeySelector() {
@Override
public String getKey(Price price) throws Exception {
return price.getPerformanceId();
}
}).window(TumblingEventTimeWindows.of(Time.seconds(2)))
.process(new ProcessWindowFunction() {

@Override
public void process(String s, Context context,
Iterable iterable, Collector collector) throws Exception {
System.out.println(context.window().getStart()+
"Current watermark: "+context.window().getEnd());
Price p1 = null ;
for(Price p : iterable)
{
System.out.println(p.toString());
p1= p;
}
collector.collect(p1);
}
});


priceStream.writeAsText("c:\\ab.txt");

also data I am inputting are

p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01

-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.


Window Process function is not getting trigger

2021-02-23 Thread sagar
I have simple flink stream program, where I am using socket as my
continuous source
I have window size of 2 seconds.

Somehow my window process function is not triggering and even if I pass
events in any order, flink is not ignoring

I can see the output only when I kill my socket , please find the code
snippet below

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);


DataStream price = env.socketTextStream("localhost",
9998).uid("price source").map(new MapFunction() {
@Override
public Price map(String s) throws Exception {
return new Price(s.split(",")[0],
LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
BigDecimal(s.split(",")[3]), s.split(",")[4], new
BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
}
}
);

DataStream priceStream = price

 
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
.withTimestampAssigner((p,timestamp) ->
{
ZoneId zoneId = ZoneId.systemDefault();
long epoch = p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
System.out.println(epoch);
 return epoch;
}))
.keyBy(new KeySelector() {
@Override
public String getKey(Price price) throws Exception {
return price.getPerformanceId();
}
}).window(TumblingEventTimeWindows.of(Time.seconds(2)))
.process(new ProcessWindowFunction() {

@Override
public void process(String s, Context context,
Iterable iterable, Collector collector) throws Exception {
System.out.println(context.window().getStart()+
"Current watermark: "+context.window().getEnd());
Price p1 = null ;
for(Price p : iterable)
{
System.out.println(p.toString());
p1= p;
}
collector.collect(p1);
}
});


priceStream.writeAsText("c:\\ab.txt");

also data I am inputting are

p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01

-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.