Hello,

I tried to read parquet data in S3 using the filesystem connector but got
the below error. The jobmanger is not starting.
I tried the standalone-job in docker.
I have already included flink-s3-fs-hadoop and flink-s3-fs-presto as
plugins and they are working fine for checkpointing and Kubernetes HA. The
issue is when I am  reading files from S3 using the table API connector.

Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.conf.Configuration
at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at
org.apache.flink.formats.parquet.ParquetFileFormatFactory.getParquetConfiguration(ParquetFileFormatFactory.java:116)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.formats.parquet.ParquetFileFormatFactory.access$000(ParquetFileFormatFactory.java:51)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeDecoder(ParquetFileFormatFactory.java:79)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeDecoder(ParquetFileFormatFactory.java:67)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:452)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:160)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:124)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:351)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:154)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:151)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:184)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:258)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:182)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.Iterator.foreach(Iterator.scala:943)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.Iterator.foreach$(Iterator.scala:943)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:182)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.scala:297)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.scala:288)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:213)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:190)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.api.bridge.scala.TableConversions.toDataStream(TableConversions.scala:52)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at org.example.Job$.main(Job.scala:53) ~[?:?]
at org.example.Job.main(Job.scala) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
~[?:?]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:253)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
... 13 more



Here's my code:

val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    env.setRuntimeMode(RuntimeExecutionMode.BATCH)

    val source = tEnv.from(TableDescriptor
      .forConnector("filesystem")
      .option("path", "s3a://source/data")
      .option("format", "parquet")
      .schema(Schema.newBuilder()
        .column("InvoiceNo", DataTypes.BIGINT())
        .column("InvoiceDate", DataTypes.BIGINT())
        .column("Quantity", DataTypes.BIGINT())
        .column("UnitPrice", DataTypes.BIGINT())
        .column("Description", DataTypes.STRING())
        .column("CustomerID", DataTypes.STRING())
        .column("Country", DataTypes.STRING())
        .column("currentTs", DataTypes.TIMESTAMP(3))
        .build()
      )
      .build())

    source.toDataStream.print()

    env.execute()

Then I added hadoop-common dependency and got the below error and jobmanger
didn't start.

Exception in thread "main" java.lang.NoSuchMethodError:
'org.apache.commons.cli.Option$Builder
org.apache.commons.cli.Option.builder(java.lang.String)'
at
org.apache.flink.container.entrypoint.StandaloneApplicationClusterConfigurationParserFactory.<clinit>(StandaloneApplicationClusterConfigurationParserFactory.java:49)
at
org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:57)



Then I built the flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar from flink source
and included it in the lib directory. The jobmanager started. But, got the
below error and job kept restarting.

Caused by: java.lang.ClassNotFoundException: Class
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback
not found
at
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2310)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2398)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2420)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.security.Groups.<init>(Groups.java:107)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.security.Groups.<init>(Groups.java:102)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:337)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:304)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1860)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:718)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:668)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:579)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3564)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3554)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3391)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT]
at
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:456)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:112)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:73)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
... 1 more

Reply via email to