Re: Problems with reading ORC files with S3 filesystem
Hi David, Thanks for your answer. I finally managed to read ORC files by: - switching to s3a:// in my Flink SQL table path parameter - providing all the properties in Hadoop's core-site.xml file (fs.s3a.endpoint, fs.s3a.path.style.access, fs.s3a.aws.credentials.provider, fs.s3a.access.key, fs.s3a.secret.key) - setting HADOOP_CONF_DIR env variable pointing to directory containing core-site.xml Regards, Piotr On 2021/08/16 09:07:48, David Morávek wrote: > Hi Piotr, > > unfortunately this is a known long-standing issue [1]. The problem is that > ORC format is not using Flink's filesystem abstraction for actual reading > of the underlying file, so you have to adjust your Hadoop config > accordingly. There is also a corresponding SO question [2] covering this. > > I think a proper fix would actually require changing the interface on ORC > side, because currently there seems to be now easy way to switch the FS > implementation (I've just quickly checked OrcFile class, so this might not > be 100% accurate). > > [1] https://issues.apache.org/jira/browse/FLINK-10989 > [2] https://stackoverflow.com/a/53435359 > > Best, > D. > > On Sat, Aug 14, 2021 at 11:40 AM Piotr Jagielski wrote: > > > Hi, > > I want to use Flink SQL filesystem to read ORC file via S3 filesystem on > > Flink 1.13. My table definition looks like this: > > > > create or replace table xxx > > (..., startdate string) > > partitioned by (startdate) with ('connector'='filesystem', > > 'format'='orc', 'path'='s3://xxx/orc/yyy') > > > > I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO > > as S3 provider and it works for Flinks checkpoints and HA files. > > The SQL connector also works when I use CSV or Avro formats. The problems > > start with ORC > > > > 1. If I just put flink-orc on job's classpath I get error on JobManager: > > Caused by: java.lang.NoClassDefFoundError: > > org/apache/hadoop/conf/Configuration > > at > > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121) > > ~[?:?] > > at > > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88) > > ~[?:?] > > at > > org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118) > > ~[flink-table-blink_2.12-1.13.2.jar:1.13.2] > > > > 2. I managed to put hadoop common libs on the classpath by this maven > > setup: > > > > > > org.apache.flink > > > > flink-orc_${scala.binary.version} > > ${flink.version} > > > > > > org.apache.orc > > orc-core > > > > > > > > > > org.apache.orc > > orc-core > > 1.5.6 > > > > > > org.apache.orc > > orc-shims > > 1.5.6 > > > > > > net.java.dev.jets3t > > jets3t > > 0.9.0 > > > > > > No the job is accepted by JobManager, but execution fails with lack of AWS > > credentials: > > Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and > > Secret Access Key must be specified as the username or password > > (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or > > fs.s3.awsSecretAccessKey properties (respectively). > > at > > org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70) > > at > > org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92) > > at > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > > Method) > > at > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > > Source) > > at > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > > Source) > > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > > at > > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > > at > > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > > at com.sun.proxy.$Proxy76.initialize(Unknown Source) > > at > > org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92) > > at > > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433) > > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88) > > at > > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467) > > at org.apache.hadoop.fs.FileSystem$Cache.get(F
Re: Problems with reading ORC files with S3 filesystem
Hi Piotr, unfortunately this is a known long-standing issue [1]. The problem is that ORC format is not using Flink's filesystem abstraction for actual reading of the underlying file, so you have to adjust your Hadoop config accordingly. There is also a corresponding SO question [2] covering this. I think a proper fix would actually require changing the interface on ORC side, because currently there seems to be now easy way to switch the FS implementation (I've just quickly checked OrcFile class, so this might not be 100% accurate). [1] https://issues.apache.org/jira/browse/FLINK-10989 [2] https://stackoverflow.com/a/53435359 Best, D. On Sat, Aug 14, 2021 at 11:40 AM Piotr Jagielski wrote: > Hi, > I want to use Flink SQL filesystem to read ORC file via S3 filesystem on > Flink 1.13. My table definition looks like this: > > create or replace table xxx > (..., startdate string) > partitioned by (startdate) with ('connector'='filesystem', > 'format'='orc', 'path'='s3://xxx/orc/yyy') > > I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO > as S3 provider and it works for Flinks checkpoints and HA files. > The SQL connector also works when I use CSV or Avro formats. The problems > start with ORC > > 1. If I just put flink-orc on job's classpath I get error on JobManager: > Caused by: java.lang.NoClassDefFoundError: > org/apache/hadoop/conf/Configuration > at > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121) > ~[?:?] > at > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88) > ~[?:?] > at > org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118) > ~[flink-table-blink_2.12-1.13.2.jar:1.13.2] > > 2. I managed to put hadoop common libs on the classpath by this maven > setup: > > > org.apache.flink > > flink-orc_${scala.binary.version} > ${flink.version} > > > org.apache.orc > orc-core > > > > > org.apache.orc > orc-core > 1.5.6 > > > org.apache.orc > orc-shims > 1.5.6 > > > net.java.dev.jets3t > jets3t > 0.9.0 > > > No the job is accepted by JobManager, but execution fails with lack of AWS > credentials: > Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and > Secret Access Key must be specified as the username or password > (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or > fs.s3.awsSecretAccessKey properties (respectively). > at > org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70) > at > org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy76.initialize(Unknown Source) > at > org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92) > at > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88) > at > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367) > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287) > at > org.apache.orc.impl.ReaderImpl.getFileSystem(ReaderImpl.java:395) > at org.apache.orc.impl.ReaderImpl.(ReaderImpl.java:368) > at org.apache.orc.OrcFile.createReader(OrcFile.java:343) > > I guess that ORC reader tries to recreate s3 filesystem in job's > classloader and cannot use credentials from flink-conf.yaml. However I can > see in the logs that it earlier managed to list the files on MinIO: > > 2021-08-14 09:35:48,285 INFO > org.apache.flink.connector.file.src.assigners.LocalityAwareSpl
Problems with reading ORC files with S3 filesystem
Hi, I want to use Flink SQL filesystem to read ORC file via S3 filesystem on Flink 1.13. My table definition looks like this: create or replace table xxx (..., startdate string) partitioned by (startdate) with ('connector'='filesystem', 'format'='orc', 'path'='s3://xxx/orc/yyy') I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO as S3 provider and it works for Flinks checkpoints and HA files. The SQL connector also works when I use CSV or Avro formats. The problems start with ORC 1. If I just put flink-orc on job's classpath I get error on JobManager: Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121) ~[?:?] at org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88) ~[?:?] at org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118) ~[flink-table-blink_2.12-1.13.2.jar:1.13.2] 2. I managed to put hadoop common libs on the classpath by this maven setup: org.apache.flink flink-orc_${scala.binary.version} ${flink.version} org.apache.orc orc-core org.apache.orc orc-core 1.5.6 org.apache.orc orc-shims 1.5.6 net.java.dev.jets3t jets3t 0.9.0 No the job is accepted by JobManager, but execution fails with lack of AWS credentials: Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively). at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70) at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy76.initialize(Unknown Source) at org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287) at org.apache.orc.impl.ReaderImpl.getFileSystem(ReaderImpl.java:395) at org.apache.orc.impl.ReaderImpl.(ReaderImpl.java:368) at org.apache.orc.OrcFile.createReader(OrcFile.java:343) I guess that ORC reader tries to recreate s3 filesystem in job's classloader and cannot use credentials from flink-conf.yaml. However I can see in the logs that it earlier managed to list the files on MinIO: 2021-08-14 09:35:48,285 INFO org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] - Assigning remote split to requesting host '172': Optional[FileSourceSplit: s3://xxx/orc/yyy/startdate=2021-08-10/3cf3afae-1050-4591-a5af-98d231879687.orc [0, 144607) hosts=[localhost] ID=02 position=null] So I think the issue is in ORCReader when it tries to read specific file. Any ideas hao can I modify the setup or pass the credentials to Jets3t? Regards, Piotr