I'm trying to use a BucketingSink to write files to S3 in my Flink job.

I have the Hadoop dependencies I need packaged in my user application jar. 
However, on running the job I get the following error (from the taskmanager):

java.lang.RuntimeException: Error while creating FileSystem when initializing 
the state of the BucketingSink.
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could 
not find a file system implementation for scheme 's3a'. The scheme is not 
directly supported by Flink and no Hadoop file system to support this scheme 
could be loaded.
        at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:405)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1125)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
        ... 9 common frames omitted
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
Hadoop is not in the classpath/dependencies.
        at 
org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
        at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
        ... 13 common frames omitted

What's the right way to do this?

Reply via email to