kkrugler opened a new issue, #8148:
URL: https://github.com/apache/hudi/issues/8148

   **Describe the problem you faced**
   
   When running a Flink workflow that writes to a Hudi table, metaspace is 
leaked whenever the job restarts from a checkpoint.
   
   Additionally, if a persistent (not session or application mode) cluster is 
being used, running a job twice triggers a `ClassCastException`.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Start up a local Flink cluster on your dev machine.
   2. Run a Flink job that writes to a Hudi table.
   3. Cancel the job
   4. Run the same job again
   
   **Expected behavior**
   
   It should run the second time without an exception.
   
   **Environment Description**
   
   * Hudi version : 0.12.2
   
   * Flink version : 1.15.1
   
   **Additional context**
   
   After dumping stack traces for the Task Manager (using the Flink web UI), 
and examining heap dumps with YourKit, there are two issues both stemming from 
Hudi's use of the Hadoop File System:
   
   First, Hadoop fires up a daemon thread that is there to clean up weak 
references to file system statistics. This thread has a reference to one of 
Flink's ChildFirstClassLoaders. I was able to get rid of this issue by adding 
code in one of my Flink function's `close()` methods to find & interrupt that 
thread.
   
   Second, that Hadoop sets up a Java shutdown hook, where they register a 
`ShutdownHookManager` lambda thread. This has its own list of hooks, of which 
there’s only one that I see. It’s the Hadoop 
`FileSystem$Cache$ClientFinalizer`, which will finalize (remove) Hadoop 
`FileSystem$Cache` entries. There are two of these (when I run locally), for 
LocalFileSystem and HoodieWrapperFileSystem.
   
   I believe the problem here is two-fold. First, this hook never gets called, 
because the JVM isn’t terminating when a job is being restarted. So we leak any 
classes being held by the classloader that’s used by this hook. And second, 
this hook is being set up with a ChildFirstClassLoader context. Every time 
Flink runs a job, we get a new one of these (versus just having the one System 
classloader), and that’s why I see the error about the HoodieWrapperFileSystem 
class not being able to be cast to the same thing, but in a different 
ChildFirstClassLoader classloader.
   
   It’s also interesting that when I dump the JVM shutdown hooks in my 
operator’s close() method, the Hadoop hook is the only one that’s not in the 
“main” thread group. It’s in the “Flink Task Threads” group:
   2023-03-06 12:46:11,299 WARN org.krugler.functions.ConvertToRowData   [] - 
Shutdown hook Thread[Thread-8,5,Flink Task Threads] of class class 
org.apache.hadoop.util.ShutdownHookManager$1
   
   **Stacktrace**
   
   ```
   2022-12-16 14:01:15
   java.io.IOException: java.io.IOException: Exception happened when bulk 
insert.
       at 
org.apache.hudi.sink.bulk.BulkInsertWriterHelper.write(BulkInsertWriterHelper.java:118)
       at 
org.apache.hudi.sink.bulk.BulkInsertWriteFunction.processElement(BulkInsertWriteFunction.java:124)
       at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
       at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
       at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
       at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
       at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
       at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
       at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
       at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
       at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
       at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
       at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
       at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
       at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
       at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
       at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
       at 
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
       at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
       at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
       at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
       at 
com.bloomberg.idp.enrichment.functions.AddIpBasedEnrichments.processElement(AddIpBasedEnrichments.java:248)
       at 
com.bloomberg.idp.enrichment.functions.AddIpBasedEnrichments.processElement(AddIpBasedEnrichments.java:42)
       at 
org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.processElement1(CoBroadcastWithNonKeyedOperator.java:110)
       at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217)
       at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183)
       at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)
       at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
       at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
       at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
       at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
       at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
       at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
       at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
       at java.base/java.lang.Thread.run(Thread.java:834)
   Caused by: java.io.IOException: Exception happened when bulk insert.
       at 
org.apache.hudi.sink.bulk.BulkInsertWriterHelper.write(BulkInsertWriterHelper.java:116)
       ... 39 more
   Caused by: java.lang.ClassCastException: class 
org.apache.hudi.common.fs.HoodieWrapperFileSystem cannot be cast to class 
org.apache.hudi.common.fs.HoodieWrapperFileSystem 
(org.apache.hudi.common.fs.HoodieWrapperFileSystem is in unnamed module of 
loader org.apache.flink.util.ChildFirstClassLoader @22441eda; 
org.apache.hudi.common.fs.HoodieWrapperFileSystem is in unnamed module of 
loader org.apache.flink.util.ChildFirstClassLoader @c949447)
       at 
org.apache.hudi.io.storage.row.HoodieRowDataParquetWriter.<init>(HoodieRowDataParquetWriter.java:51)
       at 
org.apache.hudi.io.storage.row.HoodieRowDataFileWriterFactory.newParquetInternalRowFileWriter(HoodieRowDataFileWriterFactory.java:79)
       at 
org.apache.hudi.io.storage.row.HoodieRowDataFileWriterFactory.getRowDataFileWriter(HoodieRowDataFileWriterFactory.java:55)
       at 
org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle.createNewFileWriter(HoodieRowDataCreateHandle.java:211)
       at 
org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle.<init>(HoodieRowDataCreateHandle.java:103)
       at 
org.apache.hudi.sink.bulk.BulkInsertWriterHelper.getRowCreateHandle(BulkInsertWriterHelper.java:133)
       at 
org.apache.hudi.sink.bulk.BulkInsertWriterHelper.write(BulkInsertWriterHelper.java:111)
       ... 39 more
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to