[ https://issues.apache.org/jira/browse/FLINK-27898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556225#comment-17556225 ]
zoucao commented on FLINK-27898: -------------------------------- Hi [~luoyuxia], I think you're right, if we choose the second way, a Flip is necessary, I will do some preparation, and then start a discussion about this to collect the opinions from others, WDYT? > fix PartitionPushDown in streaming mode for hive source > ------------------------------------------------------- > > Key: FLINK-27898 > URL: https://issues.apache.org/jira/browse/FLINK-27898 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive > Reporter: zoucao > Priority: Major > > In hive source, the PartitionPushDown will cause some problems in > streaming-mode, we can add the following test in {*}HiveTableSourceITCase{*} > {code:java} > @Test > public void testPushDown() throws Exception { > final String catalogName = "hive"; > final String dbName = "source_db"; > final String tblName = "stream_test"; > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(10 * 1000); > StreamTableEnvironment tEnv = > HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE); > tEnv.registerCatalog(catalogName, hiveCatalog); > tEnv.useCatalog(catalogName); > tEnv.executeSql( > "CREATE TABLE source_db.stream_test (" > + " a INT," > + " b STRING" > + ") PARTITIONED BY (ts int) TBLPROPERTIES (" > + "'streaming-source.enable'='true'," > + "'streaming-source.monitor-interval'='10s'," > + "'streaming-source.consume-order'='partition-name'," > + "'streaming-source.consume-start-offset'='ts=1'" > + ")"); > HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName) > .addRow(new Object[]{0, "a0"}) > .addRow(new Object[]{1, "a0"}) > .commit("ts=0"); > HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName) > .addRow(new Object[]{1, "a1"}) > .addRow(new Object[]{2, "a1"}) > .commit("ts=1"); > HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName) > .addRow(new Object[]{1, "a2"}) > .addRow(new Object[]{2, "a2"}) > .commit("ts=2"); > System.out.println(tEnv.explainSql("select * from hive.source_db.stream_test > where ts > 1")); > TableResult result = tEnv.executeSql("select * from > hive.source_db.stream_test where ts > 1"); > result.print(); > } > {code} > {code:java} > +----+-------------+--------------------------------+-------------+ > | op | a | b | ts | > +----+-------------+--------------------------------+-------------+ > | +I | 1 | a2 | 2 | > | +I | 2 | a2 | 2 | > | +I | 1 | a1 | 1 | > | +I | 2 | a1 | 1 | > {code} > {code:java} > == Abstract Syntax Tree == > LogicalProject(a=[$0], b=[$1], ts=[$2]) > +- LogicalFilter(condition=[>($2, 1)]) > +- LogicalTableScan(table=[[hive, source_db, stream_test]]) > == Optimized Physical Plan == > TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]], > fields=[a, b, ts]) > == Optimized Execution Plan == > TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]], > fields=[a, b, ts]) > {code} > The PartitionPushDown rule can generate the correct partitions that need to > consume by using the existing partition. If the partitions are pushed to the > hive source, the filter node will be removed. But hive source will not use > the partition info which is pushed down in streaming mode, I think it causes > some problems. -- This message was sent by Atlassian Jira (v8.20.7#820007)