[ 
https://issues.apache.org/jira/browse/FLINK-33955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801001#comment-17801001
 ] 

Alek Łańduch commented on FLINK-33955:
--------------------------------------

[~martijnvisser] Thanks for pointing it out. I was testing the settings of the 
file class and AbstractFileSystem, but I didn't notice that with the array.

> UnsupportedFileSystemException when trying to save data to Azure's abfss File 
> System
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-33955
>                 URL: https://issues.apache.org/jira/browse/FLINK-33955
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.18.0, 1.17.1
>         Environment: Flink 1.17.1 & Flink 1.18.0 with Java 11, ADLS Gen.2 
> with hierarchical namespace enabled
>            Reporter: Alek Łańduch
>            Priority: Major
>         Attachments: error.log, pom.xml, success.log
>
>
> When using Azure's File System connector for reading and writing files to 
> Azure Data Lake Storage 2 Flink job fails at writing files with given error:
>  
> {noformat}
> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: 
> org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme 
> "file"{noformat}
>  
> Full logs from Job Manager along with stack trace is attached to as 
> [^error.log] file.
> The connection itself seems to be good, as the job successfully creates 
> desired structure inside ADLS (and the the `.part` file), but the file itself 
> is empty.
> The job is simple, as its only purpose is to save events `a`, `b` and `c` 
> into a file on ADLS. The whole code is presented below:
> {code:java}
> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
> import org.apache.flink.connector.file.sink.FileSink;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> public class DataStreamJob {
>   public static void main(String[] args) throws Exception {
>     final FileSink<String> sink = FileSink
>         .forRowFormat(
>             new Path("abfss://t...@stads2dev01.dfs.core.windows.net/output"),
>             new SimpleStringEncoder<String>("UTF-8"))
>         .build();
>     final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.fromElements("a", "b", "c").sinkTo(sink);
>     env.execute("Test");
>   }
> }
> {code}
> Code is run locally using Flink 1.18.0 (the same behavior was present in 
> version 1.17.1). The only change that was made to `flink-conf.yaml` was to 
> add key for accessing Azure:
>  
> {code:java}
> fs.azure.account.auth.type.stads2dev01.dfs.core.windows.net: SharedKey
> fs.azure.account.key.stads2dev01.dfs.core.windows.net: ******{code}
>  
> The [^pom.xml] file was created by using [Getting 
> Started|https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#getting-started]
>  documentation - the only thing I added was `flink-azure-fs-hadoop` 
> connector. The whole [^pom.xml] file is attached. The connector JAR was also 
> copied from `opt` directory to `plugins/azure-fs-hadoop` in cluster files 
> according to the documentation.
> The interesting fact is that the deprecated method `writeAsText` (instead of 
> FileSink) not only works and creates desired file on ADLS, but *the 
> subsequent jobs that use FileSInk that previously failed now works and 
> creates file successfully* (until cluster's restart). The logs from job with 
> deprecated method are also attached here as [^success.log] file.
> I suspect that it is somehow connected to how Azure File System is 
> initialized, where the new FileSink method would create it incorrectly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to