Hi,

Glad to hear.

Normally, you would not encounter this if there are massive data.
`WatermarkStrategy.withIdleness` could be more appropriate in production.


Best,
Kezhu Wang


On February 24, 2021 at 22:35:11, sagar (sagarban...@gmail.com) wrote:

Thanks Kezhu, It worked!!!

On Wed, Feb 24, 2021 at 2:47 PM Kezhu Wang <kez...@gmail.com> wrote:

> Try `env.setParallelism(1)`. Default parallelism for local environment is
> `Runtime.getRuntime.availableProcessors`.
>
> You test data set are so small that when they are scatter cross multiple
> parallel instances, there will be no data with event time assigned to
> trigger downstream computation.
>
> Or you could try `WatermarkStrategy.withIdleness`.
>
>
> Best,
> Kezhu Wang
>
> On February 24, 2021 at 15:43:47, sagar (sagarban...@gmail.com) wrote:
>
> It is fairly simple requirement, if I changed it to PRocessing time it
> works fine , but not working with event time..help appreciated!
>
> On Wed, Feb 24, 2021 at 10:51 AM sagar <sagarban...@gmail.com> wrote:
>
>> HI
>>
>> Corrected with below code, but still getting same issue
>>
>> Instant instant = 
>> p.getAsOfDateTime().atZone(ZoneId.systemDefault()).toInstant();
>> long timeInMillis = instant.toEpochMilli();
>> System.out.println(timeInMillis);
>> return timeInMillis;
>>
>>
>> On Wed, Feb 24, 2021 at 10:34 AM Kezhu Wang <kez...@gmail.com> wrote:
>>
>>> I saw one potential issue. Your timestamp assigner returns timestamp in
>>> second resolution while Flink requires millisecond resolution.
>>>
>>>
>>> Best,
>>> Kezhu Wang
>>>
>>> On February 24, 2021 at 11:49:59, sagar (sagarban...@gmail.com) wrote:
>>>
>>> I have simple flink stream program, where I am using socket as my
>>> continuous source
>>> I have window size of 2 seconds.
>>>
>>> Somehow my window process function is not triggering and even if I pass
>>> events in any order, flink is not ignoring
>>>
>>> I can see the output only when I kill my socket , please find the code
>>> snippet below
>>>
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>
>>>
>>>         DataStream<Price> price = env.socketTextStream("localhost",
>>> 9998).uid("price source").map(new MapFunction<String, Price>() {
>>>             @Override
>>>             public Price map(String s) throws Exception {
>>>                 return new Price(s.split(",")[0],
>>> LocalDate.parse(s.split(",")[1]), new BigDecimal(s.split(",")[2]),new
>>> BigDecimal(s.split(",")[3]), s.split(",")[4], new
>>> BigDecimal(s.split(",")[5]), LocalDateTime.parse(s.split(",")[6]) );
>>>             }
>>>         }
>>>         );
>>>
>>>         DataStream<Price> priceStream = price
>>>
>>>  
>>> .assignTimestampsAndWatermarks(WatermarkStrategy.<Price>forMonotonousTimestamps()
>>>         .withTimestampAssigner((p,timestamp) ->
>>>         {
>>>             ZoneId zoneId = ZoneId.systemDefault();
>>>             long epoch =
>>> p.getAsOfDateTime().atZone(zoneId).toEpochSecond();
>>>             System.out.println(epoch);
>>>              return epoch;
>>>         }))
>>>         .keyBy(new KeySelector<Price, String>() {
>>>                     @Override
>>>                     public String getKey(Price price) throws Exception {
>>>                         return price.getPerformanceId();
>>>                     }
>>>                 }).window(TumblingEventTimeWindows.of(Time.seconds(2)))
>>>                 .process(new ProcessWindowFunction<Price, Price, String,
>>> TimeWindow>() {
>>>
>>>                     @Override
>>>                     public void process(String s, Context context,
>>> Iterable<Price> iterable, Collector<Price> collector) throws Exception {
>>>                         System.out.println(context.window().getStart()+
>>> "Current watermark: "+context.window().getEnd());
>>>                         Price p1 = null ;
>>>                         for(Price p : iterable)
>>>                         {
>>>                             System.out.println(p.toString());
>>>                             p1= p;
>>>                         }
>>>                         collector.collect(p1);
>>>                     }
>>>                 });
>>>
>>>
>>>         priceStream.writeAsText("c:\\ab.txt");
>>>
>>> also data I am inputting are
>>>
>>> p1,2019-12-31,1,34,USD,4,2019-12-31T00:00:00
>>> p1,2019-12-31,2,34,USD,4,2019-12-31T00:00:01
>>> p1,2019-12-31,3,34,USD,4,2019-12-31T00:00:02
>>> p1,2019-12-31,4,34,USD,4,2019-12-31T00:00:03
>>> p1,2019-12-31,5,34,USD,4,2019-12-31T00:00:04
>>> p1,2019-12-31,10,34,USD,4,2019-12-31T00:00:01
>>> p1,2021-12-31,15,34,USD,4,2021-12-31T00:00:01
>>> p1,2018-12-31,10,34,USD,4,2018-12-31T00:00:01
>>>
>>> --
>>> ---Regards---
>>>
>>>   Sagar Bandal
>>>
>>> This is confidential mail ,All Rights are Reserved.If you are not
>>> intended receipiant please ignore this email.
>>>
>>>
>>
>> --
>> ---Regards---
>>
>>   Sagar Bandal
>>
>> This is confidential mail ,All Rights are Reserved.If you are not
>> intended receipiant please ignore this email.
>>
>
>
> --
> ---Regards---
>
>   Sagar Bandal
>
> This is confidential mail ,All Rights are Reserved.If you are not intended
> receipiant please ignore this email.
>
>

-- 
---Regards---

  Sagar Bandal

This is confidential mail ,All Rights are Reserved.If you are not intended
receipiant please ignore this email.

Reply via email to