This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 3b149792 [FLINK-29189] Pass KeyValue to MergeFunction 3b149792 is described below commit 3b149792a9b106a748fe60d787c14daf55635d43 Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Tue Sep 6 10:23:56 2022 +0800 [FLINK-29189] Pass KeyValue to MergeFunction This closes #286 --- .../store/file/mergetree/SortBufferMemTable.java | 4 +-- .../compact/DeduplicateMergeFunction.java | 5 ++-- .../file/mergetree/compact/MergeFunction.java | 4 +-- .../mergetree/compact/MergeFunctionHelper.java | 34 ++++++++++------------ .../compact/PartialUpdateMergeFunction.java | 11 +++++-- .../file/mergetree/compact/SortMergeReader.java | 2 +- .../mergetree/compact/ValueCountMergeFunction.java | 9 ++++-- .../compact/MergeFunctionHelperTestBase.java | 16 +++++++++- 8 files changed, 55 insertions(+), 30 deletions(-) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java index 87186888..baa512c0 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java @@ -172,7 +172,7 @@ public class SortBufferMemTable implements MemTable { return; } mergeFunctionHelper.reset(); - mergeFunctionHelper.add(previous.getReusedKv().value()); + mergeFunctionHelper.add(previous.getReusedKv()); while (readOnce()) { if (keyComparator.compare( @@ -180,7 +180,7 @@ public class SortBufferMemTable implements MemTable { != 0) { break; } - mergeFunctionHelper.add(current.getReusedKv().value()); + mergeFunctionHelper.add(current.getReusedKv()); swapSerializers(); } result = mergeFunctionHelper.getValue(); diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java index fb9af1ef..12f3e050 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java @@ -19,6 +19,7 @@ package org.apache.flink.table.store.file.mergetree.compact; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.KeyValue; import javax.annotation.Nullable; @@ -38,8 +39,8 @@ public class DeduplicateMergeFunction implements MergeFunction { } @Override - public void add(RowData value) { - latestValue = value; + public void add(KeyValue kv) { + latestValue = kv.value(); } @Override diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunction.java index cc1c7136..78d44716 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunction.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunction.java @@ -31,8 +31,8 @@ public interface MergeFunction extends Serializable { /** Reset the merge function to its default state. */ void reset(); - /** Add the given {@link RowData} to the merge function. */ - void add(RowData value); + /** Add the given {@link KeyValue} to the merge function. */ + void add(KeyValue kv); /** Get current merged value. Return null if this merged result should be skipped. */ @Nullable diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelper.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelper.java index f3ae6192..0631351d 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelper.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelper.java @@ -19,48 +19,46 @@ package org.apache.flink.table.store.file.mergetree.compact; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.KeyValue; /** Helper functions for the interaction with {@link MergeFunction}. */ public class MergeFunctionHelper { private final MergeFunction mergeFunction; - private RowData rowData; + private KeyValue initialKV; private boolean isInitialized; public MergeFunctionHelper(MergeFunction mergeFunction) { this.mergeFunction = mergeFunction; } - /** - * Resets the {@link MergeFunction} helper to its default state: 1. Clears the one record which - * the helper maintains. 2. Resets the {@link MergeFunction} to its default state. 3. Clears the - * initialized state of the {@link MergeFunction}. - */ + /** Resets the {@link MergeFunction} helper to its default state. */ public void reset() { - rowData = null; + initialKV = null; mergeFunction.reset(); isInitialized = false; } - /** Adds the given {@link RowData} to the {@link MergeFunction} helper. */ - public void add(RowData value) { - if (rowData == null) { - rowData = value; + /** Adds the given {@link KeyValue} to the {@link MergeFunction} helper. */ + public void add(KeyValue kv) { + if (initialKV == null) { + initialKV = kv; } else { if (!isInitialized) { - mergeFunction.add(rowData); + merge(initialKV); isInitialized = true; } - mergeFunction.add(value); + merge(kv); } } - /** - * Get current value of the {@link MergeFunction} helper. Return null if the value should be - * skipped. - */ + protected void merge(KeyValue kv) { + mergeFunction.add(kv); + } + + /** Get current value of the {@link MergeFunction} helper. */ public RowData getValue() { - return isInitialized ? mergeFunction.getValue() : rowData; + return isInitialized ? mergeFunction.getValue() : initialKV.value(); } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java index 7ba77177..f93455cd 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java @@ -20,9 +20,13 @@ package org.apache.flink.table.store.file.mergetree.compact; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.types.RowKind; import javax.annotation.Nullable; +import static org.apache.flink.util.Preconditions.checkArgument; + /** * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update * non-null fields on merge. @@ -45,9 +49,12 @@ public class PartialUpdateMergeFunction implements MergeFunction { } @Override - public void add(RowData value) { + public void add(KeyValue kv) { + checkArgument( + kv.valueKind() == RowKind.INSERT || kv.valueKind() == RowKind.UPDATE_AFTER, + "Partial update can not accept delete records. Partial delete is not supported!"); for (int i = 0; i < getters.length; i++) { - Object field = getters[i].getFieldOrNull(value); + Object field = getters[i].getFieldOrNull(kv.value()); if (field != null) { row.setField(i, field); } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java index 64112d70..022974ad 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/SortMergeReader.java @@ -177,7 +177,7 @@ public class SortMergeReader implements RecordReader<KeyValue> { break; } minHeap.poll(); - mergeFunctionHelper.add(element.kv.value()); + mergeFunctionHelper.add(element.kv); polled.add(element); } return true; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java index aceddd54..b678a5e7 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java @@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.mergetree.compact; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.types.RowKind; import javax.annotation.Nullable; @@ -41,8 +43,11 @@ public class ValueCountMergeFunction implements MergeFunction { } @Override - public void add(RowData value) { - total += count(value); + public void add(KeyValue kv) { + checkArgument( + kv.valueKind() == RowKind.INSERT, + "In value count mode, only insert records come. This is a bug. Please file an issue."); + total += count(kv.value()); } @Override diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelperTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelperTestBase.java index debc67f3..13886968 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelperTestBase.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionHelperTestBase.java @@ -20,8 +20,11 @@ package org.apache.flink.table.store.file.mergetree.compact; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.types.RowKind; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -32,6 +35,7 @@ import java.util.List; import java.util.stream.Stream; import static org.apache.flink.table.store.file.data.DataFileTestUtils.row; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertEquals; /** Tests for {@link MergeFunctionHelper}. */ @@ -51,7 +55,7 @@ public abstract class MergeFunctionHelperTestBase { @MethodSource("provideMergedRowData") @ParameterizedTest public void testMergeFunctionHelper(List<RowData> rows) { - rows.forEach(r -> mergeFunctionHelper.add(r)); + rows.forEach(r -> mergeFunctionHelper.add(new KeyValue().replace(null, RowKind.INSERT, r))); assertEquals(getExpected(rows), mergeFunctionHelper.getValue()); } @@ -93,5 +97,15 @@ public abstract class MergeFunctionHelperTestBase { return total == 0 ? null : GenericRowData.of(total); } } + + @Test + public void testIllegalInput() { + mergeFunctionHelper.add(new KeyValue().replace(null, RowKind.INSERT, row(1))); + assertThatThrownBy( + () -> + mergeFunctionHelper.add( + new KeyValue().replace(null, RowKind.DELETE, row(1)))) + .hasMessageContaining("Value count only accept insert only records"); + } } }