This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.1 by this push:
     new b7ae760  [FLINK-26457] Introduce PartialUpdateMergeFunction
b7ae760 is described below

commit b7ae760200c500f2277e266e17b2cb271dcec805
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Fri Apr 22 09:55:15 2022 +0800

    [FLINK-26457] Introduce PartialUpdateMergeFunction
    
    This closes #98
---
 docs/content/docs/development/create-table.md      | 35 ++++++++
 .../flink/table/store/connector/TableStore.java    | 32 +++++++-
 .../store/connector/ContinuousFileStoreITCase.java | 38 ++-------
 .../store/connector/FileStoreTableITCase.java      | 61 ++++++++++++++
 .../table/store/connector/PartialUpdateITCase.java | 94 ++++++++++++++++++++++
 .../flink/table/store/file/FileStoreOptions.java   | 43 ++++++++++
 .../compact/DeduplicateMergeFunction.java          |  2 +
 ...nction.java => PartialUpdateMergeFunction.java} | 31 +++++--
 .../mergetree/compact/ValueCountMergeFunction.java |  2 +
 9 files changed, 298 insertions(+), 40 deletions(-)

diff --git a/docs/content/docs/development/create-table.md 
b/docs/content/docs/development/create-table.md
index 8b94f4f..95f5e87 100644
--- a/docs/content/docs/development/create-table.md
+++ b/docs/content/docs/development/create-table.md
@@ -233,3 +233,38 @@ The two methods do not behave in the same way when 
querying.
 Use approach one if you have a large number of filtered queries
 with only `user_id`, and use approach two if you have a large
 number of filtered queries with only `catalog_id`.
+
+## Partial Update
+
+You can configure partial update from options:
+
+```sql
+CREATE TABLE MyTable (
+  product_id BIGINT,
+  price DOUBLE,
+  number BIGINT,
+  detail STRING,
+  PRIMARY KEY (product_id) NOT ENFORCED
+) WITH (
+  'merge-engine' = 'partial-update'
+);
+```
+
+{{< hint info >}}
+__Note:__ Partial update is only supported for table with primary key.
+{{< /hint >}}
+
+{{< hint info >}}
+__Note:__ Partial update is not supported for streaming consuming.
+{{< /hint >}}
+
+The value fields are updated to the latest data one by one
+under the same primary key, but null values are not overwritten.
+
+For example, the inputs: 
+- <1, 23.0, 10,   NULL>
+- <1, NULL, 20,   'This is a book'>
+- <1, 25.2, NULL, NULL>
+
+Output: 
+- <1, 25.2, 20, 'This is a book'>
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 37315c4..c17a309 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogLock;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.Projection;
@@ -44,8 +45,10 @@ import 
org.apache.flink.table.store.connector.source.LogHybridSourceFactory;
 import 
org.apache.flink.table.store.connector.source.StaticFileStoreSplitEnumerator;
 import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.FileStoreOptions.MergeEngine;
 import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import 
org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
 import 
org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.log.LogOptions.LogStartupMode;
@@ -68,6 +71,8 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
 import static 
org.apache.flink.table.store.file.FileStoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
+import static org.apache.flink.table.store.file.FileStoreOptions.MERGE_ENGINE;
+import static 
org.apache.flink.table.store.file.FileStoreOptions.MergeEngine.PARTIAL_UPDATE;
 import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
 import static org.apache.flink.table.store.log.LogOptions.SCAN;
 
@@ -164,6 +169,10 @@ public class TableStore {
         return new SinkBuilder();
     }
 
+    private MergeEngine mergeEngine() {
+        return options.get(MERGE_ENGINE);
+    }
+
     private FileStore buildFileStore() {
         RowType partitionType = TypeUtils.project(type, partitions);
         RowType keyType;
@@ -190,7 +199,23 @@ public class TableStore {
                                                             
f.getDescription().orElse(null)))
                                     .collect(Collectors.toList()));
             valueType = type;
