Toroidals commented on issue #10608:
URL: https://github.com/apache/hudi/issues/10608#issuecomment-1923451819

   > **_提交问题前的提示_**
   > 
   > * 您是否浏览过我们的[常见问题解答](https://hudi.apache.org/learn/faq/)?
   > * 加入邮件列表以参与对话并在 
[dev-subscr...@hudi.apache.org](mailto:dev-subscr...@hudi.apache.org) 获得更快的支持。
   > * 如果您已将其作为 bug 
进行分类,请直接提交[问题](https://issues.apache.org/jira/projects/HUDI/issues)。
   > 
   > **描述您面临的问题**
   > 
   > 当使用两个 Flink 程序写入 Hudi 
中同一张表的不同分区时,参数已设置为:options.put(FlinkOptions.WRITE_CLIENT_ID.key(), 
String.valueOf(System.currentTimeMillis()));出现以下错误: 
![image](https://private-user-images.githubusercontent.com/54655412/301797895-85b54504-073b-4580-890b-523b24948cac.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDY4Njc0NDYsIm5iZiI6MTcwNjg2NzE0NiwicGF0aCI6Ii81NDY1NTQxMi8zMDE3OTc4OTUtODViNTQ1MDQtMDczYi00NTgwLTg5MGItNTIzYjI0OTQ4Y2FjLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAyMDIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMjAyVDA5NDU0NlomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTUwMTg0YjhiY2I5YjIxMGIyYWU5ODc5ZjFjN2YxZjBjZjRmMmQ1ODZiOTk5OWZhYWM2ODhiZTgyOTE5ZjhjZTkmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lk
 PTAmcmVwb19pZD0wIn0.pHn3wn0sG2PlSeOWgwITrj5FtoqC52cHuOMgaEr6FTI)
   > 
   > 2024-02-02 17:21:12 org.apache.flink.runtime.JobException:在 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
 在 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:101)在
 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:322)
 在 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:574)
 在 
org.apache.flink.runtime.rpc.akka.Akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443)在
 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 在 
org.apache.flink.runtime.rpc.akka.Akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443)
 在 org.apache.flink.runtime.
 rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) 在 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)在
 
org.apache.flink.runtime.rpc.akka.Akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
 在 akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 在 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 在 
scala。PartialFunction.applyOrElse(PartialFunction.scala:123) 在 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 在 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 在 
scala。PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 在 
scala。PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 在 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 在 
akka.actor.Actor.aroundReceive(Actor.scala:537) 在 
akka.actor.Actor.aroundReceive$(Actor.scala:535) �
 �� akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 在 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 在 
akka.actor.ActorCell.invoke(ActorCell.scala:548) 在 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 在 
akka.dispatch.Mailbox.run(Mailbox.scala:231) 在 
akka.dispatch.Mailbox.exec(Mailbox.scala:243) 在 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 在 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 在 
java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 在 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 
由以下原因引起:org.apache.flink.util.FlinkException:OperatorCoordinator 
触发的“consistent_bucket_write:default_database.hudi_rbs_rbscmfprd_cmf_wf_operation_log_cdc_qy_test”(运算符
 ab5eb0c735d351ddaa2e080f1564920d)的全局故障。在 org.apache.flink.runtime.operators
 
.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
 在 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:196)
 在 
org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142)
 在 
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133)
 在 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
在 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
在 
java.lang.Thread.run(Thread.java:748)原因:org.apache.hudi.exception.HoodieException:执行器执行操作[提交即时20240202171450091]错误...另外
 6 个原因: java.lang.IllegalArgumentException at 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:33)
 at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.t
 ransitionState(HoodieActiveTimeline.java:616) at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:597)在
 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:223)
 在 
org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:286)
 在 
org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:236)
 在 
org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:112)在
 
org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:75)
 在 
org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:201)
 在 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.doCommit(StreamWriteOperatorCoordinator.java:564)
 在 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:540)在
 org.apa
 
