[jira] [Commented] (FLINK-33756) Missing record with CUMULATE/HOP windows using an optimization

2023-12-14 Thread Jim Hughes (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796949#comment-17796949
 ] 

Jim Hughes commented on FLINK-33756:


Hi [~jeyhunkarimov], I am not actively working on it; I'll assign it to you for 
now.

I think there are two (likely related) things going on:  First, some watermark 
is getting miscomputed, and second, the hashing happening in the exchange step 
is allowing things to happen in either order.

>From the initial time that I looked into this, I also ran across 
>`TimeWindow.getWindowStartWithOffset`.  I noticed that this method is being 
>called with an offset of 0L in `TimeWindowUtil.getNextTriggerWatermark`.  I 
>cannot be 100% sure that's the problem, but that's the next place I'd be 
>checking if I were to continue looking!

 

> Missing record with CUMULATE/HOP windows using an optimization
> --
>
> Key: FLINK-33756
> URL: https://issues.apache.org/jira/browse/FLINK-33756
> Project: Flink
>  Issue Type: Bug
>Reporter: Jim Hughes
>Priority: Major
>
> I have seen an optimization cause a window fail to emit a record.
> With the optimization `TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED` set to 
> true, 
> the configuration AggregatePhaseStrategy.TWO_PHASE set, using a HOP or 
> CUMULATE window with an offset, a record can be sent which causes one of the 
> multiple active windows to fail to emit a record.
> The linked code 
> (https://github.com/jnh5y/flink/commit/ec90aa501d86f95559f8b22b0610e9fb786f05d4)
>  modifies the `WindowAggregateJsonITCase` to demonstrate the case.  
>  
> The test `testDistinctSplitDisabled` shows the expected behavior.  The test 
> `testDistinctSplitEnabled` tests the above configurations and shows that one 
> record is missing from the output.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33756) Missing record with CUMULATE/HOP windows using an optimization

2023-12-14 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796939#comment-17796939
 ] 

Jeyhun Karimov commented on FLINK-33756:


[~jhughes] Yes it is just printing/logging on various places of the codebase.
If you are not working on this issue (or if it is not sth urgent), you can 
assign it to me, will try to come up with deterministic solution to avoid the 
flakiness. 

> Missing record with CUMULATE/HOP windows using an optimization
> --
>
> Key: FLINK-33756
> URL: https://issues.apache.org/jira/browse/FLINK-33756
> Project: Flink
>  Issue Type: Bug
>Reporter: Jim Hughes
>Priority: Major
>
> I have seen an optimization cause a window fail to emit a record.
> With the optimization `TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED` set to 
> true, 
> the configuration AggregatePhaseStrategy.TWO_PHASE set, using a HOP or 
> CUMULATE window with an offset, a record can be sent which causes one of the 
> multiple active windows to fail to emit a record.
> The linked code 
> (https://github.com/jnh5y/flink/commit/ec90aa501d86f95559f8b22b0610e9fb786f05d4)
>  modifies the `WindowAggregateJsonITCase` to demonstrate the case.  
>  
> The test `testDistinctSplitDisabled` shows the expected behavior.  The test 
> `testDistinctSplitEnabled` tests the above configurations and shows that one 
> record is missing from the output.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33756) Missing record with CUMULATE/HOP windows using an optimization

2023-12-13 Thread Jim Hughes (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796533#comment-17796533
 ] 

Jim Hughes commented on FLINK-33756:


Hi [~jeyhunkarimov], nice analysis!  I did see that there were two pairs of 
Local-Global window aggregates when I very briefly looked initially; I totally 
agree that has to be part of the issue.  

Out of curiosity, how did you see the value coming out of the various windows?  
Was it println debugging or something else?

I like your explanation about the order of `processWatermark` and 
`processElement`; that explains the apparent flakiness.  

Looks like the different orderings is coming from the exchanging / hashing 
which is happening between the windows.  Perhaps thinking about how timestamps 
and the exchange operator will help us sort this out.  (Along with your note 
that we are "losing" the original timestamp in some sense.)

> Missing record with CUMULATE/HOP windows using an optimization
> --
>
> Key: FLINK-33756
> URL: https://issues.apache.org/jira/browse/FLINK-33756
> Project: Flink
>  Issue Type: Bug
>Reporter: Jim Hughes
>Priority: Major
>
> I have seen an optimization cause a window fail to emit a record.
> With the optimization `TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED` set to 
> true, 
> the configuration AggregatePhaseStrategy.TWO_PHASE set, using a HOP or 
> CUMULATE window with an offset, a record can be sent which causes one of the 
> multiple active windows to fail to emit a record.
> The linked code 
> (https://github.com/jnh5y/flink/commit/ec90aa501d86f95559f8b22b0610e9fb786f05d4)
>  modifies the `WindowAggregateJsonITCase` to demonstrate the case.  
>  
> The test `testDistinctSplitDisabled` shows the expected behavior.  The test 
> `testDistinctSplitEnabled` tests the above configurations and shows that one 
> record is missing from the output.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33756) Missing record with CUMULATE/HOP windows using an optimization

