Re: Problems with reading ORC files with S3 filesystem

2021-08-17 Thread Piotr Jagielski
Hi David,

Thanks for your answer. I finally managed to read ORC files by:
- switching to s3a:// in my Flink SQL table path parameter
- providing all the properties in Hadoop's core-site.xml file (fs.s3a.endpoint, 
fs.s3a.path.style.access, fs.s3a.aws.credentials.provider, fs.s3a.access.key, 
fs.s3a.secret.key)
- setting HADOOP_CONF_DIR env variable pointing to directory containing 
core-site.xml

Regards,
Piotr

On 2021/08/16 09:07:48, David Morávek  wrote: 
> Hi Piotr,
> 
> unfortunately this is a known long-standing issue [1]. The problem is that
> ORC format is not using Flink's filesystem abstraction for actual reading
> of the underlying file, so you have to adjust your Hadoop config
> accordingly. There is also a corresponding SO question [2] covering this.
> 
> I think a proper fix would actually require changing the interface on ORC
> side, because currently there seems to be now easy way to switch the FS
> implementation (I've just quickly checked OrcFile class, so this might not
> be 100% accurate).
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10989
> [2] https://stackoverflow.com/a/53435359
> 
> Best,
> D.
> 
> On Sat, Aug 14, 2021 at 11:40 AM Piotr Jagielski  wrote:
> 
> > Hi,
> > I want to use Flink SQL filesystem to read ORC file via S3 filesystem on
> > Flink 1.13. My table definition looks like this:
> >
> > create or replace table xxx
> >  (..., startdate string)
> >  partitioned by (startdate) with ('connector'='filesystem',
> > 'format'='orc', 'path'='s3://xxx/orc/yyy')
> >
> > I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO
> > as S3 provider and it works for Flinks checkpoints and HA files.
> > The SQL connector also works when I use CSV or Avro formats. The problems
> > start with ORC
> >
> > 1. If I just put flink-orc on job's classpath I get error on JobManager:
> > Caused by: java.lang.NoClassDefFoundError:
> > org/apache/hadoop/conf/Configuration
> > at
> > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121)
> > ~[?:?]
> > at
> > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88)
> > ~[?:?]
> > at
> > org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118)
> > ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]
> >
> > 2. I managed to put hadoop common libs on the classpath by this maven
> > setup:
> >
> > 
> > org.apache.flink
> >
> > flink-orc_${scala.binary.version}
> > ${flink.version}
> > 
> > 
> > org.apache.orc
> > orc-core
> > 
> > 
> > 
> > 
> > org.apache.orc
> > orc-core
> > 1.5.6
> > 
> > 
> > org.apache.orc
> > orc-shims
> > 1.5.6
> > 
> > 
> > net.java.dev.jets3t
> > jets3t
> > 0.9.0
> > 
> >
> > No the job is accepted by JobManager, but execution fails with lack of AWS
> > credentials:
> > Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and
> > Secret Access Key must be specified as the username or password
> > (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or
> > fs.s3.awsSecretAccessKey properties (respectively).
> > at
> > org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
> > at
> > org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92)
> > at
> > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> > Method)
> > at
> > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> > Source)
> > at
> > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> > Source)
> > at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> > at
> > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> > at
> > org.apa

Problems with reading ORC files with S3 filesystem

2021-08-14 Thread Piotr Jagielski
Hi,
I want to use Flink SQL filesystem to read ORC file via S3 filesystem on Flink 
1.13. My table definition looks like this:

create or replace table xxx 
 (..., startdate string)
 partitioned by (startdate) with ('connector'='filesystem', 'format'='orc', 
'path'='s3://xxx/orc/yyy')

I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO as S3 
provider and it works for Flinks checkpoints and HA files. 
The SQL connector also works when I use CSV or Avro formats. The problems start 
with ORC

1. If I just put flink-orc on job's classpath I get error on JobManager:
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
at 
org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121)
 ~[?:?]
at 
org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88)
 ~[?:?]
at 
org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118)
 ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]

2. I managed to put hadoop common libs on the classpath by this maven setup:


org.apache.flink

flink-orc_${scala.binary.version}
${flink.version}


org.apache.orc
orc-core




org.apache.orc
orc-core
1.5.6


org.apache.orc
orc-shims
1.5.6


net.java.dev.jets3t
jets3t
0.9.0


No the job is accepted by JobManager, but execution fails with lack of AWS 
credentials:
Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret 
Access Key must be specified as the username or password (respectively) of a s3 
URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey 
properties (respectively).
at 
org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy76.initialize(Unknown Source)
at org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92)
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)
at org.apache.orc.impl.ReaderImpl.getFileSystem(ReaderImpl.java:395)
at org.apache.orc.impl.ReaderImpl.(ReaderImpl.java:368)
at org.apache.orc.OrcFile.createReader(OrcFile.java:343)

I guess that ORC reader tries to recreate s3 filesystem in job's classloader 
and cannot use credentials from flink-conf.yaml. However I can see in the logs 
that it earlier managed to list the files on MinIO:

2021-08-14 09:35:48,285 INFO  
org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] - 
Assigning remote split to requesting host '172': Optional[FileSourceSplit: 
s3://xxx/orc/yyy/startdate=2021-08-10/3cf3afae-1050-4591-a5af-98d231879687.orc 
[0, 144607)  hosts=[localhost] ID=02 position=null]


So I think the issue is in ORCReader when it tries to read specific file.

Any ideas hao can I modify the setup or pass the credentials to Jets3t?

Regards,
Piotr