zoucao created FLINK-27898:
------------------------------

             Summary: fix PartitionPushDown in streaming mode for hive source
                 Key: FLINK-27898
                 URL: https://issues.apache.org/jira/browse/FLINK-27898
             Project: Flink
          Issue Type: Bug
            Reporter: zoucao


In hive source, the PartitionPushDown will cause some problems in 
streaming-mode, we can this test in {*}HiveTableSourceITCase{*}

{code:java}
@Test
public void testPushDown() throws Exception {
final String catalogName = "hive";
final String dbName = "source_db";
final String tblName = "stream_test";
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10 * 1000);
StreamTableEnvironment tEnv =
HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE);
tEnv.registerCatalog(catalogName, hiveCatalog);
tEnv.useCatalog(catalogName);
tEnv.executeSql(
"CREATE TABLE source_db.stream_test ("
+ " a INT,"
+ " b STRING"
+ ") PARTITIONED BY (ts int) TBLPROPERTIES ("
+ "'streaming-source.enable'='true',"
+ "'streaming-source.monitor-interval'='10s',"
+ "'streaming-source.consume-order'='partition-name',"
+ "'streaming-source.consume-start-offset'='ts=1'"
+ ")");

HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
.addRow(new Object[]{0, "a0"})
.addRow(new Object[]{1, "a0"})
.commit("ts=0");
HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
.addRow(new Object[]{1, "a1"})
.addRow(new Object[]{2, "a1"})
.commit("ts=1");

HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
.addRow(new Object[]{1, "a2"})
.addRow(new Object[]{2, "a2"})
.commit("ts=2");
System.out.println(tEnv.explainSql("select * from hive.source_db.stream_test 
where ts > 1"));
TableResult result = tEnv.executeSql("select * from hive.source_db.stream_test 
where ts > 1");
result.print();
)
{code}

{code:java}
+----+-------------+--------------------------------+-------------+
| op |           a |                              b |          ts |
+----+-------------+--------------------------------+-------------+
| +I |           1 |                             a2 |           2 |
| +I |           2 |                             a2 |           2 |
| +I |           1 |                             a1 |           1 |
| +I |           2 |                             a1 |           1 |
{code}

{code:java}
== Abstract Syntax Tree ==
LogicalProject(a=[$0], b=[$1], ts=[$2])
+- LogicalFilter(condition=[>($2, 1)])
   +- LogicalTableScan(table=[[hive, source_db, stream_test]])

== Optimized Physical Plan ==
TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]], 
fields=[a, b, ts])

== Optimized Execution Plan ==
TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]], 
fields=[a, b, ts])
{code}

The PartitionPushDown rule can generate the correct partitions that need to 
consume by using the existing partition. If the partitions are pushed to the 
hive source, the filter node will be removed. But hive source will not use the 
partition info which is pushed down in streaming mode, I think it causes some 
problems.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to