I see, thanks for sharing.

The change what you've made makes sense. Let me explain the details.
Each and every plugin has it's own class loader. The reason behind that is
to avoid dependency collision with Flink's main class loader.

I think if the mentioned change works when it's added as normal lib and not
as a plugin then the code can be merged to main as-is.

G


On Thu, Jun 27, 2024 at 5:30 AM Xiao Xu <ffxrqy...@gmail.com> wrote:

> Hi, Gabar,
>
> Thanks to reply, I make sure that not conflict in maven, all the hadoop
> dependency is in provided scope,
> and checked my result jar it not contains
> (src/main/resources/META-INF/services).
>
> This is my pom:
>
> <project xmlns="http://maven.apache.org/POM/4.0.0"; 
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>     <modelVersion>4.0.0</modelVersion>
>
>     <groupId>com.test.flink</groupId>
>     <artifactId>flink-sync</artifactId>
>     <version>1.0-SNAPSHOT</version>
>     <packaging>jar</packaging>
>
>     <name>Flink Quickstart Job</name>
>
>     <properties>
>           <maven.compiler.source>1.8</maven.compiler.source>
>           <maven.compiler.target>1.8</maven.compiler.target>
>           <flink.version>1.18.1</flink.version>
>           <java.version>1.8</java.version>
>           <scala.binary.version>2.12</scala.binary.version>
>           <kafka.version>3.2.0</kafka.version>
>        <hadoop.version>3.3.4</hadoop.version>
>        <log4j.version>2.16.0</log4j.version>
>        <shadeVersion>3.2.0</shadeVersion>
>     </properties>
>
>     <dependencies>
>        <dependency>
>           <groupId>org.apache.flink</groupId>
>           <artifactId>flink-java</artifactId>
>           <version>${flink.version}</version>
>           <scope>provided</scope>
>        </dependency>
>        <!-- 
> https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
>        <dependency>
>           <groupId>org.apache.flink</groupId>
>           <artifactId>flink-streaming-java</artifactId>
>           <version>${flink.version}</version>
>           <scope>provided</scope>
>        </dependency>
>        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients 
> -->
>        <dependency>
>           <groupId>org.apache.flink</groupId>
>           <artifactId>flink-clients</artifactId>
>           <version>${flink.version}</version>
>           <scope>provided</scope>
>        </dependency>
>        <!-- 
> https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
>        <dependency>
>           <groupId>org.apache.flink</groupId>
>           <artifactId>flink-connector-files</artifactId>
>           <version>${flink.version}</version>
>        </dependency>
>        <dependency>
>           <groupId>org.apache.flink</groupId>
>           <artifactId>flink-connector-kafka</artifactId>
>           <version>3.1.0-1.18</version>
>        </dependency>
>        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
>        <dependency>
>           <groupId>org.apache.logging.log4j</groupId>
>           <artifactId>log4j-slf4j-impl</artifactId>
>           <version>${log4j.version}</version>
>           <scope>runtime</scope>
>           <exclusions>
>              <exclusion>
>                 <artifactId>slf4j-api</artifactId>
>                 <groupId>org.slf4j</groupId>
>              </exclusion>
>           </exclusions>
>        </dependency>
>        <dependency>
>           <groupId>org.apache.logging.log4j</groupId>
>           <artifactId>log4j-api</artifactId>
>           <version>${log4j.version}</version>
>           <scope>runtime</scope>
>        </dependency>
>        <dependency>
>           <groupId>org.apache.logging.log4j</groupId>
>           <artifactId>log4j-core</artifactId>
>           <version>${log4j.version}</version>
>           <scope>runtime</scope>
>        </dependency>
>
>        <dependency>
>           <groupId>org.apache.flink</groupId>
>           <artifactId>flink-azure-fs-hadoop</artifactId>
>           <version>${flink.version}</version>
>           <scope>provided</scope>
>        </dependency>
>     </dependencies>
>     <build>
>        <plugins>
>           <plugin>
>              <groupId>org.apache.maven.plugins</groupId>
>              <artifactId>maven-assembly-plugin</artifactId>
>              <version>3.0.0</version>
>              <configuration>
>                 <appendAssemblyId>false</appendAssemblyId>
>                 <descriptorRefs>
>                    <descriptorRef>jar-with-dependencies</descriptorRef>
>                 </descriptorRefs>
>              </configuration>
>              <executions>
>                 <execution>
>                    <id>make-assembly</id>
>                    <phase>package</phase>
>                    <goals>
>                       <goal>single</goal>
>                    </goals>
>                 </execution>
>              </executions>
>           </plugin>
>        </plugins>
>     </build>
> </project>
>
>
> And like my reply in stackoverflow, I found the hadoop-common file :
> https://github.com/apache/hadoop/blob/release-3.3.4-RC1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3374
> do not load any filesystem, dig in ServiceLoader.load(FileSystem.class)
> source code, it looks like have different class loader  make it not load
> any filesystem.
> I changed the ServiceLoader.load(FileSystem.class)  to 
> ServiceLoader.load(FileSystem.class,
> FileSystem.class.getClassLoader()) and replace the flink-fs-azure-hadoop
> plugin, it works now,
> So I'm not sure why it works
>
> Gabor Somogyi <gabor.g.somo...@gmail.com> 于2024年6月26日周三 16:52写道:
>
>> Hi Xiao,
>>
>> I'm not quite convinced that the azure plugin ruined your workload, I
>> would take a look at the dependency graph you've in the pom.
>> Adding multiple deps can conflict in terms of class loader services
>> (src/main/resources/META-INF/services).
>>
>> As an example you've 2 such dependencies where
>> org.apache.flink.core.fs.FileSystemFactory is in the jar.
>> Hadoop core contains "flie" and the other one something different. Let's
>> say you don't use service merge plugin in your
>> maven project. Such case Hadoop core `file` entry will be just
>> overwritten by the second one.
>>
>> Solution: Either avoid deps with conflicting services or add 
>> ServicesResourceTransformer
>> to your maven project.
>>
>> G
>>
>>
>> On Wed, Jun 26, 2024 at 10:16 AM Xiao Xu <ffxrqy...@gmail.com> wrote:
>>
>>> Hi, all
>>>
>>> I try to use Flink to write Azure Blob Storage which called ADLS, I put
>>> the flink-azure-fs-hadoop jar in plugins directory and when I start my
>>> write job it shows:
>>>
>>> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
>>> FileSystem for scheme "file"
>>>         at
>>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>         at
>>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>         at
>>> org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>         at
>>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>         at
>>> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>         at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>         at org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:496)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>         at
>>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:316)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>         at
>>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:393)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>         at
>>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>         at
>>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>         at
>>> org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.createTmpFileForWrite(DataBlocks.java:980)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>         at
>>> org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.create(DataBlocks.java:960)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>         at
>>> org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.createBlockIfNeeded(AbfsOutputStream.java:262)
>>> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
>>>         at
>>> org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.<init>(AbfsOutputStream.java:173)
>>> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
>>>         at
>>> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.createFile(AzureBlobFileSystemStore.java:580)
>>> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
>>>         at
>>> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.create(AzureBlobFileSystem.java:301)
>>> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
>>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1052)
>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>
>>> I search the issue looks like this:
>>> https://stackoverflow.com/questions/77238642/apache-flink-azure-abfs-file-sink-error-streaming-unsupportedfilesystemexcep
>>>
>>> my env:
>>> Flink: 1.18.1
>>> JDK: 1.8
>>>
>>> Does anyone else have the same problem?
>>>
>>

Reply via email to