[ 
https://issues.apache.org/jira/browse/HUDI-8996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Geser Dugarov updated HUDI-8996:
--------------------------------
    Description: 
```SQL
CREATE TABLE hudi_debug (
    id INT,
    part INT,
    desc STRING,
    PRIMARY KEY (id) NOT ENFORCED
)  PARTITIONED BY (`part`) 
WITH (
    'connector' = 'hudi',
    'path' = '.../hudi_debug',
    'compaction.schedule.enabled'='false',
    'compaction.async.enabled'='false',
    'clean.async.enabled'='false',
    'write.tasks'='1',
    'read.tasks'='1',
    'table.type' = 'MERGE_ON_READ',
    'write.operation' = 'upsert',
    'index.global.enabled' = 'true'
);

INSERT INTO hudi_debug VALUES 
    (1,100,'aaa'),
    (2,200,'bbb');
```

And then I want to upsert into existed table:
```SQL
INSERT INTO hudi_debug VALUES 
    (1,111,'aaa_new'),
    (2,200,'bbb_new');
```

>From code for `BucketAssignFunction::processRecord` I expect that delete 
>record will be generated, and 
```SQL
SELECT * FROM hudi_debug;
```
will give
```text
          id        part                           desc
           2         200                        bbb_new
           1         111                        aaa_new
```

But I got:
```text
          id        part                           desc
           2         200                        bbb_new
           1         111                        aaa_new
           1         100                            aaa
```
```

> No delete records for Flink upsert if partition path changed
> ------------------------------------------------------------
>
>                 Key: HUDI-8996
>                 URL: https://issues.apache.org/jira/browse/HUDI-8996
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Geser Dugarov
>            Assignee: Geser Dugarov
>            Priority: Major
>
> ```SQL
> CREATE TABLE hudi_debug (
>     id INT,
>     part INT,
>     desc STRING,
>     PRIMARY KEY (id) NOT ENFORCED
> )  PARTITIONED BY (`part`) 
> WITH (
>     'connector' = 'hudi',
>     'path' = '.../hudi_debug',
>     'compaction.schedule.enabled'='false',
>     'compaction.async.enabled'='false',
>     'clean.async.enabled'='false',
>     'write.tasks'='1',
>     'read.tasks'='1',
>     'table.type' = 'MERGE_ON_READ',
>     'write.operation' = 'upsert',
>     'index.global.enabled' = 'true'
> );
> INSERT INTO hudi_debug VALUES 
>     (1,100,'aaa'),
>     (2,200,'bbb');
> ```
> And then I want to upsert into existed table:
> ```SQL
> INSERT INTO hudi_debug VALUES 
>     (1,111,'aaa_new'),
>     (2,200,'bbb_new');
> ```
> From code for `BucketAssignFunction::processRecord` I expect that delete 
> record will be generated, and 
> ```SQL
> SELECT * FROM hudi_debug;
> ```
> will give
> ```text
>           id        part                           desc
>            2         200                        bbb_new
>            1         111                        aaa_new
> ```
> But I got:
> ```text
>           id        part                           desc
>            2         200                        bbb_new
>            1         111                        aaa_new
>            1         100                            aaa
> ```
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to