[ 
https://issues.apache.org/jira/browse/HUDI-7763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-7763:
---------------------------------
    Labels: metrics pull-request-available  (was: metrics)

> Fix that jmx reporter cannot initialized if metadata enables
> ------------------------------------------------------------
>
>                 Key: HUDI-7763
>                 URL: https://issues.apache.org/jira/browse/HUDI-7763
>             Project: Apache Hudi
>          Issue Type: Bug
>         Environment: hudi0.14.1, Spark3.2
>            Reporter: Jihwan Lee
>            Priority: Major
>              Labels: metrics, pull-request-available
>
> If the jmx metric option is activated, port settings can be set to range.
>  
> Because metadata is also written as hoodie table, requires multiple metric 
> instances. (If not, occur exception 'ObjID already in use')
> JmxReporterServer can only use one port each.
> So, jmx server might be able to be initialized on multiple ports.
>  
> error log:
>  ( jmx reporter for metadata is initialized first, then reporter for data 
> occurs exception )
> {code:java}
> 24/05/13 20:28:27 INFO table.HoodieTableMetaClient: Loading 
> HoodieTableMetaClient from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:27 INFO table.HoodieTableConfig: Loading table properties from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/hoodie.properties
> 24/05/13 20:28:27 INFO table.HoodieTableMetaClient: Finished Loading Table of 
> type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:27 INFO table.HoodieTableMetaClient: Loading 
> HoodieTableMetaClient from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:27 INFO table.HoodieTableConfig: Loading table properties from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/hoodie.properties
> 24/05/13 20:28:27 INFO table.HoodieTableMetaClient: Finished Loading Table of 
> type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:27 INFO timeline.HoodieActiveTimeline: Loaded instants upto : 
> Option{val=[==>20240513195519782__deltacommit__REQUESTED__20240513195521160]}
> 24/05/13 20:28:28 INFO config.HoodieWriteConfig: Automatically set 
> hoodie.cleaner.policy.failed.writes=LAZY since optimistic concurrency control 
> is used
> 24/05/13 20:28:28 INFO metrics.JmxMetricsReporter: Started JMX server on port 
> 9889.
> 24/05/13 20:28:28 INFO metrics.JmxMetricsReporter: Configured JMXReporter 
> with {port:9889}
> 24/05/13 20:28:28 INFO embedded.EmbeddedTimelineService: Overriding hostIp to 
> (feeder-affiliate-book-svc-sink-09c3c08f71b47a5d-driver-svc.csp.svc) found in 
> spark-conf. It was null
> 24/05/13 20:28:28 INFO view.FileSystemViewManager: Creating View Manager with 
> storage type :MEMORY
> 24/05/13 20:28:28 INFO view.FileSystemViewManager: Creating in-memory based 
> Table View
> 24/05/13 20:28:28 INFO util.log: Logging initialized @53678ms to 
> org.apache.hudi.org.apache.jetty.util.log.Slf4jLog
> 24/05/13 20:28:28 INFO javalin.Javalin:
>        __                      __ _            __ __
>       / /____ _ _   __ ____ _ / /(_)____      / // /
>  __  / // __ `/| | / // __ `// // // __ \    / // /_
> / /_/ // /_/ / | |/ // /_/ // // // / / /   /__  __/
> \____/ \__,_/  |___/ \__,_//_//_//_/ /_/      /_/
>           https://javalin.io/documentation
> 24/05/13 20:28:28 INFO javalin.Javalin: Starting Javalin ...
> 24/05/13 20:28:28 INFO javalin.Javalin: You are running Javalin 4.6.7 
> (released October 24, 2022. Your Javalin version is 567 days old. Consider 
> checking for a newer version.).
> 24/05/13 20:28:28 INFO server.Server: jetty-9.4.48.v20220622; built: 
> 2022-06-21T20:42:25.880Z; git: 6b67c5719d1f4371b33655ff2d047d24e171e49a; jvm 
> 11.0.20.1+1
> 24/05/13 20:28:28 INFO server.Server: Started @54065ms
> 24/05/13 20:28:28 INFO javalin.Javalin: Listening on http://localhost:35071/
> 24/05/13 20:28:28 INFO javalin.Javalin: Javalin started in 177ms \o/
> 24/05/13 20:28:28 INFO service.TimelineService: Starting Timeline server on 
> port :35071
> 24/05/13 20:28:28 INFO embedded.EmbeddedTimelineService: Started embedded 
> timeline server at 
> feeder-affiliate-book-svc-sink-09c3c08f71b47a5d-driver-svc.csp.svc:35071
> 24/05/13 20:28:28 INFO client.BaseHoodieClient: Timeline Server already 
> running. Not restarting the service
> 24/05/13 20:28:28 INFO hudi.HoodieSparkSqlWriterInternal: 
> Config.inlineCompactionEnabled ? true
> 24/05/13 20:28:28 INFO hudi.HoodieSparkSqlWriterInternal: 
> Config.asyncClusteringEnabled ? false
> 24/05/13 20:28:28 INFO codegen.CodeGenerator: Code generated in 77.42891 ms
> 24/05/13 20:28:28 INFO table.HoodieTableMetaClient: Loading 
> HoodieTableMetaClient from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:28 INFO table.HoodieTableConfig: Loading table properties from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/hoodie.properties
> 24/05/13 20:28:28 INFO table.HoodieTableMetaClient: Finished Loading Table of 
> type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:28 INFO table.HoodieTableMetaClient: Loading Active commit 
> timeline for /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:28 INFO timeline.HoodieActiveTimeline: Loaded instants upto : 
> Option{val=[==>20240513195519782__deltacommit__REQUESTED__20240513195521160]}
> 24/05/13 20:28:28 INFO client.BaseHoodieWriteClient: Generate a new instant 
> time: 20240513202827651 action: deltacommit
> 24/05/13 20:28:28 INFO heartbeat.HoodieHeartbeatClient: Received request to 
> start heartbeat for instant time 20240513202827651
> 24/05/13 20:28:28 INFO timeline.HoodieActiveTimeline: Creating a new instant 
> [==>20240513202827651__deltacommit__REQUESTED]
> 24/05/13 20:28:28 INFO table.HoodieTableMetaClient: Loading 
> HoodieTableMetaClient from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:28 INFO table.HoodieTableConfig: Loading table properties from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/hoodie.properties
> 24/05/13 20:28:28 INFO table.HoodieTableMetaClient: Finished Loading Table of 
> type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:28 INFO table.HoodieTableMetaClient: Loading Active commit 
> timeline for /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:28 INFO timeline.HoodieActiveTimeline: Loaded instants upto : 
> Option{val=[==>20240513202827651__deltacommit__REQUESTED__20240513202828906]}
> 24/05/13 20:28:28 INFO transaction.TransactionManager: Transaction starting 
> for Option{val=[==>20240513202827651__deltacommit__INFLIGHT]} with latest 
> completed transaction instant Optional.empty
> 24/05/13 20:28:28 INFO lock.LockManager: LockProvider 
> org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider
> 24/05/13 20:28:29 INFO transaction.TransactionManager: Transaction started 
> for Option{val=[==>20240513202827651__deltacommit__INFLIGHT]} with latest 
> completed transaction instant Optional.empty
> 24/05/13 20:28:29 INFO table.HoodieTableMetaClient: Loading 
> HoodieTableMetaClient from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:29 INFO table.HoodieTableConfig: Loading table properties from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/hoodie.properties
> 24/05/13 20:28:29 INFO table.HoodieTableMetaClient: Finished Loading Table of 
> type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:29 INFO table.HoodieTableMetaClient: Loading 
> HoodieTableMetaClient from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/metadata
> 24/05/13 20:28:29 INFO table.HoodieTableConfig: Loading table properties from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/metadata/.hoodie/hoodie.properties
> 24/05/13 20:28:29 INFO table.HoodieTableMetaClient: Finished Loading Table of 
> type MERGE_ON_READ(version=1, baseFileFormat=HFILE) from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/metadata
> 24/05/13 20:28:29 INFO timeline.HoodieActiveTimeline: Loaded instants upto : 
> Option{val=[20240513182607590__deltacommit__COMPLETED__20240513182612582]}
> 24/05/13 20:28:29 INFO table.HoodieTableMetaClient: Loading 
> HoodieTableMetaClient from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:29 INFO table.HoodieTableConfig: Loading table properties from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/hoodie.properties
> 24/05/13 20:28:29 INFO table.HoodieTableMetaClient: Finished Loading Table of 
> type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:29 INFO table.HoodieTableMetaClient: Loading 
> HoodieTableMetaClient from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/metadata
> 24/05/13 20:28:29 INFO table.HoodieTableConfig: Loading table properties from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/metadata/.hoodie/hoodie.properties
> 24/05/13 20:28:29 INFO table.HoodieTableMetaClient: Finished Loading Table of 
> type MERGE_ON_READ(version=1, baseFileFormat=HFILE) from 
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/metadata
> 24/05/13 20:28:29 INFO timeline.HoodieActiveTimeline: Loaded instants upto : 
> Option{val=[20240513182607590__deltacommit__COMPLETED__20240513182612582]}
> 24/05/13 20:28:29 INFO view.AbstractTableFileSystemView: Took 1 ms to read  0 
> instants, 0 replaced file groups
> 24/05/13 20:28:29 INFO util.ClusteringUtils: Found 0 files in pending 
> clustering operations
> 24/05/13 20:28:29 ERROR metrics.JmxMetricsReporter: Jmx initialize failed:
> org.apache.hudi.exception.HoodieException: Jmx service url created 
> service:jmx:rmi://localhost:9889/jndi/rmi://localhost:9889/jmxrmi
>       at 
> org.apache.hudi.metrics.JmxReporterServer.<init>(JmxReporterServer.java:107)
>       at 
> org.apache.hudi.metrics.JmxReporterServer$Builder.build(JmxReporterServer.java:88)
>       at 
> org.apache.hudi.metrics.JmxMetricsReporter.createJmxReport(JmxMetricsReporter.java:104)
>       at 
> org.apache.hudi.metrics.JmxMetricsReporter.<init>(JmxMetricsReporter.java:58)
>       at 
> org.apache.hudi.metrics.MetricsReporterFactory.createReporter(MetricsReporterFactory.java:75)
>       at org.apache.hudi.metrics.Metrics.<init>(Metrics.java:58)
>       at org.apache.hudi.metrics.Metrics.getInstance(Metrics.java:82)
>       at org.apache.hudi.metrics.HoodieMetrics.<init>(HoodieMetrics.java:95)
>       at 
> org.apache.hudi.client.BaseHoodieClient.<init>(BaseHoodieClient.java:96)
>       at 
> org.apache.hudi.client.BaseHoodieWriteClient.<init>(BaseHoodieWriteClient.java:163)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.<init>(SparkRDDWriteClient.java:86)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.<init>(SparkRDDWriteClient.java:75)
>       at 
> org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.initializeWriteClient(SparkHoodieBackedTableMetadataWriter.java:146)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.getWriteClient(HoodieBackedTableMetadataWriter.java:1474)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.performTableServices(HoodieBackedTableMetadataWriter.java:1186)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:290)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.initMetadataTable(SparkRDDWriteClient.java:273)
>       at 
> org.apache.hudi.client.BaseHoodieWriteClient.doInitTable(BaseHoodieWriteClient.java:1257)
>       at 
> org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1297)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:139)
>       at 
> org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224)
>       at 
> org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:504)
>       at 
> org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:502)
>       at 
> org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
>       at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
>       at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
>       at 
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
>       at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>       at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
>       at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
>       at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
>       at 
> org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
>       at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
>       at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
>       at 
> org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115)
>       at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
>       at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
>       at 
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
>       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
>       at 
> com.naver.csp.data.cores.common.spark.datasets.hudi.HudiWriter$DataFrameWriterImplicits.hudi(HudiWriter.scala:35)
>       at 
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.batchAsCdc(KafkaHudiSink.scala:160)
>       at 
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.$anonfun$startCdcStreaming$1(KafkaHudiSink.scala:78)
>       at 
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.$anonfun$startCdcStreaming$1$adapted(KafkaHudiSink.scala:78)
>       at 
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
>       at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
> Caused by: java.rmi.server.ExportException: internal error: ObjID already in 
> use
>       at 
> java.rmi/sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:184)
>       at java.rmi/sun.rmi.transport.Transport.exportObject(Transport.java:106)
>       at 
> java.rmi/sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:254)
>       at 
> java.rmi/sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:412)
>       at java.rmi/sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
>       at 
> java.rmi/sun.rmi.server.UnicastServerRef.exportObject(UnicastServerRef.java:234)
>       at java.rmi/sun.rmi.registry.RegistryImpl.setup(RegistryImpl.java:220)
>       at java.rmi/sun.rmi.registry.RegistryImpl.<init>(RegistryImpl.java:205)
>       at 
> java.rmi/java.rmi.registry.LocateRegistry.createRegistry(LocateRegistry.java:203)
>       at 
> org.apache.hudi.metrics.JmxReporterServer.<init>(JmxReporterServer.java:104)
>       ... 83 more
> 24/05/13 20:28:29 INFO transaction.TransactionManager: Transaction ending 
> with transaction owner 
> Option{val=[==>20240513202827651__deltacommit__INFLIGHT]}
> 24/05/13 20:28:29 INFO lock.LockManager: Released connection created for 
> acquiring lock
> 24/05/13 20:28:29 INFO transaction.TransactionManager: Transaction ended with 
> transaction owner Option{val=[==>20240513202827651__deltacommit__INFLIGHT]}
> 24/05/13 20:28:29 INFO hudi.HoodieSparkSqlWriterInternal: 
> Config.inlineCompactionEnabled ? true
> 24/05/13 20:28:29 INFO hudi.HoodieSparkSqlWriterInternal: 
> Config.asyncClusteringEnabled ? false
> 24/05/13 20:28:29 WARN hudi.HoodieSparkSqlWriterInternal: Closing write client
> 24/05/13 20:28:29 INFO client.BaseHoodieClient: Stopping Timeline service !!
> 24/05/13 20:28:29 INFO embedded.EmbeddedTimelineService: Closing Timeline 
> server
> 24/05/13 20:28:29 INFO service.TimelineService: Closing Timeline Service
> 24/05/13 20:28:29 INFO javalin.Javalin: Stopping Javalin ...
> 24/05/13 20:28:29 INFO javalin.Javalin: Javalin has stopped
> 24/05/13 20:28:29 INFO service.TimelineService: Closed Timeline Service
> 24/05/13 20:28:29 INFO embedded.EmbeddedTimelineService: Closed Timeline 
> server
> 24/05/13 20:28:29 INFO heartbeat.HoodieHeartbeatClient: Stopping heartbeat 
> for instant 20240513202827651
> 24/05/13 20:28:29 INFO heartbeat.HoodieHeartbeatClient: Stopped heartbeat for 
> instant 20240513202827651
> 24/05/13 20:28:29 INFO transaction.TransactionManager: Transaction manager 
> closed
> 24/05/13 20:28:29 INFO transaction.TransactionManager: Transaction manager 
> closed
> 24/05/13 20:28:29 ERROR streaming.MicroBatchExecution: Query [id = 
> 7b545afa-8957-4e93-8c53-438e7996b501, runId = 
> 98e99ae3-61b3-4e76-b80b-bbaff8af971e] terminated with error
> org.apache.hudi.exception.HoodieException: Failed to instantiate Metadata 
> table
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:293)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.initMetadataTable(SparkRDDWriteClient.java:273)
>       at 
> org.apache.hudi.client.BaseHoodieWriteClient.doInitTable(BaseHoodieWriteClient.java:1257)
>       at 
> org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1297)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:139)
>       at 
> org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224)
>       at 
> org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:504)
>       at 
> org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:502)
>       at 
> org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
>       at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
>       at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
>       at 
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
>       at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>       at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
>       at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
>       at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
>       at 
> org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
>       at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
>       at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
>       at 
> org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115)
>       at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
>       at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
>       at 
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
>       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
>       at 
> com.naver.csp.data.cores.common.spark.datasets.hudi.HudiWriter$DataFrameWriterImplicits.hudi(HudiWriter.scala:35)
>       at 
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.batchAsCdc(KafkaHudiSink.scala:160)
>       at 
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.$anonfun$startCdcStreaming$1(KafkaHudiSink.scala:78)
>       at 
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.$anonfun$startCdcStreaming$1$adapted(KafkaHudiSink.scala:78)
>       at 
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
>       at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
> Caused by: org.apache.hudi.exception.HoodieException: Jmx initialize failed:
>       at 
> org.apache.hudi.metrics.JmxMetricsReporter.<init>(JmxMetricsReporter.java:71)
>       at 
> org.apache.hudi.metrics.MetricsReporterFactory.createReporter(MetricsReporterFactory.java:75)
>       at org.apache.hudi.metrics.Metrics.<init>(Metrics.java:58)
>       at org.apache.hudi.metrics.Metrics.getInstance(Metrics.java:82)
>       at org.apache.hudi.metrics.HoodieMetrics.<init>(HoodieMetrics.java:95)
>       at 
> org.apache.hudi.client.BaseHoodieClient.<init>(BaseHoodieClient.java:96)
>       at 
> org.apache.hudi.client.BaseHoodieWriteClient.<init>(BaseHoodieWriteClient.java:163)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.<init>(SparkRDDWriteClient.java:86)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.<init>(SparkRDDWriteClient.java:75)
>       at 
> org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.initializeWriteClient(SparkHoodieBackedTableMetadataWriter.java:146)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.getWriteClient(HoodieBackedTableMetadataWriter.java:1474)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.performTableServices(HoodieBackedTableMetadataWriter.java:1186)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:290)
>       ... 68 more
> Caused by: org.apache.hudi.exception.HoodieException: Jmx service url created 
> service:jmx:rmi://localhost:9889/jndi/rmi://localhost:9889/jmxrmi
>       at 
> org.apache.hudi.metrics.JmxReporterServer.<init>(JmxReporterServer.java:107)
>       at 
> org.apache.hudi.metrics.JmxReporterServer$Builder.build(JmxReporterServer.java:88)
>       at 
> org.apache.hudi.metrics.JmxMetricsReporter.createJmxReport(JmxMetricsReporter.java:104)
>       at 
> org.apache.hudi.metrics.JmxMetricsReporter.<init>(JmxMetricsReporter.java:58)
>       ... 80 more
> Caused by: java.rmi.server.ExportException: internal error: ObjID already in 
> use
>       at 
> java.rmi/sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:184)
>       at java.rmi/sun.rmi.transport.Transport.exportObject(Transport.java:106)
>       at 
> java.rmi/sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:254)
>       at 
> java.rmi/sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:412)
>       at java.rmi/sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
>       at 
> java.rmi/sun.rmi.server.UnicastServerRef.exportObject(UnicastServerRef.java:234)
>       at java.rmi/sun.rmi.registry.RegistryImpl.setup(RegistryImpl.java:220)
>       at java.rmi/sun.rmi.registry.RegistryImpl.<init>(RegistryImpl.java:205)
>       at 
> java.rmi/java.rmi.registry.LocateRegistry.createRegistry(LocateRegistry.java:203)
>       at 
> org.apache.hudi.metrics.JmxReporterServer.<init>(JmxReporterServer.java:104)
>       ... 83 more
> 24/05/13 20:28:29 INFO server.AbstractConnector: Stopped 
> Spark@376498da{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
> 24/05/13 20:28:29 INFO ui.SparkUI: Stopped Spark web UI at 
> http://feeder-affiliate-book-svc-sink-09c3c08f71b47a5d-driver-svc.csp.svc:4040
> 24/05/13 20:28:29 INFO k8s.KubernetesClusterSchedulerBackend: Shutting down 
> all executors
> 24/05/13 20:28:29 INFO 
> k8s.KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each 
> executor to shut down
> 24/05/13 20:28:29 WARN k8s.ExecutorPodsWatchSnapshotSource: Kubernetes client 
> has been closed.
> 24/05/13 20:28:29 ERROR util.Utils: Uncaught exception in thread main
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> GET at: 
> https://kubernetes.default.svc/api/v1/namespaces/csp/persistentvolumeclaims?labelSelector=spark-app-selector%3Dspark-52b31b8f89564fee94b07a763597d89f.
>  Message: Forbidden!Configured service account doesn't have access. Service 
> account may have been revoked. persistentvolumeclaims is forbidden: User 
> "system:serviceaccount:csp:spark" cannot list resource 
> "persistentvolumeclaims" in API group "" in the namespace "csp".
>       at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:639)
>       at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:576)
>       at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:543)
>       at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:504)
>       at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:487)
>       at 
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.listRequestHelper(BaseOperation.java:163)
>       at 
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.list(BaseOperation.java:672)
>       at 
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.deleteList(BaseOperation.java:786)
>       at 
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.delete(BaseOperation.java:704)
>       at 
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.$anonfun$stop$6(KubernetesClusterSchedulerBackend.scala:138)
>       at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1471)
>       at 
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.stop(KubernetesClusterSchedulerBackend.scala:139)
>       at 
> org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:927)
>       at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2567)
>       at 
> org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2086)
>       at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1471)
>       at org.apache.spark.SparkContext.stop(SparkContext.scala:2086)
>       at 
> org.apache.spark.deploy.SparkSubmit.$anonfun$runMain$13(SparkSubmit.scala:963)
>       at 
> org.apache.spark.deploy.SparkSubmit.$anonfun$runMain$13$adapted(SparkSubmit.scala:963)
>       at scala.Option.foreach(Option.scala:407)
>       at 
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:963)
>       at 
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
>       at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
>       at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
>       at 
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
>       at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
>       at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 24/05/13 20:28:29 INFO spark.MapOutputTrackerMasterEndpoint: 
> MapOutputTrackerMasterEndpoint stopped!
> 24/05/13 20:28:29 INFO memory.MemoryStore: MemoryStore cleared
> 24/05/13 20:28:29 INFO storage.BlockManager: BlockManager stopped
> 24/05/13 20:28:29 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
> 24/05/13 20:28:29 INFO 
> scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
> OutputCommitCoordinator stopped!
> 24/05/13 20:28:29 INFO spark.SparkContext: Successfully stopped SparkContext
> Exception in thread "main" 
> org.apache.spark.sql.streaming.StreamingQueryException: Failed to instantiate 
> Metadata table
> === Streaming Query ===
> Identifier: [id = 7b545afa-8957-4e93-8c53-438e7996b501, runId = 
> 98e99ae3-61b3-4e76-b80b-bbaff8af971e]
> Current Committed Offsets: {KafkaV2[Subscribe[CDC_CDC_GSHOPAPPO_BOOK]]: 
> {"CDC_CDC_GSHOPAPPO_BOOK":{"23":4040666,"8":4020522,"17":4016708,"11":4011011,"2":3988721,"20":4040743,"5":3984751,"14":4013617,"4":3989875,"13":4014484,"22":4049439,"7":4009753,"16":4044345,"10":4015180,"1":4014228,"19":4081496,"9":3999227,"18":4046117,"12":3990938,"3":4016564,"21":4039580,"15":4071459,"6":3986614,"24":4051505,"0":3996595}}}
> Current Available Offsets: {KafkaV2[Subscribe[CDC_CDC_GSHOPAPPO_BOOK]]: 
> {"CDC_CDC_GSHOPAPPO_BOOK":{"23":4040667,"8":4020522,"17":4016708,"11":4011011,"2":3988721,"20":4040743,"5":3984751,"14":4013617,"4":3989875,"13":4014484,"22":4049439,"7":4009753,"16":4044345,"10":4015180,"1":4014228,"19":4081496,"9":3999227,"18":4046117,"12":3990938,"3":4016564,"21":4039580,"15":4071459,"6":3986614,"24":4051505,"0":3996595}}}
> Current State: ACTIVE
> Thread State: RUNNABLE
> Logical Plan:
> StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, 
> offset#11L, timestamp#12, timestampType#13], 
> org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@325fe569, 
> KafkaV2[Subscribe[CDC_CDC_GSHOPAPPO_BOOK]]
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:325)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
> Caused by: org.apache.hudi.exception.HoodieException: Failed to instantiate 
> Metadata table
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:293)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.initMetadataTable(SparkRDDWriteClient.java:273)
>       at 
> org.apache.hudi.client.BaseHoodieWriteClient.doInitTable(BaseHoodieWriteClient.java:1257)
>       at 
> org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1297)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:139)
>       at 
> org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224)
>       at 
> org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:504)
>       at 
> org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:502)
>       at 
> org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
>       at 
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
>       at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
>       at 
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
>       at 
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
>       at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>       at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
>       at 
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
>       at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
>       at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
>       at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
>       at 
> org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
>       at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
>       at 
> org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
>       at 
> org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115)
>       at 
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
>       at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
>       at 
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
>       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
>       at 
> com.naver.csp.data.cores.common.spark.datasets.hudi.HudiWriter$DataFrameWriterImplicits.hudi(HudiWriter.scala:35)
>       at 
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.batchAsCdc(KafkaHudiSink.scala:160)
>       at 
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.$anonfun$startCdcStreaming$1(KafkaHudiSink.scala:78)
>       at 
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.$anonfun$startCdcStreaming$1$adapted(KafkaHudiSink.scala:78)
>       at 
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
>       at 
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
>       at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
>       at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
>       at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
>       ... 1 more
> Caused by: org.apache.hudi.exception.HoodieException: Jmx initialize failed:
>       at 
> org.apache.hudi.metrics.JmxMetricsReporter.<init>(JmxMetricsReporter.java:71)
>       at 
> org.apache.hudi.metrics.MetricsReporterFactory.createReporter(MetricsReporterFactory.java:75)
>       at org.apache.hudi.metrics.Metrics.<init>(Metrics.java:58)
>       at org.apache.hudi.metrics.Metrics.getInstance(Metrics.java:82)
>       at org.apache.hudi.metrics.HoodieMetrics.<init>(HoodieMetrics.java:95)
>       at 
> org.apache.hudi.client.BaseHoodieClient.<init>(BaseHoodieClient.java:96)
>       at 
> org.apache.hudi.client.BaseHoodieWriteClient.<init>(BaseHoodieWriteClient.java:163)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.<init>(SparkRDDWriteClient.java:86)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.<init>(SparkRDDWriteClient.java:75)
>       at 
> org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.initializeWriteClient(SparkHoodieBackedTableMetadataWriter.java:146)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.getWriteClient(HoodieBackedTableMetadataWriter.java:1474)
>       at 
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.performTableServices(HoodieBackedTableMetadataWriter.java:1186)
>       at 
> org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:290)
>       ... 68 more
> Caused by: org.apache.hudi.exception.HoodieException: Jmx service url created 
> service:jmx:rmi://localhost:9889/jndi/rmi://localhost:9889/jmxrmi
>       at 
> org.apache.hudi.metrics.JmxReporterServer.<init>(JmxReporterServer.java:107)
>       at 
> org.apache.hudi.metrics.JmxReporterServer$Builder.build(JmxReporterServer.java:88)
>       at 
> org.apache.hudi.metrics.JmxMetricsReporter.createJmxReport(JmxMetricsReporter.java:104)
>       at 
> org.apache.hudi.metrics.JmxMetricsReporter.<init>(JmxMetricsReporter.java:58)
>       ... 80 more
> Caused by: java.rmi.server.ExportException: internal error: ObjID already in 
> use
>       at 
> java.rmi/sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:184)
>       at java.rmi/sun.rmi.transport.Transport.exportObject(Transport.java:106)
>       at 
> java.rmi/sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:254)
>       at 
> java.rmi/sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:412)
>       at java.rmi/sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
>       at 
> java.rmi/sun.rmi.server.UnicastServerRef.exportObject(UnicastServerRef.java:234)
>       at java.rmi/sun.rmi.registry.RegistryImpl.setup(RegistryImpl.java:220)
>       at java.rmi/sun.rmi.registry.RegistryImpl.<init>(RegistryImpl.java:205)
>       at 
> java.rmi/java.rmi.registry.LocateRegistry.createRegistry(LocateRegistry.java:203)
>       at 
> org.apache.hudi.metrics.JmxReporterServer.<init>(JmxReporterServer.java:104)
>       ... 83 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to