This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 5d7af498 [FLINK-29621] Append-only with eventual log.consistency can not work 5d7af498 is described below commit 5d7af498785b5a6be2a2bad43b6195da79e6d819 Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Thu Oct 13 19:21:25 2022 +0800 [FLINK-29621] Append-only with eventual log.consistency can not work This closes #318 --- .../table/store/connector/LogSystemITCase.java | 71 ++++++++++++++++++++++ .../table/store/kafka/KafkaLogSinkProvider.java | 4 -- .../table/store/kafka/KafkaLogSourceProvider.java | 4 -- .../flink/table/store/kafka/KafkaLogITCase.java | 16 +---- 4 files changed, 74 insertions(+), 21 deletions(-) diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LogSystemITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LogSystemITCase.java new file mode 100644 index 00000000..02b77fac --- /dev/null +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LogSystemITCase.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector; + +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.store.file.utils.BlockingIterator; +import org.apache.flink.table.store.kafka.KafkaTableTestBase; +import org.apache.flink.types.Row; + +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase for table with log system. */ +public class LogSystemITCase extends KafkaTableTestBase { + + @Before + public void before() throws IOException { + tEnv.executeSql( + String.format( + "CREATE CATALOG TABLE_STORE WITH (" + + "'type'='table-store', 'warehouse'='%s')", + TEMPORARY_FOLDER.newFolder().toURI())); + tEnv.useCatalog("TABLE_STORE"); + } + + @Test + public void testAppendOnlyWithEventual() throws Exception { + createTopicIfNotExists("T", 1); + // disable checkpointing to test eventual + env.getCheckpointConfig().disableCheckpointing(); + env.setParallelism(1); + tEnv.executeSql( + String.format( + "CREATE TABLE T (i INT, j INT) WITH (" + + "'log.system'='kafka', " + + "'write-mode'='append-only', " + + "'log.consistency'='eventual', " + + "'kafka.bootstrap.servers'='%s', " + + "'kafka.topic'='T')", + getBootstrapServers())); + tEnv.executeSql("CREATE TEMPORARY TABLE gen (i INT, j INT) WITH ('connector'='datagen')"); + TableResult write = tEnv.executeSql("INSERT INTO T SELECT * FROM gen"); + BlockingIterator<Row, Row> read = + BlockingIterator.of(tEnv.executeSql("SELECT * FROM T").collect()); + List<Row> collect = read.collect(10); + assertThat(collect).hasSize(10); + write.getJobClient().get().cancel(); + read.close(); + } +} diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java index 7698bcfe..67194214 100644 --- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java +++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java @@ -74,10 +74,6 @@ public class KafkaLogSinkProvider implements LogSinkProvider { semantic = Semantic.EXACTLY_ONCE; break; case EVENTUAL: - if (primaryKeySerializer == null) { - throw new IllegalArgumentException( - "Can not use EVENTUAL consistency mode for non-pk table."); - } semantic = Semantic.AT_LEAST_ONCE; break; default: diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java index 59c0d4ab..c1bcc617 100644 --- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java +++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java @@ -96,10 +96,6 @@ public class KafkaLogSourceProvider implements LogSourceProvider { properties.setProperty(ISOLATION_LEVEL_CONFIG, "read_committed"); break; case EVENTUAL: - if (primaryKeyDeserializer == null) { - throw new IllegalArgumentException( - "Can not use EVENTUAL consistency mode for non-pk table."); - } properties.setProperty(ISOLATION_LEVEL_CONFIG, "read_uncommitted"); break; } diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java index e6f39c90..b098712e 100644 --- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java +++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java @@ -114,22 +114,12 @@ public class KafkaLogITCase extends KafkaTableTestBase { LogConsistency.EVENTUAL, false)); assertThat(exception.getMessage()) - .isEqualTo("Can not use EVENTUAL consistency mode for non-pk table."); + .isEqualTo("Can not use upsert changelog mode for non-pk table."); } @Test - public void testAllEventualNonKeyed() { - IllegalArgumentException exception = - Assertions.assertThrows( - IllegalArgumentException.class, - () -> - innerTest( - "AllEventualNonKeyed", - LogChangelogMode.ALL, - LogConsistency.EVENTUAL, - false)); - assertThat(exception.getMessage()) - .isEqualTo("Can not use EVENTUAL consistency mode for non-pk table."); + public void testAllEventualNonKeyed() throws Exception { + innerTest("AllEventualNonKeyed", LogChangelogMode.ALL, LogConsistency.EVENTUAL, false); } private void innerTest(