Hi Folks, My team and i have a situation that cannot be explain and would like to hear your thoughts, we have a pipeline which enrich the incoming messages and write them to BigQuery, the pipeline looks like this:
Apache Beam 2.12.0 / GCP Dataflow ----- - ReadFromKafka (with withCreateTime and 10min MaxDelay) - ApplySomeInitialEnrichment (just add some stuff to the incoming json messages) - Apply a Fixed 1 hour window (with default triggering) - Apply Group By Key (userId) - Apply External Sorter (SortValues.create(BufferedExternalSorter.options()))) - Apply ComplexEnrichmentDoFn (to the sorted Iterable<>) - Read initial state from Hbase (BigTable) - loop thru all messages, enriching them with the previous state (incremental enrichment) and session calculation - write the final state to Hbase (BigTable) - output each of the enriched element to the next DoFn - Apply a Transformation to prepare the data to BigQuery - Apply BigQueryIO ------ Just to give some more context we have a meta_info column in our BigQuery table which values are set at the very beginning of the ComplexEnrichmentDoFn, meaning all the records within the same Iterable<> will hold the same information. The meta_info column contains the serialized PaneInfo, WindowInfo and our SystemTimestamp = currentTimeMilliseconds. We have 3 windows: A-windowInfo":"[2019-05-20T01:00:00.000Z..2019-05-20T02:00:00.000Z), systemTimestamp: *1559396670577* B-windowInfo":"[2019-05-20T02:00:00.000Z..2019-05-20T03:00:00.000Z), systemTimestamp: *1559396670670* C-windowInfo":"[2019-05-20T03:00:00.000Z..2019-05-20T04:00:00.000Z), systemTimestamp: *1559396670533* window A contains: 18 records window B contains: 46 records window C contains: 3 records If you pay attention to the A, B, C windowInfo from above, the ` *systemTimestamp*` field reflect an incorrect order of processing, and the enrichment was executed as C -> A -> B, corrupting all the messages for this given user. For all 3 windows the serialized PaneInfo was set by the runner to ON_TIME: A=B=C= "PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}" Any idea why would the windows be triggered out of order? -- JC