[ https://issues.apache.org/jira/browse/FLINK-27898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17551373#comment-17551373 ]
zoucao edited comment on FLINK-27898 at 6/8/22 3:26 AM: -------------------------------------------------------- Hi [~luoyuxia], I think this problem only exists in the hive source for streaming reading, filesystem source does not support streaming reading, such that it can get the right partitions to consume. Why does it exist in the hive source for streaming reading? See *HiveSource*, in the Constructor, parameter *fileEnumerator* holds the partition info which is pushed down. In the method *HiveSource#createEnumerator*, if under the batch reading, `super.createEnumerator(enumContext)` will be invoked, so the fileEnumerator can be used, but under the streaming reading, the fileEnumerator will not be used. How to solve? 1. For a simple resolving, we can remove the partitionPushdown supports under the streaming reading for hive source. 2. Otherwise, considering the partitions comitted in the future, we need to push down a boolean expression, not only the partition value, all partitions which should be consumed need to be decided by the expression. It's a little bit more complicated, and we need to change or add a public method, now the following method can not meet the condition. {code:java} applyPartitions(List<Map<String, String>> remainingPartitions); {code} >From my side, I think the second way is better, and more flexibility. was (Author: zoucao): Hi [~luoyuxia], I think this problem only exists in the hive source for streaming reading, filesystem source does not support streaming reading, such that it can get the right partitions to consume. Why does it exist in the hive source for streaming reading? See *HiveSource*, in the Constructor, parameter *fileEnumerator* holds the partition info which is pushed down. In the method *HiveSource#createEnumerator*, if under the batch reading, `super.createEnumerator(enumContext)` will be invoked, so the fileEnumerator can be used, but under the streaming reading, the fileEnumerator will not be used. How to solve? 1. For a simple resolving, we can remove the partitionPushdown supports under the streaming reading for hive source. 2. Otherwise, considering the partitions comitted in the future, we need to push down a boolean expression, not only the partition value, all partitions which should be consumed need to be decided by the expression. It's a little bit more complicated, and we need to change or add a public method, now the following method can not meet the condition. {code:java} applyPartitions(List<Map<String, String>> remainingPartitions); {code} > fix PartitionPushDown in streaming mode for hive source > ------------------------------------------------------- > > Key: FLINK-27898 > URL: https://issues.apache.org/jira/browse/FLINK-27898 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive > Reporter: zoucao > Priority: Major > > In hive source, the PartitionPushDown will cause some problems in > streaming-mode, we can add the following test in {*}HiveTableSourceITCase{*} > {code:java} > @Test > public void testPushDown() throws Exception { > final String catalogName = "hive"; > final String dbName = "source_db"; > final String tblName = "stream_test"; > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(10 * 1000); > StreamTableEnvironment tEnv = > HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE); > tEnv.registerCatalog(catalogName, hiveCatalog); > tEnv.useCatalog(catalogName); > tEnv.executeSql( > "CREATE TABLE source_db.stream_test (" > + " a INT," > + " b STRING" > + ") PARTITIONED BY (ts int) TBLPROPERTIES (" > + "'streaming-source.enable'='true'," > + "'streaming-source.monitor-interval'='10s'," > + "'streaming-source.consume-order'='partition-name'," > + "'streaming-source.consume-start-offset'='ts=1'" > + ")"); > HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName) > .addRow(new Object[]{0, "a0"}) > .addRow(new Object[]{1, "a0"}) > .commit("ts=0"); > HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName) > .addRow(new Object[]{1, "a1"}) > .addRow(new Object[]{2, "a1"}) > .commit("ts=1"); > HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName) > .addRow(new Object[]{1, "a2"}) > .addRow(new Object[]{2, "a2"}) > .commit("ts=2"); > System.out.println(tEnv.explainSql("select * from hive.source_db.stream_test > where ts > 1")); > TableResult result = tEnv.executeSql("select * from > hive.source_db.stream_test where ts > 1"); > result.print(); > } > {code} > {code:java} > +----+-------------+--------------------------------+-------------+ > | op | a | b | ts | > +----+-------------+--------------------------------+-------------+ > | +I | 1 | a2 | 2 | > | +I | 2 | a2 | 2 | > | +I | 1 | a1 | 1 | > | +I | 2 | a1 | 1 | > {code} > {code:java} > == Abstract Syntax Tree == > LogicalProject(a=[$0], b=[$1], ts=[$2]) > +- LogicalFilter(condition=[>($2, 1)]) > +- LogicalTableScan(table=[[hive, source_db, stream_test]]) > == Optimized Physical Plan == > TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]], > fields=[a, b, ts]) > == Optimized Execution Plan == > TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]], > fields=[a, b, ts]) > {code} > The PartitionPushDown rule can generate the correct partitions that need to > consume by using the existing partition. If the partitions are pushed to the > hive source, the filter node will be removed. But hive source will not use > the partition info which is pushed down in streaming mode, I think it causes > some problems. -- This message was sent by Atlassian Jira (v8.20.7#820007)