Hi Folks,

My team and i have a situation that cannot be explain and
would like to hear your thoughts, we have a pipeline which
enrich the incoming messages and write them to BigQuery, the pipeline looks
like this:

Apache Beam 2.12.0 / GCP Dataflow

-----
- ReadFromKafka (with withCreateTime and 10min MaxDelay)
- ApplySomeInitialEnrichment (just add some stuff to the incoming json
messages)
- Apply a Fixed 1 hour window (with default triggering)
- Apply Group By Key (userId)
- Apply External Sorter
(SortValues.create(BufferedExternalSorter.options())))
- Apply ComplexEnrichmentDoFn (to the sorted Iterable<>)
  - Read initial state from Hbase (BigTable)
  - loop thru all messages, enriching them with the previous state
(incremental enrichment) and session calculation
  - write the final state to Hbase (BigTable)
  - output each of the enriched element to the next DoFn
- Apply a Transformation to prepare the data to BigQuery
- Apply BigQueryIO
------


Just to give some more context we have a meta_info column in our BigQuery
table which values are set at the very beginning of the
ComplexEnrichmentDoFn, meaning all the records within the same Iterable<>
will hold the same information. The meta_info column contains the
serialized PaneInfo, WindowInfo and our SystemTimestamp =
currentTimeMilliseconds.

We have 3 windows:
  A-windowInfo":"[2019-05-20T01:00:00.000Z..2019-05-20T02:00:00.000Z),
systemTimestamp: *1559396670577*
  B-windowInfo":"[2019-05-20T02:00:00.000Z..2019-05-20T03:00:00.000Z),
systemTimestamp: *1559396670670*
  C-windowInfo":"[2019-05-20T03:00:00.000Z..2019-05-20T04:00:00.000Z),
systemTimestamp: *1559396670533*


window A contains: 18 records
window B contains: 46 records
window C contains: 3  records

If you pay attention to the A, B, C windowInfo from above, the `
*systemTimestamp*` field reflect an incorrect order of processing, and the
enrichment was executed as C -> A ->  B, corrupting all the messages for
this given user.

For all 3 windows the serialized PaneInfo was set by the runner to ON_TIME:
A=B=C= "PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0,
onTimeIndex=0}"

Any idea why would the windows be triggered out of order?

-- 

JC

Reply via email to