Hi,
thanks for reaching out to the Flink community. The tricky thing here is
that the Google Cloud Storage connector is not supported by Flink's plugin
system as stated in [1]. There is a blog post on how to get started with
Flink on Google's Cloud Platform [2]. In case you haven't seen that one,
yet: There is a subsection "Advanced: Set up access to Google Cloud Storage
for checkpoints and savepoints." describing the old way of adding support
for file systems specifically show-casing the GCP Storage. There you're
asked to copy the connector into Flink's lib/ directory, instead. In
addition to that, you have to add the Hadoop dependencies to the lib/
folder as well. For this, it's advisable to use a bundled Hadoop lib
provided by the Flink community [3] to avoid name clashes on the classpath.

I hope this helps.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#hadoop-file-system-hdfs-and-its-other-implementations
[2]
https://www.ververica.com/blog/getting-started-with-da-platform-on-google-kubernetes-engine
[3] https://flink.apache.org/downloads.html#additional-components

On Fri, Nov 13, 2020 at 2:32 PM orionemail <orionem...@protonmail.com>
wrote:

> Hi,
>
> I am running flink 1.10.1 initially on my local development machine -
> Macbook Pro.  I'm struggling to understand how to write to Google Cloud
> storage using the StreamingfileSink  (S3 works fine).
>
> There error I am seeing:
>
> "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'gs'. The scheme is not
> directly supported by Flink and no Hadoop file system to support this
> scheme could be loaded.
>     at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
>     at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
>     at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:117)"
>
>
> I have put the gcs-connector-hadoop2-latest.jar in a subdir in plugins/
>
> plugins
> ├── gcs-connector
> │   └── gcs-connector-hadoop2-latest.jar
>
> In flink-yaml.conf I have added:
>
> fs.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
> google.cloud.auth.service.account.enable: true
> google.cloud.auth.service.account.json.keyfile: ~/key.json
>
> This mirrors the setup I used for s3 storage.
>
> My implementation is a simple test reading data from a kinesis stream and
> outputing to gcp.
>
> DataStream<String> input = getKinesisSource(env, kinesisStream);
>
> final StreamingFileSink<String> sink = StreamingFileSink
> .forRowFormat(new Path("gs://some-gcp-bucket"), new
> SimpleStringEncoder<String>("UTF-8"))
> .withRollingPolicy(
> DefaultRollingPolicy.builder()
> .withRolloverInterval(TimeUnit.MINUTES.toMillis(2))
> .withInactivityInterval(TimeUnit.MINUTES.toMillis(1))
> .withMaxPartSize(1024 * 1024 * 1024)
> .build())
> .build();
>
> //input.print();
> input.addSink(sink);
>
>
> Not sure what else to try.  Any pointers appreciated.
>
>
>
> Sent with ProtonMail <https://protonmail.com> Secure Email.
>
>

-- 

Matthias Pohl | Engineer

Follow us @VervericaData Ververica <https://www.ververica.com/>

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner

Reply via email to