Toroidals opened a new issue, #10608:
URL: https://github.com/apache/hudi/issues/10608

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at 
dev-subscr...@hudi.apache.org.
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   When using two Flink programs to write to different partitions of the same 
table in Hudi, And the parameter has been set as: 
options.put(FlinkOptions.WRITE_CLIENT_ID.key(), 
String.valueOf(System.currentTimeMillis())); 
   the following error occurred:
   
![image](https://github.com/apache/hudi/assets/54655412/85b54504-073b-4580-890b-523b24948cac)
   
   2024-02-02 17:21:12
   org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
        at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:101)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:322)
        at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:574)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443)
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
        at akka.actor.ActorCell.invoke(ActorCell.scala:548)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
   Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'consistent_bucket_write: 
default_database.hudi_rbs_rbscmfprd_cmf_wf_operation_log_cdc_qy_test' (operator 
ab5eb0c735d351ddaa2e080f1564920d).
        at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
        at 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:196)
        at 
org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142)
        at 
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.hudi.exception.HoodieException: Executor executes 
action [commits the instant 20240202171450091] error
        ... 6 more
   Caused by: java.lang.IllegalArgumentException
        at 
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:33)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:616)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:597)
        at 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:223)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:286)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:236)
        at 
org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:112)
        at 
org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:75)
        at 
org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:201)
        at 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.doCommit(StreamWriteOperatorCoordinator.java:564)
        at 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:540)
        at 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$notifyCheckpointComplete$2(StreamWriteOperatorCoordinator.java:258)
        at 
org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
        ... 3 more
   
   
   **To Reproduce**
   
   public static HoodiePipeline.Builder getHoodieBuilder(HashMap<String, 
String> infoMap, HashMap<String, String> connectInfo) {
   
           HoodiePipeline.Builder builder = 
HoodiePipeline.builder(infoMap.get("hudi_table_name"));
   
           String hudiFieldMap = 
infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT);
           ArrayList<ArrayList<String>> fieldList = 
JSON.parseObject(hudiFieldMap, new 
TypeReference<ArrayList<ArrayList<String>>>() {
           });
           for (ArrayList<String> columnList : fieldList) {
               builder.column("`" + columnList.get(0) + "` " + 
columnList.get(1));
           }
           builder.column("_flink_cdc_connector string");
           builder.column("_flink_cdc_db string");
           builder.column("_flink_cdc_table string");
           builder.column("_flink_cdc_op string");
           builder.column("_flink_cdc_ts_ms timestamp");
   
           builder.pk(infoMap.get("hudi_primary_key"));
   
           Map<String, String> options = new HashMap<>();
           options.put(FlinkOptions.TABLE_TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name());
   
           options.put(FlinkOptions.DATABASE_NAME.key(), 
infoMap.get("hudi_database_name"));
           options.put(FlinkOptions.TABLE_NAME.key(), 
infoMap.get("hudi_table_name"));
   
           options.put(FlinkOptions.PRE_COMBINE.key(), "true");
           options.put(FlinkOptions.PRECOMBINE_FIELD.key(), 
infoMap.get("hudi_precombine_field"));
   
           options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
           options.put(FlinkOptions.INDEX_TYPE.key(), 
HoodieIndex.IndexType.BUCKET.name());
                options.put(FlinkOptions.WRITE_TASKS.key(), writeTasks);
   
                //bucket assigner
           options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(), 
bucketAssignTasks);
           options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), 
infoMap.get("hudi_bucket_index_num_buckets"));
           options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), 
infoMap.get("hudi_bucket_index_engine_type"));
   
           //COMPACTION
           options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), 
infoMap.get("hudi_compaction_trigger_strategy"));
           options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), 
infoMap.get("hudi_compaction_delta_commits"));
           options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), 
infoMap.get("hudi_compaction_delta_seconds"));
   
                options.put(FlinkOptions.PARTITION_PATH_FIELD.key(),  
"_flink_cdc_table");
   
           //HIVE_SYNC
           options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), 
infoMap.get("hudi_hive_sync_enabled"));
           options.put(FlinkOptions.HIVE_SYNC_MODE.key(), 
infoMap.get("hudi_hive_sync_mode"));
           options.put(FlinkOptions.HIVE_SYNC_DB.key(), 
infoMap.get("hudi_hive_sync_db"));
           options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), 
infoMap.get("hudi_hive_sync_table"));
           options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(),  
"_flink_cdc_table");
           options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf");
           options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), 
connectInfo.get("hive_metastore_url"));
           options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), 
connectInfo.get("conn_url"));
           options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true");
           options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true");
   
                
                **options.put(FlinkOptions.WRITE_CLIENT_ID.key(),  
String.valueOf(System.currentTimeMillis()));**
   
           builder.options(options);
           return builder;
       }
        
   
   1.
   2.
   3.
   4.
   
   **Expected behavior**
   
   After setting the parameter options.put(FlinkOptions.WRITE_CLIENT_ID.key(), 
String.valueOf(System.currentTimeMillis()));, ensure that two Flink programs 
can write to different partitions of the same Hudi table properly.
   
   **Environment Description**
   
   * Hudi version : 0.14.0
   
   * Flink version : 1.15.2
   
   * Hive version : 3.x
   
   * Hadoop version : 3.x
   
   * Storage (HDFS/S3/GCS..) : HDFS
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to