This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 38026fde [Fix] Fix flink sql projection pushdown error (#428) 38026fde is described below commit 38026fde2e1b010b0776df6d4e9cdbb2e49966b6 Author: wudi <676366...@qq.com> AuthorDate: Mon Jul 15 15:36:49 2024 +0800 [Fix] Fix flink sql projection pushdown error (#428) --- .../doris/flink/table/DorisDynamicTableSource.java | 22 ++++++++----- .../doris/flink/source/DorisSourceITCase.java | 36 ++++++++++++++++++++++ 2 files changed, 50 insertions(+), 8 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index 9753361c..5827f879 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -94,6 +94,15 @@ public final class DorisDynamicTableSource String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND ")); readOptions.setFilterQuery(filterQuery); } + if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) { + String[] selectFields = + DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); + readOptions.setReadFields( + Arrays.stream(selectFields) + .map(item -> String.format("`%s`", item.trim().replace("`", ""))) + .collect(Collectors.joining(", "))); + } + if (readOptions.getUseOldApi()) { List<PartitionDefinition> dorisPartitions; try { @@ -199,14 +208,11 @@ public final class DorisDynamicTableSource @Override public void applyProjection(int[][] projectedFields, DataType producedDataType) { this.physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType); - if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) { - String[] selectFields = - DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); - this.readOptions.setReadFields( - Arrays.stream(selectFields) - .map(item -> String.format("`%s`", item.trim().replace("`", ""))) - .collect(Collectors.joining(", "))); - } + String[] selectFields = DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); + this.readOptions.setReadFields( + Arrays.stream(selectFields) + .map(item -> String.format("`%s`", item.trim().replace("`", ""))) + .collect(Collectors.joining(", "))); } @VisibleForTesting diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index e13eeb36..027159db 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -49,6 +49,7 @@ public class DorisSourceITCase extends DorisTestBase { static final String TABLE_READ_TBL = "tbl_read_tbl"; static final String TABLE_READ_TBL_OLD_API = "tbl_read_tbl_old_api"; static final String TABLE_READ_TBL_ALL_OPTIONS = "tbl_read_tbl_all_options"; + static final String TABLE_READ_TBL_PUSH_DOWN = "tbl_read_tbl_push_down"; @Test public void testSource() throws Exception { @@ -231,6 +232,41 @@ public class DorisSourceITCase extends DorisTestBase { Assert.assertArrayEquals(expected, actual.toArray()); } + @Test + public void testTableSourceFilterAndProjectionPushDown() throws Exception { + initializeTable(TABLE_READ_TBL_PUSH_DOWN); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String sourceDDL = + String.format( + "CREATE TABLE doris_source (" + + " age INT" + + ") WITH (" + + " 'connector' = 'doris'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'" + + ")", + getFenodes(), + DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN, + USERNAME, + PASSWORD); + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("SELECT age FROM doris_source where age = '18'"); + + List<String> actual = new ArrayList<>(); + try (CloseableIterator<Row> iterator = tableResult.collect()) { + while (iterator.hasNext()) { + actual.add(iterator.next().toString()); + } + } + String[] expected = new String[] {"+I[18]"}; + Assert.assertArrayEquals(expected, actual.toArray()); + } + private void initializeTable(String table) throws Exception { try (Connection connection = DriverManager.getConnection( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org