[ https://issues.apache.org/jira/browse/FLINK-22437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17330904#comment-17330904 ]
zoucao edited comment on FLINK-22437 at 4/23/21, 4:36 PM: ---------------------------------------------------------- == Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.fs_sink], fields=[timestamp, time, id, product, price, cansell, sellednum, dt, hour, min]) +- LogicalProject(timestamp=[$0], time=[$1], id=[$2], product=[$3], price=[$4], cansell=[$5], sellednum=[$6], dt=[$7], hour=[$8], min=[$9]) +- LogicalTableScan(table=[[myHive, flink_sql_online_test, hive_sink]]) == Optimized Logical Plan == Sink(table=[default_catalog.default_database.fs_sink], fields=[timestamp, time, id, product, price, cansell, sellednum, dt, hour, min]) +- Sort(orderBy=[dt ASC, hour ASC, min ASC]) +- TableSourceScan(table=[[myHive, flink_sql_online_test, hive_sink]], fields=[timestamp, time, id, product, price, cansell, sellednum, dt, hour, min]) == Physical Execution Plan == Stage 1 : Data Source content : Source: HiveSource-flink_sql_online_test.hive_sink Stage 2 : Operator content : Sort(orderBy=[dt ASC, hour ASC, min ASC]) ship_strategy : FORWARD Stage 3 : Operator content : Filter ship_strategy : REBALANCE Stage 4 : Data Sink content : Sink: Unnamed ship_strategy : FORWARD was (Author: zoucao): == Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.fs_sink], fields=[timestamp, time, id, product, price, cansell, sellednum, dt, hour, min]) +- LogicalProject(timestamp=[$0], time=[$1], id=[$2], product=[$3], price=[$4], cansell=[$5], sellednum=[$6], dt=[$7], hour=[$8], min=[$9]) +- LogicalTableScan(table=[[myHive, flink_sql_online_test, hive_sink]]) == Optimized Logical Plan == Sink(table=[default_catalog.default_database.fs_sink], fields=[timestamp, time, id, product, price, cansell, sellednum, dt, hour, min]) +- Sort(orderBy=[dt ASC, hour ASC, min ASC]) +- TableSourceScan(table=[[myHive, flink_sql_online_test, hive_sink]], fields=[timestamp, time, id, product, price, cansell, sellednum, dt, hour, min]) == Physical Execution Plan == Stage 1 : Data Source content : Source: HiveSource-flink_sql_online_test.hive_sink Stage 2 : Operator content : Sort(orderBy=[dt ASC, hour ASC, min ASC]) ship_strategy : FORWARD Stage 3 : Operator content : Filter ship_strategy : REBALANCE Stage 4 : Data Sink content : Sink: Unnamed ship_strategy : FORWARD > Miss adding parallesim for filter operator in batch mode > -------------------------------------------------------- > > Key: FLINK-22437 > URL: https://issues.apache.org/jira/browse/FLINK-22437 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.12.2 > Reporter: zoucao > Priority: Major > > when I execute batch sql as follow in flink-1.12.2, I found lots of small > files in hdfs. In filesystem connector, `GroupedPartitionWriter` will be > used, and it close the last partiton if a new record does not belong to the > existing partition. The phenomenon occurred if there are more than one > partiton' records are sent to filesystem sink at the same time. Hive source > can determine parallesim by the number of file and partiton, and the > parallesim will extended by sort operator, but in > `CommonPhysicalSink#createSinkTransformation`,a filter operator will be add > to support `SinkNotNullEnforcer`, There is no parallesim set for it, so > filesystem sink operator can not get the correct parallesim from inputstream. > {code:java} > CREATE CATALOG myHive with ( > 'type'='hive', > 'property-version'='1', > 'default-database' = 'flink_sql_online_test' > ); > -- SET table.sql-dialect=hive; > -- CREATE TABLE IF NOT EXISTS myHive.flink_sql_online_test.hive_sink ( > -- `timestamp` BIGINT, > -- `time` STRING, > -- id BIGINT, > -- product STRING, > -- price DOUBLE, > -- canSell STRING, > -- selledNum BIGINT > -- ) PARTITIONED BY ( > -- dt STRING, > -- `hour` STRING, > -- `min` STRING > -- ) TBLPROPERTIES ( > -- 'partition.time-extractor.timestamp-pattern'='$dt $hr:$min:00', > -- 'sink.partition-commit.trigger'='partition-time', > -- 'sink.partition-commit.delay'='1 min', > -- 'sink.partition-commit.policy.kind'='metastore,success-file' > -- ); > create table fs_sink ( > `timestamp` BIGINT, > `time` STRING, > id BIGINT, > product STRING, > price DOUBLE, > canSell STRING, > selledNum BIGINT, > dt STRING, > `hour` STRING, > `min` STRING > ) PARTITIONED BY (dt, `hour`, `min`) with ( > 'connector'='filesystem', > 'path'='hdfs://XXXX', > 'format'='csv' > ); > insert into fs_sink > select * from myHive.flink_sql_online_test.hive_sink; > {code} > I think this problem can be fixed by add a parallesim for it just like > {code:java} > val dataStream = new DataStream(env, inputTransformation).filter(enforcer) > .setParallelism(inputTransformation.getParallelism) > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)