This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.1 in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.1 by this push: new b7ae760 [FLINK-26457] Introduce PartialUpdateMergeFunction b7ae760 is described below commit b7ae760200c500f2277e266e17b2cb271dcec805 Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Fri Apr 22 09:55:15 2022 +0800 [FLINK-26457] Introduce PartialUpdateMergeFunction This closes #98 --- docs/content/docs/development/create-table.md | 35 ++++++++ .../flink/table/store/connector/TableStore.java | 32 +++++++- .../store/connector/ContinuousFileStoreITCase.java | 38 ++------- .../store/connector/FileStoreTableITCase.java | 61 ++++++++++++++ .../table/store/connector/PartialUpdateITCase.java | 94 ++++++++++++++++++++++ .../flink/table/store/file/FileStoreOptions.java | 43 ++++++++++ .../compact/DeduplicateMergeFunction.java | 2 + ...nction.java => PartialUpdateMergeFunction.java} | 31 +++++-- .../mergetree/compact/ValueCountMergeFunction.java | 2 + 9 files changed, 298 insertions(+), 40 deletions(-) diff --git a/docs/content/docs/development/create-table.md b/docs/content/docs/development/create-table.md index 8b94f4f..95f5e87 100644 --- a/docs/content/docs/development/create-table.md +++ b/docs/content/docs/development/create-table.md @@ -233,3 +233,38 @@ The two methods do not behave in the same way when querying. Use approach one if you have a large number of filtered queries with only `user_id`, and use approach two if you have a large number of filtered queries with only `catalog_id`. + +## Partial Update + +You can configure partial update from options: + +```sql +CREATE TABLE MyTable ( + product_id BIGINT, + price DOUBLE, + number BIGINT, + detail STRING, + PRIMARY KEY (product_id) NOT ENFORCED +) WITH ( + 'merge-engine' = 'partial-update' +); +``` + +{{< hint info >}} +__Note:__ Partial update is only supported for table with primary key. +{{< /hint >}} + +{{< hint info >}} +__Note:__ Partial update is not supported for streaming consuming. +{{< /hint >}} + +The value fields are updated to the latest data one by one +under the same primary key, but null values are not overwritten. + +For example, the inputs: +- <1, 23.0, 10, NULL> +- <1, NULL, 20, 'This is a book'> +- <1, 25.2, NULL, NULL> + +Output: +- <1, 25.2, 20, 'This is a book'> diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java index 37315c4..c17a309 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java @@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogLock; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.Projection; @@ -44,8 +45,10 @@ import org.apache.flink.table.store.connector.source.LogHybridSourceFactory; import org.apache.flink.table.store.connector.source.StaticFileStoreSplitEnumerator; import org.apache.flink.table.store.file.FileStore; import org.apache.flink.table.store.file.FileStoreImpl; +import org.apache.flink.table.store.file.FileStoreOptions.MergeEngine; import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction; import org.apache.flink.table.store.file.mergetree.compact.MergeFunction; +import org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction; import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction; import org.apache.flink.table.store.file.predicate.Predicate; import org.apache.flink.table.store.log.LogOptions.LogStartupMode; @@ -68,6 +71,8 @@ import java.util.stream.Collectors; import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET; import static org.apache.flink.table.store.file.FileStoreOptions.CONTINUOUS_DISCOVERY_INTERVAL; +import static org.apache.flink.table.store.file.FileStoreOptions.MERGE_ENGINE; +import static org.apache.flink.table.store.file.FileStoreOptions.MergeEngine.PARTIAL_UPDATE; import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX; import static org.apache.flink.table.store.log.LogOptions.SCAN; @@ -164,6 +169,10 @@ public class TableStore { return new SinkBuilder(); } + private MergeEngine mergeEngine() { + return options.get(MERGE_ENGINE); + } + private FileStore buildFileStore() { RowType partitionType = TypeUtils.project(type, partitions); RowType keyType; @@ -190,7 +199,23 @@ public class TableStore { f.getDescription().orElse(null))) .collect(Collectors.toList())); valueType = type; - mergeFunction = new DeduplicateMergeFunction(); + + switch (mergeEngine()) { + case DEDUPLICATE: + mergeFunction = new DeduplicateMergeFunction(); + break; + case PARTIAL_UPDATE: + List<LogicalType> fieldTypes = type.getChildren(); + RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.size()]; + for (int i = 0; i < fieldTypes.size(); i++) { + fieldGetters[i] = RowData.createFieldGetter(fieldTypes.get(i), i); + } + mergeFunction = new PartialUpdateMergeFunction(fieldGetters); + break; + default: + throw new UnsupportedOperationException( + "Unsupported merge engine: " + mergeEngine()); + } } return new FileStoreImpl( tableIdentifier, options, user, partitionType, keyType, valueType, mergeFunction); @@ -300,6 +325,11 @@ public class TableStore { private Source<RowData, ?, ?> buildSource() { if (isContinuous) { + if (primaryKeys.length > 0 && mergeEngine() == PARTIAL_UPDATE) { + throw new ValidationException( + "Partial update continuous reading is not supported."); + } + LogStartupMode startupMode = logOptions().get(SCAN); if (logSourceProvider == null) { return buildFileSource(true, startupMode == LogStartupMode.LATEST); diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java index f873e8e..0be5e18 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java @@ -18,50 +18,26 @@ package org.apache.flink.table.store.connector; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.store.file.utils.BlockingIterator; -import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.types.Row; -import org.junit.Before; import org.junit.Test; -import java.io.IOException; -import java.time.Duration; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; -import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL; -import static org.apache.flink.table.store.file.FileStoreOptions.PATH; -import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** SQL ITCase for continuous file store. */ -public class ContinuousFileStoreITCase extends AbstractTestBase { - - private TableEnvironment bEnv; - private TableEnvironment sEnv; - - @Before - public void before() throws IOException { - bEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build()); - sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build()); - sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100)); - String path = TEMPORARY_FOLDER.newFolder().toURI().toString(); - prepareEnv(bEnv, path); - prepareEnv(sEnv, path); - } +public class ContinuousFileStoreITCase extends FileStoreTableITCase { - private void prepareEnv(TableEnvironment env, String path) { - Configuration config = env.getConfig().getConfiguration(); - config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); - config.setString(TABLE_STORE_PREFIX + PATH.key(), path); - env.executeSql("CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)"); - env.executeSql( + @Override + protected List<String> ddl() { + return Arrays.asList( + "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)", "CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)"); } diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java new file mode 100644 index 0000000..2fd5092 --- /dev/null +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Before; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; + +import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL; +import static org.apache.flink.table.store.file.FileStoreOptions.PATH; +import static org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX; + +/** ITCase for file store table api. */ +public abstract class FileStoreTableITCase extends AbstractTestBase { + + protected TableEnvironment bEnv; + protected TableEnvironment sEnv; + + @Before + public void before() throws IOException { + bEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build()); + sEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build()); + sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, Duration.ofMillis(100)); + String path = TEMPORARY_FOLDER.newFolder().toURI().toString(); + prepareEnv(bEnv, path); + prepareEnv(sEnv, path); + } + + private void prepareEnv(TableEnvironment env, String path) { + Configuration config = env.getConfig().getConfiguration(); + config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + config.setString(TABLE_STORE_PREFIX + PATH.key(), path); + ddl().forEach(env::executeSql); + } + + protected abstract List<String> ddl(); +} diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java new file mode 100644 index 0000000..7d4094a --- /dev/null +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector; + +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.util.CollectionUtil.iteratorToList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** ITCase for partial update. */ +public class PartialUpdateITCase extends FileStoreTableITCase { + + @Override + protected List<String> ddl() { + return Collections.singletonList( + "CREATE TABLE IF NOT EXISTS T (" + + "j INT, k INT, a INT, b INT, c STRING, PRIMARY KEY (j,k) NOT ENFORCED)" + + " WITH ('merge-engine'='partial-update');"); + } + + @Test + public void testMergeInMemory() throws ExecutionException, InterruptedException { + bEnv.executeSql( + "INSERT INTO T VALUES " + + "(1, 2, 3, CAST(NULL AS INT), '5'), " + + "(1, 2, CAST(NULL AS INT), 6, CAST(NULL AS STRING))") + .await(); + List<Row> result = iteratorToList(bEnv.from("T").execute().collect()); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 3, 6, "5")); + } + + @Test + public void testMergeRead() throws ExecutionException, InterruptedException { + bEnv.executeSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT), CAST(NULL AS STRING))") + .await(); + bEnv.executeSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS STRING))").await(); + bEnv.executeSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT), '6')").await(); + + List<Row> result = iteratorToList(bEnv.from("T").execute().collect()); + assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 4, 5, "6")); + } + + @Test + public void testMergeCompaction() throws ExecutionException, InterruptedException { + // Wait compaction + bEnv.executeSql("ALTER TABLE T SET ('commit.force-compact'='true')"); + + // key 1 2 + bEnv.executeSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT), CAST(NULL AS STRING))") + .await(); + bEnv.executeSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS STRING))").await(); + bEnv.executeSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT), '6')").await(); + + // key 1 3 + bEnv.executeSql("INSERT INTO T VALUES (1, 3, CAST(NULL AS INT), 1, '1')").await(); + bEnv.executeSql("INSERT INTO T VALUES (1, 3, 2, 3, CAST(NULL AS STRING))").await(); + bEnv.executeSql("INSERT INTO T VALUES (1, 3, CAST(NULL AS INT), 4, CAST(NULL AS STRING))") + .await(); + + List<Row> result = iteratorToList(bEnv.from("T").execute().collect()); + assertThat(result) + .containsExactlyInAnyOrder(Row.of(1, 2, 4, 5, "6"), Row.of(1, 3, 2, 4, "1")); + } + + @Test + public void testStreamingRead() { + assertThatThrownBy( + () -> sEnv.from("T").execute().print(), + "Partial update continuous reading is not supported"); + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java index 0b91c9b..eb8c908 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java @@ -21,7 +21,10 @@ package org.apache.flink.table.store.file; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DescribedEnum; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.description.Description; +import org.apache.flink.configuration.description.InlineElement; import org.apache.flink.core.fs.Path; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.store.file.format.FileFormat; @@ -35,6 +38,8 @@ import java.util.Map; import java.util.Set; import static org.apache.flink.configuration.ConfigOptions.key; +import static org.apache.flink.configuration.description.TextElement.text; +import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption; /** Options for {@link FileStore}. */ public class FileStoreOptions implements Serializable { @@ -111,6 +116,19 @@ public class FileStoreOptions implements Serializable { .defaultValue(Duration.ofSeconds(1)) .withDescription("The discovery interval of continuous reading."); + public static final ConfigOption<MergeEngine> MERGE_ENGINE = + ConfigOptions.key("merge-engine") + .enumType(MergeEngine.class) + .defaultValue(MergeEngine.DEDUPLICATE) + .withDescription( + Description.builder() + .text("Specifies the merge engine for table with primary key.") + .linebreak() + .list( + formatEnumOption(MergeEngine.DEDUPLICATE), + formatEnumOption(MergeEngine.PARTIAL_UPDATE)) + .build()); + private final Configuration options; public static Set<ConfigOption<?>> allOptions() { @@ -207,4 +225,29 @@ public class FileStoreOptions implements Serializable { public int manifestMergeMinCount() { return options.get(MANIFEST_MERGE_MIN_COUNT); } + + /** Specifies the merge engine for table with primary key. */ + public enum MergeEngine implements DescribedEnum { + DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), + + PARTIAL_UPDATE("partial-update", "Partial update non-null fields."); + + private final String value; + private final String description; + + MergeEngine(String value, String description) { + this.value = value; + this.description = description; + } + + @Override + public String toString() { + return value; + } + + @Override + public InlineElement getDescription() { + return text(description); + } + } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java index b9befdf..fb9af1e 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java @@ -28,6 +28,8 @@ import javax.annotation.Nullable; */ public class DeduplicateMergeFunction implements MergeFunction { + private static final long serialVersionUID = 1L; + private RowData latestValue; @Override diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java similarity index 60% copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java index b9befdf..7ba7717 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java @@ -18,36 +18,51 @@ package org.apache.flink.table.store.file.mergetree.compact; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import javax.annotation.Nullable; /** - * A {@link MergeFunction} where key is primary key (unique) and value is the full record, only keep - * the latest one. + * A {@link MergeFunction} where key is primary key (unique) and value is the partial record, update + * non-null fields on merge. */ -public class DeduplicateMergeFunction implements MergeFunction { +public class PartialUpdateMergeFunction implements MergeFunction { - private RowData latestValue; + private static final long serialVersionUID = 1L; + + private final RowData.FieldGetter[] getters; + + private transient GenericRowData row; + + public PartialUpdateMergeFunction(RowData.FieldGetter[] getters) { + this.getters = getters; + } @Override public void reset() { - latestValue = null; + this.row = new GenericRowData(getters.length); } @Override public void add(RowData value) { - latestValue = value; + for (int i = 0; i < getters.length; i++) { + Object field = getters[i].getFieldOrNull(value); + if (field != null) { + row.setField(i, field); + } + } } @Override @Nullable public RowData getValue() { - return latestValue; + return row; } @Override public MergeFunction copy() { - return new DeduplicateMergeFunction(); + // RowData.FieldGetter is thread safe + return new PartialUpdateMergeFunction(getters); } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java index b62157a..aceddd5 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java @@ -31,6 +31,8 @@ import static org.apache.flink.util.Preconditions.checkArgument; */ public class ValueCountMergeFunction implements MergeFunction { + private static final long serialVersionUID = 1L; + private long total; @Override