Hello, I tried to read parquet data in S3 using the filesystem connector but got the below error. The jobmanger is not starting. I tried the standalone-job in docker. I have already included flink-s3-fs-hadoop and flink-s3-fs-presto as plugins and they are working fine for checkpointing and Kubernetes HA. The issue is when I am reading files from S3 using the table API connector.
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?] at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) ~[?:?] at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at org.apache.flink.formats.parquet.ParquetFileFormatFactory.getParquetConfiguration(ParquetFileFormatFactory.java:116) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at org.apache.flink.formats.parquet.ParquetFileFormatFactory.access$000(ParquetFileFormatFactory.java:51) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeDecoder(ParquetFileFormatFactory.java:79) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeDecoder(ParquetFileFormatFactory.java:67) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:452) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:160) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:124) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:351) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:154) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:151) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:184) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:258) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:182) ~[flink-table_2.12-1.14.2.jar:1.14.2] at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at scala.collection.Iterator.foreach(Iterator.scala:943) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at scala.collection.Iterator.foreach$(Iterator.scala:943) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at scala.collection.IterableLike.foreach(IterableLike.scala:74) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at scala.collection.IterableLike.foreach$(IterableLike.scala:73) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at scala.collection.AbstractIterable.foreach(Iterable.scala:56) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at scala.collection.TraversableLike.map(TraversableLike.scala:285) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at scala.collection.TraversableLike.map$(TraversableLike.scala:278) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at scala.collection.AbstractTraversable.map(Traversable.scala:108) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:182) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.scala:297) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.scala:288) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:213) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:190) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.table.api.bridge.scala.TableConversions.toDataStream(TableConversions.scala:52) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.example.Job$.main(Job.scala:53) ~[?:?] at org.example.Job.main(Job.scala) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?] at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.14.2.jar:1.14.2] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:253) ~[flink-dist_2.12-1.14.2.jar:1.14.2] ... 13 more Here's my code: val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) env.setRuntimeMode(RuntimeExecutionMode.BATCH) val source = tEnv.from(TableDescriptor .forConnector("filesystem") .option("path", "s3a://source/data") .option("format", "parquet") .schema(Schema.newBuilder() .column("InvoiceNo", DataTypes.BIGINT()) .column("InvoiceDate", DataTypes.BIGINT()) .column("Quantity", DataTypes.BIGINT()) .column("UnitPrice", DataTypes.BIGINT()) .column("Description", DataTypes.STRING()) .column("CustomerID", DataTypes.STRING()) .column("Country", DataTypes.STRING()) .column("currentTs", DataTypes.TIMESTAMP(3)) .build() ) .build()) source.toDataStream.print() env.execute() Then I added hadoop-common dependency and got the below error and jobmanger didn't start. Exception in thread "main" java.lang.NoSuchMethodError: 'org.apache.commons.cli.Option$Builder org.apache.commons.cli.Option.builder(java.lang.String)' at org.apache.flink.container.entrypoint.StandaloneApplicationClusterConfigurationParserFactory.<clinit>(StandaloneApplicationClusterConfigurationParserFactory.java:49) at org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:57) Then I built the flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar from flink source and included it in the lib directory. The jobmanager started. But, got the below error and job kept restarting. Caused by: java.lang.ClassNotFoundException: Class org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2310) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2398) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2420) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:337) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:304) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1860) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:718) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:668) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:579) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3564) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3554) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3391) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] at org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:456) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:112) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:73) ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) ~[flink-table_2.12-1.14.2.jar:1.14.2] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) ~[flink-table_2.12-1.14.2.jar:1.14.2] at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?] at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?] ... 1 more