Re: [I] [SUPPORT] Executor executes action [commits the instant 20240202161708414] error [hudi]
Toroidals commented on issue #10608: URL: https://github.com/apache/hudi/issues/10608#issuecomment-1926104297 > The checkpoint must be enabled, we need to ensure the exactly semantics. However, enabling checkpoint will result in an error message: Executor execute action [commit the instant 20240202161708414] error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [SUPPORT] Executor executes action [commits the instant 20240202161708414] error [hudi]
Toroidals commented on issue #10608: URL: https://github.com/apache/hudi/issues/10608#issuecomment-1925610466 > Only 1.0 release supports concurrent streaming writers. If the Flink checkpoint is closed, the writing can be done normally, but when the checkpoint is on, there will be an error. The requirement of writing to mor table by Flink is that the checkpoint must be on. public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(TimeUnit.SECONDS.toMillis(Long.parseLong(confInfo.get("checkpoint_interval"))), CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(TimeUnit.SECONDS.toMillis(Long.parseLong(confInfo.get("checkpoint_timeout"; env.getCheckpointConfig().setMaxConcurrentCheckpoints(Integer.parseInt(confInfo.get("checkpoint_max_concurrent"))); env.getCheckpointConfig().setCheckpointStorage(confInfo.get("checkpoint_path")); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.parseInt(confInfo.get("checkpoint_failure_number"))); env.setRestartStrategy(RestartStrategies.noRestart()); EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true); embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM); env.setStateBackend(embeddedRocksDBStateBackend); DataStreamSource dataStreamSource = env.addSource(); HoodiePipeline.Builder builder = HoodiePipeline.builder(infoMap.get("hudi_table_name")); Map options = new HashMap<>(); String hudiFieldMap = infoMap.get("hudi_field_map").toLowerCase(Locale.ROOT); ArrayList> fieldList = JSON.parseObject(hudiFieldMap, new TypeReference>>() { }); for (ArrayList columnList : fieldList) { builder.column("`" + columnList.get(0) + "` " + columnList.get(1)); } builder.column("_flink_cdc_connector string"); builder.column("_flink_cdc_db string"); builder.column("_flink_cdc_table string"); builder.column("_flink_cdc_op string"); builder.column("_flink_cdc_ts_ms timestamp"); builder.pk(infoMap.get("hudi_primary_key")); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.DATABASE_NAME.key(), infoMap.get("hudi_database_name")); options.put(FlinkOptions.TABLE_NAME.key(), infoMap.get("hudi_table_name")); options.put(FlinkOptions.PRE_COMBINE.key(), "true"); options.put(FlinkOptions.PRECOMBINE_FIELD.key(), infoMap.get("hudi_precombine_field")); options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true"); options.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name()); options.put(FlinkOptions.WRITE_TASKS.key(), infoMap.get("hudi_write_tasks")); options.put(FlinkOptions.BUCKET_ASSIGN_TASKS.key(), bucketAssignTasks); options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), infoMap.get("hudi_bucket_index_num_buckets")); options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(), infoMap.get("hudi_bucket_index_engine_type")); options.put(FlinkOptions.COMPACTION_TASKS.key(), compactionTasks); options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), infoMap.get("hudi_compaction_trigger_strategy")); options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), infoMap.get("hudi_compaction_delta_commits")); options.put(FlinkOptions.COMPACTION_DELTA_SECONDS.key(), infoMap.get("hudi_compaction_delta_seconds")); options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), infoMap.get("hudi_hive_sync_enabled")); options.put(FlinkOptions.HIVE_SYNC_MODE.key(), infoMap.get("hudi_hive_sync_mode")); options.put(FlinkOptions.HIVE_SYNC_DB.key(), infoMap.get("hudi_hive_sync_db")); options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), infoMap.get("hudi_hive_sync_table")); options.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "_flink_cdc_table"); options.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "_flink_cdc_table"); options.put(FlinkOptions.HIVE_SYNC_CONF_DIR.key(), "/etc/hive/conf"); options.put(FlinkOptions.HIVE_SYNC_METASTORE_URIS.key(), connectInfo.get("hive_metastore_url")); options.put(FlinkOptions.HIVE_SYNC_JDBC_URL.key(), connectInfo.get("conn_url")); options.put(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP.key(), "true"); options.put(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX.key(), "true"); options.put(Flink
Re: [I] [SUPPORT] Executor executes action [commits the instant 20240202161708414] error [hudi]
Toroidals commented on issue #10608: URL: https://github.com/apache/hudi/issues/10608#issuecomment-1923451819 > **_提交问题前的提示_** > > * 您是否浏览过我们的[常见问题解答](https://hudi.apache.org/learn/faq/)? > * 加入邮件列表以参与对话并在 [dev-subscr...@hudi.apache.org](mailto:dev-subscr...@hudi.apache.org) 获得更快的支持。 > * 如果您已将其作为 bug 进行分类,请直接提交[问题](https://issues.apache.org/jira/projects/HUDI/issues)。 > > **描述您面临的问题** > > 当使用两个 Flink 程序写入 Hudi 中同一张表的不同分区时,参数已设置为:options.put(FlinkOptions.WRITE_CLIENT_ID.key(), String.valueOf(System.currentTimeMillis()));出现以下错误: ![image](https://private-user-images.githubusercontent.com/54655412/301797895-85b54504-073b-4580-890b-523b24948cac.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MDY4Njc0NDYsIm5iZiI6MTcwNjg2NzE0NiwicGF0aCI6Ii81NDY1NTQxMi8zMDE3OTc4OTUtODViNTQ1MDQtMDczYi00NTgwLTg5MGItNTIzYjI0OTQ4Y2FjLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDAyMDIlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwMjAyVDA5NDU0NlomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTUwMTg0YjhiY2I5YjIxMGIyYWU5ODc5ZjFjN2YxZjBjZjRmMmQ1ODZiOTk5OWZhYWM2ODhiZTgyOTE5ZjhjZTkmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lk PTAmcmVwb19pZD0wIn0.pHn3wn0sG2PlSeOWgwITrj5FtoqC52cHuOMgaEr6FTI) > > 2024-02-02 17:21:12 org.apache.flink.runtime.JobException:在 org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) 在 org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:101)在 org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:322) 在 org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:574) 在 org.apache.flink.runtime.rpc.akka.Akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443)在 org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) 在 org.apache.flink.runtime.rpc.akka.Akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443) 在 org.apache.flink.runtime. rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) 在 org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)在 org.apache.flink.runtime.rpc.akka.Akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) 在 akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 在 akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 在 scala。PartialFunction.applyOrElse(PartialFunction.scala:123) 在 scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 在 akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 在 scala。PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 在 scala。PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 在 scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 在 akka.actor.Actor.aroundReceive(Actor.scala:537) 在 akka.actor.Actor.aroundReceive$(Actor.scala:535) � �� akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 在 akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 在 akka.actor.ActorCell.invoke(ActorCell.scala:548) 在 akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 在 akka.dispatch.Mailbox.run(Mailbox.scala:231) 在 akka.dispatch.Mailbox.exec(Mailbox.scala:243) 在 java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 在 java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 在 java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 在 java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 由以下原因引起:org.apache.flink.util.FlinkException:OperatorCoordinator 触发的“consistent_bucket_write:default_database.hudi_rbs_rbscmfprd_cmf_wf_operation_log_cdc_qy_test”(运算符 ab5eb0c735d351ddaa2e080f1564920d)的全局故障。在 org.apache.flink.runtime.operators .coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556) 在 org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:196) 在 org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142) 在 org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748)原因:org.apache.hudi.exception.HoodieException:执行器执行操作[提交即时20240202171450091]错误...另外 6 个原因: java.lang.IllegalArgumentException at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:33) at org.apache.
[I] [SUPPORT] Executor executes action [commits the instant 20240202161708414] error [hudi]
Toroidals opened a new issue, #10608: URL: https://github.com/apache/hudi/issues/10608 **_Tips before filing an issue_** - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? - Join the mailing list to engage in conversations and get faster support at dev-subscr...@hudi.apache.org. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** When using two Flink programs to write to different partitions of the same table in Hudi, And the parameter has been set as: options.put(FlinkOptions.WRITE_CLIENT_ID.key(), String.valueOf(System.currentTimeMillis())); the following error occurred: ![image](https://github.com/apache/hudi/assets/54655412/85b54504-073b-4580-890b-523b24948cac) 2024-02-02 17:21:12 org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:101) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:322) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:574) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'consistent_bucket_write: default_database.hudi_rbs_rbscmfprd_cmf_wf_operation_log_cdc_qy_test' (operator ab5eb0c735d351ddaa2e080f1564920d). at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556) at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:196) at org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142) at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [commits the instant 20240202171450091] error ... 6 more Caused by: java.lang.IllegalArgumentException at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:33) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline