Re: Problems with reading ORC files with S3 filesystem
Hi David, Thanks for your answer. I finally managed to read ORC files by: - switching to s3a:// in my Flink SQL table path parameter - providing all the properties in Hadoop's core-site.xml file (fs.s3a.endpoint, fs.s3a.path.style.access, fs.s3a.aws.credentials.provider, fs.s3a.access.key, fs.s3a.secret.key) - setting HADOOP_CONF_DIR env variable pointing to directory containing core-site.xml Regards, Piotr On 2021/08/16 09:07:48, David Morávek wrote: > Hi Piotr, > > unfortunately this is a known long-standing issue [1]. The problem is that > ORC format is not using Flink's filesystem abstraction for actual reading > of the underlying file, so you have to adjust your Hadoop config > accordingly. There is also a corresponding SO question [2] covering this. > > I think a proper fix would actually require changing the interface on ORC > side, because currently there seems to be now easy way to switch the FS > implementation (I've just quickly checked OrcFile class, so this might not > be 100% accurate). > > [1] https://issues.apache.org/jira/browse/FLINK-10989 > [2] https://stackoverflow.com/a/53435359 > > Best, > D. > > On Sat, Aug 14, 2021 at 11:40 AM Piotr Jagielski wrote: > > > Hi, > > I want to use Flink SQL filesystem to read ORC file via S3 filesystem on > > Flink 1.13. My table definition looks like this: > > > > create or replace table xxx > > (..., startdate string) > > partitioned by (startdate) with ('connector'='filesystem', > > 'format'='orc', 'path'='s3://xxx/orc/yyy') > > > > I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO > > as S3 provider and it works for Flinks checkpoints and HA files. > > The SQL connector also works when I use CSV or Avro formats. The problems > > start with ORC > > > > 1. If I just put flink-orc on job's classpath I get error on JobManager: > > Caused by: java.lang.NoClassDefFoundError: > > org/apache/hadoop/conf/Configuration > > at > > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121) > > ~[?:?] > > at > > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88) > > ~[?:?] > > at > > org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118) > > ~[flink-table-blink_2.12-1.13.2.jar:1.13.2] > > > > 2. I managed to put hadoop common libs on the classpath by this maven > > setup: > > > > > > org.apache.flink > > > > flink-orc_${scala.binary.version} > > ${flink.version} > > > > > > org.apache.orc > > orc-core > > > > > > > > > > org.apache.orc > > orc-core > > 1.5.6 > > > > > > org.apache.orc > > orc-shims > > 1.5.6 > > > > > > net.java.dev.jets3t > > jets3t > > 0.9.0 > > > > > > No the job is accepted by JobManager, but execution fails with lack of AWS > > credentials: > > Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and > > Secret Access Key must be specified as the username or password > > (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or > > fs.s3.awsSecretAccessKey properties (respectively). > > at > > org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70) > > at > > org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92) > > at > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > > Method) > > at > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > > Source) > > at > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > > Source) > > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > > at > > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > > at > > org.apa
Problems with reading ORC files with S3 filesystem
Hi, I want to use Flink SQL filesystem to read ORC file via S3 filesystem on Flink 1.13. My table definition looks like this: create or replace table xxx (..., startdate string) partitioned by (startdate) with ('connector'='filesystem', 'format'='orc', 'path'='s3://xxx/orc/yyy') I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO as S3 provider and it works for Flinks checkpoints and HA files. The SQL connector also works when I use CSV or Avro formats. The problems start with ORC 1. If I just put flink-orc on job's classpath I get error on JobManager: Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121) ~[?:?] at org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88) ~[?:?] at org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118) ~[flink-table-blink_2.12-1.13.2.jar:1.13.2] 2. I managed to put hadoop common libs on the classpath by this maven setup: org.apache.flink flink-orc_${scala.binary.version} ${flink.version} org.apache.orc orc-core org.apache.orc orc-core 1.5.6 org.apache.orc orc-shims 1.5.6 net.java.dev.jets3t jets3t 0.9.0 No the job is accepted by JobManager, but execution fails with lack of AWS credentials: Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively). at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70) at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy76.initialize(Unknown Source) at org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287) at org.apache.orc.impl.ReaderImpl.getFileSystem(ReaderImpl.java:395) at org.apache.orc.impl.ReaderImpl.(ReaderImpl.java:368) at org.apache.orc.OrcFile.createReader(OrcFile.java:343) I guess that ORC reader tries to recreate s3 filesystem in job's classloader and cannot use credentials from flink-conf.yaml. However I can see in the logs that it earlier managed to list the files on MinIO: 2021-08-14 09:35:48,285 INFO org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] - Assigning remote split to requesting host '172': Optional[FileSourceSplit: s3://xxx/orc/yyy/startdate=2021-08-10/3cf3afae-1050-4591-a5af-98d231879687.orc [0, 144607) hosts=[localhost] ID=02 position=null] So I think the issue is in ORCReader when it tries to read specific file. Any ideas hao can I modify the setup or pass the credentials to Jets3t? Regards, Piotr