This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 8d64d419 [bugfix]Fix a mismatch between the type of jdbc query result
and the datatype (#514)
8d64d419 is described below
commit 8d64d419a45e5fd438b8ec38adf2ccc1b130bfb7
Author: Zhumengze <[email protected]>
AuthorDate: Wed Nov 20 10:26:09 2024 +0800
[bugfix]Fix a mismatch between the type of jdbc query result and the
datatype (#514)
The value type obtained by rs.getObject(index + 1) is inconsistent with the
field type obtained by flink Context, resulting in a conversion failure in
encapsulating flink RowData. For example, the java type corresponding to
TINYINT is Java.lang. Byte, while the java type obtained by rs.getObject(index
+ 1) is Java.lang. Integer, which has a type conversion problem. SMALLINT had
the same problem.
---
.../java/org/apache/doris/flink/lookup/Worker.java | 7 +-
.../doris/flink/lookup/DorisLookupTableITCase.java | 141 +++++++++++++++++++++
2 files changed, 146 insertions(+), 2 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
index e4e08f7e..59cfd85d 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
@@ -18,6 +18,7 @@
package org.apache.doris.flink.lookup;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.types.DataType;
import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisOptions;
@@ -200,8 +201,10 @@ public class Worker implements Runnable {
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
Record record = new Record(schema);
- for (int index = 0; index <
schema.getFieldTypes().length; index++) {
- record.setObject(index, rs.getObject(index +
1));
+ DataType[] fieldTypes = schema.getFieldTypes();
+ for (int index = 0; index < fieldTypes.length;
index++) {
+ Class<?> conversionClass =
fieldTypes[index].getConversionClass();
+ record.setObject(index, rs.getObject(index +
1, conversionClass));
}
List<Record> records =
resultRecordMap.computeIfAbsent(
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
new file mode 100644
index 00000000..6d569bcf
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/lookup/DorisLookupTableITCase.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.lookup;
+
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.apache.doris.flink.container.AbstractITCaseService;
+import org.apache.doris.flink.container.ContainerUtils;
+import org.apache.doris.flink.table.DorisConfigOptions;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class DorisLookupTableITCase extends AbstractITCaseService {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DorisLookupTableITCase.class);
+ private static final String DATABASE = "test_lookup";
+ private static final String TABLE_READ_TBL = "tbl_read_tbl";
+
+ @Test
+ public void testLookupTable() throws Exception {
+ initializeTable();
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ DataStreamSource<Integer> sourceStream = env.fromElements(1, 2, 3, 4);
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+ Schema schema =
+ Schema.newBuilder()
+ .column("f0", DataTypes.INT())
+ .columnByExpression("proctime", "PROCTIME()")
+ .build();
+ Table table = tEnv.fromDataStream(sourceStream, schema);
+ tEnv.createTemporaryView("source", table);
+
+ String lookupDDL =
+ String.format(
+ "CREATE TABLE `doris_lookup`("
+ + " `id` INTEGER,"
+ + " `tinyintColumn` TINYINT,"
+ + " `smallintColumn` SMALLINT,"
+ + " `bigintColumn` BIGINT,"
+ + " PRIMARY KEY (`id`) NOT ENFORCED"
+ + ") WITH ("
+ + "'connector' = '"
+ + DorisConfigOptions.IDENTIFIER
+ + "',"
+ + "'fenodes' = '%s',"
+ + "'jdbc-url' = '%s',"
+ + "'table.identifier' = '%s',"
+ + "'username' = '%s',"
+ + "'password' = '%s',"
+ + "'lookup.cache.max-rows' = '100'"
+ + ")",
+ getFenodes(),
+ getDorisQueryUrl(),
+ DATABASE + "." + TABLE_READ_TBL,
+ getDorisUsername(),
+ getDorisPassword());
+ tEnv.executeSql(lookupDDL);
+ TableResult tableResult =
+ tEnv.executeSql(
+ "select source.f0,"
+ + "tinyintColumn,"
+ + "smallintColumn,"
+ + "bigintColumn"
+ + " from `source`"
+ + " inner join `doris_lookup` FOR SYSTEM_TIME
AS OF source.proctime on source.f0 = doris_lookup.id");
+
+ List<String> actual = new ArrayList<>();
+ try (CloseableIterator<Row> iterator = tableResult.collect()) {
+ while (iterator.hasNext()) {
+ actual.add(iterator.next().toString());
+ }
+ }
+
+ String[] expected =
+ new String[] {
+ "+I[1, 97, 27479, 8670353564751764000]",
+ "+I[2, 79, 17119, -4381380624467725000]",
+ "+I[3, -106, -14878, 1466614815449373200]"
+ };
+ assertEqualsInAnyOrder(Arrays.asList(expected),
Arrays.asList(actual.toArray()));
+ }
+
+ private void initializeTable() {
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection(),
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+ String.format(
+ "DROP TABLE IF EXISTS %s.%s",
+ DATABASE, DorisLookupTableITCase.TABLE_READ_TBL),
+ String.format(
+ "CREATE TABLE %s.%s ( \n"
+ + "`id` int(11),\n"
+ + "`tinyintColumn` tinyint(4),\n"
+ + "`smallintColumn` smallint(6),\n"
+ + "`bigintColumn` bigint(20),\n"
+ + ") DISTRIBUTED BY HASH(`id`) BUCKETS 10\n"
+ + "PROPERTIES (\n"
+ + "\"replication_num\" = \"1\"\n"
+ + ")\n",
+ DATABASE, DorisLookupTableITCase.TABLE_READ_TBL),
+ String.format(
+ "insert into %s.%s values
(1,97,27479,8670353564751764000)",
+ DATABASE, DorisLookupTableITCase.TABLE_READ_TBL),
+ String.format(
+ "insert into %s.%s values
(2,79,17119,-4381380624467725000)",
+ DATABASE, DorisLookupTableITCase.TABLE_READ_TBL),
+ String.format(
+ "insert into %s.%s values
(3,-106,-14878,1466614815449373200)",
+ DATABASE, DorisLookupTableITCase.TABLE_READ_TBL));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]