This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new 724e17c [BAHIR-321] Make KuduFilterInfo handle String literals
724e17c is described below
commit 724e17c4481fb12c1ab14608dd04b974d6f5e090
Author: Balazs Varga <[email protected]>
AuthorDate: Tue Jan 3 18:07:55 2023 +0100
[BAHIR-321] Make KuduFilterInfo handle String literals
---
.../connectors/kudu/connector/KuduFilterInfo.java | 4 +-
.../function/lookup/KuduRowDataLookupFunction.java | 8 +--
.../kudu/connector/KuduFilterInfoTest.java | 41 ++++++++++++++++
.../connectors/kudu/connector/KuduTestBase.java | 2 +-
.../kudu/table/dynamic/KuduDynamicSourceTest.java | 57 ++++++++++++++++++++++
5 files changed, 107 insertions(+), 5 deletions(-)
diff --git
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
index 08fa86b..e7a8d16 100644
---
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
+++
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
@@ -17,6 +17,7 @@
package org.apache.flink.connectors.kudu.connector;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
@@ -65,7 +66,8 @@ public class KuduFilterInfo implements Serializable {
switch (column.getType()) {
case STRING:
- predicate = KuduPredicate.newComparisonPredicate(column,
comparison, (String) this.value);
+ predicate = KuduPredicate.newComparisonPredicate(column,
comparison,
+ (this.value instanceof BinaryStringData) ?
this.value.toString() : (String) this.value);
break;
case FLOAT:
predicate = KuduPredicate.newComparisonPredicate(column,
comparison, (float) this.value);
diff --git
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
index 4a4a952..d54f23a 100644
---
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
+++
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
@@ -178,9 +178,11 @@ public class KuduRowDataLookupFunction extends
TableFunction<RowData> {
if (null != this.kuduReader) {
try {
this.kuduReader.close();
- this.cache.cleanUp();
- // help gc
- this.cache = null;
+ if (cache != null) {
+ this.cache.cleanUp();
+ // help gc
+ this.cache = null;
+ }
this.kuduReader = null;
} catch (IOException e) {
// ignore exception when close.
diff --git
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfoTest.java
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfoTest.java
new file mode 100644
index 0000000..a6c2ff7
--- /dev/null
+++
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfoTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector;
+
+import org.apache.flink.table.data.binary.BinaryStringData;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Type;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+public class KuduFilterInfoTest {
+
+ @Test
+ void testKuduFilterInfoWithBinaryStringData() {
+ String filterValue = "someValue";
+
+ KuduFilterInfo kuduFilterInfo = KuduFilterInfo.Builder.create("col")
+ .equalTo(BinaryStringData.fromString(filterValue))
+ .build();
+
+ ColumnSchema colSchema = new ColumnSchema.ColumnSchemaBuilder("col",
Type.STRING).build();
+ assertDoesNotThrow(() -> kuduFilterInfo.toPredicate(colSchema));
+ }
+
+}
diff --git
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
index 90b0746..bcc9b2d 100644
---
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
+++
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
@@ -261,7 +261,7 @@ public class KuduTestBase {
});
kuduWriter.close();
} catch (Exception e) {
- Assertions.fail();
+ Assertions.fail(e.getMessage());
}
}
diff --git
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSourceTest.java
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSourceTest.java
index 2cab282..377ac2d 100644
---
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSourceTest.java
+++
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSourceTest.java
@@ -161,4 +161,61 @@ public class KuduDynamicSourceTest extends KuduTestBase {
.collect(Collectors.toList());
assertEquals(1, result.size());
}
+
+ @Test
+ public void testLookupJoin() {
+ tEnv.executeSql(
+ "CREATE TABLE "
+ + INPUT_TABLE
+ + "("
+ + "id int,"
+ + "title string,"
+ + "author string,"
+ + "price double,"
+ + "quantity int"
+ + ") WITH ("
+ + " 'connector'='kudu',"
+ + " 'kudu.masters'='"
+ + getMasterAddress()
+ + "',"
+ + " 'kudu.table'='"
+ + INPUT_TABLE
+ + "',"
+ + "'kudu.scan.row-size'='10'," +
+ "'kudu.primary-key-columns'='id'"
+ + ")");
+
+ tEnv.executeSql(
+ "CREATE TABLE datagen"
+ + "("
+ + "id int,"
+ + "isbn string,"
+ + "proctime as PROCTIME()"
+ + ") WITH ("
+ + " 'connector'='datagen',"
+ + " 'number-of-rows'='5',"
+ + " 'fields.id.kind'='sequence',"
+ + " 'fields.isbn.kind'='sequence',"
+ + " 'fields.id.start'='1001',"
+ + " 'fields.isbn.start'='1',"
+ + " 'fields.id.end'='1005',"
+ + " 'fields.isbn.end'='5'"
+ + ")");
+
+ Iterator<Row> collected =
+ tEnv.executeSql("SELECT d.id, isbn, title FROM datagen as d"
+ + " JOIN " + INPUT_TABLE
+ + " FOR SYSTEM_TIME AS OF d.proctime AS k"
+ + " ON d.id=k.id"
+ + " WHERE k.title='Java for dummies'")
+ .collect();
+ assertNotNull(collected);
+ List<String> result =
+ CollectionUtil.iteratorToList(collected).stream()
+ .map(Row::toString)
+ .sorted()
+ .collect(Collectors.toList());
+ assertEquals(1, result.size());
+ assertEquals("+I[1001, 1, Java for dummies]", result.get(0));
+ }
}