This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git

commit 2e75a60111bd48d670e94b8b6fc1b2eb52fd70fe
Author: Balazs Varga <bva...@cloudera.com>
AuthorDate: Tue Jan 3 18:07:55 2023 +0100

    [BAHIR-321] Make KuduFilterInfo handle String literals
---
 .../connectors/kudu/connector/KuduFilterInfo.java  |  4 +-
 .../function/lookup/KuduRowDataLookupFunction.java |  8 +--
 .../kudu/connector/KuduFilterInfoTest.java         | 41 ++++++++++++++++
 .../connectors/kudu/connector/KuduTestBase.java    |  2 +-
 .../kudu/table/dynamic/KuduDynamicSourceTest.java  | 57 ++++++++++++++++++++++
 5 files changed, 107 insertions(+), 5 deletions(-)

diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
index 08fa86b..e7a8d16 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfo.java
@@ -17,6 +17,7 @@
 package org.apache.flink.connectors.kudu.connector;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.binary.BinaryStringData;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
@@ -65,7 +66,8 @@ public class KuduFilterInfo implements Serializable {
 
         switch (column.getType()) {
             case STRING:
-                predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (String) this.value);
+                predicate = KuduPredicate.newComparisonPredicate(column, 
comparison,
+                        (this.value instanceof BinaryStringData) ? 
this.value.toString() : (String) this.value);
                 break;
             case FLOAT:
                 predicate = KuduPredicate.newComparisonPredicate(column, 
comparison, (float) this.value);
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
index 4a4a952..d54f23a 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/function/lookup/KuduRowDataLookupFunction.java
@@ -178,9 +178,11 @@ public class KuduRowDataLookupFunction extends 
TableFunction<RowData> {
         if (null != this.kuduReader) {
             try {
                 this.kuduReader.close();
-                this.cache.cleanUp();
-                // help gc
-                this.cache = null;
+                if (cache != null) {
+                    this.cache.cleanUp();
+                    // help gc
+                    this.cache = null;
+                }
                 this.kuduReader = null;
             } catch (IOException e) {
                 // ignore exception when close.
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfoTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfoTest.java
new file mode 100644
index 0000000..a6c2ff7
--- /dev/null
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduFilterInfoTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.connectors.kudu.connector;
+
+import org.apache.flink.table.data.binary.BinaryStringData;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Type;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+public class KuduFilterInfoTest {
+
+    @Test
+    void testKuduFilterInfoWithBinaryStringData() {
+        String filterValue = "someValue";
+
+        KuduFilterInfo kuduFilterInfo = KuduFilterInfo.Builder.create("col")
+                .equalTo(BinaryStringData.fromString(filterValue))
+                .build();
+
+        ColumnSchema colSchema = new ColumnSchema.ColumnSchemaBuilder("col", 
Type.STRING).build();
+        assertDoesNotThrow(() -> kuduFilterInfo.toPredicate(colSchema));
+    }
+
+}
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
index 90b0746..bcc9b2d 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/connector/KuduTestBase.java
@@ -261,7 +261,7 @@ public class KuduTestBase {
             });
             kuduWriter.close();
         } catch (Exception e) {
-            Assertions.fail();
+            Assertions.fail(e.getMessage());
         }
     }
 
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSourceTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSourceTest.java
index 2cab282..377ac2d 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSourceTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicSourceTest.java
@@ -161,4 +161,61 @@ public class KuduDynamicSourceTest extends KuduTestBase {
                         .collect(Collectors.toList());
         assertEquals(1, result.size());
     }
+
+    @Test
+    public void testLookupJoin() {
+        tEnv.executeSql(
+                "CREATE TABLE "
+                        + INPUT_TABLE
+                        + "("
+                        + "id int,"
+                        + "title string,"
+                        + "author string,"
+                        + "price double,"
+                        + "quantity int"
+                        + ") WITH ("
+                        + "  'connector'='kudu',"
+                        + "  'kudu.masters'='"
+                        + getMasterAddress()
+                        + "',"
+                        + "  'kudu.table'='"
+                        + INPUT_TABLE
+                        + "',"
+                        + "'kudu.scan.row-size'='10'," +
+                        "'kudu.primary-key-columns'='id'"
+                        + ")");
+
+        tEnv.executeSql(
+                "CREATE TABLE datagen"
+                        + "("
+                        + "id int,"
+                        + "isbn string,"
+                        + "proctime as PROCTIME()"
+                        + ") WITH ("
+                        + "  'connector'='datagen',"
+                        + "  'number-of-rows'='5',"
+                        + "  'fields.id.kind'='sequence',"
+                        + "  'fields.isbn.kind'='sequence',"
+                        + "  'fields.id.start'='1001',"
+                        + "  'fields.isbn.start'='1',"
+                        + "  'fields.id.end'='1005',"
+                        + "  'fields.isbn.end'='5'"
+                        + ")");
+
+        Iterator<Row> collected =
+                tEnv.executeSql("SELECT d.id, isbn, title FROM datagen as d"
+                                + " JOIN " + INPUT_TABLE
+                                + " FOR SYSTEM_TIME AS OF d.proctime AS k"
+                                + " ON d.id=k.id"
+                                + " WHERE k.title='Java for dummies'")
+                        .collect();
+        assertNotNull(collected);
+        List<String> result =
+                CollectionUtil.iteratorToList(collected).stream()
+                        .map(Row::toString)
+                        .sorted()
+                        .collect(Collectors.toList());
+        assertEquals(1, result.size());
+        assertEquals("+I[1001, 1, Java for dummies]", result.get(0));
+    }
 }

Reply via email to