che.hudi.sink.StreamWriteOperatorCoordinator.lambda$notifyCheckpointComplete$2(StreamWriteOperatorCoordinator.java:258)
 在 
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
 ...另外 3 个
   > 
   > **To Reproduce**
   > 
   > public static HoodiePipeline.Builder getHoodieBuilder(HashMap<String, 
String> infoMap, HashMap<String, String> connectInfo) {
   > 
   > ```
   >     HoodiePipeline.Builder builder = 
HoodiePipeline.builder(infoMap.get("hudi_table_name"));
   > 
   >     String hudiFieldMap = 
infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
   >     ArrayList<ArrayList<String>> fieldList = 
JSON.parseObject(hudiFieldMap, new 
TypeReference<ArrayList<ArrayList<String>>>() {
   >     });
   >     for (ArrayList<String> columnList : fieldList) {
   >         builder.column("`" + columnList.get(0) + "` " + columnList.get(1));
   >     }
   >     builder.column("_flink_cdc_connector string");
   >     builder.column("_flink_cdc_db string");
   >     builder.column("_flink_cdc_table string");
   >     builder.column("_flink_cdc_op string");
   >     builder.column("_flink_cdc_ts_ms timestamp");
   > 
   >     builder.pk(infoMap.get("hudi_primary_key"));
   > 
   >     Map<String, String> options = new HashMap<>();
   >     options.put(FlinkOptions.TABLE_TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name());
   > 
   >     options.put(FlinkOptions.DATABASE_NAME.key(), 
infoMap.get("hudi_database_name"));
   >     options.put(FlinkOptions.TABLE_NAME.key(), 
infoMap.get("hudi_table_name"));
   > 
   >     options.put(FlinkOptions.PRE_COMBINE.key(), "true");
   >     options.put(FlinkOptions.PRECOMBINE_FIELD.key(), 
infoMap.get("hudi_precombine_field"));
   > 
   >     options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
   >     options.put(FlinkOptions.INDEX_TYPE.key(), 
HoodieIndex.IndexType.BUCKET.name());
   >    options.put(FlinkOptions.WRITE_TASKS.key(), writeTasks);
   > 
   >    //bucket assigner
   >     options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(), bucketAssignTasks);
   >     options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), 
infoMap.get("hudi_bucket_index_num_buckets"));
   >     options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), 
infoMap.get("hudi_bucket_index_engine_type"));
   > 
   >     //COMPACTION
   >     options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), 
infoMap.get("hudi_compaction_trigger_strategy"));
   >     options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), 
infoMap.get("hudi_compaction_delta_commits"));
   >     options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), 
infoMap.get("hudi_compaction_delta_seconds"));
   > 
   >    options.put(FlinkOptions.PARTITION_PATH_FIELD.key(),  
"_flink_cdc_table");
   > 
   >     //HIVE_SYNC
   >     options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), 
infoMap.get("hudi_hive_sync_enabled"));
   >     options.put(FlinkOptions.HIVE_SYNC_MODE.key(), 
infoMap.get("hudi_hive_sync_mode"));
   >     options.put(FlinkOptions.HIVE_SYNC_DB.key(), 
infoMap.get("hudi_hive_sync_db"));
   >     options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), 
infoMap.get("hudi_hive_sync_table"));
   >     options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(),  
"_flink_cdc_table");
   >     options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
   >     options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), 
connectInfo.get("hive_metastore_url"));
   >     options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), 
connectInfo.get("conn_url"));
   >     options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
   >     options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");
   > 
   >    
   >    **options.put(FlinkOptions.WRITE_CLIENT_ID.key(),  
String.valueOf(System.currentTimeMillis()));**
   > 
   >     builder.options(options);
   >     return builder;
   > }
   > ```
   > 
   > 
   > **Expected behavior**
   > 
   > After setting the parameter 
options.put(FlinkOptions.WRITE_CLIENT_ID.key(), 
String.valueOf(System.currentTimeMillis()));, ensure that two Flink programs 
can write to different partitions of the same Hudi table properly.
   > 
   > **Environment Description**
   > 
   > * Hudi version : 0.14.0
   > * Flink version : 1.15.2
   > * Hive version : 3.x
   > * Hadoop version : 3.x
   > * Storage (HDFS/S3/GCS..) : HDFS
   > * Running on Docker? (yes/no) : no
   > 
   > **Additional context**
   > 
   > Add any other context about the problem here.
   > 
   > **Stacktrace**
   > 
   > `Add the stacktrace of the error.`
   
![image](https://github.com/apache/hudi/assets/54655412/56db6fe9-1d6f-4b44-935a-39aae875cfd4)
   Using two Flink programs to write to different partitions of the same table 
in Hudi, only one task will consistently encounter an error.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to