Hi Piotr,

unfortunately this is a known long-standing issue [1]. The problem is that
ORC format is not using Flink's filesystem abstraction for actual reading
of the underlying file, so you have to adjust your Hadoop config
accordingly. There is also a corresponding SO question [2] covering this.

I think a proper fix would actually require changing the interface on ORC
side, because currently there seems to be now easy way to switch the FS
implementation (I've just quickly checked OrcFile class, so this might not
be 100% accurate).

[1] https://issues.apache.org/jira/browse/FLINK-10989
[2] https://stackoverflow.com/a/53435359

Best,
D.

On Sat, Aug 14, 2021 at 11:40 AM Piotr Jagielski <p...@touk.pl> wrote:

> Hi,
> I want to use Flink SQL filesystem to read ORC file via S3 filesystem on
> Flink 1.13. My table definition looks like this:
>
> create or replace table xxx
>  (..., startdate string)
>  partitioned by (startdate) with ('connector'='filesystem',
> 'format'='orc', 'path'='s3://xxx/orc/yyy')
>
> I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO
> as S3 provider and it works for Flinks checkpoints and HA files.
> The SQL connector also works when I use CSV or Avro formats. The problems
> start with ORC
>
> 1. If I just put flink-orc on job's classpath I get error on JobManager:
> Caused by: java.lang.NoClassDefFoundError:
> org/apache/hadoop/conf/Configuration
>         at
> org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121)
> ~[?:?]
>         at
> org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88)
> ~[?:?]
>         at
> org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118)
> ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]
>
> 2. I managed to put hadoop common libs on the classpath by this maven
> setup:
>
>                 <dependency>
>                         <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-orc_${scala.binary.version}</artifactId>
>                         <version>${flink.version}</version>
>                         <exclusions>
>                                 <exclusion>
>                                         <groupId>org.apache.orc</groupId>
>                                         <artifactId>orc-core</artifactId>
>                                 </exclusion>
>                         </exclusions>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.orc</groupId>
>                         <artifactId>orc-core</artifactId>
>                         <version>1.5.6</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.orc</groupId>
>                         <artifactId>orc-shims</artifactId>
>                         <version>1.5.6</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>net.java.dev.jets3t</groupId>
>                         <artifactId>jets3t</artifactId>
>                         <version>0.9.0</version>
>                 </dependency>
>
> No the job is accepted by JobManager, but execution fails with lack of AWS
> credentials:
> Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and
> Secret Access Key must be specified as the username or password
> (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or
> fs.s3.awsSecretAccessKey properties (respectively).
>         at
> org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
>         at
> org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92)
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
>         at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
>         at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>         at com.sun.proxy.$Proxy76.initialize(Unknown Source)
>         at
> org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92)
>         at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
>         at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
>         at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
>         at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
>         at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
>         at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)
>         at
> org.apache.orc.impl.ReaderImpl.getFileSystem(ReaderImpl.java:395)
>         at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:368)
>         at org.apache.orc.OrcFile.createReader(OrcFile.java:343)
>
> I guess that ORC reader tries to recreate s3 filesystem in job's
> classloader and cannot use credentials from flink-conf.yaml. However I can
> see in the logs that it earlier managed to list the files on MinIO:
>
> 2021-08-14 09:35:48,285 INFO
> org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner []
> - Assigning remote split to requesting host '172':
> Optional[FileSourceSplit:
> s3://xxx/orc/yyy/startdate=2021-08-10/3cf3afae-1050-4591-a5af-98d231879687.orc
> [0, 144607)  hosts=[localhost] ID=0000000002 position=null]
>
>
> So I think the issue is in ORCReader when it tries to read specific file.
>
> Any ideas hao can I modify the setup or pass the credentials to Jets3t?
>
> Regards,
> Piotr
>
>

Reply via email to