[ 
https://issues.apache.org/jira/browse/FLINK-32241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17728393#comment-17728393
 ] 

Steve Loughran commented on FLINK-32241:
----------------------------------------

this is odd as the buffer to disk code is derived from what's been in the s3a 
connector for years and it never blew up.

on a host without the abfs change, can you grab the cloudstore module and run 
its storediag command to get a view of the world, including probes of temp dirs 
... look for the output under "Output Buffering" to see

https://github.com/steveloughran/cloudstore


> UnsupportedFileSystemException when using the ABFS Hadoop driver for 
> checkpointing in Flink 1.17
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32241
>                 URL: https://issues.apache.org/jira/browse/FLINK-32241
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.17.0, 1.17.1
>         Environment: Flink 1.17.1
> Hadoop 3.3.4
> Flink Operator 1.4.0
> Kubernetes 1.24
>            Reporter: Anton Ippolitov
>            Priority: Minor
>
> https://issues.apache.org/jira/browse/HADOOP-18707 introduced a new 
> functionality in the ABFS Hadoop client which buffers data on local disk by 
> default.
> It looks like this breaks with Flink 1.17 in a scenario where:
>  * ABFS is used for checkpointing
>  * JobManager HA is enabled
>  * First JobManager leader dies and a stand-by JobManager takes over
> I can reliably reproduce this with Flink 1.17.1 running on Kubernetes by 
> simply killing the JM leader pod. Once the stand-by JobManager takes over, 
> all checkpoints consistently fail with the following error:
> {noformat}
> org.apache.flink.runtime.checkpoint.CheckpointException: Failure to finalize 
> checkpoint. at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1424)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1310)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1202)
>  at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
>  at 
> org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:829) Caused by: 
> org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme 
> "file" at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443) at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466) at 
> org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174) at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574) at 
> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521) at 
> org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540) at 
> org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:496) at 
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:316)
>  at 
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:393)
>  at 
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
>  at 
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
>  at 
> org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.createTmpFileForWrite(DataBlocks.java:980)
>  at 
> org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.create(DataBlocks.java:960)
>  at 
> org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.createBlockIfNeeded(AbfsOutputStream.java:262)
>  at 
> org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.<init>(AbfsOutputStream.java:173)
>  at 
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.createFile(AzureBlobFileSystemStore.java:580)
>  at 
> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.create(AzureBlobFileSystem.java:301)
>  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195) at 
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175) at 
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064) at 
> org.apache.flink.fs.azure.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:154)
>  at 
> org.apache.flink.fs.azure.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:37)
>  at 
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
>  at 
> org.apache.flink.runtime.state.filesystem.FSDataOutputStreamWrapper.<init>(FSDataOutputStreamWrapper.java:42)
>  at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:179)
>  at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.<init>(FsCheckpointMetadataOutputStream.java:64)
>  at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.createMetadataOutputStream(FsCheckpointStorageLocation.java:116)
>  at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:330)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1410)
>  ... 7 common frames omitted{noformat}
> I am not exactly sure why this works with the first JM and why all of a 
> sudden the local Hadoop FS implementation cannot be loaded on a stand-by 
> JM... I am running with default settings when it comes to Hadoop FS etc.
> Anyway, I found a workaround thanks to 
> https://issues.apache.org/jira/browse/SPARK-43188: simply setting 
> {{fs.azure.data.blocks.buffer}} to {{array}} seems to solve the issue. So 
> this is minor priority.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to