This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git
commit 2e75a60111bd48d670e94b8b6fc1b2eb52fd70fe Author: Balazs Varga <bva...@cloudera.com> AuthorDate: Tue Jan 3 18:07:55 2023 +0100 [BAHIR-321] Make KuduFilterInfo handle String literals --- .../connectors/kudu/connector/KuduFilterInfo.java | 4 +- .../function/lookup/KuduRowDataLookupFunction.java | 8 +-- .../kudu/connector/KuduFilterInfoTest.java | 41 ++++++++++++++++ .../connectors/kudu/connector/KuduTestBase.java | 2 +- .../kudu/table/dynamic/KuduDynamicSourceTest.java | 57 ++++++++++++++++++++++ 5 files changed, 107 insertions(+), 5 deletions(-) diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java index 08fa86b..e7a8d16 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java @@ -17,6 +17,7 @@ package org.apache.flink.connectors.kudu.connector; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.data.binary.BinaryStringData; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; @@ -65,7 +66,8 @@ public class KuduFilterInfo implements Serializable { switch (column.getType()) { case STRING: - predicate = KuduPredicate.newComparisonPredicate(column, comparison, (String) this.value); + predicate = KuduPredicate.newComparisonPredicate(column, comparison, + (this.value instanceof BinaryStringData) ? this.value.toString() : (String) this.value); break; case FLOAT: predicate = KuduPredicate.newComparisonPredicate(column, comparison, (float) this.value); diff --git a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java index 4a4a952..d54f23a 100644 --- a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java +++ b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java @@ -178,9 +178,11 @@ public class KuduRowDataLookupFunction extends TableFunction<RowData> { if (null != this.kuduReader) { try { this.kuduReader.close(); - this.cache.cleanUp(); - // help gc - this.cache = null; + if (cache != null) { + this.cache.cleanUp(); + // help gc + this.cache = null; + } this.kuduReader = null; } catch (IOException e) { // ignore exception when close. diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfoTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfoTest.java new file mode 100644 index 0000000..a6c2ff7 --- /dev/null +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfoTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.connectors.kudu.connector; + +import org.apache.flink.table.data.binary.BinaryStringData; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Type; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +public class KuduFilterInfoTest { + + @Test + void testKuduFilterInfoWithBinaryStringData() { + String filterValue = "someValue"; + + KuduFilterInfo kuduFilterInfo = KuduFilterInfo.Builder.create("col") + .equalTo(BinaryStringData.fromString(filterValue)) + .build(); + + ColumnSchema colSchema = new ColumnSchema.ColumnSchemaBuilder("col", Type.STRING).build(); + assertDoesNotThrow(() -> kuduFilterInfo.toPredicate(colSchema)); + } + +} diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java index 90b0746..bcc9b2d 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java @@ -261,7 +261,7 @@ public class KuduTestBase { }); kuduWriter.close(); } catch (Exception e) { - Assertions.fail(); + Assertions.fail(e.getMessage()); } } diff --git a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSourceTest.java b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSourceTest.java index 2cab282..377ac2d 100644 --- a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSourceTest.java +++ b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSourceTest.java @@ -161,4 +161,61 @@ public class KuduDynamicSourceTest extends KuduTestBase { .collect(Collectors.toList()); assertEquals(1, result.size()); } + + @Test + public void testLookupJoin() { + tEnv.executeSql( + "CREATE TABLE " + + INPUT_TABLE + + "(" + + "id int," + + "title string," + + "author string," + + "price double," + + "quantity int" + + ") WITH (" + + " 'connector'='kudu'," + + " 'kudu.masters'='" + + getMasterAddress() + + "'," + + " 'kudu.table'='" + + INPUT_TABLE + + "'," + + "'kudu.scan.row-size'='10'," + + "'kudu.primary-key-columns'='id'" + + ")"); + + tEnv.executeSql( + "CREATE TABLE datagen" + + "(" + + "id int," + + "isbn string," + + "proctime as PROCTIME()" + + ") WITH (" + + " 'connector'='datagen'," + + " 'number-of-rows'='5'," + + " 'fields.id.kind'='sequence'," + + " 'fields.isbn.kind'='sequence'," + + " 'fields.id.start'='1001'," + + " 'fields.isbn.start'='1'," + + " 'fields.id.end'='1005'," + + " 'fields.isbn.end'='5'" + + ")"); + + Iterator<Row> collected = + tEnv.executeSql("SELECT d.id, isbn, title FROM datagen as d" + + " JOIN " + INPUT_TABLE + + " FOR SYSTEM_TIME AS OF d.proctime AS k" + + " ON d.id=k.id" + + " WHERE k.title='Java for dummies'") + .collect(); + assertNotNull(collected); + List<String> result = + CollectionUtil.iteratorToList(collected).stream() + .map(Row::toString) + .sorted() + .collect(Collectors.toList()); + assertEquals(1, result.size()); + assertEquals("+I[1001, 1, Java for dummies]", result.get(0)); + } }