This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 38026fde [Fix] Fix flink sql projection pushdown error (#428)
38026fde is described below

commit 38026fde2e1b010b0776df6d4e9cdbb2e49966b6
Author: wudi <676366...@qq.com>
AuthorDate: Mon Jul 15 15:36:49 2024 +0800

    [Fix] Fix flink sql projection pushdown error (#428)
---
 .../doris/flink/table/DorisDynamicTableSource.java | 22 ++++++++-----
 .../doris/flink/source/DorisSourceITCase.java      | 36 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 8 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 9753361c..5827f879 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -94,6 +94,15 @@ public final class DorisDynamicTableSource
             String filterQuery = 
resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
             readOptions.setFilterQuery(filterQuery);
         }
+        if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
+            String[] selectFields =
+                    DataType.getFieldNames(physicalRowDataType).toArray(new 
String[0]);
+            readOptions.setReadFields(
+                    Arrays.stream(selectFields)
+                            .map(item -> String.format("`%s`", 
item.trim().replace("`", "")))
+                            .collect(Collectors.joining(", ")));
+        }
+
         if (readOptions.getUseOldApi()) {
             List<PartitionDefinition> dorisPartitions;
             try {
@@ -199,14 +208,11 @@ public final class DorisDynamicTableSource
     @Override
     public void applyProjection(int[][] projectedFields, DataType 
producedDataType) {
         this.physicalRowDataType = 
Projection.of(projectedFields).project(physicalRowDataType);
-        if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
-            String[] selectFields =
-                    DataType.getFieldNames(physicalRowDataType).toArray(new 
String[0]);
-            this.readOptions.setReadFields(
-                    Arrays.stream(selectFields)
-                            .map(item -> String.format("`%s`", 
item.trim().replace("`", "")))
-                            .collect(Collectors.joining(", ")));
-        }
+        String[] selectFields = 
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
+        this.readOptions.setReadFields(
+                Arrays.stream(selectFields)
+                        .map(item -> String.format("`%s`", 
item.trim().replace("`", "")))
+                        .collect(Collectors.joining(", ")));
     }
 
     @VisibleForTesting
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index e13eeb36..027159db 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -49,6 +49,7 @@ public class DorisSourceITCase extends DorisTestBase {
     static final String TABLE_READ_TBL = "tbl_read_tbl";
     static final String TABLE_READ_TBL_OLD_API = "tbl_read_tbl_old_api";
     static final String TABLE_READ_TBL_ALL_OPTIONS = 
"tbl_read_tbl_all_options";
+    static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down";
 
     @Test
     public void testSource() throws Exception {
@@ -231,6 +232,41 @@ public class DorisSourceITCase extends DorisTestBase {
         Assert.assertArrayEquals(expected, actual.toArray());
     }
 
+    @Test
+    public void testTableSourceFilterAndProjectionPushDown() throws Exception {
+        initializeTable(TABLE_READ_TBL_PUSH_DOWN);
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE doris_source ("
+                                + " age INT"
+                                + ") WITH ("
+                                + " 'connector' = 'doris',"
+                                + " 'fenodes' = '%s',"
+                                + " 'table.identifier' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s'"
+                                + ")",
+                        getFenodes(),
+                        DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN,
+                        USERNAME,
+                        PASSWORD);
+        tEnv.executeSql(sourceDDL);
+        TableResult tableResult = tEnv.executeSql("SELECT age FROM 
doris_source where age = '18'");
+
+        List<String> actual = new ArrayList<>();
+        try (CloseableIterator<Row> iterator = tableResult.collect()) {
+            while (iterator.hasNext()) {
+                actual.add(iterator.next().toString());
+            }
+        }
+        String[] expected = new String[] {"+I[18]"};
+        Assert.assertArrayEquals(expected, actual.toArray());
+    }
+
     private void initializeTable(String table) throws Exception {
         try (Connection connection =
                         DriverManager.getConnection(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to