[ https://issues.apache.org/jira/browse/FLINK-22437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17348139#comment-17348139 ]
godfrey he edited comment on FLINK-22437 at 5/20/21, 8:03 AM: -------------------------------------------------------------- [~zoucao] yes, I think that is the easiest approach. Assign to you was (Author: godfreyhe): [~zoucao] yes, I think that is the easiest approach > Miss adding parallesim for filter operator in batch mode > -------------------------------------------------------- > > Key: FLINK-22437 > URL: https://issues.apache.org/jira/browse/FLINK-22437 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.12.2 > Reporter: zoucao > Assignee: zoucao > Priority: Major > > when I execute batch sql as follow in flink-1.12.2, I found lots of small > files in hdfs. In filesystem connector, `GroupedPartitionWriter` will be > used, and it close the last partiton if a new record does not belong to the > existing partition. The phenomenon occurred if there are more than one > partiton's records are sent to filesystem sink at the same time. Hive source > can determine parallesim by the number of file and partition, and the > parallesim will extended by sort operator, but in > `CommonPhysicalSink#createSinkTransformation`,a filter operator will be add > to support `SinkNotNullEnforcer`, there is no parallesim set for it, so > filesystem sink operator can not get the correct parallesim from inputstream. > {code:java} > CREATE CATALOG myHive with ( > 'type'='hive', > 'property-version'='1', > 'default-database' = 'flink_sql_online_test' > ); > -- SET table.sql-dialect=hive; > -- CREATE TABLE IF NOT EXISTS myHive.flink_sql_online_test.hive_sink ( > -- `timestamp` BIGINT, > -- `time` STRING, > -- id BIGINT, > -- product STRING, > -- price DOUBLE, > -- canSell STRING, > -- selledNum BIGINT > -- ) PARTITIONED BY ( > -- dt STRING, > -- `hour` STRING, > -- `min` STRING > -- ) TBLPROPERTIES ( > -- 'partition.time-extractor.timestamp-pattern'='$dt $hr:$min:00', > -- 'sink.partition-commit.trigger'='partition-time', > -- 'sink.partition-commit.delay'='1 min', > -- 'sink.partition-commit.policy.kind'='metastore,success-file' > -- ); > create table fs_sink ( > `timestamp` BIGINT, > `time` STRING, > id BIGINT, > product STRING, > price DOUBLE, > canSell STRING, > selledNum BIGINT, > dt STRING, > `hour` STRING, > `min` STRING > ) PARTITIONED BY (dt, `hour`, `min`) with ( > 'connector'='filesystem', > 'path'='hdfs://XXXX', > 'format'='csv' > ); > insert into fs_sink > select * from myHive.flink_sql_online_test.hive_sink; > {code} > I think this problem can be fixed by adding a parallesim for it just like > {code:java} > val dataStream = new DataStream(env, inputTransformation).filter(enforcer) > .setParallelism(inputTransformation.getParallelism) > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)