This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch pull/19923/head
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0767370306c0966dc6fe217cda642219b472315e
Author: Alexander Preuß <11444089+alp...@users.noreply.github.com>
AuthorDate: Tue Jun 7 10:24:45 2022 +0200

    WIP
---
 .../flink-sql-client-test/pom.xml                  |  12 ++
 .../src/test/java/SqlClientITCase.java             | 129 +++++++++++++++++++++
 .../src/test/resources/log4j2-test.properties      |  34 ++++++
 .../testframe/container/FlinkContainers.java       |   5 +
 4 files changed, 180 insertions(+)

diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml 
b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
index 8ed8004ce65..06d20839527 100644
--- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml
+++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml
@@ -38,6 +38,12 @@ under the License.
                        <scope>provided</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-end-to-end-tests-common</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
                <!-- The following dependencies are for connector/format 
sql-jars that
                        we copy using the maven-dependency-plugin. When 
extending the test
                        to cover more connectors/formats, add a dependency here 
and an entry
@@ -159,6 +165,12 @@ under the License.
                                                                        
<version>${project.version}</version>
                                                                        
<type>jar</type>
                                                                </artifactItem>
+                                                               <artifactItem>
+                                                                       
<groupId>org.apache.flink</groupId>
+                                                                       
<artifactId>flink-test-utils</artifactId>
+                                                                       
<version>${project.version}</version>
+                                                                       
<type>jar</type>
+                                                               </artifactItem>
                                                        </artifactItems>
                                                </configuration>
                                        </execution>
diff --git 
a/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java
 
b/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java
new file mode 100644
index 00000000000..5e7e19683b3
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.connector.upserttest.sink.UpsertTestFileUtil;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.SQLJobSubmission;
+import org.apache.flink.tests.util.flink.container.FlinkContainers;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** E2E Test for SqlClient. */
+public class SqlClientITCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SqlClientITCase.class);
+
+    private static final Path sqlToolBoxJar = 
TestUtils.getResource(".*SqlToolbox.jar");
+
+    private final Path sqlConnectorUpsertTestJar =
+            TestUtils.getResource(".*flink-test-utils.*.jar");
+
+    public static final Network NETWORK = Network.newNetwork();
+
+    public final FlinkContainers flink =
+            FlinkContainers.builder()
+                    .setNumTaskManagers(1)
+                    .setNetwork(NETWORK)
+                    .setLogger(LOG) /*.dependsOn(KAFKA)*/
+                    .build();
+
+    @TempDir private File tempDir;
+
+    @BeforeEach
+    public void setup() throws Exception {
+        flink.start();
+    }
+
+    @AfterEach
+    public void tearDown() {
+        flink.stop();
+    }
+
+    @Test
+    public void testUpsert() throws Exception {
+        String outputFilepath = "/flink/records.out";
+
+        List<String> sqlLines =
+                Arrays.asList(
+                        "SET 'execution.runtime-mode' = 'batch';",
+                        "",
+                        "CREATE TABLE UpsertSinkTable (",
+                        "    user_id INT,",
+                        "    user_name STRING,",
+                        "    user_count BIGINT,",
+                        "    PRIMARY KEY (user_id) NOT ENFORCED",
+                        "  ) WITH (",
+                        "    'connector' = 'upsert-files',",
+                        "    'key.format' = 'json',",
+                        "    'value.format' = 'json',",
+                        "    'output-filepath' = '" + outputFilepath + "'",
+                        "  );",
+                        "",
+                        "INSERT INTO UpsertSinkTable(",
+                        "  SELECT user_id, user_name, COUNT(*) AS user_count",
+                        "  FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), 
(42, 'Kim'), (42, 'Kim'), (1, 'Bob'))",
+                        "    AS UserCountTable(user_id, user_name)",
+                        "  GROUP BY user_id, user_name);");
+        executeSql(sqlLines);
+
+        /*
+        | user_id | user_name | user_count |
+        | ------- | --------- | ---------- |
+        |    1    |    Bob    |     2      |
+        |   22    |    Tom    |     1      |
+        |   42    |    Kim    |     3      |
+        */
+
+        Thread.sleep(5000); // prevent NotFoundException: Status 404
+
+        verifyNumberOfResultRecords(outputFilepath, 3);
+    }
+
+    private void verifyNumberOfResultRecords(String resultFilePath, int 
expectedNumberOfRecords)
+            throws IOException {
+        File tempOutputFile = new File(tempDir, "records.out");
+        String tempOutputFilepath = tempOutputFile.toString();
+        GenericContainer<?> taskManager = flink.getTaskManagers().get(0);
+        taskManager.copyFileFromContainer(resultFilePath, tempOutputFilepath);
+
+        int numberOfResultRecords = 
UpsertTestFileUtil.getNumberOfRecords(tempOutputFile);
+        assertThat(numberOfResultRecords).isEqualTo(expectedNumberOfRecords);
+    }
+
+    private void executeSql(List<String> sqlLines) throws Exception {
+        flink.submitSQLJob(
+                new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
+                        .addJars(sqlConnectorUpsertTestJar, sqlToolBoxJar)
+                        .build());
+    }
+}
diff --git 
a/flink-end-to-end-tests/flink-sql-client-test/src/test/resources/log4j2-test.properties
 
b/flink-end-to-end-tests/flink-sql-client-test/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000000..365eff4286a
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-sql-client-test/src/test/resources/log4j2-test.properties
@@ -0,0 +1,34 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = INFO
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+
+# It is recommended to uncomment these lines when enabling the logger. The 
below package used
+# by testcontainers is quite verbose
+#logger.yarn.name = org.testcontainers.shaded.com.github.dockerjava.core
+#logger.yarn.level = WARN
+#logger.yarn.appenderRef.console.ref = TestLogger
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainers.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainers.java
index b44266bbe8b..67564e1c6a9 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainers.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainers.java
@@ -255,6 +255,11 @@ public class FlinkContainers implements BeforeAllCallback, 
AfterAllCallback {
         return this.jobManager;
     }
 
+    /** Gets TaskManager containers. */
+    public List<GenericContainer<?>> getTaskManagers() {
+        return this.taskManagers;
+    }
+
     /** Gets JobManager's hostname on the host machine. */
     public String getJobManagerHost() {
         return jobManager.getHost();

Reply via email to