Hi David,

Thanks for your answer. I finally managed to read ORC files by:
- switching to s3a:// in my Flink SQL table path parameter
- providing all the properties in Hadoop's core-site.xml file (fs.s3a.endpoint, 
fs.s3a.path.style.access, fs.s3a.aws.credentials.provider, fs.s3a.access.key, 
fs.s3a.secret.key)
- setting HADOOP_CONF_DIR env variable pointing to directory containing 
core-site.xml

Regards,
Piotr

On 2021/08/16 09:07:48, David Morávek <d...@apache.org> wrote: 
> Hi Piotr,
> 
> unfortunately this is a known long-standing issue [1]. The problem is that
> ORC format is not using Flink's filesystem abstraction for actual reading
> of the underlying file, so you have to adjust your Hadoop config
> accordingly. There is also a corresponding SO question [2] covering this.
> 
> I think a proper fix would actually require changing the interface on ORC
> side, because currently there seems to be now easy way to switch the FS
> implementation (I've just quickly checked OrcFile class, so this might not
> be 100% accurate).
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10989
> [2] https://stackoverflow.com/a/53435359
> 
> Best,
> D.
> 
> On Sat, Aug 14, 2021 at 11:40 AM Piotr Jagielski <p...@touk.pl> wrote:
> 
> > Hi,
> > I want to use Flink SQL filesystem to read ORC file via S3 filesystem on
> > Flink 1.13. My table definition looks like this:
> >
> > create or replace table xxx
> >  (..., startdate string)
> >  partitioned by (startdate) with ('connector'='filesystem',
> > 'format'='orc', 'path'='s3://xxx/orc/yyy')
> >
> > I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO
> > as S3 provider and it works for Flinks checkpoints and HA files.
> > The SQL connector also works when I use CSV or Avro formats. The problems
> > start with ORC
> >
> > 1. If I just put flink-orc on job's classpath I get error on JobManager:
> > Caused by: java.lang.NoClassDefFoundError:
> > org/apache/hadoop/conf/Configuration
> >         at
> > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121)
> > ~[?:?]
> >         at
> > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88)
> > ~[?:?]
> >         at
> > org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118)
> > ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]
> >
> > 2. I managed to put hadoop common libs on the classpath by this maven
> > setup:
> >
> >                 <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >
> > <artifactId>flink-orc_${scala.binary.version}</artifactId>
> >                         <version>${flink.version}</version>
> >                         <exclusions>
> >                                 <exclusion>
> >                                         <groupId>org.apache.orc</groupId>
> >                                         <artifactId>orc-core</artifactId>
> >                                 </exclusion>
> >                         </exclusions>
> >                 </dependency>
> >                 <dependency>
> >                         <groupId>org.apache.orc</groupId>
> >                         <artifactId>orc-core</artifactId>
> >                         <version>1.5.6</version>
> >                 </dependency>
> >                 <dependency>
> >                         <groupId>org.apache.orc</groupId>
> >                         <artifactId>orc-shims</artifactId>
> >                         <version>1.5.6</version>
> >                 </dependency>
> >                 <dependency>
> >                         <groupId>net.java.dev.jets3t</groupId>
> >                         <artifactId>jets3t</artifactId>
> >                         <version>0.9.0</version>
> >                 </dependency>
> >
> > No the job is accepted by JobManager, but execution fails with lack of AWS
> > credentials:
> > Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and
> > Secret Access Key must be specified as the username or password
> > (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or
> > fs.s3.awsSecretAccessKey properties (respectively).
> >         at
> > org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
> >         at
> > org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92)
> >         at
> > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> > Method)
> >         at
> > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> > Source)
> >         at
> > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> > Source)
> >         at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> >         at
> > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> >         at
> > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> >         at com.sun.proxy.$Proxy76.initialize(Unknown Source)
> >         at
> > org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92)
> >         at
> > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
> >         at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
> >         at
> > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
> >         at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
> >         at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
> >         at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)
> >         at
> > org.apache.orc.impl.ReaderImpl.getFileSystem(ReaderImpl.java:395)
> >         at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:368)
> >         at org.apache.orc.OrcFile.createReader(OrcFile.java:343)
> >
> > I guess that ORC reader tries to recreate s3 filesystem in job's
> > classloader and cannot use credentials from flink-conf.yaml. However I can
> > see in the logs that it earlier managed to list the files on MinIO:
> >
> > 2021-08-14 09:35:48,285 INFO
> > org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner []
> > - Assigning remote split to requesting host '172':
> > Optional[FileSourceSplit:
> > s3://xxx/orc/yyy/startdate=2021-08-10/3cf3afae-1050-4591-a5af-98d231879687.orc
> > [0, 144607)  hosts=[localhost] ID=0000000002 position=null]
> >
> >
> > So I think the issue is in ORCReader when it tries to read specific file.
> >
> > Any ideas hao can I modify the setup or pass the credentials to Jets3t?
> >
> > Regards,
> > Piotr
> >
> >
> 

Reply via email to