Toroidals opened a new issue, #10608: URL: https://github.com/apache/hudi/issues/10608
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? - Join the mailing list to engage in conversations and get faster support at dev-subscr...@hudi.apache.org. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** When using two Flink programs to write to different partitions of the same table in Hudi, And the parameter has been set as: options.put(FlinkOptions.WRITE_CLIENT_ID.key(), String.valueOf(System.currentTimeMillis())); the following error occurred: ![image](https://github.com/apache/hudi/assets/54655412/85b54504-073b-4580-890b-523b24948cac) 2024-02-02 17:21:12 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:101) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:322) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:574) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'consistent_bucket_write: default_database.hudi_rbs_rbscmfprd_cmf_wf_operation_log_cdc_qy_test' (operator ab5eb0c735d351ddaa2e080f1564920d). at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:196) at org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142) at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [commits the instant 20240202171450091] error ... 6 more Caused by: java.lang.IllegalArgumentException at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:33) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:616) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:597) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:223) at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:286) at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:236) at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:112) at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:75) at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:201) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doCommit(StreamWriteOperatorCoordinator.java:564) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:540) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$notifyCheckpointComplete$2(StreamWriteOperatorCoordinator.java:258) at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ... 3 more **To Reproduce** public static HoodiePipeline.Builder getHoodieBuilder(HashMap<String, String> infoMap, HashMap<String, String> connectInfo) { HoodiePipeline.Builder builder = HoodiePipeline.builder(infoMap.get("hudi_table_name")); String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT); ArrayList<ArrayList<String>> fieldList = JSON.parseObject(hudiFieldMap, new TypeReference<ArrayList<ArrayList<String>>>() { }); for (ArrayList<String> columnList : fieldList) { builder.column("`" + columnList.get(0) + "` " + columnList.get(1)); } builder.column("_flink_cdc_connector string"); builder.column("_flink_cdc_db string"); builder.column("_flink_cdc_table string"); builder.column("_flink_cdc_op string"); builder.column("_flink_cdc_ts_ms timestamp"); builder.pk(infoMap.get("hudi_primary_key")); Map<String, String> options = new HashMap<>(); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.DATABASE_NAME.key(), infoMap.get("hudi_database_name")); options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name")); options.put(FlinkOptions.PRE_COMBINE.key(), "true"); options.put(FlinkOptions.PRECOMBINE_FIELD.key(), infoMap.get("hudi_precombine_field")); options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true"); options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name()); options.put(FlinkOptions.WRITE_TASKS.key(), writeTasks); //bucket assigner options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(), bucketAssignTasks); options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), infoMap.get("hudi_bucket_index_num_buckets")); options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), infoMap.get("hudi_bucket_index_engine_type")); //COMPACTION options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), infoMap.get("hudi_compaction_trigger_strategy")); options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), infoMap.get("hudi_compaction_delta_commits")); options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), infoMap.get("hudi_compaction_delta_seconds")); options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "_flink_cdc_table"); //HIVE_SYNC options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), infoMap.get("hudi_hive_sync_enabled")); options.put(FlinkOptions.HIVE_SYNC_MODE.key(), infoMap.get("hudi_hive_sync_mode")); options.put(FlinkOptions.HIVE_SYNC_DB.key(), infoMap.get("hudi_hive_sync_db")); options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), infoMap.get("hudi_hive_sync_table")); options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "_flink_cdc_table"); options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf"); options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), connectInfo.get("hive_metastore_url")); options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), connectInfo.get("conn_url")); options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true"); options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true"); **options.put(FlinkOptions.WRITE_CLIENT_ID.key(), String.valueOf(System.currentTimeMillis()));** builder.options(options); return builder; } 1. 2. 3. 4. **Expected behavior** After setting the parameter options.put(FlinkOptions.WRITE_CLIENT_ID.key(), String.valueOf(System.currentTimeMillis()));, ensure that two Flink programs can write to different partitions of the same Hudi table properly. **Environment Description** * Hudi version : 0.14.0 * Flink version : 1.15.2 * Hive version : 3.x * Hadoop version : 3.x * Storage (HDFS/S3/GCS..) : HDFS * Running on Docker? (yes/no) : no **Additional context** Add any other context about the problem here. **Stacktrace** ```Add the stacktrace of the error.``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org