各位好:
了解实际运行是要复制jar到plugin下,但是调试的话用该怎么初始化s3p这个文件系统了?
flink版本 1.14,win10
项目通过flink-quick-start创建,在pom中添加了如下依赖
org.apache.flink
flink-s3-fs-presto
${flink.version}
初始代码类似如下:
Configuration fileSystemConf = new Configuration();
fileSystemConf.setBoolean("presto.s3.connection.ssl.enabled", false);
fileSystemConf.setString("presto.s3.access-key", "minioadmin");
fileSystemConf.setString("presto.s3.secret-key", "minioadmin");
fileSystemConf.setString("presto.s3.endpoint", "http://127.0.0.1:9000";);
FileSystem.initialize(fileSystemConf);
Path path = new Path("s3p://test/");
System.out.println(path.getFileSystem().exists(path));
但是会抛出如下异常:
Exception in thread "main"
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
find a file system implementation for scheme 's3p'. The scheme is directly
supported by Flink through the following plugin: flink-s3-fs-presto. Please
ensure that each plugin resides within its own subfolder within the plugins
directory. See
https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for
more information. If you want to use a Hadoop file system for that scheme,
please add the scheme to the configuration fs.allowed-fallback-filesystems.
For a full list of supported file systems, please see
https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at org.example.StreamingJob.main(StreamingJob.java:58)
但是神奇的是,我可以用s3a
初始化配置如下:
fileSystemConf.setBoolean("fs.s3a.connection.ssl.enabled", false);
fileSystemConf.setString("fs.s3a.endpoint", "http://127.0.0.1:9000";);
fileSystemConf.setString("fs.s3a.access.key", "minioadmin");
fileSystemConf.setString("fs.s3a.secret.key", "minioadmin");
fileSystemConf.setString("fs.s3a.path.style.access", "true");
fileSystemConf.setString("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
谢谢!