[ 
https://issues.apache.org/jira/browse/FLINK-27898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17551373#comment-17551373
 ] 

zoucao edited comment on FLINK-27898 at 6/8/22 3:26 AM:
--------------------------------------------------------

Hi [~luoyuxia], I think this problem only exists in the hive source for 
streaming reading, filesystem source does not support streaming reading, such 
that it can get the right partitions to consume.

Why does it exist in the hive source for streaming reading?
See *HiveSource*, in the Constructor, parameter *fileEnumerator* holds the 
partition info which is pushed down. In the method 
*HiveSource#createEnumerator*, if under the batch reading, 
`super.createEnumerator(enumContext)` will be invoked, so the fileEnumerator 
can be used, but under the streaming reading, the fileEnumerator will not be 
used.

How to solve?
1. For a simple resolving, we can remove the partitionPushdown supports under 
the streaming reading for hive source.
2. Otherwise, considering the partitions comitted in the future, we need to 
push down a boolean expression, not only the partition value, all partitions 
which should be consumed need to be decided by the expression. It's a little 
bit more complicated, and we need to change or add a public method, now the 
following method can not meet the condition.
{code:java}
applyPartitions(List<Map<String, String>> remainingPartitions); 
{code}

>From my side, I think the second way is better, and more flexibility.

 




was (Author: zoucao):
Hi [~luoyuxia], I think this problem only exists in the hive source for 
streaming reading, filesystem source does not support streaming reading, such 
that it can get the right partitions to consume.

Why does it exist in the hive source for streaming reading?
See *HiveSource*, in the Constructor, parameter *fileEnumerator* holds the 
partition info which is pushed down. In the method 
*HiveSource#createEnumerator*, if under the batch reading, 
`super.createEnumerator(enumContext)` will be invoked, so the fileEnumerator 
can be used, but under the streaming reading, the fileEnumerator will not be 
used.

How to solve?
1. For a simple resolving, we can remove the partitionPushdown supports under 
the streaming reading for hive source.
2. Otherwise, considering the partitions comitted in the future, we need to 
push down a boolean expression, not only the partition value, all partitions 
which should be consumed need to be decided by the expression. It's a little 
bit more complicated, and we need to change or add a public method, now the 
following method can not meet the condition.
{code:java}
applyPartitions(List<Map<String, String>> remainingPartitions); 
{code}


 



> fix PartitionPushDown in streaming mode for hive source
> -------------------------------------------------------
>
>                 Key: FLINK-27898
>                 URL: https://issues.apache.org/jira/browse/FLINK-27898
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>            Reporter: zoucao
>            Priority: Major
>
> In hive source, the PartitionPushDown will cause some problems in 
> streaming-mode, we can add the following test in {*}HiveTableSourceITCase{*}
> {code:java}
> @Test
> public void testPushDown() throws Exception {
> final String catalogName = "hive";
> final String dbName = "source_db";
> final String tblName = "stream_test";
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(10 * 1000);
> StreamTableEnvironment tEnv =
> HiveTestUtils.createTableEnvInStreamingMode(env, SqlDialect.HIVE);
> tEnv.registerCatalog(catalogName, hiveCatalog);
> tEnv.useCatalog(catalogName);
> tEnv.executeSql(
> "CREATE TABLE source_db.stream_test ("
> + " a INT,"
> + " b STRING"
> + ") PARTITIONED BY (ts int) TBLPROPERTIES ("
> + "'streaming-source.enable'='true',"
> + "'streaming-source.monitor-interval'='10s',"
> + "'streaming-source.consume-order'='partition-name',"
> + "'streaming-source.consume-start-offset'='ts=1'"
> + ")");
> HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
> .addRow(new Object[]{0, "a0"})
> .addRow(new Object[]{1, "a0"})
> .commit("ts=0");
> HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
> .addRow(new Object[]{1, "a1"})
> .addRow(new Object[]{2, "a1"})
> .commit("ts=1");
> HiveTestUtils.createTextTableInserter(hiveCatalog, dbName, tblName)
> .addRow(new Object[]{1, "a2"})
> .addRow(new Object[]{2, "a2"})
> .commit("ts=2");
> System.out.println(tEnv.explainSql("select * from hive.source_db.stream_test 
> where ts > 1"));
> TableResult result = tEnv.executeSql("select * from 
> hive.source_db.stream_test where ts > 1");
> result.print();
> }
> {code}
> {code:java}
> +----+-------------+--------------------------------+-------------+
> | op |           a |                              b |          ts |
> +----+-------------+--------------------------------+-------------+
> | +I |           1 |                             a2 |           2 |
> | +I |           2 |                             a2 |           2 |
> | +I |           1 |                             a1 |           1 |
> | +I |           2 |                             a1 |           1 |
> {code}
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(a=[$0], b=[$1], ts=[$2])
> +- LogicalFilter(condition=[>($2, 1)])
>    +- LogicalTableScan(table=[[hive, source_db, stream_test]])
> == Optimized Physical Plan ==
> TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]], 
> fields=[a, b, ts])
> == Optimized Execution Plan ==
> TableSourceScan(table=[[hive, source_db, stream_test, partitions=[{ts=2}]]], 
> fields=[a, b, ts])
> {code}
> The PartitionPushDown rule can generate the correct partitions that need to 
> consume by using the existing partition. If the partitions are pushed to the 
> hive source, the filter node will be removed. But hive source will not use 
> the partition info which is pushed down in streaming mode, I think it causes 
> some problems.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to