2023-12-13 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796372#comment-17796372
 ] 

Jeyhun Karimov commented on FLINK-33756:


Hi [~jhughes] I had a chance to look at the issue. I share my findings below.

So, when we enable 
{{OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED}}, the 
following optimized execution plan is produced:


{code}
Sink(table=[default_catalog.default_database.MySink], fields=[name, $f1, $f2, 
window_start, window_end])
+- GlobalWindowAggregate(groupBy=[name], 
window=[CUMULATE(win_end=[$window_end], max_size=[15 s], step=[5 s], offset=[1 
s])], select=[name, MAX(max$0) AS $f1, $SUM0(sum$1) AS $f2, start('w$) AS 
window_start, end('w$) AS window_end])
   +- Exchange(distribution=[hash[name]])
  +- LocalWindowAggregate(groupBy=[name], 
window=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[15 
s], step=[5 s], offset=[1 s])], select=[name, MAX($f5_0) AS max$0, $SUM0($f6_0) 
AS sum$1, slice_end('w$) AS $window_end])
 +- Calc(select=[name, window_start, window_end, $f5, $f6, $f3 AS 
$f5_0, $f4 AS $f6_0])
+- GlobalWindowAggregate(groupBy=[name, $f5, $f6], 
window=[CUMULATE(slice_end=[$slice_end], max_size=[15 s], step=[5 s], offset=[1 
s])], select=[name, $f5, $f6, MAX(max$0) AS $f3, COUNT(distinct$0 count$1) AS 
$f4, start('w$) AS window_start, end('w$) AS window_end])
   +- Exchange(distribution=[hash[name, $f5, $f6]])
  +- LocalWindowAggregate(groupBy=[name, $f5, $f6], 
window=[CUMULATE(time_col=[rowtime], max_size=[15 s], step=[5 s], offset=[1 
s])], select=[name, $f5, $f6, MAX(double) FILTER $g_1 AS max$0, 
COUNT(distinct$0 int) FILTER $g_2 AS count$1, DISTINCT(int) AS distinct$0, 
slice_end('w$) AS $slice_end])
 +- Calc(select=[name, double, int, $f5, $f6, ($e = 1) AS 
$g_1, ($e = 2) AS $g_2, rowtime])
+- Expand(projects=[{name, double, int, $f5, null AS 
$f6, 1 AS $e, rowtime}, {name, double, int, null AS $f5, $f6, 2 AS $e, 
rowtime}])
   +- Calc(select=[name, double, int, 
MOD(HASH_CODE(double), 1024) AS $f5, MOD(HASH_CODE(int), 1024) AS $f6, 
Reinterpret(TO_TIMESTAMP(ts)) AS rowtime])
  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[int, double, name, ts], metadata=[], 
watermark=[-(TO_TIMESTAMP(ts), 1000:INTERVAL SECOND)], 
watermarkEmitStrategy=[on-periodic]]], fields=[int, double, name, ts])

{code}


As we see, there are two window operators (both with {{Local-Global 
optimization}} ). (Just to remember that the missing record is  - "+I[b, 3.0, 
1, 2020-10-10T00:00:31, 2020-10-10T00:00:41]")

As we see from the schema of the second {{LocalWindowAggregate}}, it uses 
{{window_start}} and {{window_end}} to calculate {{CUMULATE}} windows. At this 
point (at the second {{LocalWindowAggregate}}), our "missing" record becomes 
like "+I(b,2020-10-10T00:00:31,2020-10-10T00:00:41,0,null,3.0,0)". So, at this 
point, we already lost the original event time of the record. 


As a result, the flaky behaviour happens because of the calling order between 
{{SlicingWindowOperator::processWatermark}}->{{AbstractWindowAggProcessor::advanceProgress}}
 and {{SlicingWindowOperator::processElement}}:

- If the {{processWatermark}} is called before the {{processElement}}, then the 
{{currentProgress}} is updated to {{1602288041000}}. In this case, once the 
{{processElement}} is called afterwards, it considers the window is already 
fired and drops the element

- If the {{processElement}} is called before the {{processWatermark}}, then the 
record processed as expected. 

Is this something expected? WDYT?


> Missing record with CUMULATE/HOP windows using an optimization
> --
>
> Key: FLINK-33756
> URL: https://issues.apache.org/jira/browse/FLINK-33756
> Project: Flink
>  Issue Type: Bug
>Reporter: Jim Hughes
>Priority: Major
>
> I have seen an optimization cause a window fail to emit a record.
> With the optimization `TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED` set to 
> true, 
> the configuration AggregatePhaseStrategy.TWO_PHASE set, using a HOP or 
> CUMULATE window with an offset, a record can be sent which causes one of the 
> multiple active windows to fail to emit a record.
> The linked code 
> (https://github.com/jnh5y/flink/commit/ec90aa501d86f95559f8b22b0610e9fb786f05d4)
>  modifies the `WindowAggregateJsonITCase` to demonstrate the case.  
>  
> The test `testDistinctSplitDisabled` shows the expected behavior.  The test 
> `testDistinctSplitEnabled` tests the above configurations and shows that one 
> record is missing from the output.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)