[ 
https://issues.apache.org/jira/browse/FLINK-33756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17796372#comment-17796372
 ] 

Jeyhun Karimov commented on FLINK-33756:
----------------------------------------

Hi [~jhughes] I had a chance to look at the issue. I share my findings below.

So, when we enable 
{{OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED}}, the 
following optimized execution plan is produced:


{code}
Sink(table=[default_catalog.default_database.MySink], fields=[name, $f1, $f2, 
window_start, window_end])
+- GlobalWindowAggregate(groupBy=[name], 
window=[CUMULATE(win_end=[$window_end], max_size=[15 s], step=[5 s], offset=[1 
s])], select=[name, MAX(max$0) AS $f1, $SUM0(sum$1) AS $f2, start('w$) AS 
window_start, end('w$) AS window_end])
   +- Exchange(distribution=[hash[name]])
      +- LocalWindowAggregate(groupBy=[name], 
window=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[15 
s], step=[5 s], offset=[1 s])], select=[name, MAX($f5_0) AS max$0, $SUM0($f6_0) 
AS sum$1, slice_end('w$) AS $window_end])
         +- Calc(select=[name, window_start, window_end, $f5, $f6, $f3 AS 
$f5_0, $f4 AS $f6_0])
            +- GlobalWindowAggregate(groupBy=[name, $f5, $f6], 
window=[CUMULATE(slice_end=[$slice_end], max_size=[15 s], step=[5 s], offset=[1 
s])], select=[name, $f5, $f6, MAX(max$0) AS $f3, COUNT(distinct$0 count$1) AS 
$f4, start('w$) AS window_start, end('w$) AS window_end])
               +- Exchange(distribution=[hash[name, $f5, $f6]])
                  +- LocalWindowAggregate(groupBy=[name, $f5, $f6], 
window=[CUMULATE(time_col=[rowtime], max_size=[15 s], step=[5 s], offset=[1 
s])], select=[name, $f5, $f6, MAX(double) FILTER $g_1 AS max$0, 
COUNT(distinct$0 int) FILTER $g_2 AS count$1, DISTINCT(int) AS distinct$0, 
slice_end('w$) AS $slice_end])
                     +- Calc(select=[name, double, int, $f5, $f6, ($e = 1) AS 
$g_1, ($e = 2) AS $g_2, rowtime])
                        +- Expand(projects=[{name, double, int, $f5, null AS 
$f6, 1 AS $e, rowtime}, {name, double, int, null AS $f5, $f6, 2 AS $e, 
rowtime}])
                           +- Calc(select=[name, double, int, 
MOD(HASH_CODE(double), 1024) AS $f5, MOD(HASH_CODE(int), 1024) AS $f6, 
Reinterpret(TO_TIMESTAMP(ts)) AS rowtime])
                              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[int, double, name, ts], metadata=[], 
watermark=[-(TO_TIMESTAMP(ts), 1000:INTERVAL SECOND)], 
watermarkEmitStrategy=[on-periodic]]], fields=[int, double, name, ts])

{code}


As we see, there are two window operators (both with {{Local-Global 
optimization}} ). (Just to remember that the missing record is  - "+I[b, 3.0, 
1, 2020-10-10T00:00:31, 2020-10-10T00:00:41]")

As we see from the schema of the second {{LocalWindowAggregate}}, it uses 
{{window_start}} and {{window_end}} to calculate {{CUMULATE}} windows. At this 
point (at the second {{LocalWindowAggregate}}), our "missing" record becomes 
like "+I(b,2020-10-10T00:00:31,2020-10-10T00:00:41,0,null,3.0,0)". So, at this 
point, we already lost the original event time of the record. 


As a result, the flaky behaviour happens because of the calling order between 
{{SlicingWindowOperator::processWatermark}}->{{AbstractWindowAggProcessor::advanceProgress}}
 and {{SlicingWindowOperator::processElement}}:

- If the {{processWatermark}} is called before the {{processElement}}, then the 
{{currentProgress}} is updated to {{1602288041000}}. In this case, once the 
{{processElement}} is called afterwards, it considers the window is already 
fired and drops the element

- If the {{processElement}} is called before the {{processWatermark}}, then the 
record processed as expected. 

Is this something expected? WDYT?


> Missing record with CUMULATE/HOP windows using an optimization
> --------------------------------------------------------------
>
>                 Key: FLINK-33756
>                 URL: https://issues.apache.org/jira/browse/FLINK-33756
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Jim Hughes
>            Priority: Major
>
> I have seen an optimization cause a window fail to emit a record.
> With the optimization `TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED` set to 
> true, 
> the configuration AggregatePhaseStrategy.TWO_PHASE set, using a HOP or 
> CUMULATE window with an offset, a record can be sent which causes one of the 
> multiple active windows to fail to emit a record.
> The linked code 
> (https://github.com/jnh5y/flink/commit/ec90aa501d86f95559f8b22b0610e9fb786f05d4)
>  modifies the `WindowAggregateJsonITCase` to demonstrate the case.  
>  
> The test `testDistinctSplitDisabled` shows the expected behavior.  The test 
> `testDistinctSplitEnabled` tests the above configurations and shows that one 
> record is missing from the output.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to