This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b149792 [FLINK-29189] Pass KeyValue to MergeFunction
3b149792 is described below

commit 3b149792a9b106a748fe60d787c14daf55635d43
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Tue Sep 6 10:23:56 2022 +0800

    [FLINK-29189] Pass KeyValue to MergeFunction
    
    This closes #286
---
 .../store/file/mergetree/SortBufferMemTable.java   |  4 +--
 .../compact/DeduplicateMergeFunction.java          |  5 ++--
 .../file/mergetree/compact/MergeFunction.java      |  4 +--
 .../mergetree/compact/MergeFunctionHelper.java     | 34 ++++++++++------------
 .../compact/PartialUpdateMergeFunction.java        | 11 +++++--
 .../file/mergetree/compact/SortMergeReader.java    |  2 +-
 .../mergetree/compact/ValueCountMergeFunction.java |  9 ++++--
 .../compact/MergeFunctionHelperTestBase.java       | 16 +++++++++-
 8 files changed, 55 insertions(+), 30 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
index 87186888..baa512c0 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
@@ -172,7 +172,7 @@ public class SortBufferMemTable implements MemTable {
                     return;
                 }
                 mergeFunctionHelper.reset();
-                mergeFunctionHelper.add(previous.getReusedKv().value());
+                mergeFunctionHelper.add(previous.getReusedKv());
 
                 while (readOnce()) {
                     if (keyComparator.compare(
@@ -180,7 +180,7 @@ public class SortBufferMemTable implements MemTable {
                             != 0) {
                         break;
                     }
-                    mergeFunctionHelper.add(current.getReusedKv().value());
+                    mergeFunctionHelper.add(current.getReusedKv());
                     swapSerializers();
                 }
                 result = mergeFunctionHelper.getValue();
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
index fb9af1ef..12f3e050 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.file.mergetree.compact;
 
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
 
 import javax.annotation.Nullable;
 
@@ -38,8 +39,8 @@ public class DeduplicateMergeFunction implements 
MergeFunction {
     }
 
     @Override
-    public void add(RowData value) {
-        latestValue = value;
+    public void add(KeyValue kv) {
+        latestValue = kv.value();
     }
 
     @Override
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunction.java
index cc1c7136..78d44716 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunction.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunction.java
@@ -31,8 +31,8 @@ public interface MergeFunction extends Serializable {
     /** Reset the merge function to its default state. */
     void reset();
 
-    /** Add the given {@link RowData} to the merge function. */
-    void add(RowData value);
+    /** Add the given {@link KeyValue} to the merge function. */
+    void add(KeyValue kv);
 
     /** Get current merged value. Return null if this merged result should be 
skipped. */
     @Nullable
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelper.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelper.java
index f3ae6192..0631351d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelper.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelper.java
@@ -19,48 +19,46 @@
 package org.apache.flink.table.store.file.mergetree.compact;
 
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
 
 /** Helper functions for the interaction with {@link MergeFunction}. */
 public class MergeFunctionHelper {
 
     private final MergeFunction mergeFunction;
 
-    private RowData rowData;
+    private KeyValue initialKV;
     private boolean isInitialized;
 
     public MergeFunctionHelper(MergeFunction mergeFunction) {
         this.mergeFunction = mergeFunction;
     }
 
-    /**
-     * Resets the {@link MergeFunction} helper to its default state: 1. Clears 
the one record which
-     * the helper maintains. 2. Resets the {@link MergeFunction} to its 
default state. 3. Clears the
-     * initialized state of the {@link MergeFunction}.
-     */
+    /** Resets the {@link MergeFunction} helper to its default state. */
     public void reset() {
-        rowData = null;
+        initialKV = null;
         mergeFunction.reset();
         isInitialized = false;
     }
 
-    /** Adds the given {@link RowData} to the {@link MergeFunction} helper. */
-    public void add(RowData value) {
-        if (rowData == null) {
-            rowData = value;
+    /** Adds the given {@link KeyValue} to the {@link MergeFunction} helper. */
+    public void add(KeyValue kv) {
+        if (initialKV == null) {
+            initialKV = kv;
         } else {
             if (!isInitialized) {
-                mergeFunction.add(rowData);
+                merge(initialKV);
                 isInitialized = true;
             }
-            mergeFunction.add(value);
+            merge(kv);
         }
     }
 
-    /**
-     * Get current value of the {@link MergeFunction} helper. Return null if 
the value should be
-     * skipped.
-     */
+    protected void merge(KeyValue kv) {
+        mergeFunction.add(kv);
+    }
+
+    /** Get current value of the {@link MergeFunction} helper. */
     public RowData getValue() {
-        return isInitialized ? mergeFunction.getValue() : rowData;
+        return isInitialized ? mergeFunction.getValue() : initialKV.value();
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java
index 7ba77177..f93455cd 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java
@@ -20,9 +20,13 @@ package org.apache.flink.table.store.file.mergetree.compact;
 
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.types.RowKind;
 
 import javax.annotation.Nullable;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A {@link MergeFunction} where key is primary key (unique) and value is the 
partial record, update
  * non-null fields on merge.
@@ -45,9 +49,12 @@ public class PartialUpdateMergeFunction implements 
MergeFunction {
     }
 
     @Override
-    public void add(RowData value) {
+    public void add(KeyValue kv) {
+        checkArgument(
+                kv.valueKind() == RowKind.INSERT || kv.valueKind() == 
RowKind.UPDATE_AFTER,
+                "Partial update can not accept delete records. Partial delete 
is not supported!");
         for (int i = 0; i < getters.length; i++) {
-            Object field = getters[i].getFieldOrNull(value);
+            Object field = getters[i].getFieldOrNull(kv.value());
             if (field != null) {
                 row.setField(i, field);
             }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
index 64112d70..022974ad 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java
@@ -177,7 +177,7 @@ public class SortMergeReader implements 
RecordReader<KeyValue> {
                     break;
                 }
                 minHeap.poll();
-                mergeFunctionHelper.add(element.kv.value());
+                mergeFunctionHelper.add(element.kv);
                 polled.add(element);
             }
             return true;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
index aceddd54..b678a5e7 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.mergetree.compact;
 
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.types.RowKind;
 
 import javax.annotation.Nullable;
 
@@ -41,8 +43,11 @@ public class ValueCountMergeFunction implements 
MergeFunction {
     }
 
     @Override
-    public void add(RowData value) {
-        total += count(value);
+    public void add(KeyValue kv) {
+        checkArgument(
+                kv.valueKind() == RowKind.INSERT,
+                "In value count mode, only insert records come. This is a bug. 
Please file an issue.");
+        total += count(kv.value());
     }
 
     @Override
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelperTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelperTestBase.java
index debc67f3..13886968 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelperTestBase.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelperTestBase.java
@@ -20,8 +20,11 @@ package org.apache.flink.table.store.file.mergetree.compact;
 
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.types.RowKind;
 
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -32,6 +35,7 @@ import java.util.List;
 import java.util.stream.Stream;
 
 import static org.apache.flink.table.store.file.data.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** Tests for {@link MergeFunctionHelper}. */
@@ -51,7 +55,7 @@ public abstract class MergeFunctionHelperTestBase {
     @MethodSource("provideMergedRowData")
     @ParameterizedTest
     public void testMergeFunctionHelper(List<RowData> rows) {
-        rows.forEach(r -> mergeFunctionHelper.add(r));
+        rows.forEach(r -> mergeFunctionHelper.add(new KeyValue().replace(null, 
RowKind.INSERT, r)));
         assertEquals(getExpected(rows), mergeFunctionHelper.getValue());
     }
 
@@ -93,5 +97,15 @@ public abstract class MergeFunctionHelperTestBase {
                 return total == 0 ? null : GenericRowData.of(total);
             }
         }
+
+        @Test
+        public void testIllegalInput() {
+            mergeFunctionHelper.add(new KeyValue().replace(null, 
RowKind.INSERT, row(1)));
+            assertThatThrownBy(
+                            () ->
+                                    mergeFunctionHelper.add(
+                                            new KeyValue().replace(null, 
RowKind.DELETE, row(1))))
+                    .hasMessageContaining("Value count only accept insert only 
records");
+        }
     }
 }

Reply via email to