[ 
https://issues.apache.org/jira/browse/FLINK-27898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556225#comment-17556225
 ] 

zoucao commented on FLINK-27898:
--------------------------------

Hi [~luoyuxia], I think you're right, if we choose the second way, a Flip is 
necessary, I will do some preparation, and then start a discussion about this 
to collect the opinions from others, WDYT?

> fix PartitionPushDown in streaming mode for hive source
> -------------------------------------------------------
>
>                 Key: FLINK-27898
>                 URL: https://issues.apache.org/jira/browse/FLINK-27898
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>            Reporter: zoucao
>            Priority: Major
>
> In hive source, the PartitionPushDown will cause some problems in 
> streaming-mode, we can add the following test in {*}HiveTableSourceITCase{*}
> {code:java}
> @Test
> public void testPushDown() throws Exception {
> final String catalogName = "hive";
> final String dbName = "source_db";
> final String tblName = "stream_test";
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(10 * 1000);
> StreamTableEnvironment tEnv =
> HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE);
> tEnv.registerCatalog(catalogName, hiveCatalog);
> tEnv.useCatalog(catalogName);
> tEnv.executeSql(
> "CREATE TABLE source_db.stream_test ("
> + " a INT,"
> + " b STRING"
> + ") PARTITIONED BY (ts int) TBLPROPERTIES ("
> + "'streaming-source.enable'='true',"
> + "'streaming-source.monitor-interval'='10s',"
> + "'streaming-source.consume-order'='partition-name',"
> + "'streaming-source.consume-start-offset'='ts=1'"
> + ")");
> HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
> .addRow(new Object[]{0, "a0"})
> .addRow(new Object[]{1, "a0"})
> .commit("ts=0");
> HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
> .addRow(new Object[]{1, "a1"})
> .addRow(new Object[]{2, "a1"})
> .commit("ts=1");
> HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
> .addRow(new Object[]{1, "a2"})
> .addRow(new Object[]{2, "a2"})
> .commit("ts=2");
> System.out.println(tEnv.explainSql("select * from hive.source_db.stream_test 
> where ts > 1"));
> TableResult result = tEnv.executeSql("select * from 
> hive.source_db.stream_test where ts > 1");
> result.print();
> }
> {code}
> {code:java}
> +----+-------------+--------------------------------+-------------+
> | op |           a |                              b |          ts |
> +----+-------------+--------------------------------+-------------+
> | +I |           1 |                             a2 |           2 |
> | +I |           2 |                             a2 |           2 |
> | +I |           1 |                             a1 |           1 |
> | +I |           2 |                             a1 |           1 |
> {code}
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(a=[$0], b=[$1], ts=[$2])
> +- LogicalFilter(condition=[>($2, 1)])
>    +- LogicalTableScan(table=[[hive, source_db, stream_test]])
> == Optimized Physical Plan ==
> TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]], 
> fields=[a, b, ts])
> == Optimized Execution Plan ==
> TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]], 
> fields=[a, b, ts])
> {code}
> The PartitionPushDown rule can generate the correct partitions that need to 
> consume by using the existing partition. If the partitions are pushed to the 
> hive source, the filter node will be removed. But hive source will not use 
> the partition info which is pushed down in streaming mode, I think it causes 
> some problems.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to