mosche opened a new issue, #23129: URL: https://github.com/apache/beam/issues/23129
### What happened? I was running into [this issue](https://github.com/apache/beam/pull/22620#issuecomment-1208303417) when adding support for VR tests for Spark in streaming mode (https://github.com/apache/beam/pull/22620). It looks like the runner only supports a global watermark, causing issues when running multiple stateful operators. In the logs below you can see that timers fire downstream in `PAssert$0/GroupGlobally/GroupByKey` (at `56,830`) after advancing the global watermark (at `56,793`) **but before** respective timers are fired upstream in `Group.ByFields/ToKvs/GroupByKey` (at `57,005`). As a result the expected element is the considered expired when finally arriving in `PAssert$0/GroupGlobally/GroupByKey` (at `57,065`): ``` 52,112 [worker] INFO TestSparkRunner - About to run test pipeline avroschematest0testavropipelinegroupby-mmack-0909103252-2afdba63 55,816 [0] INFO StateSpecFunctions - Source 0_0 read 1 values, watermarks: [low=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z] 55,899 [JobGenerator] INFO GlobalWatermarkHolder - Queued watermarks for source 0: [low=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.000Z] 56,297 [1] INFO StateSpecFunctions - Source 0_0 read 0 values, watermarks: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z] 56,314 [JobGenerator] INFO GlobalWatermarkHolder - Queued watermarks for source 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.500Z] 56,546 [3] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: input elements: 1, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=-290308-12-21T19:59:05.225Z] 56,567 [3] INFO SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to -290308-12-21T19:59:05.225Z 56,570 [3] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: output elements: 0, state size: 2 56,614 [9] TRACE SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: input elements: 1, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=-290308-12-21T19:59:05.225Z] 56,615 [9] INFO SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to -290308-12-21T19:59:05.225Z 56,615 [9] TRACE SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: output elements: 0, state size: 2 56,770 [15] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: input elements: 0, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=-290308-12-21T19:59:05.225Z] 56,771 [15] INFO SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to -290308-12-21T19:59:05.225Z 56,771 [15] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: output elements: 0, state size: 2 56,780 [8] INFO StateSpecFunctions - Source 0_0 read 0 values, watermarks: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z] 56,793 [listener] INFO GlobalWatermarkHolder - Batch 2022-09-09T10:32:55.000Z completed, new watermarks: {0=[low=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.000Z]} 56,793 [JobGenerator] INFO GlobalWatermarkHolder - Queued watermarks for source 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.000Z] 56,829 [20] TRACE SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: input elements: 0, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z] 56,829 [20] INFO SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to 294247-01-10T04:00:54.775Z 56,830 [20] DEBUG SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: eligible timer at 294247-01-09T04:00:54.775Z: TimerData{timerId=0:9223371950454775, timerFamilyId=, namespace=Window(org.apache.beam.sdk.transforms.windowing.GlobalWindow@1f346ad2), timestamp=294247-01-09T04:00:54.775Z, outputTimestamp=294247-01-09T04:00:54.775Z, domain=EVENT_TIME, deleted=false} 56,835 [20] TRACE SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: output elements: 1, state size: 0 56,850 [20] ERROR Executor - Exception in task 1.0 in stage 14.0 (TID 20) org.apache.beam.sdk.util.UserCodeException: java.lang.AssertionError: Group.ByFields/ToRow/ParMultiDo(Anonymous).output: Expected: iterable with items [<Row: key:Row: string:mystring,,value:[Row: bool_non_nullable:true,int:43,long:44,float:44.1,double:44.2,string:mystring,bytes:[1, 2, 3, 4],fixed:[B@7cb94f7,date:1979-03-14T00:00:00.000Z,timestampMillis:1979-03-14T00:02:03.004Z,testEnum:enum value: 0,row:Row: BOOL_NON_NULLABLE:true,int:42,,array:[Row: BOOL_NON_NULLABLE:true,int:42,, Row: BOOL_NON_NULLABLE:true,int:42,, ],map:{(k1, Row: BOOL_NON_NULLABLE:true,int:42,), (k2, Row: BOOL_NON_NULLABLE:true,int:42,), },, ],>] in any order but: no item matches: <Row: key:Row: string:mystring,,value:[Row: bool_non_nullable:true,int:43,long:44,float:44.1,double:44.2,string:mystring,bytes:[1, 2, 3, 4],fixed:[B@7cb94f7,date:1979-03-14T00:00:00.000Z,timestampMillis:1979-03-14T00:02:03.004Z,testEnum:enum value: 0,row:Row: BOOL_NON_NULLABLE:true,int:42,,array:[Row: BOOL_NON_NULLABLE:true,int:42,, Row: BOOL_NON_NULLABLE:true,int:42,, ],map:{(k1, Row: BOOL_NON_NULLABLE:true,int:42,), (k2, Row: BOOL_NON_NULLABLE:true, int:42,), },, ],> in [] 56,879 [task-result] ERROR TaskSetManager - Task 1 in stage 14.0 failed 1 times; aborting job 56,893 [JobScheduler] ERROR JobScheduler - Error running job streaming job 1662719575500 ms.0 57,004 [26] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: input elements: 0, expired elements: 0 [current=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z] 57,004 [26] INFO SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: advancing watermark from -290308-12-21T19:59:05.225Z to 294247-01-10T04:00:54.775Z 57,005 [26] DEBUG SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: eligible timer at 294247-01-09T04:00:54.775Z: TimerData{timerId=0:9223371950454775, timerFamilyId=, namespace=Window(org.apache.beam.sdk.transforms.windowing.GlobalWindow@1f346ad2), timestamp=294247-01-09T04:00:54.775Z, outputTimestamp=294247-01-09T04:00:54.775Z, domain=EVENT_TIME, deleted=false} 57,005 [26] TRACE SparkGroupAlsoByWindowViaWindowSet - Group.ByFields/ToKvs/GroupByKey: output elements: 1, state size: 0 57,005 [listener] INFO GlobalWatermarkHolder - Current watermarks for sourceId 0: [low=-290308-12-21T19:59:05.225Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.000Z] 57,005 [listener] INFO GlobalWatermarkHolder - Batch 2022-09-09T10:32:55.500Z completed, new watermarks {0=[low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.500Z]} 57,065 [31] TRACE SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: input elements: 1, expired elements: 1 [current=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z] 57,065 [31] INFO SparkGroupAlsoByWindowViaWindowSet - PAssert$0/GroupGlobally/GroupByKey: advancing watermark from 294247-01-10T04:00:54.775Z to 294247-01-10T04:00:54.775Z 57,174 [listener] INFO GlobalWatermarkHolder - Current watermarks for sourceId 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:55.500Z] 57,176 [listener] INFO GlobalWatermarkHolder - Batch 2022-09-09T10:32:56.000Z completed: new watermarks {0=[low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.000Z]} 57,180 [24] INFO StateSpecFunctions - Source 0_0 read 0 values, watermarks: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z] 57,190 [JobGenerator] INFO GlobalWatermarkHolder - Queued watermarks for source 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.500Z] 57,552 [listener] INFO GlobalWatermarkHolder - Current watermarks for sourceId 0: [low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.000Z] 57,555 [listener] INFO GlobalWatermarkHolder - Batch 2022-09-09T10:32:56.500Z completed, new watermarks {0=[low=294247-01-10T04:00:54.775Z, high=294247-01-10T04:00:54.775Z, time=2022-09-09T10:32:56.500Z]} ``` ### Issue Priority Priority: 2 ### Issue Component Component: runner-spark -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
