[jira] [Commented] (FLINK-34129) MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when state expired
[ https://issues.apache.org/jira/browse/FLINK-34129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816250#comment-17816250 ] Jeyhun Karimov commented on FLINK-34129: Hi [~loserwang1024], [~xuyangzhong] I am not sure if it a bug or expected behaviour in local-global aggregation. Partitioned aggregates (see {{GroupAggFunction::processElement}}) solve the above-mentioned issue by tracking the {{firstRow}} and avoid sending the first row to {{retract}} function. In this case, since the state partitioned and there is only one operator instance responsible for the partition, we can avoid the above mentioned behaviour. In the presence of local-global aggregates, however: - it is difficult to prevent the above-mentioned behaviour in {{LocalGroupAggFunction}} instances, since there can be multiple of {{LocalGroupAggFunction}} instances, and there is no ordering among them ( to track {{firstRow}} and to avoid it being retracted) - it is difficult to prefent the above-mentioned behaviour in {{GlobalGroupAggFunction}} instances, since it already receives pre-aggregated data. Currently, the only way to avoid this behavior is to either - Use the {{firstRow}} tracking (similar to {{GroupAggFunction::processElement}}) in {{LocalGroupAggFunction}} AND use parallelism 1 - Use the partitioned aggregates > MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when > state expired > - > > Key: FLINK-34129 > URL: https://issues.apache.org/jira/browse/FLINK-34129 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.1 >Reporter: Hongshun Wang >Priority: Major > Fix For: 1.19.0 > > > Take sum for example: > When state is expired, then an update operation from source happens. > MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but > will emit +I[1, -20] and -D[1, -20]. The sink will detele the data from > external database. > Let's see why this will happens: > * when state is expired and -U[1, 20] arrive, > MiniBatchGlobalGroupAggFunction will create a new sum accumulator and set > firstRow as true. > {code:java} > if (stateAcc == null) { > stateAcc = globalAgg.createAccumulators(); > firstRow = true; > } {code} > * then sum accumulator will retract sum value as -20 > * As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, > then emit to downstream. > {code:java} > if (!recordCounter.recordCountIsZero(acc)) { > // if this was not the first row and we have to emit retractions > if (!firstRow) { > // ignore > } else { > // update acc to state > accState.update(acc); > > // this is the first, output new result > // prepare INSERT message for new row > resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT); > out.collect(resultRow); > } {code} > * when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, > so RetractionRecordCounter#recordCountIsZero will return true. Because > firstRow = false now, will change the +U as -D, then emit to downtream. > {code:java} > if (!recordCounter.recordCountIsZero(acc)) { > // ignode > }else{ > // we retracted the last record for this key > // if this is not first row sent out a DELETE message > if (!firstRow) { > // prepare DELETE message for previous row > resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE); > out.collect(resultRow); > } {code} > > So the sink will receiver +I and -D after a source update operation, the data > will be delete. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34129) MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when state expired
[ https://issues.apache.org/jira/browse/FLINK-34129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811167#comment-17811167 ] xuyang commented on FLINK-34129: If the upstream CDC data is incomplete, such as receiving UB before UA, and there is no I data before UB, taking SUM as an example, the global group aggregation will first emit (+I, key, -1), then emit (-D, key, -1). This leads to the final data being deleted. This bug can be reproduced by adding the code in `TableSinkITCase` below. {code:java} @TestTemplate def test(): Unit = { val tableConfig = tEnv.getConfig tableConfig.set(TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true)) tableConfig.set(TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)) tableConfig.set(TABLE_EXEC_MINIBATCH_SIZE, Long.box(1L)) val userDataId = TestValuesTableFactory.registerData( Seq( changelogRow("-U", "k1", new JLong(1L)), changelogRow("+U", "k1", new JLong(1L)) )) tEnv.executeSql(s""" |CREATE TABLE TT ( | a STRING, | b BIGINT |) WITH ( | 'connector' = 'values', | 'bounded' = 'false', | 'changelog-mode' = 'I,UA,UB,D', | 'data-id' = '$userDataId' |) |""".stripMargin) val sql = """ |SELECT a, SUM(b) |FROM TT GROUP BY a """.stripMargin tEnv.executeSql(sql).print } {code} The return values are: {code:java} +++--+ | op | a | EXPR$1 | +++--+ | +I | k1 | -1 | | -D | k1 | -1 | +++--+ 2 rows in set {code} > MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when > state expired > - > > Key: FLINK-34129 > URL: https://issues.apache.org/jira/browse/FLINK-34129 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.1 >Reporter: Hongshun Wang >Priority: Major > Fix For: 1.19.0 > > > Take sum for example: > When state is expired, then an update operation from source happens. > MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but > will emit +I[1, -20] and -D[1, -20]. The sink will detele the data from > external database. > Let's see why this will happens: > * when state is expired and -U[1, 20] arrive, > MiniBatchGlobalGroupAggFunction will create a new sum accumulator and set > firstRow as true. > {code:java} > if (stateAcc == null) { > stateAcc = globalAgg.createAccumulators(); > firstRow = true; > } {code} > * then sum accumulator will retract sum value as -20 > * As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, > then emit to downstream. > {code:java} > if (!recordCounter.recordCountIsZero(acc)) { > // if this was not the first row and we have to emit retractions > if (!firstRow) { > // ignore > } else { > // update acc to state > accState.update(acc); > > // this is the first, output new result > // prepare INSERT message for new row > resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT); > out.collect(resultRow); > } {code} > * when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, > so RetractionRecordCounter#recordCountIsZero will return true. Because > firstRow = false now, will change the +U as -D, then emit to downtream. > {code:java} > if (!recordCounter.recordCountIsZero(acc)) { > // ignode > }else{ > // we retracted the last record for this key > // if this is not first row sent out a DELETE message > if (!firstRow) { > // prepare DELETE message for previous row > resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE); > out.collect(resultRow); > } {code} > > So the sink will receiver +I and -D after a source update operation, the data > will be delete. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34129) MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when state expired
[ https://issues.apache.org/jira/browse/FLINK-34129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808018#comment-17808018 ] Hongshun Wang commented on FLINK-34129: --- [~fsk119] ,[~lsy] , [~andrewlinc...@gmail.com] , CC > MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when > state expired > - > > Key: FLINK-34129 > URL: https://issues.apache.org/jira/browse/FLINK-34129 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.18.1 >Reporter: Hongshun Wang >Priority: Major > Fix For: 1.19.0 > > > Take sum for example: > When state is expired, then an update operation from source happens. > MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but > will emit +I[1, -20] and -D[1, -20]. The sink will detele the data from > external database. > Let's see why this will happens: > * when state is expired and -U[1, 20] arrive, > MiniBatchGlobalGroupAggFunction will create a new sum accumulator and set > firstRow as true. > {code:java} > if (stateAcc == null) { > stateAcc = globalAgg.createAccumulators(); > firstRow = true; > } {code} > * then sum accumulator will retract sum value as -20 > * As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, > then emit to downstream. > {code:java} > if (!recordCounter.recordCountIsZero(acc)) { > // if this was not the first row and we have to emit retractions > if (!firstRow) { > // ignore > } else { > // update acc to state > accState.update(acc); > > // this is the first, output new result > // prepare INSERT message for new row > resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT); > out.collect(resultRow); > } {code} > * when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, > so RetractionRecordCounter#recordCountIsZero will return true. Because > firstRow = false now, will change the +U as -D, then emit to downtream. > {code:java} > if (!recordCounter.recordCountIsZero(acc)) { > // ignode > }else{ > // we retracted the last record for this key > // if this is not first row sent out a DELETE message > if (!firstRow) { > // prepare DELETE message for previous row > resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE); > out.collect(resultRow); > } {code} > > So the sink will receiver +I and -D after a source update operation, the data > will be delete. -- This message was sent by Atlassian Jira (v8.20.10#820010)