[jira] [Commented] (FLINK-34129) MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when state expired

2024-02-09 Thread Jeyhun Karimov (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17816250#comment-17816250
 ] 

Jeyhun Karimov commented on FLINK-34129:


Hi [~loserwang1024],  [~xuyangzhong] I am not sure if it a bug or expected 
behaviour in local-global aggregation. 

Partitioned aggregates (see {{GroupAggFunction::processElement}}) solve the 
above-mentioned issue by tracking the {{firstRow}} and avoid sending the first 
row to {{retract}} function. In this case, since the state partitioned and 
there is only one operator instance responsible for the partition, we can avoid 
the above mentioned behaviour. 

In the presence of local-global aggregates,  however:
- it is difficult to prevent the above-mentioned behaviour in 
{{LocalGroupAggFunction}} instances, since there can be multiple of 
{{LocalGroupAggFunction}} instances, and there is no ordering among them ( to 
track {{firstRow}} and to avoid it being retracted)
- it is difficult to prefent the above-mentioned behaviour in 
{{GlobalGroupAggFunction}} instances, since it already receives pre-aggregated 
data. 

Currently, the only way to avoid this behavior is to either

- Use the {{firstRow}} tracking (similar to 
{{GroupAggFunction::processElement}}) in {{LocalGroupAggFunction}} AND use 
parallelism 1
- Use the partitioned aggregates

> MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when 
> state expired 
> -
>
> Key: FLINK-34129
> URL: https://issues.apache.org/jira/browse/FLINK-34129
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.1
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: 1.19.0
>
>
> Take sum for example:
> When state is expired, then an update operation from source happens. 
> MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but 
> will emit +I[1, -20] and -D[1, -20]. The sink will detele the data from 
> external database.
> Let's see why this will happens:
>  * when state is expired and -U[1, 20] arrive, 
> MiniBatchGlobalGroupAggFunction will create a new sum accumulator and set 
> firstRow as true.
> {code:java}
> if (stateAcc == null) { 
>     stateAcc = globalAgg.createAccumulators(); 
>     firstRow = true; 
> }   {code}
>  * then sum accumulator will retract sum value as -20
>  * As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, 
> then emit to downstream.
> {code:java}
> if (!recordCounter.recordCountIsZero(acc)) {
>    // if this was not the first row and we have to emit retractions
>     if (!firstRow) {
>        // ignore
>     } else {
>     // update acc to state
>     accState.update(acc);
>  
>    // this is the first, output new result
>    // prepare INSERT message for new row
>    resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT);
>    out.collect(resultRow);
> }  {code}
>  * when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, 
> so RetractionRecordCounter#recordCountIsZero will return true. Because 
> firstRow = false now, will change the +U as -D, then emit to downtream.
> {code:java}
> if (!recordCounter.recordCountIsZero(acc)) {
>     // ignode
> }else{
>    // we retracted the last record for this key
>    // if this is not first row sent out a DELETE message
>    if (!firstRow) {
>    // prepare DELETE message for previous row
>    resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
>    out.collect(resultRow);
> } {code}
>  
> So the sink will receiver +I and -D after a source update operation, the data 
> will be delete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34129) MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when state expired

2024-01-26 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811167#comment-17811167
 ] 

xuyang commented on FLINK-34129:


If the upstream CDC data is incomplete, such as receiving UB before UA, and 
there is no I data before UB, taking SUM as an example, the global group 
aggregation will first emit (+I, key, -1), then emit (-D, key, -1). This leads 
to the final data being deleted.

This bug can be reproduced by adding the code in `TableSinkITCase` below.
{code:java}
@TestTemplate
def test(): Unit = {
  val tableConfig = tEnv.getConfig
  tableConfig.set(TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
  tableConfig.set(TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1))
  tableConfig.set(TABLE_EXEC_MINIBATCH_SIZE, Long.box(1L))

  val userDataId = TestValuesTableFactory.registerData(
Seq(
  changelogRow("-U", "k1", new JLong(1L)),
  changelogRow("+U", "k1", new JLong(1L))
))
  tEnv.executeSql(s"""
 |CREATE TABLE TT (
 |  a STRING,
 |  b BIGINT
 |) WITH (
 |  'connector' = 'values',
 |  'bounded' = 'false',
 |  'changelog-mode' = 'I,UA,UB,D',
 |  'data-id' = '$userDataId'
 |)
 |""".stripMargin)

  val sql =
"""
  |SELECT a, SUM(b)
  |FROM TT GROUP BY a
""".stripMargin
  tEnv.executeSql(sql).print
} {code}
The return values are:
{code:java}
+++--+
| op |                              a |               EXPR$1 |
+++--+
| +I |                             k1 |                   -1 |
| -D |                             k1 |                   -1 |
+++--+
2 rows in set {code}
 

> MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when 
> state expired 
> -
>
> Key: FLINK-34129
> URL: https://issues.apache.org/jira/browse/FLINK-34129
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.1
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: 1.19.0
>
>
> Take sum for example:
> When state is expired, then an update operation from source happens. 
> MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but 
> will emit +I[1, -20] and -D[1, -20]. The sink will detele the data from 
> external database.
> Let's see why this will happens:
>  * when state is expired and -U[1, 20] arrive, 
> MiniBatchGlobalGroupAggFunction will create a new sum accumulator and set 
> firstRow as true.
> {code:java}
> if (stateAcc == null) { 
>     stateAcc = globalAgg.createAccumulators(); 
>     firstRow = true; 
> }   {code}
>  * then sum accumulator will retract sum value as -20
>  * As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, 
> then emit to downstream.
> {code:java}
> if (!recordCounter.recordCountIsZero(acc)) {
>    // if this was not the first row and we have to emit retractions
>     if (!firstRow) {
>        // ignore
>     } else {
>     // update acc to state
>     accState.update(acc);
>  
>    // this is the first, output new result
>    // prepare INSERT message for new row
>    resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT);
>    out.collect(resultRow);
> }  {code}
>  * when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, 
> so RetractionRecordCounter#recordCountIsZero will return true. Because 
> firstRow = false now, will change the +U as -D, then emit to downtream.
> {code:java}
> if (!recordCounter.recordCountIsZero(acc)) {
>     // ignode
> }else{
>    // we retracted the last record for this key
>    // if this is not first row sent out a DELETE message
>    if (!firstRow) {
>    // prepare DELETE message for previous row
>    resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
>    out.collect(resultRow);
> } {code}
>  
> So the sink will receiver +I and -D after a source update operation, the data 
> will be delete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34129) MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when state expired

2024-01-17 Thread Hongshun Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17808018#comment-17808018
 ] 

Hongshun Wang commented on FLINK-34129:
---

[~fsk119] ,[~lsy] , [~andrewlinc...@gmail.com] , CC

> MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when 
> state expired 
> -
>
> Key: FLINK-34129
> URL: https://issues.apache.org/jira/browse/FLINK-34129
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.18.1
>Reporter: Hongshun Wang
>Priority: Major
> Fix For: 1.19.0
>
>
> Take sum for example:
> When state is expired, then an update operation from source happens. 
> MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but 
> will emit +I[1, -20] and -D[1, -20]. The sink will detele the data from 
> external database.
> Let's see why this will happens:
>  * when state is expired and -U[1, 20] arrive, 
> MiniBatchGlobalGroupAggFunction will create a new sum accumulator and set 
> firstRow as true.
> {code:java}
> if (stateAcc == null) { 
>     stateAcc = globalAgg.createAccumulators(); 
>     firstRow = true; 
> }   {code}
>  * then sum accumulator will retract sum value as -20
>  * As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, 
> then emit to downstream.
> {code:java}
> if (!recordCounter.recordCountIsZero(acc)) {
>    // if this was not the first row and we have to emit retractions
>     if (!firstRow) {
>        // ignore
>     } else {
>     // update acc to state
>     accState.update(acc);
>  
>    // this is the first, output new result
>    // prepare INSERT message for new row
>    resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT);
>    out.collect(resultRow);
> }  {code}
>  * when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, 
> so RetractionRecordCounter#recordCountIsZero will return true. Because 
> firstRow = false now, will change the +U as -D, then emit to downtream.
> {code:java}
> if (!recordCounter.recordCountIsZero(acc)) {
>     // ignode
> }else{
>    // we retracted the last record for this key
>    // if this is not first row sent out a DELETE message
>    if (!firstRow) {
>    // prepare DELETE message for previous row
>    resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
>    out.collect(resultRow);
> } {code}
>  
> So the sink will receiver +I and -D after a source update operation, the data 
> will be delete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)