[ 
https://issues.apache.org/jira/browse/FLINK-22437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17348139#comment-17348139
 ] 

godfrey he edited comment on FLINK-22437 at 5/20/21, 8:03 AM:
--------------------------------------------------------------

[~zoucao] yes, I think that is the easiest approach. Assign to you


was (Author: godfreyhe):
[~zoucao] yes, I think that is the easiest approach

> Miss adding parallesim for filter operator in batch mode
> --------------------------------------------------------
>
>                 Key: FLINK-22437
>                 URL: https://issues.apache.org/jira/browse/FLINK-22437
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.2
>            Reporter: zoucao
>            Assignee: zoucao
>            Priority: Major
>
> when I execute batch sql as follow in flink-1.12.2, I found lots of small 
> files in hdfs. In filesystem connector, `GroupedPartitionWriter` will be 
> used, and it close the last partiton if a new record does not belong to the 
> existing partition. The phenomenon occurred if there are more than one 
> partiton's records are sent to filesystem sink at the same time. Hive source 
> can determine parallesim by the number of file and partition, and the 
> parallesim will extended by sort operator,  but in 
> `CommonPhysicalSink#createSinkTransformation`,a filter operator will be add 
> to support `SinkNotNullEnforcer`, there is no parallesim set for it, so 
> filesystem sink operator can not get the correct parallesim from inputstream.
> {code:java}
> CREATE CATALOG myHive with (
>     'type'='hive',
>     'property-version'='1',
>     'default-database' = 'flink_sql_online_test'
> );
> -- SET table.sql-dialect=hive;
> -- CREATE TABLE IF NOT EXISTS myHive.flink_sql_online_test.hive_sink (
> --    `timestamp` BIGINT,
> --    `time` STRING,
> --    id BIGINT,
> --    product STRING,
> --    price DOUBLE,
> --    canSell STRING,
> --    selledNum BIGINT
> -- ) PARTITIONED BY (
> --    dt STRING,
> --    `hour` STRING,
> --   `min` STRING
> -- ) TBLPROPERTIES (
> --    'partition.time-extractor.timestamp-pattern'='$dt $hr:$min:00',
> --    'sink.partition-commit.trigger'='partition-time',
> --    'sink.partition-commit.delay'='1 min',
> --    'sink.partition-commit.policy.kind'='metastore,success-file'
> -- );
> create table fs_sink (
>     `timestamp` BIGINT,
>     `time` STRING,
>     id BIGINT,
>     product STRING,
>     price DOUBLE,
>     canSell STRING,
>     selledNum BIGINT,
>     dt STRING,
>     `hour` STRING,
>     `min` STRING
> ) PARTITIONED BY (dt, `hour`, `min`) with (
>     'connector'='filesystem',
>     'path'='hdfs://XXXX',
>     'format'='csv'
> );
> insert into fs_sink
> select * from myHive.flink_sql_online_test.hive_sink;
> {code}
> I think this problem can be fixed by adding a parallesim for it just like
> {code:java}
> val dataStream = new DataStream(env, inputTransformation).filter(enforcer)
>   .setParallelism(inputTransformation.getParallelism)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to