Hi,

I am facing the below exception while running the job from Flink Dashboard and 
from AKS Cluster :
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 'wasb'. The scheme is directly 
supported by Flink through the following plugin: flink-fs-azure-hadoop.

In order to connect to the Blob storage we have done the below setup:

1.     Put the  flink-azure-fs-hadoop.jar in plugins folder in AKS cluster.

2.     In the yaml file we have put this

fs.azure.account.key.pjmsimt1pfdbsbsa.blob.core.windows.net:    
zSw+bQfhHIZsN0sJHsa8Qz4oJjNhktdwuFWHkfA3tjxgSodnniLfOQ2ZKLmFOHcme8DwWk3s0C8+2r1wzlqNmQ==

3.     We have also tried restarting pods after yaml configurations.

4.     Also, in code, tried writing to blob storage with stream.writeAsText and 
also with StreamingFileSink.forRowFormat but no luck.





I have also attached the code and error for reference. Could you please suggest 
and help to get the connectivity of Flink job with Azure Blob Storage?

Hoping to get your reply soon.



Regards

Simar

Attachment: BlobStorageSink.scala
Description: BlobStorageSink.scala

Apache Flink Dashboard
Overview
Jobs
Running Jobs
Completed Jobs
Task Managers
Job Manager
Submit New Job
Version: 1.13.0 Commit: f06faf1 @ 2021-04-23T15:39:21+02:00  Message: 
0

1

2

3

4

5

6

7

8

9

Job failed during initialization of JobManager
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
        at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
        at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: 
java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
'Source: Collection Source -> Sink: Unnamed': Could not find a file system 
implementation for scheme 'wasb'. The scheme is directly supported by Flink 
through the following plugin: flink-fs-azure-hadoop. Please ensure that each 
plugin resides within its own subfolder within the plugins directory. See 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for 
more information. If you want to use a Hadoop file system for that scheme, 
please add the scheme to the configuration fs.allowed-fallback-filesystems. For 
a full list of supported file systems, please see 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
        ... 7 more
Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
'Source: Collection Source -> Sink: Unnamed': Could not find a file system 
implementation for scheme 'wasb'. The scheme is directly supported by Flink 
through the following plugin: flink-fs-azure-hadoop. Please ensure that each 
plugin resides within its own subfolder within the plugins directory. See 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for 
more information. If you want to use a Hadoop file system for that scheme, 
please add the scheme to the configuration fs.allowed-fallback-filesystems. For 
a full list of supported file systems, please see 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
        at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        ... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot 
initialize task 'Source: Collection Source -> Sink: Unnamed': Could not find a 
file system implementation for scheme 'wasb'. The scheme is directly supported 
by Flink through the following plugin: flink-fs-azure-hadoop. Please ensure 
that each plugin resides within its own subfolder within the plugins directory. 
See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for 
more information. If you want to use a Hadoop file system for that scheme, 
please add the scheme to the configuration fs.allowed-fallback-filesystems. For 
a full list of supported file systems, please see 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
        at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:178)
        at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:107)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190)
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
        at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
        at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317)
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
        ... 8 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 'wasb'. The scheme is directly 
supported by Flink through the following plugin: flink-fs-azure-hadoop. Please 
ensure that each plugin resides within its own subfolder within the plugins 
directory. See 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for 
more information. If you want to use a Hadoop file system for that scheme, 
please add the scheme to the configuration fs.allowed-fallback-filesystems. For 
a full list of supported file systems, please see 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
        at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:513)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:407)
        at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
        at 
org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:288)
        at 
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:110)
        at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:174)
        ... 19 more
Server Response Message List

Reply via email to