JingsongLi opened a new pull request #17792:
URL: https://github.com/apache/flink/pull/17792


   ## What is the purpose of the change
   
   Exceptions in `ParquetFileSystemITCase.testLimitableBulkFormat`
   ```
   2021-11-03T22:10:11.5215786Z Nov 03 22:10:11 Caused by: 
java.lang.IllegalStateException: Trying to access closed classloader. Please 
check if you store classloaders directly or indirectly in static fields. If the 
stacktrace suggests that the leak occurs in a third party library and cannot be 
fixed immediately, you can disable this check with the configuration 
'classloader.check-leaked-classloader'.
   2021-11-03T22:10:11.5217523Z Nov 03 22:10:11         at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
   2021-11-03T22:10:11.5218577Z Nov 03 22:10:11         at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183)
   2021-11-03T22:10:11.5219513Z Nov 03 22:10:11         at 
org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2780)
   2021-11-03T22:10:11.5220068Z Nov 03 22:10:11         at 
org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3036)
   2021-11-03T22:10:11.5220721Z Nov 03 22:10:11         at 
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2995)
   2021-11-03T22:10:11.5221505Z Nov 03 22:10:11         at 
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2968)
   2021-11-03T22:10:11.5222138Z Nov 03 22:10:11         at 
org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2848)
   2021-11-03T22:10:11.5222733Z Nov 03 22:10:11         at 
org.apache.hadoop.conf.Configuration.get(Configuration.java:1200)
   2021-11-03T22:10:11.5223230Z Nov 03 22:10:11         at 
org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1254)
   2021-11-03T22:10:11.5223707Z Nov 03 22:10:11         at 
org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1479)
   2021-11-03T22:10:11.5224231Z Nov 03 22:10:11         at 
org.apache.parquet.hadoop.codec.SnappyCodec.createInputStream(SnappyCodec.java:75)
   2021-11-03T22:10:11.5224772Z Nov 03 22:10:11         at 
org.apache.parquet.hadoop.CodecFactory$HeapBytesDecompressor.decompress(CodecFactory.java:109)
   2021-11-03T22:10:11.5225418Z Nov 03 22:10:11         at 
org.apache.parquet.hadoop.ColumnChunkPageReadStore$ColumnChunkPageReader.readDictionaryPage(ColumnChunkPageReadStore.java:196)
   2021-11-03T22:10:11.5226286Z Nov 03 22:10:11         at 
org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.<init>(AbstractColumnReader.java:110)
   2021-11-03T22:10:11.5226876Z Nov 03 22:10:11         at 
org.apache.flink.formats.parquet.vector.reader.IntColumnReader.<init>(IntColumnReader.java:33)
   2021-11-03T22:10:11.5227492Z Nov 03 22:10:11         at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createColumnReader(ParquetSplitReaderUtil.java:280)
   2021-11-03T22:10:11.5228185Z Nov 03 22:10:11         at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readNextRowGroup(ParquetVectorizedInputFormat.java:412)
   2021-11-03T22:10:11.5228961Z Nov 03 22:10:11         at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:381)
   2021-11-03T22:10:11.5229660Z Nov 03 22:10:11         at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:358)
   2021-11-03T22:10:11.5230333Z Nov 03 22:10:11         at 
org.apache.flink.table.filesystem.LimitableBulkFormat$LimitableReader.readBatch(LimitableBulkFormat.java:108)
   2021-11-03T22:10:11.5230939Z Nov 03 22:10:11         at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
   2021-11-03T22:10:11.5231515Z Nov 03 22:10:11         at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
   2021-11-03T22:10:11.5232095Z Nov 03 22:10:11         at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
   2021-11-03T22:10:11.5232614Z Nov 03 22:10:11         ... 6 more
   ```
   
   The reason is `globalNumberRead` in the `LimitableBulkFormat`. A 
`LimitableIterator` is over, so it return null. But `LimitableReader` dose not 
know, the fetcher still read records from hadoop files, but the classloader is 
closed.
   They are in the different threads.
   
   Therefore, when the iterator thinks it is over, even if the reader 
encounters an exception, we should exit normally.
   
   ## Brief change log
   
   Swallow exceptions in `LimitableReader.readBatch`.
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   `LimitableBulkFormatTest.testSwallowExceptionWhenLimited`.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to