hudi-bot opened a new issue, #15261:
URL: https://github.com/apache/hudi/issues/15261

   Now when we create a flink DataStream to write a hudi table. We usually use 
the org.apache.hudi.streamer.HoodieFlinkStreamer moudle class.
   
   The generated flink DAG contains BucketAssignFunction(required) and 
   
   StreamWriteFunction(required) and some other optioned Operator. 
BucketAssignFunction will assgin bucket for incoming records, then 
StreamWriteFunction will handle the stream witch keyed by bucket.
   {code:java}
   DataStream<Object> pipeline = hoodieDataStream
       // Key-by record key, to avoid multiple subtasks write to a bucket at 
the same time
       .keyBy(HoodieRecord::getRecordKey)
       .transform(
           "bucket_assigner",
           TypeInformation.of(HoodieRecord.class),
           new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
       .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
       .uid("uid_bucket_assigner")
       // shuffle by fileId(bucket id)
       .keyBy(record -> record.getCurrentLocation().getFileId())
       .transform("hoodie_stream_write", TypeInformation.of(Object.class), 
operatorFactory)
       .uid("uid_hoodie_stream_write")
       .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); {code}
   For reducing the small file num, BucketAssignFunction will roll the bucket 
every 50w records by default. So at most time BucketAssignFunction will hold 
the bucket num which equals to its parallelism. And usually 
BucketAssignFunction has the same parallelism as StreamWriteFunction, we can't 
promise that every single bucket will be send to only one StreamWriteFunction 
task.
   
   And finally we will get the data skew case like this:
   
   !image-2022-06-28-20-00-39-158.png!!image-2022-06-28-20-00-39-181.png!
   
   The data skew may cause the backpresure which make ck timeout. And the flink 
hudi write pipeline strongly depend on the ck completed to commit the instant.
   
   I think should we chain the operator when BucketAssignFunction's parallelism 
equals to StreamWriteFunction's parallelism. 
   
   It will improve the huge performance and Stability of the write job after 
testing. It reslove the data skew and reduce the network overhead. 
   
   ## JIRA info
   
   - Link: https://issues.apache.org/jira/browse/HUDI-4338
   - Type: Improvement
   - Attachment(s):
     - 28/Jun/22 
12:00;liufangqi;image-2022-06-28-20-00-39-158.png;https://issues.apache.org/jira/secure/attachment/13045811/image-2022-06-28-20-00-39-158.png
     - 28/Jun/22 
12:00;liufangqi;image-2022-06-28-20-00-39-181.png;https://issues.apache.org/jira/secure/attachment/13045810/image-2022-06-28-20-00-39-181.png


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to