Hi,

> Since we have "flink-s3-fs-hadoop" at the plugins folder and therefore being 
> dynamically loaded upon task/job manager(s) startup (also, we are keeping 
> Flink's default inverted class loading strategy), shouldn't Hadoop 
> dependencies be loaded by parent-first? (based on 
> classloader.parent-first-patterns.default)

I think you are misunderstanding plugins. The fact that you have added s3 
FileSystem plugin, doesn’t mean that your code can access it’s dependencies. 
The whole point of plugins class loading is to completely isolate plugins 
between one another, and to isolate them from any user code. Plugin classes are 
not loaded to the parent class loader, but to a separate class loader that’s 
independent from the FlinkUserClassLoader (containing user’s jars).

> ---------------------------------
> import org.apache.hadoop.fs.Path
> import org.apache.parquet.avro.AvroParquetWriter
> import org.apache.parquet.hadoop.ParquetFileWriter
> // ...
> Try {
> val writer = AvroParquetWriter
> .builder[GenericRecord](new Path(finalFilePath))
> .withSchema(new Schema.Parser().parse(schema))
> .withDataModel(GenericData.get)
> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
> .build()
> 
> elements.foreach(element => writer.write(element))
> writer.close()
> }
> // ...
> ---------------------------------


Also in the first place, you probably shouldn’t be using AvroParquetWriter 
directly, but use StreamingFileSink [1] to write Parquet files. Example can be 
found here [2]. 

If you are using `org.apache.parquet.avro.AvroParquetWriter` directly, you will 
not have any checkpointing support (potential data loss or data duplication 
issues). Even I’m not sure if your code can be executed in parallel (aren’t you 
trying to share one instance of org.apache.parquet.avro.AvroParquetWriter among 
multiple operators?). 

But let’s say that you have to use AvroParquetWriter directly for some reason. 
In that case you would have to add all of the required dependencies to your 
job’s fat jar (or usrlib directory?), and you should be using 
TwoPhaseCommitSinkFunction as a base class for your writer [3]. Implementing 
properly an exactly-once sink is not that trivial - unless you know what you 
are doing.

Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
 
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html>
[2] 
https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
 
<https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java>
[3] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html
 
<https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html>

> On 26 Feb 2020, at 18:52, Ricardo Cardante <carda...@tutanota.com> wrote:
> 
> Hi!
> 
> We're working on a project where data is being written to S3 within a Flink 
> application.
> Running the integration tests locally / IntelliJ (using 
> MiniClusterWithClientResource) all the dependencies are correctly resolved 
> and the program executes as expected. However, when fat JAR is submitted to a 
> Flink setup running on docker, we're getting the following exception:
> 
> ---------------------------------
> java.lang.NoClassDefFoundError: org/apache/hadoop/fs/Path
> ---------------------------------
> 
> Which refers to the usage of that class in a RichSinkFunction while building 
> an AvroParquetWriter
> 
> ---------------------------------
> import org.apache.hadoop.fs.Path
> import org.apache.parquet.avro.AvroParquetWriter
> import org.apache.parquet.hadoop.ParquetFileWriter
> // ...
> Try {
> val writer = AvroParquetWriter
> .builder[GenericRecord](new Path(finalFilePath))
> .withSchema(new Schema.Parser().parse(schema))
> .withDataModel(GenericData.get)
> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
> .build()
> 
> elements.foreach(element => writer.write(element))
> writer.close()
> }
> // ...
> ---------------------------------
> 
> Since we have "flink-s3-fs-hadoop" at the plugins folder and therefore being 
> dynamically loaded upon task/job manager(s) startup (also, we are keeping 
> Flink's default inverted class loading strategy), shouldn't Hadoop 
> dependencies be loaded by parent-first? (based on 
> classloader.parent-first-patterns.default)
> 
> We also tried to put "flink-shaded-hadoop-2-uber-2.8.3-10.0.jar" on Flink's 
> /lib folder, but when doing that we got this error instead:
> 
> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
> org.apache.hadoop.fs.s3a.S3AFileSystem not found
> 
> The only way we are being able to make the application work as expected is to 
> include the dependency "hadoop-aws" with compile scope, but we get a fat JAR 
> and transitive dependencies on Hadoop libraries that we would like to avoid.
> 
> What would be the most appropriate way to take advantage of cluster's 
> "flink-s3-fs-hadoop" and avoid to deliver any Hadoop related library on our 
> application JAR?
> 
> The dependencies we're using in the build.sbt file:
> ---------------------------------
> lazy val dependencies =
> new {
> val flinkVersion = "1.10.0"
> val parquetAvroVersion = "1.10.1"
> val hadoopVersion = "3.2.1"
> val circeVersion = "0.12.3"
> val rogachVersion = "3.3.1"
> val loggingVersion = "3.7.2"
> val scalatestVersion = "3.0.5"
> val mockitoVersion = "1.10.0"
> val kafkaVersion = "2.2.0"
> val scalajVersion = "2.4.2"
> val snakeYamlVersion = "1.25"
> val slf4jVersion = "1.7.30"
> val beanUtilsVersion = "1.9.4"
> val shadedHadoopVersion = "2.8.3-10.0"
> 
> // Core libraries provided at runtime
> val flink = "org.apache.flink" %% "flink-scala" % flinkVersion % "provided"
> val flinkStreaming = "org.apache.flink" %% "flink-streaming-scala" % 
> flinkVersion % "provided"
> val flinks3Hadoop = "org.apache.flink" % "flink-s3-fs-hadoop" % flinkVersion 
> % "provided"
> 
> // Application specific dependencies.
> val flinkConnectorKafka = "org.apache.flink" %% "flink-connector-kafka" % 
> flinkVersion
> val flinkStateBackendRocksDb = "org.apache.flink" %% 
> "flink-statebackend-rocksdb" % flinkVersion
> val flinkParquet = "org.apache.flink" %% "flink-parquet" % flinkVersion
> val flinkDropwizard = "org.apache.flink" % "flink-metrics-dropwizard" % 
> flinkVersion
> val parquetAvro = "org.apache.parquet" % "parquet-avro" % parquetAvroVersion
> val circeCore = "io.circe" %% "circe-core" % circeVersion
> val circeParser = "io.circe" %% "circe-parser" % circeVersion
> val circeGeneric = "io.circe" %% "circe-generic" % circeVersion
> val scallop = "org.rogach" %% "scallop" % rogachVersion
> val logging = "com.typesafe.scala-logging" %% "scala-logging" % loggingVersion
> val snakeYaml = "org.yaml" % "snakeyaml" % snakeYamlVersion
> val slf4j = "org.slf4j" % "slf4j-log4j12" % slf4jVersion
> val beanUtils = "commons-beanutils" % "commons-beanutils" % beanUtilsVersion
> 
> // Test libraries
> val scalatest = "org.scalatest" %% "scalatest" % scalatestVersion % "test"
> val mockito = "org.mockito" %% "mockito-scala" % mockitoVersion % "test"
> val flinkTestUtils = "org.apache.flink" %% "flink-test-utils" % flinkVersion 
> % "test"
> val kafkaStreams = "org.apache.kafka" % "kafka-streams" % kafkaVersion % 
> "test"
> val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion % 
> "test"
> val kafka = "org.apache.kafka" %% "kafka" % kafkaVersion % "test"
> val hadoopClient = "org.apache.hadoop" % "hadoop-client" % hadoopVersion % 
> "test"
> 
> // Test classifiers only
> val flinkRuntimeTest = "org.apache.flink" %% "flink-runtime" % flinkVersion % 
> "test" classifier "tests"
> val kafkaTest = "org.apache.kafka" %% "kafka" % kafkaVersion % "test" 
> classifier "test"
> val kafkaStreamsTest = "org.apache.kafka" % "kafka-streams" % kafkaVersion % 
> "test" classifier "test"
> val kafkaClientsTest = "org.apache.kafka" % "kafka-clients" % kafkaVersion % 
> "test" classifier "test"
> }
> ---------------------------------
> 
> 
> 
> This is the Dockerfile:
> ---------------------------------
> FROM flink:1.10.0-scala_2.12
> RUN cp /opt/flink/opt/flink-metrics-prometheus-1.10.0.jar /opt/flink/lib
> RUN mkdir /opt/flink/plugins/flink-s3-fs-presto 
> /opt/flink/plugins/flink-s3-fs-hadoop
> RUN cp /opt/flink/opt/flink-s3-fs-presto-1.10.0.jar 
> /opt/flink/plugins/flink-s3-fs-presto/
> RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.10.0.jar 
> /opt/flink/plugins/flink-s3-fs-hadoop/
> RUN chown -R flink:flink /opt/flink/plugins/flink-s3-fs-presto/
> RUN chown -R flink:flink /opt/flink/plugins/flink-s3-fs-hadoop/
> ---------------------------------
> 
> --
> Best regards,
> Ricardo Cardante.

Reply via email to