This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 8160dd2e [Fix] Fix SupportsFilterPushDown bug when flinksql unionall 
(#479)
8160dd2e is described below

commit 8160dd2e9d69f3ba32e981e4670482eb9f152ede
Author: xiayang <xyws5...@qq.com>
AuthorDate: Fri Aug 30 17:45:28 2024 +0800

    [Fix] Fix SupportsFilterPushDown bug when flinksql unionall (#479)
---
 .../org/apache/doris/flink/source/DorisSource.java | 23 ++++++++-
 .../doris/flink/table/DorisDynamicTableSource.java |  5 +-
 .../doris/flink/source/DorisSourceITCase.java      | 55 ++++++++++++++++++++--
 3 files changed, 73 insertions(+), 10 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
index 3c71c068..6faed87d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.StringUtils;
 
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -64,14 +65,18 @@ public class DorisSource<OUT>
     private final Boundedness boundedness;
     private final DorisDeserializationSchema<OUT> deserializer;
 
+    private final List<String> resolvedFilterQuery;
+
     public DorisSource(
             DorisOptions options,
             DorisReadOptions readOptions,
             Boundedness boundedness,
+            List<String> resolvedFilterQuery,
             DorisDeserializationSchema<OUT> deserializer) {
         this.options = options;
         this.readOptions = readOptions;
         this.boundedness = boundedness;
+        this.resolvedFilterQuery = resolvedFilterQuery;
         this.deserializer = deserializer;
     }
 
@@ -95,6 +100,15 @@ public class DorisSource<OUT>
     public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> 
createEnumerator(
             SplitEnumeratorContext<DorisSourceSplit> context) throws Exception 
{
         List<DorisSourceSplit> dorisSourceSplits = new ArrayList<>();
+        if (!resolvedFilterQuery.isEmpty()) {
+            String filterQuery = String.join(" AND ", resolvedFilterQuery);
+            if 
(StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
+                readOptions.setFilterQuery(filterQuery);
+            } else {
+                readOptions.setFilterQuery(
+                        String.join(" AND ", readOptions.getFilterQuery(), 
filterQuery));
+            }
+        }
         List<PartitionDefinition> partitions =
                 RestService.findPartitions(options, readOptions, LOG);
         for (int index = 0; index < partitions.size(); index++) {
@@ -147,6 +161,7 @@ public class DorisSource<OUT>
         // Boundedness
         private Boundedness boundedness;
         private DorisDeserializationSchema<OUT> deserializer;
+        private List<String> resolvedFilterQuery = new ArrayList<>();
 
         DorisSourceBuilder() {
             boundedness = Boundedness.BOUNDED;
@@ -173,11 +188,17 @@ public class DorisSource<OUT>
             return this;
         }
 
+        public DorisSourceBuilder<OUT> setResolvedFilterQuery(List<String> 
resolvedFilterQuery) {
+            this.resolvedFilterQuery = resolvedFilterQuery;
+            return this;
+        }
+
         public DorisSource<OUT> build() {
             if (readOptions == null) {
                 readOptions = DorisReadOptions.builder().build();
             }
-            return new DorisSource<>(options, readOptions, boundedness, 
deserializer);
+            return new DorisSource<>(
+                    options, readOptions, boundedness, resolvedFilterQuery, 
deserializer);
         }
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 5827f879..e55d3631 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -90,10 +90,6 @@ public final class DorisDynamicTableSource
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext 
runtimeProviderContext) {
-        if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
-            String filterQuery = 
resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
-            readOptions.setFilterQuery(filterQuery);
-        }
         if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
             String[] selectFields =
                     DataType.getFieldNames(physicalRowDataType).toArray(new 
String[0]);
@@ -127,6 +123,7 @@ public final class DorisDynamicTableSource
                     DorisSource.<RowData>builder()
                             .setDorisReadOptions(readOptions)
                             .setDorisOptions(options)
+                            .setResolvedFilterQuery(resolvedFilterQuery)
                             .setDeserializer(
                                     new RowDataDeserializationSchema(
                                             (RowType) 
physicalRowDataType.getLogicalType()))
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
index 027159db..a13e96f7 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java
@@ -50,6 +50,8 @@ public class DorisSourceITCase extends DorisTestBase {
     static final String TABLE_READ_TBL_OLD_API = "tbl_read_tbl_old_api";
     static final String TABLE_READ_TBL_ALL_OPTIONS = 
"tbl_read_tbl_all_options";
     static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down";
+    static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL =
+            "tbl_read_tbl_push_down_with_union_all";
 
     @Test
     public void testSource() throws Exception {
@@ -77,7 +79,7 @@ public class DorisSourceITCase extends DorisTestBase {
                 actual.add(iterator.next().toString());
             }
         }
-        List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]");
+        List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]", 
"[apache, 12]");
         Assert.assertArrayEquals(actual.toArray(), expected.toArray());
     }
 
