This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch pull/19923/head in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0767370306c0966dc6fe217cda642219b472315e Author: Alexander Preuß <11444089+alp...@users.noreply.github.com> AuthorDate: Tue Jun 7 10:24:45 2022 +0200 WIP --- .../flink-sql-client-test/pom.xml | 12 ++ .../src/test/java/SqlClientITCase.java | 129 +++++++++++++++++++++ .../src/test/resources/log4j2-test.properties | 34 ++++++ .../testframe/container/FlinkContainers.java | 5 + 4 files changed, 180 insertions(+) diff --git a/flink-end-to-end-tests/flink-sql-client-test/pom.xml b/flink-end-to-end-tests/flink-sql-client-test/pom.xml index 8ed8004ce65..06d20839527 100644 --- a/flink-end-to-end-tests/flink-sql-client-test/pom.xml +++ b/flink-end-to-end-tests/flink-sql-client-test/pom.xml @@ -38,6 +38,12 @@ under the License. <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-end-to-end-tests-common</artifactId> + <version>${project.version}</version> + </dependency> + <!-- The following dependencies are for connector/format sql-jars that we copy using the maven-dependency-plugin. When extending the test to cover more connectors/formats, add a dependency here and an entry @@ -159,6 +165,12 @@ under the License. <version>${project.version}</version> <type>jar</type> </artifactItem> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <type>jar</type> + </artifactItem> </artifactItems> </configuration> </execution> diff --git a/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java b/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java new file mode 100644 index 00000000000..5e7e19683b3 --- /dev/null +++ b/flink-end-to-end-tests/flink-sql-client-test/src/test/java/SqlClientITCase.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.connector.upserttest.sink.UpsertTestFileUtil; +import org.apache.flink.tests.util.TestUtils; +import org.apache.flink.tests.util.flink.SQLJobSubmission; +import org.apache.flink.tests.util.flink.container.FlinkContainers; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** E2E Test for SqlClient. */ +public class SqlClientITCase { + + private static final Logger LOG = LoggerFactory.getLogger(SqlClientITCase.class); + + private static final Path sqlToolBoxJar = TestUtils.getResource(".*SqlToolbox.jar"); + + private final Path sqlConnectorUpsertTestJar = + TestUtils.getResource(".*flink-test-utils.*.jar"); + + public static final Network NETWORK = Network.newNetwork(); + + public final FlinkContainers flink = + FlinkContainers.builder() + .setNumTaskManagers(1) + .setNetwork(NETWORK) + .setLogger(LOG) /*.dependsOn(KAFKA)*/ + .build(); + + @TempDir private File tempDir; + + @BeforeEach + public void setup() throws Exception { + flink.start(); + } + + @AfterEach + public void tearDown() { + flink.stop(); + } + + @Test + public void testUpsert() throws Exception { + String outputFilepath = "/flink/records.out"; + + List<String> sqlLines = + Arrays.asList( + "SET 'execution.runtime-mode' = 'batch';", + "", + "CREATE TABLE UpsertSinkTable (", + " user_id INT,", + " user_name STRING,", + " user_count BIGINT,", + " PRIMARY KEY (user_id) NOT ENFORCED", + " ) WITH (", + " 'connector' = 'upsert-files',", + " 'key.format' = 'json',", + " 'value.format' = 'json',", + " 'output-filepath' = '" + outputFilepath + "'", + " );", + "", + "INSERT INTO UpsertSinkTable(", + " SELECT user_id, user_name, COUNT(*) AS user_count", + " FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 'Kim'), (1, 'Bob'))", + " AS UserCountTable(user_id, user_name)", + " GROUP BY user_id, user_name);"); + executeSql(sqlLines); + + /* + | user_id | user_name | user_count | + | ------- | --------- | ---------- | + | 1 | Bob | 2 | + | 22 | Tom | 1 | + | 42 | Kim | 3 | + */ + + Thread.sleep(5000); // prevent NotFoundException: Status 404 + + verifyNumberOfResultRecords(outputFilepath, 3); + } + + private void verifyNumberOfResultRecords(String resultFilePath, int expectedNumberOfRecords) + throws IOException { + File tempOutputFile = new File(tempDir, "records.out"); + String tempOutputFilepath = tempOutputFile.toString(); + GenericContainer<?> taskManager = flink.getTaskManagers().get(0); + taskManager.copyFileFromContainer(resultFilePath, tempOutputFilepath); + + int numberOfResultRecords = UpsertTestFileUtil.getNumberOfRecords(tempOutputFile); + assertThat(numberOfResultRecords).isEqualTo(expectedNumberOfRecords); + } + + private void executeSql(List<String> sqlLines) throws Exception { + flink.submitSQLJob( + new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) + .addJars(sqlConnectorUpsertTestJar, sqlToolBoxJar) + .build()); + } +} diff --git a/flink-end-to-end-tests/flink-sql-client-test/src/test/resources/log4j2-test.properties b/flink-end-to-end-tests/flink-sql-client-test/src/test/resources/log4j2-test.properties new file mode 100644 index 00000000000..365eff4286a --- /dev/null +++ b/flink-end-to-end-tests/flink-sql-client-test/src/test/resources/log4j2-test.properties @@ -0,0 +1,34 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = INFO +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n + +# It is recommended to uncomment these lines when enabling the logger. The below package used +# by testcontainers is quite verbose +#logger.yarn.name = org.testcontainers.shaded.com.github.dockerjava.core +#logger.yarn.level = WARN +#logger.yarn.appenderRef.console.ref = TestLogger diff --git a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainers.java b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainers.java index b44266bbe8b..67564e1c6a9 100644 --- a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainers.java +++ b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/container/FlinkContainers.java @@ -255,6 +255,11 @@ public class FlinkContainers implements BeforeAllCallback, AfterAllCallback { return this.jobManager; } + /** Gets TaskManager containers. */ + public List<GenericContainer<?>> getTaskManagers() { + return this.taskManagers; + } + /** Gets JobManager's hostname on the host machine. */ public String getJobManagerHost() { return jobManager.getHost();