This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d7af498 [FLINK-29621] Append-only with eventual log.consistency can 
not work
5d7af498 is described below

commit 5d7af498785b5a6be2a2bad43b6195da79e6d819
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Thu Oct 13 19:21:25 2022 +0800

    [FLINK-29621] Append-only with eventual log.consistency can not work
    
    This closes #318
---
 .../table/store/connector/LogSystemITCase.java     | 71 ++++++++++++++++++++++
 .../table/store/kafka/KafkaLogSinkProvider.java    |  4 --
 .../table/store/kafka/KafkaLogSourceProvider.java  |  4 --
 .../flink/table/store/kafka/KafkaLogITCase.java    | 16 +----
 4 files changed, 74 insertions(+), 21 deletions(-)

diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LogSystemITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LogSystemITCase.java
new file mode 100644
index 00000000..02b77fac
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/LogSystemITCase.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.table.store.kafka.KafkaTableTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for table with log system. */
+public class LogSystemITCase extends KafkaTableTestBase {
+
+    @Before
+    public void before() throws IOException {
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG TABLE_STORE WITH ("
+                                + "'type'='table-store', 'warehouse'='%s')",
+                        TEMPORARY_FOLDER.newFolder().toURI()));
+        tEnv.useCatalog("TABLE_STORE");
+    }
+
+    @Test
+    public void testAppendOnlyWithEventual() throws Exception {
+        createTopicIfNotExists("T", 1);
+        // disable checkpointing to test eventual
+        env.getCheckpointConfig().disableCheckpointing();
+        env.setParallelism(1);
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE T (i INT, j INT) WITH ("
+                                + "'log.system'='kafka', "
+                                + "'write-mode'='append-only', "
+                                + "'log.consistency'='eventual', "
+                                + "'kafka.bootstrap.servers'='%s', "
+                                + "'kafka.topic'='T')",
+                        getBootstrapServers()));
+        tEnv.executeSql("CREATE TEMPORARY TABLE gen (i INT, j INT) WITH 
('connector'='datagen')");
+        TableResult write = tEnv.executeSql("INSERT INTO T SELECT * FROM gen");
+        BlockingIterator<Row, Row> read =
+                BlockingIterator.of(tEnv.executeSql("SELECT * FROM 
T").collect());
+        List<Row> collect = read.collect(10);
+        assertThat(collect).hasSize(10);
+        write.getJobClient().get().cancel();
+        read.close();
+    }
+}
diff --git 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
index 7698bcfe..67194214 100644
--- 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
+++ 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
@@ -74,10 +74,6 @@ public class KafkaLogSinkProvider implements LogSinkProvider 
{
                 semantic = Semantic.EXACTLY_ONCE;
                 break;
             case EVENTUAL:
-                if (primaryKeySerializer == null) {
-                    throw new IllegalArgumentException(
-                            "Can not use EVENTUAL consistency mode for non-pk 
table.");
-                }
                 semantic = Semantic.AT_LEAST_ONCE;
                 break;
             default:
diff --git 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
index 59c0d4ab..c1bcc617 100644
--- 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
+++ 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
@@ -96,10 +96,6 @@ public class KafkaLogSourceProvider implements 
LogSourceProvider {
                 properties.setProperty(ISOLATION_LEVEL_CONFIG, 
"read_committed");
                 break;
             case EVENTUAL:
-                if (primaryKeyDeserializer == null) {
-                    throw new IllegalArgumentException(
-                            "Can not use EVENTUAL consistency mode for non-pk 
table.");
-                }
                 properties.setProperty(ISOLATION_LEVEL_CONFIG, 
"read_uncommitted");
                 break;
         }
diff --git 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
index e6f39c90..b098712e 100644
--- 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
+++ 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
@@ -114,22 +114,12 @@ public class KafkaLogITCase extends KafkaTableTestBase {
                                         LogConsistency.EVENTUAL,
                                         false));
         assertThat(exception.getMessage())
-                .isEqualTo("Can not use EVENTUAL consistency mode for non-pk 
table.");
+                .isEqualTo("Can not use upsert changelog mode for non-pk 
table.");
     }
 
     @Test
-    public void testAllEventualNonKeyed() {
-        IllegalArgumentException exception =
-                Assertions.assertThrows(
-                        IllegalArgumentException.class,
-                        () ->
-                                innerTest(
-                                        "AllEventualNonKeyed",
-                                        LogChangelogMode.ALL,
-                                        LogConsistency.EVENTUAL,
-                                        false));
-        assertThat(exception.getMessage())
-                .isEqualTo("Can not use EVENTUAL consistency mode for non-pk 
table.");
+    public void testAllEventualNonKeyed() throws Exception {
+        innerTest("AllEventualNonKeyed", LogChangelogMode.ALL, 
LogConsistency.EVENTUAL, false);
     }
 
     private void innerTest(

Reply via email to