-            mergeFunction = new DeduplicateMergeFunction();
+
+            switch (mergeEngine()) {
+                case DEDUPLICATE:
+                    mergeFunction = new DeduplicateMergeFunction();
+                    break;
+                case PARTIAL_UPDATE:
+                    List<LogicalType> fieldTypes = type.getChildren();
+                    RowData.FieldGetter[] fieldGetters = new 
RowData.FieldGetter[fieldTypes.size()];
+                    for (int i = 0; i < fieldTypes.size(); i++) {
+                        fieldGetters[i] = 
RowData.createFieldGetter(fieldTypes.get(i), i);
+                    }
+                    mergeFunction = new 
PartialUpdateMergeFunction(fieldGetters);
+                    break;
+                default:
+                    throw new UnsupportedOperationException(
+                            "Unsupported merge engine: " + mergeEngine());
+            }
         }
         return new FileStoreImpl(
                 tableIdentifier, options, user, partitionType, keyType, 
valueType, mergeFunction);
@@ -300,6 +325,11 @@ public class TableStore {
 
         private Source<RowData, ?, ?> buildSource() {
             if (isContinuous) {
+                if (primaryKeys.length > 0 && mergeEngine() == PARTIAL_UPDATE) 
{
+                    throw new ValidationException(
+                            "Partial update continuous reading is not 
supported.");
+                }
+
                 LogStartupMode startupMode = logOptions().get(SCAN);
                 if (logSourceProvider == null) {
                     return buildFileSource(true, startupMode == 
LogStartupMode.LATEST);
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
index f873e8e..0be5e18 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
@@ -18,50 +18,26 @@
 
 package org.apache.flink.table.store.connector;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
-import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
-import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
-import static 
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** SQL ITCase for continuous file store. */
-public class ContinuousFileStoreITCase extends AbstractTestBase {
-
-    private TableEnvironment bEnv;
-    private TableEnvironment sEnv;
-
-    @Before
-    public void before() throws IOException {
-        bEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
-        sEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
-        sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(100));
-        String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
-        prepareEnv(bEnv, path);
-        prepareEnv(sEnv, path);
-    }
+public class ContinuousFileStoreITCase extends FileStoreTableITCase {
 
-    private void prepareEnv(TableEnvironment env, String path) {
-        Configuration config = env.getConfig().getConfiguration();
-        
config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
-        config.setString(TABLE_STORE_PREFIX + PATH.key(), path);
-        env.executeSql("CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c 
STRING)");
-        env.executeSql(
+    @Override
+    protected List<String> ddl() {
+        return Arrays.asList(
+                "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)",
                 "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, 
PRIMARY KEY (a) NOT ENFORCED)");
     }
 
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
new file mode 100644
index 0000000..2fd5092
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Before;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+import static 
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+
+/** ITCase for file store table api. */
+public abstract class FileStoreTableITCase extends AbstractTestBase {
+
+    protected TableEnvironment bEnv;
+    protected TableEnvironment sEnv;
+
+    @Before
+    public void before() throws IOException {
+        bEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        sEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+        sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(100));
+        String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+        prepareEnv(bEnv, path);
+        prepareEnv(sEnv, path);
+    }
+
+    private void prepareEnv(TableEnvironment env, String path) {
+        Configuration config = env.getConfig().getConfiguration();
+        
config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+        config.setString(TABLE_STORE_PREFIX + PATH.key(), path);
+        ddl().forEach(env::executeSql);
+    }
+
+    protected abstract List<String> ddl();
+}
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java
new file mode 100644
index 0000000..7d4094a
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.CollectionUtil.iteratorToList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** ITCase for partial update. */
+public class PartialUpdateITCase extends FileStoreTableITCase {
+
+    @Override
+    protected List<String> ddl() {
+        return Collections.singletonList(
+                "CREATE TABLE IF NOT EXISTS T ("
+                        + "j INT, k INT, a INT, b INT, c STRING, PRIMARY KEY 
(j,k) NOT ENFORCED)"
+                        + " WITH ('merge-engine'='partial-update');");
+    }
+
+    @Test
+    public void testMergeInMemory() throws ExecutionException, 
InterruptedException {
+        bEnv.executeSql(
+                        "INSERT INTO T VALUES "
+                                + "(1, 2, 3, CAST(NULL AS INT), '5'), "
+                                + "(1, 2, CAST(NULL AS INT), 6, CAST(NULL AS 
STRING))")
+                .await();
+        List<Row> result = iteratorToList(bEnv.from("T").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 3, 6, "5"));
+    }
+
+    @Test
+    public void testMergeRead() throws ExecutionException, 
InterruptedException {
+        bEnv.executeSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT), 
CAST(NULL AS STRING))")
+                .await();
+        bEnv.executeSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS 
STRING))").await();
+        bEnv.executeSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT), 
'6')").await();
+
+        List<Row> result = iteratorToList(bEnv.from("T").execute().collect());
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 4, 5, "6"));
+    }
+
+    @Test
+    public void testMergeCompaction() throws ExecutionException, 
InterruptedException {
+        // Wait compaction
+        bEnv.executeSql("ALTER TABLE T SET ('commit.force-compact'='true')");
+
+        // key 1 2
+        bEnv.executeSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT), 
CAST(NULL AS STRING))")
+                .await();
+        bEnv.executeSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS 
STRING))").await();
+        bEnv.executeSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT), 
'6')").await();
+
+        // key 1 3
+        bEnv.executeSql("INSERT INTO T VALUES (1, 3, CAST(NULL AS INT), 1, 
'1')").await();
+        bEnv.executeSql("INSERT INTO T VALUES (1, 3, 2, 3, CAST(NULL AS 
STRING))").await();
+        bEnv.executeSql("INSERT INTO T VALUES (1, 3, CAST(NULL AS INT), 4, 
CAST(NULL AS STRING))")
+                .await();
+
+        List<Row> result = iteratorToList(bEnv.from("T").execute().collect());
+        assertThat(result)
+                .containsExactlyInAnyOrder(Row.of(1, 2, 4, 5, "6"), Row.of(1, 
3, 2, 4, "1"));
+    }
+
+    @Test
+    public void testStreamingRead() {
+        assertThatThrownBy(
+                () -> sEnv.from("T").execute().print(),
+                "Partial update continuous reading is not supported");
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index 0b91c9b..eb8c908 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -21,7 +21,10 @@ package org.apache.flink.table.store.file;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DescribedEnum;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.InlineElement;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.store.file.format.FileFormat;
@@ -35,6 +38,8 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
 
 /** Options for {@link FileStore}. */
 public class FileStoreOptions implements Serializable {
@@ -111,6 +116,19 @@ public class FileStoreOptions implements Serializable {
                     .defaultValue(Duration.ofSeconds(1))
                     .withDescription("The discovery interval of continuous 
reading.");
 
+    public static final ConfigOption<MergeEngine> MERGE_ENGINE =
+            ConfigOptions.key("merge-engine")
+                    .enumType(MergeEngine.class)
+                    .defaultValue(MergeEngine.DEDUPLICATE)
+                    .withDescription(
+                            Description.builder()
+                                    .text("Specifies the merge engine for 
table with primary key.")
+                                    .linebreak()
+                                    .list(
+                                            
formatEnumOption(MergeEngine.DEDUPLICATE),
+                                            
formatEnumOption(MergeEngine.PARTIAL_UPDATE))
+                                    .build());
+
     private final Configuration options;
 
     public static Set<ConfigOption<?>> allOptions() {
@@ -207,4 +225,29 @@ public class FileStoreOptions implements Serializable {
     public int manifestMergeMinCount() {
         return options.get(MANIFEST_MERGE_MIN_COUNT);
     }
+
+    /** Specifies the merge engine for table with primary key. */
+    public enum MergeEngine implements DescribedEnum {
+        DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
+
+        PARTIAL_UPDATE("partial-update", "Partial update non-null fields.");
+
+        private final String value;
+        private final String description;
+
+        MergeEngine(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
index b9befdf..fb9af1e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
@@ -28,6 +28,8 @@ import javax.annotation.Nullable;
  */
 public class DeduplicateMergeFunction implements MergeFunction {
 
+    private static final long serialVersionUID = 1L;
+
     private RowData latestValue;
 
     @Override
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java
similarity index 60%
copy from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
copy to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java
index b9befdf..7ba7717 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java
@@ -18,36 +18,51 @@
 
 package org.apache.flink.table.store.file.mergetree.compact;
 
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 
 import javax.annotation.Nullable;
 
 /**
- * A {@link MergeFunction} where key is primary key (unique) and value is the 
full record, only keep
- * the latest one.
+ * A {@link MergeFunction} where key is primary key (unique) and value is the 
partial record, update
+ * non-null fields on merge.
  */
-public class DeduplicateMergeFunction implements MergeFunction {
+public class PartialUpdateMergeFunction implements MergeFunction {
 
-    private RowData latestValue;
+    private static final long serialVersionUID = 1L;
+
+    private final RowData.FieldGetter[] getters;
+
+    private transient GenericRowData row;
+
+    public PartialUpdateMergeFunction(RowData.FieldGetter[] getters) {
+        this.getters = getters;
+    }
 
     @Override
     public void reset() {
-        latestValue = null;
+        this.row = new GenericRowData(getters.length);
     }
 
     @Override
     public void add(RowData value) {
-        latestValue = value;
+        for (int i = 0; i < getters.length; i++) {
+            Object field = getters[i].getFieldOrNull(value);
+            if (field != null) {
+                row.setField(i, field);
+            }
+        }
     }
 
     @Override
     @Nullable
     public RowData getValue() {
-        return latestValue;
+        return row;
     }
 
     @Override
     public MergeFunction copy() {
-        return new DeduplicateMergeFunction();
+        // RowData.FieldGetter is thread safe
+        return new PartialUpdateMergeFunction(getters);
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
index b62157a..aceddd5 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
@@ -31,6 +31,8 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  */
 public class ValueCountMergeFunction implements MergeFunction {
 
+    private static final long serialVersionUID = 1L;
+
     private long total;
 
     @Override

Reply via email to