@@ -102,7 +104,7 @@ public class DorisSourceITCase extends DorisTestBase {
                 actual.add(iterator.next().toString());
             }
         }
-        List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]");
+        List<String> expected = Arrays.asList("[doris, 18]", "[flink, 10]", 
"[apache, 12]");
         Assert.assertArrayEquals(actual.toArray(), expected.toArray());
     }
 
@@ -136,7 +138,7 @@ public class DorisSourceITCase extends DorisTestBase {
                 actual.add(iterator.next().toString());
             }
         }
-        String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]"};
+        String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]", 
"+I[apache, 12]"};
         Assert.assertArrayEquals(expected, actual.toArray());
 
         // fitler query
@@ -182,7 +184,7 @@ public class DorisSourceITCase extends DorisTestBase {
                 actual.add(iterator.next().toString());
             }
         }
-        String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]"};
+        String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]", 
"+I[apache, 12]"};
         Assert.assertArrayEquals(expected, actual.toArray());
     }
 
@@ -228,7 +230,7 @@ public class DorisSourceITCase extends DorisTestBase {
                 actual.add(iterator.next().toString());
             }
         }
-        String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]"};
+        String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]", 
"+I[apache, 12]"};
         Assert.assertArrayEquals(expected, actual.toArray());
     }
 
@@ -242,6 +244,7 @@ public class DorisSourceITCase extends DorisTestBase {
         String sourceDDL =
                 String.format(
                         "CREATE TABLE doris_source ("
+                                + " name STRING,"
                                 + " age INT"
                                 + ") WITH ("
                                 + " 'connector' = 'doris',"
@@ -267,6 +270,46 @@ public class DorisSourceITCase extends DorisTestBase {
         Assert.assertArrayEquals(expected, actual.toArray());
     }
 
+    @Test
+    public void testTableSourceFilterWithUnionAll() throws Exception {
+        initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL);
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE doris_source ("
+                                + " name STRING,"
+                                + " age INT"
+                                + ") WITH ("
+                                + " 'connector' = 'doris',"
+                                + " 'fenodes' = '%s',"
+                                + " 'table.identifier' = '%s',"
+                                + " 'username' = '%s',"
+                                + " 'password' = '%s'"
+                                + ")",
+                        getFenodes(),
+                        DATABASE + "." + 
TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL,
+                        USERNAME,
+                        PASSWORD);
+        tEnv.executeSql(sourceDDL);
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "  SELECT * FROM doris_source where age = '18'"
+                                + " UNION ALL "
+                                + "SELECT * FROM doris_source where age = '10' 
 ");
+
+        List<String> actual = new ArrayList<>();
+        try (CloseableIterator<Row> iterator = tableResult.collect()) {
+            while (iterator.hasNext()) {
+                actual.add(iterator.next().toString());
+            }
+        }
+        String[] expected = new String[] {"+I[doris, 18]", "+I[flink, 10]"};
+        Assert.assertArrayEquals(expected, actual.toArray());
+    }
+
     private void initializeTable(String table) throws Exception {
         try (Connection connection =
                         DriverManager.getConnection(
@@ -288,6 +331,8 @@ public class DorisSourceITCase extends DorisTestBase {
                     String.format("insert into %s.%s  values ('doris',18)", 
DATABASE, table));
             statement.execute(
                     String.format("insert into %s.%s  values ('flink',10)", 
DATABASE, table));
+            statement.execute(
+                    String.format("insert into %s.%s  values ('apache',12)", 
DATABASE, table));
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to