kkrugler opened a new issue, #8148: URL: https://github.com/apache/hudi/issues/8148
**Describe the problem you faced** When running a Flink workflow that writes to a Hudi table, metaspace is leaked whenever the job restarts from a checkpoint. Additionally, if a persistent (not session or application mode) cluster is being used, running a job twice triggers a `ClassCastException`. **To Reproduce** Steps to reproduce the behavior: 1. Start up a local Flink cluster on your dev machine. 2. Run a Flink job that writes to a Hudi table. 3. Cancel the job 4. Run the same job again **Expected behavior** It should run the second time without an exception. **Environment Description** * Hudi version : 0.12.2 * Flink version : 1.15.1 **Additional context** After dumping stack traces for the Task Manager (using the Flink web UI), and examining heap dumps with YourKit, there are two issues both stemming from Hudi's use of the Hadoop File System: First, Hadoop fires up a daemon thread that is there to clean up weak references to file system statistics. This thread has a reference to one of Flink's ChildFirstClassLoaders. I was able to get rid of this issue by adding code in one of my Flink function's `close()` methods to find & interrupt that thread. Second, that Hadoop sets up a Java shutdown hook, where they register a `ShutdownHookManager` lambda thread. This has its own list of hooks, of which there’s only one that I see. It’s the Hadoop `FileSystem$Cache$ClientFinalizer`, which will finalize (remove) Hadoop `FileSystem$Cache` entries. There are two of these (when I run locally), for LocalFileSystem and HoodieWrapperFileSystem. I believe the problem here is two-fold. First, this hook never gets called, because the JVM isn’t terminating when a job is being restarted. So we leak any classes being held by the classloader that’s used by this hook. And second, this hook is being set up with a ChildFirstClassLoader context. Every time Flink runs a job, we get a new one of these (versus just having the one System classloader), and that’s why I see the error about the HoodieWrapperFileSystem class not being able to be cast to the same thing, but in a different ChildFirstClassLoader classloader. It’s also interesting that when I dump the JVM shutdown hooks in my operator’s close() method, the Hadoop hook is the only one that’s not in the “main” thread group. It’s in the “Flink Task Threads” group: 2023-03-06 12:46:11,299 WARN org.krugler.functions.ConvertToRowData [] - Shutdown hook Thread[Thread-8,5,Flink Task Threads] of class class org.apache.hadoop.util.ShutdownHookManager$1 **Stacktrace** ``` 2022-12-16 14:01:15 java.io.IOException: java.io.IOException: Exception happened when bulk insert. at org.apache.hudi.sink.bulk.BulkInsertWriterHelper.write(BulkInsertWriterHelper.java:118) at org.apache.hudi.sink.bulk.BulkInsertWriteFunction.processElement(BulkInsertWriteFunction.java:124) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.bloomberg.idp.enrichment.functions.AddIpBasedEnrichments.processElement(AddIpBasedEnrichments.java:248) at com.bloomberg.idp.enrichment.functions.AddIpBasedEnrichments.processElement(AddIpBasedEnrichments.java:42) at org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.processElement1(CoBroadcastWithNonKeyedOperator.java:110) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.io.IOException: Exception happened when bulk insert. at org.apache.hudi.sink.bulk.BulkInsertWriterHelper.write(BulkInsertWriterHelper.java:116) ... 39 more Caused by: java.lang.ClassCastException: class org.apache.hudi.common.fs.HoodieWrapperFileSystem cannot be cast to class org.apache.hudi.common.fs.HoodieWrapperFileSystem (org.apache.hudi.common.fs.HoodieWrapperFileSystem is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @22441eda; org.apache.hudi.common.fs.HoodieWrapperFileSystem is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @c949447) at org.apache.hudi.io.storage.row.HoodieRowDataParquetWriter.<init>(HoodieRowDataParquetWriter.java:51) at org.apache.hudi.io.storage.row.HoodieRowDataFileWriterFactory.newParquetInternalRowFileWriter(HoodieRowDataFileWriterFactory.java:79) at org.apache.hudi.io.storage.row.HoodieRowDataFileWriterFactory.getRowDataFileWriter(HoodieRowDataFileWriterFactory.java:55) at org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle.createNewFileWriter(HoodieRowDataCreateHandle.java:211) at org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle.<init>(HoodieRowDataCreateHandle.java:103) at org.apache.hudi.sink.bulk.BulkInsertWriterHelper.getRowCreateHandle(BulkInsertWriterHelper.java:133) at org.apache.hudi.sink.bulk.BulkInsertWriterHelper.write(BulkInsertWriterHelper.java:111) ... 39 more ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org