Hey guys.

I'm trying to read a file from an internal S3 with flink on Kubernetes, but get 
a strange blocking error.

Here is the code :

MyFlinkJob.scala :
###
package com.example.flink

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import scala.io.Source

object MyFlinkJob {
    def main(args: Array[String]): Unit = {
        try {
            val env = StreamExecutionEnvironment.getExecutionEnvironment

            val hadoopConfig = new Configuration()
            hadoopConfig.set("fs.s3a.access.key", "###")
            hadoopConfig.set("fs.s3a.secret.key", "###")
            hadoopConfig.set("fs.s3a.endpoint", "internal endpoint")

            val fs = FileSystem.get(hadoopConfig)
            val s3Path = new Path("s3a://mybucket/myfolder/myfile.txt")
            val inputStream = fs.open(s3Path)
            val referenceData = 
Source.fromInputStream(inputStream).getLines().toSet
            inputStream.close()

            println("Reference Data:")
            referenceData.foreach(println)

            env.execute("Flink S3 Simple Example")
        } catch {
            case e: Exception =>
            e.printStackTrace()
            println(s"Error: ${e.getMessage}")
        }
    }
}

###

And my build.sbt file :
###
import Dependencies._

name := "MyFlinkJob"

version := "0.1"

scalaVersion := "2.12.19"

ThisBuild / scalaVersion     := "2.12.19"
ThisBuild / version          := "0.1.0-SNAPSHOT"
ThisBuild / organization     := "com.example"
ThisBuild / organizationName := "example"

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % "1.18.1",
  "org.apache.flink" %% "flink-streaming-scala" % "1.18.1",
  "org.apache.flink" % "flink-s3-fs-hadoop" % "1.18.1"
)

assembly / assemblyOption ~= {
  _.withIncludeScala(false)
}

assembly / mainClass := Some(s"com.example.flink.MyFlinkJob")

assembly / assemblyJarName := s"myflinkjob_2.12-0.1.jar"

assembly / assemblyMergeStrategy := {
  case path if path.contains("services") => MergeStrategy.concat
  case PathList("META-INF", _*) => MergeStrategy.discard
  case _ => MergeStrategy.first
}

###

I'm using the following docker image :
###
FROM flink:1.18-scala_2.12

USER root

RUN mkdir -p /opt/flink/plugins/s3-fs-hadoop && \
    cp -p /opt/flink/opt/flink-s3-fs-hadoop-1.18.1.jar 
/opt/flink/plugins/s3-fs-hadoop/

RUN mkdir -p /opt/flink/log/ /opt/flink/conf /opt/flink/userlib

WORKDIR /opt/flink/userlib
COPY target/scala-2.12/myflinkjob_2.12-0.1.jar myflinkjob.jar

RUN chown -R flink:flink /opt/flink && \
    chmod -R 755 /opt/flink

RUN chown -R flink:flink /opt/flink/userlib && \
    chmod -R 755 /opt/flink/userlib
###

And the following Kubernetes deployment :
###
---
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-s3
spec:
  image: flink-s3:0.1
  flinkVersion: v1_18
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    classloader.resolve-order: parent-first
  serviceAccount: flink
  jobManager:
    resource:
      memory: 2048m
      cpu: 0.5
  taskManager:
    replicas: 2
    resource:
      memory: 2048m
      cpu: 0.5
  job:
    jarURI: "local:///opt/flink/userlib/myflinkjob.jar"
    parallelism: 2
    #upgradeMode: stateless  # stateless or savepoint or last-state
    entryClass: "com.example.flink.MyFlinkJob"
    args: []
  podTemplate:
    apiVersion: v1
    kind: Pod
    metadata:
      name: flink-s3
    spec:
      containers:
        - name: flink-main-container
          securityContext:
            runAsUser: 9999 # UID of a non-root user
            runAsNonRoot: true
          env: []
          volumeMounts: []
      volumes: []

###

I launch the flink job like this :
###
kubectl apply -f kubernetes/FlinkDeployment.yml
###

I am using Flink operator on Kubernetes.

And I get this error in the logs :
###
java.lang.IllegalArgumentException: Wrong FS: 
s3a://mybucket/myfolder/myfile.txt, expected: file:///
        at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:807)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:105)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:774)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
        at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
        at 
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
        at 
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:976)
        at com.orange.flink.MyFlinkJob$.main(MyFlinkJob.scala:28)
        at com.orange.flink.MyFlinkJob.main(MyFlinkJob.scala)
###

I don't really understand why flink is telling me this.

I followed the documentation here :
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins

May someone help me to understand what I am missing ?

Cdt.
Gwenael Le Barzic




Orange Restricted
____________________________________________________________________________________________________________
Ce message et ses pieces jointes peuvent contenir des informations 
confidentielles ou privilegiees et ne doivent donc
pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce 
message par erreur, veuillez le signaler
a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
electroniques etant susceptibles d'alteration,
Orange decline toute responsabilite si ce message a ete altere, deforme ou 
falsifie. Merci.

This message and its attachments may contain confidential or privileged 
information that may be protected by law;
they should not be distributed, used or copied without authorisation.
If you have received this email in error, please notify the sender and delete 
this message and its attachments.
As emails may be altered, Orange is not liable for messages that have been 
modified, changed or falsified.
Thank you.

Reply via email to