Hello everyone.
In fact, the problem was coming from FileSystem.get() :
###
val fs = FileSystem.get(hadoopConfig)
###
When you want to interact with S3, you need to add a first parameter, before
the hadoop config, to specify the filesystem.
Something like this :
###
val s3uri = URI.create("s3a://mybucket")
val fs = FileSystem.get(s3uri, hadoopConfig)
###
Best regards.
[Logo Orange]<http://www.orange.com/>
Gwenael Le Barzic
De : LE BARZIC Gwenael DTSI/SI
Envoyé : jeudi 11 juillet 2024 16:24
À : [email protected]
Objet : Trying to read a file from S3 with flink on kubernetes
Hey guys.
I'm trying to read a file from an internal S3 with flink on Kubernetes, but get
a strange blocking error.
Here is the code :
MyFlinkJob.scala :
###
package com.example.flink
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import scala.io.Source
object MyFlinkJob {
def main(args: Array[String]): Unit = {
try {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val hadoopConfig = new Configuration()
hadoopConfig.set("fs.s3a.access.key", "###")
hadoopConfig.set("fs.s3a.secret.key", "###")
hadoopConfig.set("fs.s3a.endpoint", "internal endpoint")
val fs = FileSystem.get(hadoopConfig)
val s3Path = new Path("s3a://mybucket/myfolder/myfile.txt")
val inputStream = fs.open(s3Path)
val referenceData =
Source.fromInputStream(inputStream).getLines().toSet
inputStream.close()
println("Reference Data:")
referenceData.foreach(println)
env.execute("Flink S3 Simple Example")
} catch {
case e: Exception =>
e.printStackTrace()
println(s"Error: ${e.getMessage}")
}
}
}
###
And my build.sbt file :
###
import Dependencies._
name := "MyFlinkJob"
version := "0.1"
scalaVersion := "2.12.19"
ThisBuild / scalaVersion := "2.12.19"
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / organization := "com.example"
ThisBuild / organizationName := "example"
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % "1.18.1",
"org.apache.flink" %% "flink-streaming-scala" % "1.18.1",
"org.apache.flink" % "flink-s3-fs-hadoop" % "1.18.1"
)
assembly / assemblyOption ~= {
_.withIncludeScala(false)
}
assembly / mainClass := Some(s"com.example.flink.MyFlinkJob")
assembly / assemblyJarName := s"myflinkjob_2.12-0.1.jar"
assembly / assemblyMergeStrategy := {
case path if path.contains("services") => MergeStrategy.concat
case PathList("META-INF", _*) => MergeStrategy.discard
case _ => MergeStrategy.first
}
###
I'm using the following docker image :
###
FROM flink:1.18-scala_2.12
USER root
RUN mkdir -p /opt/flink/plugins/s3-fs-hadoop && \
cp -p /opt/flink/opt/flink-s3-fs-hadoop-1.18.1.jar
/opt/flink/plugins/s3-fs-hadoop/
RUN mkdir -p /opt/flink/log/ /opt/flink/conf /opt/flink/userlib
WORKDIR /opt/flink/userlib
COPY target/scala-2.12/myflinkjob_2.12-0.1.jar myflinkjob.jar
RUN chown -R flink:flink /opt/flink && \
chmod -R 755 /opt/flink
RUN chown -R flink:flink /opt/flink/userlib && \
chmod -R 755 /opt/flink/userlib
###
And the following Kubernetes deployment :
###
---
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-s3
spec:
image: flink-s3:0.1
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
classloader.resolve-order: parent-first
serviceAccount: flink
jobManager:
resource:
memory: 2048m
cpu: 0.5
taskManager:
replicas: 2
resource:
memory: 2048m
cpu: 0.5
job:
jarURI: "local:///opt/flink/userlib/myflinkjob.jar"
parallelism: 2
#upgradeMode: stateless # stateless or savepoint or last-state
entryClass: "com.example.flink.MyFlinkJob"
args: []
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: flink-s3
spec:
containers:
- name: flink-main-container
securityContext:
runAsUser: 9999 # UID of a non-root user
runAsNonRoot: true
env: []
volumeMounts: []
volumes: []
###
I launch the flink job like this :
###
kubectl apply -f kubernetes/FlinkDeployment.yml
###
I am using Flink operator on Kubernetes.
And I get this error in the logs :
###
java.lang.IllegalArgumentException: Wrong FS:
s3a://mybucket/myfolder/myfile.txt, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:807)
at
org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:105)
at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:774)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
at
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
at
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
at
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
at com.orange.flink.MyFlinkJob$.main(MyFlinkJob.scala:28)
at com.orange.flink.MyFlinkJob.main(MyFlinkJob.scala)
###
I don't really understand why flink is telling me this.
I followed the documentation here :
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
May someone help me to understand what I am missing ?
Cdt.
Gwenael Le Barzic
Orange Restricted
Orange Restricted
____________________________________________________________________________________________________________
Ce message et ses pieces jointes peuvent contenir des informations
confidentielles ou privilegiees et ne doivent donc
pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce
message par erreur, veuillez le signaler
a l'expediteur et le detruire ainsi que les pieces jointes. Les messages
electroniques etant susceptibles d'alteration,
Orange decline toute responsabilite si ce message a ete altere, deforme ou
falsifie. Merci.
This message and its attachments may contain confidential or privileged
information that may be protected by law;
they should not be distributed, used or copied without authorisation.
If you have received this email in error, please notify the sender and delete
this message and its attachments.
As emails may be altered, Orange is not liable for messages that have been
modified, changed or falsified.
Thank you.