Re: [DataStream API] Watermarks not closing TumblingEventTimeWindow with Reduce function

2022-04-29 Thread r pp
Glad to hear it! thank you for feedback !

Ryan van Huuksloot  于2022年4月5日周二 06:26写道:

> Sorry for the hassle. I ended up working with a colleague and found that
> the Kafka Source had a single partition but the pipeline had a
> parallelism of 4 and there was no withIdleness associated so the Watermark
> was set on the output but didn't persist to the operator.
>
> Appreciate you both taking time - Thanks!
>
> Closing this ticket
>
> Ryan van Huuksloot
> Data Developer | Data Platform Engineering | Streaming Capabilities
> [image: Shopify]
> <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>
>
>
> On Mon, Apr 4, 2022 at 8:18 AM Arvid Heise  wrote:
>
>> This sounds like a bug indeed. Could you please create a ticket with a
>> minimal test case?
>>
>> The workaround is probably to use #aggregate but it should be fixed
>> nonetheless.
>>
>> On Fri, Apr 1, 2022 at 9:58 AM r pp  wrote:
>>
>>> hi~ Can you send your full code ?
>>>
>>> Ryan van Huuksloot  于2022年3月31日周四
>>> 22:58写道:
>>>
>>>> Hello!
>>>>
>>>> *Problem:*
>>>> I am connecting to a Kafka Source with the Watermark Strategy below.
>>>>
>>>> val watermarkStrategy = WatermarkStrategy
>>>>   .forBoundedOutOfOrderness(Duration.of(2, ChronoUnit.HOURS))
>>>>   .withTimestampAssigner(new 
>>>> SerializableTimestampAssigner[StarscreamEventCounter_V1] {
>>>> override def extractTimestamp(element: StarscreamEventCounter_V1, 
>>>> recordTimestamp: Long): Long =
>>>>   element.envelopeTimestamp
>>>>   })
>>>>
>>>> The Watermarks are correctly getting assigned.
>>>> However, when a reduce function is used the window never terminates
>>>> because the `ctx.getCurrentWatermark()` returns the default value of
>>>> `-9223372036854775808` in perpetuity.
>>>>
>>>> This is the stream code:
>>>>
>>>> stream
>>>>   .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).reduce(_ + _)
>>>>
>>>> The reduce uses this overloaded operator:
>>>>
>>>> @JsonIgnoreProperties(ignoreUnknown = true)
>>>> case class StarscreamEventCounter_V1(
>>>>   envelopeTimestamp: Long,
>>>>   numberOfEvents: Int = 1
>>>> ) {
>>>>   def +(that: StarscreamEventCounter_V1): StarscreamEventCounter_V1 = {
>>>> 
>>>> StarscreamEventCounter_V1(this.envelopeTimestamp.min(that.envelopeTimestamp),
>>>>  that.numberOfEvents + this.numberOfEvents)
>>>>   }
>>>>
>>>>
>>>> *Attempt to Solve:*
>>>> 1. Validate that the Watermark is set on the source
>>>> a. Set a custom trigger to emit a watermark on each event just in
>>>> case
>>>> 2. Test with aggregate / process functions
>>>> a. Both other functions work properly - window closes and emits to
>>>> a PrintSink
>>>> 3. Change Watermark Generator to a custom generator
>>>> a. Also change time horizons and let run for 1 day - window never
>>>> closes due to the watermark being stuck at the min default. The sink never
>>>> receives the data but the UI says there are records being output!
>>>>
>>>> *Hypothesis:*
>>>> The output id of a reduce function is causing an upstream issue where
>>>> the watermark is no longer assigned to the Window. I haven't been able to
>>>> lock down what exactly is causing the issue though. My thought is that it
>>>> might be a bug given it works for Aggregate/Process.
>>>> It could be a bug in the IndexedCombinedWatermarkStatus, the partial
>>>> watermarks should not be the min default value when I set the watermark per
>>>> event - this is what I will be looking into until I hear back. I validated
>>>> that the watermark is set correctly in CombinedWatermarkStatus.
>>>>
>>>> *Tools:*
>>>> - Flink 1.14.3
>>>> - Scala
>>>> - DataStream API
>>>>
>>>> Any assistance would be great! Happy to provide more context or
>>>> clarification if something isn't clear!
>>>>
>>>> Thanks!
>>>>
>>>> Ryan van Huuksloot
>>>> Data Developer | Data Platform Engineering | Streaming Capabilities
>>>> [image: Shopify]
>>>> <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>
>>>>
>>>
>>>
>>> --
>>> Best,
>>>   pp
>>>
>>

-- 
Best,
  pp


Re: [DataStream API] Watermarks not closing TumblingEventTimeWindow with Reduce function

2022-04-01 Thread r pp
hi~ Can you send your full code ?

Ryan van Huuksloot  于2022年3月31日周四 22:58写道:

> Hello!
>
> *Problem:*
> I am connecting to a Kafka Source with the Watermark Strategy below.
>
> val watermarkStrategy = WatermarkStrategy
>   .forBoundedOutOfOrderness(Duration.of(2, ChronoUnit.HOURS))
>   .withTimestampAssigner(new 
> SerializableTimestampAssigner[StarscreamEventCounter_V1] {
> override def extractTimestamp(element: StarscreamEventCounter_V1, 
> recordTimestamp: Long): Long =
>   element.envelopeTimestamp
>   })
>
> The Watermarks are correctly getting assigned.
> However, when a reduce function is used the window never terminates
> because the `ctx.getCurrentWatermark()` returns the default value of
> `-9223372036854775808` in perpetuity.
>
> This is the stream code:
>
> stream
>   .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).reduce(_ + _)
>
> The reduce uses this overloaded operator:
>
> @JsonIgnoreProperties(ignoreUnknown = true)
> case class StarscreamEventCounter_V1(
>   envelopeTimestamp: Long,
>   numberOfEvents: Int = 1
> ) {
>   def +(that: StarscreamEventCounter_V1): StarscreamEventCounter_V1 = {
> 
> StarscreamEventCounter_V1(this.envelopeTimestamp.min(that.envelopeTimestamp), 
> that.numberOfEvents + this.numberOfEvents)
>   }
>
>
> *Attempt to Solve:*
> 1. Validate that the Watermark is set on the source
> a. Set a custom trigger to emit a watermark on each event just in case
> 2. Test with aggregate / process functions
> a. Both other functions work properly - window closes and emits to a
> PrintSink
> 3. Change Watermark Generator to a custom generator
> a. Also change time horizons and let run for 1 day - window never
> closes due to the watermark being stuck at the min default. The sink never
> receives the data but the UI says there are records being output!
>
> *Hypothesis:*
> The output id of a reduce function is causing an upstream issue where the
> watermark is no longer assigned to the Window. I haven't been able to lock
> down what exactly is causing the issue though. My thought is that it might
> be a bug given it works for Aggregate/Process.
> It could be a bug in the IndexedCombinedWatermarkStatus, the partial
> watermarks should not be the min default value when I set the watermark per
> event - this is what I will be looking into until I hear back. I validated
> that the watermark is set correctly in CombinedWatermarkStatus.
>
> *Tools:*
> - Flink 1.14.3
> - Scala
> - DataStream API
>
> Any assistance would be great! Happy to provide more context or
> clarification if something isn't clear!
>
> Thanks!
>
> Ryan van Huuksloot
> Data Developer | Data Platform Engineering | Streaming Capabilities
> [image: Shopify]
> 
>


-- 
Best,
  pp