[ 
https://issues.apache.org/jira/browse/FLINK-22437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17330904#comment-17330904
 ] 

zoucao edited comment on FLINK-22437 at 4/23/21, 4:36 PM:
----------------------------------------------------------

== Abstract Syntax Tree ==

LogicalSink(table=[default_catalog.default_database.fs_sink], 
fields=[timestamp, time, id, product, price, cansell, sellednum, dt, hour, min])

 +- LogicalProject(timestamp=[$0], time=[$1], id=[$2], product=[$3], 
price=[$4], cansell=[$5], sellednum=[$6], dt=[$7], hour=[$8], min=[$9])

    +- LogicalTableScan(table=[[myHive, flink_sql_online_test, hive_sink]])

== Optimized Logical Plan ==

Sink(table=[default_catalog.default_database.fs_sink], fields=[timestamp, time, 
id, product, price, cansell, sellednum, dt, hour, min])

    +- Sort(orderBy=[dt ASC, hour ASC, min ASC])

        +- TableSourceScan(table=[[myHive, flink_sql_online_test, hive_sink]], 
fields=[timestamp, time, id, product, price, cansell, sellednum, dt, hour, min])

== Physical Execution Plan ==

Stage 1 : Data Source content : Source: 
HiveSource-flink_sql_online_test.hive_sink

  Stage 2 : Operator content : Sort(orderBy=[dt ASC, hour ASC, min ASC]) 
ship_strategy : FORWARD

    Stage 3 : Operator content : Filter ship_strategy : REBALANCE

      Stage 4 : Data Sink content : Sink: Unnamed ship_strategy : FORWARD


was (Author: zoucao):
== Abstract Syntax Tree == 
LogicalSink(table=[default_catalog.default_database.fs_sink], 
fields=[timestamp, time, id, product, price, cansell, sellednum, dt, hour, 
min]) +- LogicalProject(timestamp=[$0], time=[$1], id=[$2], product=[$3], 
price=[$4], cansell=[$5], sellednum=[$6], dt=[$7], hour=[$8], min=[$9])  +- 
LogicalTableScan(table=[[myHive, flink_sql_online_test, hive_sink]]) == 
Optimized Logical Plan == 
Sink(table=[default_catalog.default_database.fs_sink], fields=[timestamp, time, 
id, product, price, cansell, sellednum, dt, hour, min]) +- Sort(orderBy=[dt 
ASC, hour ASC, min ASC])  +- TableSourceScan(table=[[myHive, 
flink_sql_online_test, hive_sink]], fields=[timestamp, time, id, product, 
price, cansell, sellednum, dt, hour, min]) == Physical Execution Plan == Stage 
1 : Data Source  content : Source: HiveSource-flink_sql_online_test.hive_sink  
Stage 2 : Operator  content : Sort(orderBy=[dt ASC, hour ASC, min ASC])  
ship_strategy : FORWARD  Stage 3 : Operator  content : Filter  ship_strategy : 
REBALANCE  Stage 4 : Data Sink  content : Sink: Unnamed  ship_strategy : FORWARD

> Miss adding parallesim for filter operator in batch mode
> --------------------------------------------------------
>
>                 Key: FLINK-22437
>                 URL: https://issues.apache.org/jira/browse/FLINK-22437
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.2
>            Reporter: zoucao
>            Priority: Major
>
> when I execute batch sql as follow in flink-1.12.2, I found lots of small 
> files in hdfs. In filesystem connector, `GroupedPartitionWriter` will be 
> used, and it close the last partiton if a new record does not belong to the 
> existing partition. The phenomenon occurred if there are more than one 
> partiton' records are sent to filesystem sink at the same time. Hive source 
> can determine parallesim by the number of file and partiton, and the 
> parallesim will extended by sort operator,  but in 
> `CommonPhysicalSink#createSinkTransformation`,a filter operator will be add 
> to support `SinkNotNullEnforcer`, There is no parallesim set for it, so 
> filesystem sink operator can not get the correct parallesim from inputstream.
> {code:java}
> CREATE CATALOG myHive with (
>     'type'='hive',
>     'property-version'='1',
>     'default-database' = 'flink_sql_online_test'
> );
> -- SET table.sql-dialect=hive;
> -- CREATE TABLE IF NOT EXISTS myHive.flink_sql_online_test.hive_sink (
> --    `timestamp` BIGINT,
> --    `time` STRING,
> --    id BIGINT,
> --    product STRING,
> --    price DOUBLE,
> --    canSell STRING,
> --    selledNum BIGINT
> -- ) PARTITIONED BY (
> --    dt STRING,
> --    `hour` STRING,
> --   `min` STRING
> -- ) TBLPROPERTIES (
> --    'partition.time-extractor.timestamp-pattern'='$dt $hr:$min:00',
> --    'sink.partition-commit.trigger'='partition-time',
> --    'sink.partition-commit.delay'='1 min',
> --    'sink.partition-commit.policy.kind'='metastore,success-file'
> -- );
> create table fs_sink (
>     `timestamp` BIGINT,
>     `time` STRING,
>     id BIGINT,
>     product STRING,
>     price DOUBLE,
>     canSell STRING,
>     selledNum BIGINT,
>     dt STRING,
>     `hour` STRING,
>     `min` STRING
> ) PARTITIONED BY (dt, `hour`, `min`) with (
>     'connector'='filesystem',
>     'path'='hdfs://XXXX',
>     'format'='csv'
> );
> insert into fs_sink
> select * from myHive.flink_sql_online_test.hive_sink;
> {code}
> I think this problem can be fixed by add a parallesim for it just like
> {code:java}
> val dataStream = new DataStream(env, inputTransformation).filter(enforcer)
>   .setParallelism(inputTransformation.getParallelism)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to