Hi, thanks for reaching out to the Flink community. The tricky thing here is that the Google Cloud Storage connector is not supported by Flink's plugin system as stated in [1]. There is a blog post on how to get started with Flink on Google's Cloud Platform [2]. In case you haven't seen that one, yet: There is a subsection "Advanced: Set up access to Google Cloud Storage for checkpoints and savepoints." describing the old way of adding support for file systems specifically show-casing the GCP Storage. There you're asked to copy the connector into Flink's lib/ directory, instead. In addition to that, you have to add the Hadoop dependencies to the lib/ folder as well. For this, it's advisable to use a bundled Hadoop lib provided by the Flink community [3] to avoid name clashes on the classpath.
I hope this helps. Best, Matthias [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#hadoop-file-system-hdfs-and-its-other-implementations [2] https://www.ververica.com/blog/getting-started-with-da-platform-on-google-kubernetes-engine [3] https://flink.apache.org/downloads.html#additional-components On Fri, Nov 13, 2020 at 2:32 PM orionemail <orionem...@protonmail.com> wrote: > Hi, > > I am running flink 1.10.1 initially on my local development machine - > Macbook Pro. I'm struggling to understand how to write to Google Cloud > storage using the StreamingfileSink (S3 works fine). > > There error I am seeing: > > "org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not > find a file system implementation for scheme 'gs'. The scheme is not > directly supported by Flink and no Hadoop file system to support this > scheme could be loaded. > at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450) > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362) > at > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:117)" > > > I have put the gcs-connector-hadoop2-latest.jar in a subdir in plugins/ > > plugins > ├── gcs-connector > │ └── gcs-connector-hadoop2-latest.jar > > In flink-yaml.conf I have added: > > fs.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem > google.cloud.auth.service.account.enable: true > google.cloud.auth.service.account.json.keyfile: ~/key.json > > This mirrors the setup I used for s3 storage. > > My implementation is a simple test reading data from a kinesis stream and > outputing to gcp. > > DataStream<String> input = getKinesisSource(env, kinesisStream); > > final StreamingFileSink<String> sink = StreamingFileSink > .forRowFormat(new Path("gs://some-gcp-bucket"), new > SimpleStringEncoder<String>("UTF-8")) > .withRollingPolicy( > DefaultRollingPolicy.builder() > .withRolloverInterval(TimeUnit.MINUTES.toMillis(2)) > .withInactivityInterval(TimeUnit.MINUTES.toMillis(1)) > .withMaxPartSize(1024 * 1024 * 1024) > .build()) > .build(); > > //input.print(); > input.addSink(sink); > > > Not sure what else to try. Any pointers appreciated. > > > > Sent with ProtonMail <https://protonmail.com> Secure Email. > > -- Matthias Pohl | Engineer Follow us @VervericaData Ververica <https://www.ververica.com/> -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner