This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git
commit 7b5d717b609ca68cd8e812773585c2c295946619 Author: Gyula Fora <gyf...@apache.org> AuthorDate: Thu Sep 24 20:16:10 2020 +0200 [BAHIR-241] Upgrade all connectors to Flink 1.11 (#99) Co-authored-by: Gyula Fora <gy...@cloudera.com> --- flink-connector-kudu/pom.xml | 32 ++++--- .../kudu/connector/reader/KuduReader.java | 2 +- .../kudu/connector/writer/KuduWriter.java | 2 +- .../connectors/kudu/table/KuduTableFactory.java | 55 +++--------- .../flink/connectors/kudu/table/KuduTableSink.java | 3 - ...utFormatTest.java => KuduOutputFormatTest.java} | 4 +- .../connectors/kudu/streaming/KuduSinkTest.java | 2 +- .../connectors/kudu/table/KuduCatalogTest.java | 98 +++++++++++++--------- .../kudu/table/KuduTableFactoryTest.java | 89 +++++++++++--------- .../kudu/table/KuduTableSourceITCase.java | 16 ++-- .../connectors/kudu/table/KuduTableSourceTest.java | 5 +- .../connectors/kudu/table/KuduTableTestUtils.java | 2 +- .../{log4j.properties => log4j2-test.properties} | 17 ++-- 13 files changed, 170 insertions(+), 157 deletions(-) diff --git a/flink-connector-kudu/pom.xml b/flink-connector-kudu/pom.xml index bbf168e..a76102e 100644 --- a/flink-connector-kudu/pom.xml +++ b/flink-connector-kudu/pom.xml @@ -30,7 +30,7 @@ <packaging>jar</packaging> <properties> - <kudu.version>1.11.1</kudu.version> + <kudu.version>1.13.0</kudu.version> <mockito.version>1.10.19</mockito.version> </properties> @@ -79,6 +79,13 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <!-- this is added because test cluster use @Rule from junit4 --> <dependency> <groupId>org.junit.jupiter</groupId> @@ -95,17 +102,22 @@ </dependency> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>${slf4j.version}</version> - <scope>runtime</scope> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>${log4j2.version}</version> + <scope>test</scope> </dependency> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <version>${log4j.version}</version> - <scope>runtime</scope> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>${log4j2.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> + <version>${log4j2.version}</version> + <scope>test</scope> </dependency> </dependencies> diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java index 51ab748..d7a0c61 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/reader/KuduReader.java @@ -82,7 +82,7 @@ public class KuduReader implements AutoCloseable { if (tableInfo.getCreateTableIfNotExists()) { return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions()); } - throw new UnsupportedOperationException("table not exists and is marketed to not be created"); + throw new RuntimeException("Table " + tableName + " does not exist."); } public KuduReaderIterator scanner(byte[] token) throws IOException { diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java index 7233478..03c37ea 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/writer/KuduWriter.java @@ -83,7 +83,7 @@ public class KuduWriter<T> implements AutoCloseable { if (tableInfo.getCreateTableIfNotExists()) { return client.createTable(tableName, tableInfo.getSchema(), tableInfo.getCreateTableOptions()); } - throw new UnsupportedOperationException("table not exists and is marketed to not be created"); + throw new RuntimeException("Table " + tableName + " does not exist."); } public void write(T input) throws IOException { diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java index eb72205..1961aad 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableFactory.java @@ -30,7 +30,6 @@ import org.apache.flink.table.descriptors.SchemaValidator; import org.apache.flink.table.factories.TableSinkFactory; import org.apache.flink.table.factories.TableSourceFactory; import org.apache.flink.types.Row; -import static org.apache.flink.util.Preconditions.checkNotNull; import java.util.ArrayList; import java.util.HashMap; @@ -38,25 +37,10 @@ import java.util.List; import java.util.Map; import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.TABLE_SCHEMA_EXPR; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; +import static org.apache.flink.table.descriptors.DescriptorProperties.*; +import static org.apache.flink.table.descriptors.Rowtime.*; +import static org.apache.flink.table.descriptors.Schema.*; +import static org.apache.flink.util.Preconditions.checkNotNull; public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Tuple2<Boolean, Row>> { @@ -87,7 +71,7 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto properties.add(SCHEMA + ".#." + SCHEMA_NAME); properties.add(SCHEMA + ".#." + SCHEMA_FROM); // computed column - properties.add(SCHEMA + ".#." + TABLE_SCHEMA_EXPR); + properties.add(SCHEMA + ".#." + EXPR); // time attributes properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME); @@ -107,26 +91,20 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto return properties; } - private DescriptorProperties getValidatedProps(Map<String, String> properties) { + private DescriptorProperties validateTable(CatalogTable table) { + Map<String, String> properties = table.toProperties(); checkNotNull(properties.get(KUDU_MASTERS), "Missing required property " + KUDU_MASTERS); - checkNotNull(properties.get(KUDU_TABLE), "Missing required property " + KUDU_TABLE); + final DescriptorProperties descriptorProperties = new DescriptorProperties(true); descriptorProperties.putProperties(properties); new SchemaValidator(true, false, false).validate(descriptorProperties); return descriptorProperties; } - @Override - public KuduTableSource createTableSource(Map<String, String> properties) { - DescriptorProperties descriptorProperties = getValidatedProps(properties); - String tableName = descriptorProperties.getString(KUDU_TABLE); - TableSchema schema = descriptorProperties.getTableSchema(SCHEMA); - return createTableSource(tableName, schema, properties); - } - @Override public KuduTableSource createTableSource(ObjectPath tablePath, CatalogTable table) { - String tableName = tablePath.getObjectName(); + validateTable(table); + String tableName = table.toProperties().getOrDefault(KUDU_TABLE, tablePath.getObjectName()); return createTableSource(tableName, table.getSchema(), table.getProperties()); } @@ -141,18 +119,11 @@ public class KuduTableFactory implements TableSourceFactory<Row>, TableSinkFacto return new KuduTableSource(configBuilder, tableInfo, physicalSchema, null, null); } - @Override - public KuduTableSink createTableSink(Map<String, String> properties) { - DescriptorProperties descriptorProperties = getValidatedProps(properties); - String tableName = descriptorProperties.getString(KUDU_TABLE); - TableSchema schema = descriptorProperties.getTableSchema(SCHEMA); - - return createTableSink(tableName, schema, properties); - } - @Override public KuduTableSink createTableSink(ObjectPath tablePath, CatalogTable table) { - return createTableSink(tablePath.getObjectName(), table.getSchema(), table.getProperties()); + validateTable(table); + String tableName = table.toProperties().getOrDefault(KUDU_TABLE, tablePath.getObjectName()); + return createTableSink(tableName, table.getSchema(), table.getProperties()); } private KuduTableSink createTableSink(String tableName, TableSchema schema, Map<String, String> props) { diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java index 8ff517a..99325c0 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduTableSink.java @@ -51,9 +51,6 @@ public class KuduTableSink implements UpsertStreamTableSink<Row> { @Override public TypeInformation<Row> getRecordType() { return flinkSchema.toRowType(); } - @Override - public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStreamTuple) { consumeDataStream(dataStreamTuple); } - @Override public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStreamTuple) { KuduSink upsertKuduSink = new KuduSink(writerConfigBuilder.build(), tableInfo, new UpsertOperationMapper(getTableSchema().getFieldNames())); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java similarity index 96% rename from flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java rename to flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java index abf8a30..22fa0a4 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOuputFormatTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/batch/KuduOutputFormatTest.java @@ -29,7 +29,7 @@ import org.junit.jupiter.api.Test; import java.util.List; import java.util.UUID; -class KuduOuputFormatTest extends KuduTestBase { +class KuduOutputFormatTest extends KuduTestBase { @Test void testInvalidKuduMaster() { @@ -50,7 +50,7 @@ class KuduOuputFormatTest extends KuduTestBase { KuduTableInfo tableInfo = booksTableInfo(UUID.randomUUID().toString(), false); KuduWriterConfig writerConfig = KuduWriterConfig.Builder.setMasters(masterAddresses).build(); KuduOutputFormat<Row> outputFormat = new KuduOutputFormat<>(writerConfig, tableInfo, new RowOperationMapper(KuduTestBase.columns, AbstractSingleOperationMapper.KuduOperation.INSERT)); - Assertions.assertThrows(UnsupportedOperationException.class, () -> outputFormat.open(0, 1)); + Assertions.assertThrows(RuntimeException.class, () -> outputFormat.open(0, 1)); } @Test diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java index 077306d..4d74fda 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/streaming/KuduSinkTest.java @@ -73,7 +73,7 @@ public class KuduSinkTest extends KuduTestBase { KuduSink<Row> sink = new KuduSink<>(writerConfig, tableInfo, new RowOperationMapper(columns, AbstractSingleOperationMapper.KuduOperation.INSERT)); sink.setRuntimeContext(context); - Assertions.assertThrows(UnsupportedOperationException.class, () -> sink.open(new Configuration())); + Assertions.assertThrows(RuntimeException.class, () -> sink.open(new Configuration())); } @Test diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java index 3cef2d8..4bb1871 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduCatalogTest.java @@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; @@ -52,6 +52,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertFalse; import static org.junit.jupiter.api.Assertions.*; @@ -72,78 +73,89 @@ public class KuduCatalogTest extends KuduTestBase { @Test public void testCreateAlterDrop() throws Exception { - tableEnv.sqlUpdate("CREATE TABLE TestTable1 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')"); - tableEnv.sqlUpdate("INSERT INTO TestTable1 VALUES ('f', 's')"); + tableEnv.executeSql("CREATE TABLE TestTable1 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')"); + tableEnv.executeSql("INSERT INTO TestTable1 VALUES ('f', 's')") + .getJobClient() + .get() + .getJobExecutionResult(getClass().getClassLoader()) + .get(1, TimeUnit.MINUTES); // Add this once Primary key support has been enabled // tableEnv.sqlUpdate("CREATE TABLE TestTable2 (`first` STRING, `second` String, PRIMARY KEY(`first`)) WITH ('kudu.hash-columns' = 'first')"); // tableEnv.sqlUpdate("INSERT INTO TestTable2 VALUES ('f', 's')"); - tableEnv.execute("test"); validateSingleKey("TestTable1"); // validateSingleKey("TestTable2"); - tableEnv.sqlUpdate("ALTER TABLE TestTable1 RENAME TO TestTable1R"); + tableEnv.executeSql("ALTER TABLE TestTable1 RENAME TO TestTable1R"); validateSingleKey("TestTable1R"); - tableEnv.sqlUpdate("DROP TABLE TestTable1R"); + tableEnv.executeSql("DROP TABLE TestTable1R"); assertFalse(harness.getClient().tableExists("TestTable1R")); } @Test public void testCreateAndInsertMultiKey() throws Exception { - tableEnv.sqlUpdate("CREATE TABLE TestTable3 (`first` STRING, `second` INT, third STRING) WITH ('kudu.hash-columns' = 'first,second', 'kudu.primary-key-columns' = 'first,second')"); - tableEnv.sqlUpdate("INSERT INTO TestTable3 VALUES ('f', 2, 't')"); - - // Add this once Primary key support has been enabled - // tableEnv.sqlUpdate("CREATE TABLE TestTable4 (`first` STRING, `second` INT, `third` STRING) PRIMARY KEY (`first`, `second`) WITH ('kudu.hash-columns' = 'first,second')"); - // tableEnv.sqlUpdate("INSERT INTO TestTable4 VALUES ('f', 2, 't')"); - - tableEnv.execute("test"); + tableEnv.executeSql("CREATE TABLE TestTable3 (`first` STRING, `second` INT, third STRING) WITH ('kudu.hash-columns' = 'first,second', 'kudu.primary-key-columns' = 'first,second')"); + tableEnv.executeSql("INSERT INTO TestTable3 VALUES ('f', 2, 't')") + .getJobClient() + .get() + .getJobExecutionResult(getClass().getClassLoader()) + .get(1, TimeUnit.MINUTES); validateMultiKey("TestTable3"); - // validateMultiKey("TestTable4"); } @Test public void testSourceProjection() throws Exception { - tableEnv.sqlUpdate("CREATE TABLE TestTable5 (`second` String, `first` STRING, `third` String) WITH ('kudu.hash-columns' = 'second', 'kudu.primary-key-columns' = 'second')"); - tableEnv.sqlUpdate("INSERT INTO TestTable5 VALUES ('s', 'f', 't')"); - tableEnv.execute("test"); - - tableEnv.sqlUpdate("CREATE TABLE TestTable6 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')"); - tableEnv.sqlUpdate("INSERT INTO TestTable6 (SELECT `first`, `second` FROM TestTable5)"); - tableEnv.execute("test"); + tableEnv.executeSql("CREATE TABLE TestTable5 (`second` String, `first` STRING, `third` String) WITH ('kudu.hash-columns' = 'second', 'kudu.primary-key-columns' = 'second')"); + tableEnv.executeSql("INSERT INTO TestTable5 VALUES ('s', 'f', 't')") + .getJobClient() + .get() + .getJobExecutionResult(getClass().getClassLoader()) + .get(1, TimeUnit.MINUTES); + + tableEnv.executeSql("CREATE TABLE TestTable6 (`first` STRING, `second` String) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')"); + tableEnv.executeSql("INSERT INTO TestTable6 (SELECT `first`, `second` FROM TestTable5)") + .getJobClient() + .get() + .getJobExecutionResult(getClass().getClassLoader()) + .get(1, TimeUnit.MINUTES); validateSingleKey("TestTable6"); } @Test public void testEmptyProjection() throws Exception { - tableEnv.sqlUpdate("CREATE TABLE TestTableEP (`first` STRING, `second` STRING) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')"); - tableEnv.sqlUpdate("INSERT INTO TestTableEP VALUES ('f','s')"); - tableEnv.sqlUpdate("INSERT INTO TestTableEP VALUES ('f2','s2')"); - tableEnv.execute("test"); + CollectionSink.output.clear(); + tableEnv.executeSql("CREATE TABLE TestTableEP (`first` STRING, `second` STRING) WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')"); + tableEnv.executeSql("INSERT INTO TestTableEP VALUES ('f','s')") + .getJobClient() + .get() + .getJobExecutionResult(getClass().getClassLoader()) + .get(1, TimeUnit.MINUTES); + tableEnv.executeSql("INSERT INTO TestTableEP VALUES ('f2','s2')") + .getJobClient() + .get() + .getJobExecutionResult(getClass().getClassLoader()) + .get(1, TimeUnit.MINUTES); Table result = tableEnv.sqlQuery("SELECT COUNT(*) FROM TestTableEP"); DataStream<Tuple2<Boolean, Row>> resultDataStream = tableEnv.toRetractStream(result, Types.ROW(Types.LONG)); - CollectionSink.output.clear(); - resultDataStream .map(t -> Tuple2.of(t.f0, t.f1.getField(0))) .returns(Types.TUPLE(Types.BOOLEAN, Types.LONG)) .addSink(new CollectionSink<>()).setParallelism(1); - tableEnv.execute("test"); + resultDataStream.getExecutionEnvironment().execute(); List<Tuple2<Boolean, Long>> expected = Lists.newArrayList(Tuple2.of(true, 1L), Tuple2.of(false, 1L), Tuple2.of(true, 2L)); assertEquals(new HashSet<>(expected), new HashSet<>(CollectionSink.output)); CollectionSink.output.clear(); - } @Test @@ -208,10 +220,13 @@ public class KuduCatalogTest extends KuduTestBase { @Test public void testTimestamp() throws Exception { - tableEnv.sqlUpdate("CREATE TABLE TestTableTsC (`first` STRING, `second` TIMESTAMP(3)) " + + tableEnv.executeSql("CREATE TABLE TestTableTsC (`first` STRING, `second` TIMESTAMP(3)) " + "WITH ('kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')"); - tableEnv.sqlUpdate("INSERT INTO TestTableTsC values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')"); - tableEnv.execute("test"); + tableEnv.executeSql("INSERT INTO TestTableTsC values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')") + .getJobClient() + .get() + .getJobExecutionResult(getClass().getClassLoader()) + .get(1, TimeUnit.MINUTES); KuduTable kuduTable = harness.getClient().openTable("TestTableTsC"); assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType()); @@ -227,29 +242,32 @@ public class KuduCatalogTest extends KuduTestBase { @Test public void testDatatypes() throws Exception { - tableEnv.sqlUpdate("CREATE TABLE TestTable8 (`first` STRING, `second` BOOLEAN, `third` BYTES," + + tableEnv.executeSql("CREATE TABLE TestTable8 (`first` STRING, `second` BOOLEAN, `third` BYTES," + "`fourth` TINYINT, `fifth` SMALLINT, `sixth` INT, `seventh` BIGINT, `eighth` FLOAT, `ninth` DOUBLE, " + "`tenth` TIMESTAMP)" + "WITH ('kudu.hash-columns' = 'first', 'kudu.primary-key-columns' = 'first')"); - tableEnv.sqlUpdate("INSERT INTO TestTable8 values ('f', false, cast('bbbb' as BYTES), cast(12 as TINYINT)," + + tableEnv.executeSql("INSERT INTO TestTable8 values ('f', false, cast('bbbb' as BYTES), cast(12 as TINYINT)," + "cast(34 as SMALLINT), 56, cast(78 as BIGINT), cast(3.14 as FLOAT), cast(1.2345 as DOUBLE)," + - "TIMESTAMP '2020-04-15 12:34:56.123') "); + "TIMESTAMP '2020-04-15 12:34:56.123') ") + .getJobClient() + .get() + .getJobExecutionResult(getClass().getClassLoader()) + .get(1, TimeUnit.MINUTES); - tableEnv.execute("test"); validateManyTypes("TestTable8"); } @Test public void testMissingPropertiesCatalog() throws Exception { assertThrows(TableException.class, - () -> tableEnv.sqlUpdate("CREATE TABLE TestTable9a (`first` STRING, `second` String) " + + () -> tableEnv.executeSql("CREATE TABLE TestTable9a (`first` STRING, `second` String) " + "WITH ('kudu.primary-key-columns' = 'second')")); assertThrows(TableException.class, - () -> tableEnv.sqlUpdate("CREATE TABLE TestTable9b (`first` STRING, `second` String) " + + () -> tableEnv.executeSql("CREATE TABLE TestTable9b (`first` STRING, `second` String) " + "WITH ('kudu.hash-columns' = 'first')")); assertThrows(TableException.class, - () -> tableEnv.sqlUpdate("CREATE TABLE TestTable9b (`first` STRING, `second` String) " + + () -> tableEnv.executeSql("CREATE TABLE TestTable9b (`first` STRING, `second` String) " + "WITH ('kudu.primary-key-columns' = 'second', 'kudu.hash-columns' = 'first')")); } diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java index 3a2f776..d852f8e 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableFactoryTest.java @@ -17,8 +17,11 @@ package org.apache.flink.connectors.kudu.table; import org.apache.flink.connectors.kudu.connector.KuduTestBase; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.kudu.Type; import org.apache.kudu.client.KuduScanner; import org.apache.kudu.client.KuduTable; @@ -31,9 +34,10 @@ import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.*; public class KuduTableFactoryTest extends KuduTestBase { private StreamTableEnvironment tableEnv; @@ -46,57 +50,60 @@ public class KuduTableFactoryTest extends KuduTestBase { kuduMasters = harness.getMasterAddressesAsString(); } - @Test - public void testMissingTable() throws Exception { - tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " + - "WITH ('connector.type'='kudu', 'kudu.masters'='" + kuduMasters + "')"); - assertThrows(NullPointerException.class, - () -> tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)")); - } - @Test public void testMissingMasters() throws Exception { - tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " + + tableEnv.executeSql("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " + "WITH ('connector.type'='kudu', 'kudu.table'='TestTable11')"); - assertThrows(NullPointerException.class, - () -> tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)")); + assertThrows(TableException.class, + () -> tableEnv.executeSql("INSERT INTO TestTable11 values ('f', 1)")); } @Test public void testNonExistingTable() throws Exception { - tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " + + tableEnv.executeSql("CREATE TABLE TestTable11 (`first` STRING, `second` INT) " + "WITH ('connector.type'='kudu', 'kudu.table'='TestTable11', 'kudu.masters'='" + kuduMasters + "')"); - tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 1)"); - assertThrows(java.util.concurrent.ExecutionException.class, - () -> tableEnv.execute("test")); + JobClient jobClient = tableEnv.executeSql("INSERT INTO TestTable11 values ('f', 1)").getJobClient().get(); + try { + jobClient.getJobExecutionResult(getClass().getClassLoader()).get(); + fail(); + } catch (ExecutionException ee) { + assertTrue(ee.getCause() instanceof JobExecutionException); + } } @Test public void testCreateTable() throws Exception { - tableEnv.sqlUpdate("CREATE TABLE TestTable11 (`first` STRING, `second` STRING) " + + tableEnv.executeSql("CREATE TABLE TestTable11 (`first` STRING, `second` STRING) " + "WITH ('connector.type'='kudu', 'kudu.table'='TestTable11', 'kudu.masters'='" + kuduMasters + "', " + "'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')"); - tableEnv.sqlUpdate("INSERT INTO TestTable11 values ('f', 's')"); - tableEnv.execute("test"); + + tableEnv.executeSql("INSERT INTO TestTable11 values ('f', 's')") + .getJobClient() + .get() + .getJobExecutionResult(getClass().getClassLoader()) + .get(1, TimeUnit.MINUTES); validateSingleKey("TestTable11"); } + @Test public void testTimestamp() throws Exception { // Timestamp should be bridged to sql.Timestamp // Test it when creating the table... - tableEnv.sqlUpdate("CREATE TABLE TestTableTs (`first` STRING, `second` TIMESTAMP(3)) " + - "WITH ('connector.type'='kudu', 'kudu.table'='TestTableTs', 'kudu.masters'='" + kuduMasters + "', " + + tableEnv.executeSql("CREATE TABLE TestTableTs (`first` STRING, `second` TIMESTAMP(3)) " + + "WITH ('connector.type'='kudu', 'kudu.masters'='" + kuduMasters + "', " + "'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')"); - tableEnv.sqlUpdate("INSERT INTO TestTableTs values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')"); - tableEnv.execute("test"); - - // And also when inserting into existing table - tableEnv.sqlUpdate("CREATE TABLE TestTableTsE (`first` STRING, `second` TIMESTAMP(3)) " + - "WITH ('connector.type'='kudu', 'kudu.table'='TestTableTs', 'kudu.masters'='" + kuduMasters + "')"); - - tableEnv.sqlUpdate("INSERT INTO TestTableTsE values ('s', TIMESTAMP '2020-02-02 23:23:23')"); - tableEnv.execute("test"); + tableEnv.executeSql("INSERT INTO TestTableTs values ('f', TIMESTAMP '2020-01-01 12:12:12.123456')") + .getJobClient() + .get() + .getJobExecutionResult(getClass().getClassLoader()) + .get(1, TimeUnit.MINUTES); + + tableEnv.executeSql("INSERT INTO TestTableTs values ('s', TIMESTAMP '2020-02-02 23:23:23')") + .getJobClient() + .get() + .getJobExecutionResult(getClass().getClassLoader()) + .get(1, TimeUnit.MINUTES); KuduTable kuduTable = harness.getClient().openTable("TestTableTs"); assertEquals(Type.UNIXTIME_MICROS, kuduTable.getSchema().getColumn("second").getType()); @@ -115,18 +122,24 @@ public class KuduTableFactoryTest extends KuduTestBase { @Test public void testExistingTable() throws Exception { // Creating a table - tableEnv.sqlUpdate("CREATE TABLE TestTable12 (`first` STRING, `second` STRING) " + + tableEnv.executeSql("CREATE TABLE TestTable12 (`first` STRING, `second` STRING) " + "WITH ('connector.type'='kudu', 'kudu.table'='TestTable12', 'kudu.masters'='" + kuduMasters + "', " + "'kudu.hash-columns'='first', 'kudu.primary-key-columns'='first')"); - tableEnv.sqlUpdate("INSERT INTO TestTable12 values ('f', 's')"); - tableEnv.execute("test"); + tableEnv.executeSql("INSERT INTO TestTable12 values ('f', 's')") + .getJobClient() + .get() + .getJobExecutionResult(getClass().getClassLoader()) + .get(1, TimeUnit.MINUTES); // Then another one in SQL that refers to the previously created one - tableEnv.sqlUpdate("CREATE TABLE TestTable12b (`first` STRING, `second` STRING) " + + tableEnv.executeSql("CREATE TABLE TestTable12b (`first` STRING, `second` STRING) " + "WITH ('connector.type'='kudu', 'kudu.table'='TestTable12', 'kudu.masters'='" + kuduMasters + "')"); - tableEnv.sqlUpdate("INSERT INTO TestTable12b values ('f2','s2')"); - tableEnv.execute("test2"); + tableEnv.executeSql("INSERT INTO TestTable12b values ('f2','s2')") + .getJobClient() + .get() + .getJobExecutionResult(getClass().getClassLoader()) + .get(1, TimeUnit.MINUTES); // Validate that both insertions were into the same table KuduTable kuduTable = harness.getClient().openTable("TestTable12"); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java index f5939fc..b83497c 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceITCase.java @@ -18,16 +18,16 @@ package org.apache.flink.connectors.kudu.table; import org.apache.flink.connectors.kudu.connector.KuduTableInfo; import org.apache.flink.connectors.kudu.connector.KuduTestBase; -import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableUtils; import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.util.ArrayList; import java.util.List; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; /** * Integration tests for {@link KuduTableSource}. @@ -48,8 +48,9 @@ public class KuduTableSourceITCase extends KuduTestBase { @Test void testFullBatchScan() throws Exception { - Table query = tableEnv.sqlQuery("select * from books order by id"); - List<Row> results = TableUtils.collectToList(query); + CloseableIterator<Row> it = tableEnv.executeSql("select * from books order by id").collect(); + List<Row> results = new ArrayList<>(); + it.forEachRemaining(results::add); assertEquals(5, results.size()); assertEquals("1001,Java for dummies,Tan Ah Teck,11.11,11", results.get(0).toString()); tableEnv.sqlUpdate("DROP TABLE books"); @@ -58,8 +59,9 @@ public class KuduTableSourceITCase extends KuduTestBase { @Test void testScanWithProjectionAndFilter() throws Exception { // (price > 30 and price < 40) - Table table = tableEnv.sqlQuery("SELECT title FROM books WHERE id IN (1003, 1004) and quantity < 40"); - List<Row> results = TableUtils.collectToList(table); + CloseableIterator<Row> it = tableEnv.executeSql("SELECT title FROM books WHERE id IN (1003, 1004) and quantity < 40").collect(); + List<Row> results = new ArrayList<>(); + it.forEachRemaining(results::add); assertEquals(1, results.size()); assertEquals("More Java for more dummies", results.get(0).toString()); tableEnv.sqlUpdate("DROP TABLE books"); diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java index f4bb6ae..2dfb71b 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableSourceTest.java @@ -121,8 +121,7 @@ public class KuduTableSourceTest extends KuduTestBase { // expressions for supported predicates FieldReferenceExpression fieldReferenceExpression = new FieldReferenceExpression( "id", DataTypes.INT(), 0, 0); - ValueLiteralExpression valueLiteralExpression = new ValueLiteralExpression( - 1, DataTypes.INT()); + ValueLiteralExpression valueLiteralExpression = new ValueLiteralExpression(1); List<ResolvedExpression> args = new ArrayList<>( Arrays.asList(fieldReferenceExpression, valueLiteralExpression)); Expression supportedPred = new CallExpression( @@ -132,7 +131,7 @@ public class KuduTableSourceTest extends KuduTestBase { // unsupported predicate Expression unsupportedPred = new CallExpression( new ScalarFunctionDefinition("dummy", DUMMY_FUNCTION), - singletonList(new ValueLiteralExpression(1, DataTypes.INT())), + singletonList(new ValueLiteralExpression(1)), DataTypes.INT()); // invalid predicate Expression invalidPred = new CallExpression( diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java index 54854fb..4eae7bf 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/KuduTableTestUtils.java @@ -19,7 +19,7 @@ package org.apache.flink.connectors.kudu.table; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM; diff --git a/flink-connector-kudu/src/test/resources/log4j.properties b/flink-connector-kudu/src/test/resources/log4j2-test.properties similarity index 70% rename from flink-connector-kudu/src/test/resources/log4j.properties rename to flink-connector-kudu/src/test/resources/log4j2-test.properties index 15efe08..e463a0e 100644 --- a/flink-connector-kudu/src/test/resources/log4j.properties +++ b/flink-connector-kudu/src/test/resources/log4j2-test.properties @@ -16,12 +16,13 @@ # limitations under the License. ################################################################################ -# This file ensures that tests executed from the IDE show log output +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = INFO +rootLogger.appenderRef.test.ref = TestLogger -log4j.rootLogger=WARN, console - -# Log all infos in the given file -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.target=System.err -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n