Hello everyone.

Does someone know how to solve this please ?

Cdt.
[Logo Orange]<http://www.orange.com/>

Gwenael Le Barzic
Ingénieur technique techno BigData
Orange/OF/DTSI/SI/DATA-IA/SOLID/CXP

Mobile : +33 6 48 70 85 75 
<https://monsi.sso.francetelecom.fr/index.asp?target=http%3A%2F%2Fclicvoice.sso.francetelecom.fr%2FClicvoiceV2%2FToolBar.do%3Faction%3Ddefault%26rootservice%3DSIGNATURE%26to%3D+33%206%2048%2070%2085%2075>
gwenael.lebar...@orange.com<mailto:gwenael.lebar...@orange.com>

Nouveau lien vers le Portail de suivi des Tickets du 
CXP<https://portail.agir.orange.com/servicedesk/customer/portal/35>



Orange Restricted

De : LE BARZIC Gwenael DTSI/SI
Envoyé : vendredi 14 juin 2024 22:02
À : user@flink.apache.org
Objet : Problem reading a CSV file with pyflink datastream in k8s with Flink 
operator

Hello everyone.

I get the following error when trying to read a CSV file with pyflink 
datastream in a k8s environment using the flink operator.
###
  File "/opt/myworkdir/myscript.py", line 30, in <module>
    run_flink_job(myfile)
  File "/opt/myworkdir/myscript.py", line 21, in run_flink_job
    csvshem = CsvReaderFormat.for_schema(file_csv_schema)
  File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/formats/csv.py", 
line 322, in for_schema
    items = list(charFrequency[char].items())
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 
1322, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 
146, in deco
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 
326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat.
: java.lang.IllegalAccessError: class 
org.apache.flink.formats.csv.PythonCsvUtils tried to access method 'void 
org.apache.flink.formats.csv.CsvReaderFormat.<init>(org.apache.flink.util.function.SerializableSupplier,
 org.apache.flink.util.function.SerializableFunction, java.lang.Class, 
org.apache.flink.formats.common.Converter, 
org.apache.flink.api.common.typeinfo.TypeInformation, boolean)' 
(org.apache.flink.formats.csv.PythonCsvUtils is in unnamed module of loader 
org.apache.flink.util.ChildFirstClassLoader @5d9b7a8a; 
org.apache.flink.formats.csv.CsvReaderFormat is in unnamed module of loader 
'app')
        at 
org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat(PythonCsvUtils.java:48)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Unknown Source)
###

Here is my dockerfile :
###
FROM flink:1.18.1

RUN apt-get update -y && \
    apt-get install -y python3 python3-pip python3-dev && rm -rf 
/var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python

RUN mkdir -p /opt/myworkdir
WORKDIR /opt/myworkdir

RUN alias python=python3

COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt

COPY src .
RUN chown -R flink:flink /opt/myworkdir
RUN chmod -R 755 /opt/myworkdir

###

Here is my flinkdeployment custom resource :
###
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
 finalizers:
  - flinkdeployments.flink.apache.org/finalizer
 name: myscript
 namespace: flink
spec:
 image: myscript:0.1
 flinkVersion: v1_18
 flinkConfiguration:
   taskmanager.numberOfTaskSlots: "2"
   state.backend.fs.checkpointdir: file:///checkpoints/flink/checkpoints
   state.checkpoints.dir: file:///checkpoints/flink/externalized-checkpoints
   state.savepoints.dir: file:///checkpoints/flink/savepoints
   job.autoscaler.enabled: "true"
   job.autoscaler.stabilization.interval: 1m
   job.autoscaler.metrics.window: 5m
   job.autoscaler.target.utilization: "0.6"
   job.autoscaler.target.utilization.boundary: "0.2"
   job.autoscaler.restart.time: 2m
   job.autoscaler.catch-up.duration: 5m
   pipeline.max-parallelism: "720"
 serviceAccount: flink
 jobManager:
   resource:
     memory: "2048m"
     cpu: 1
 taskManager:
   resource:
     memory: "2048m"
     cpu: 1
 job:
   jarURI: local:///opt/myworkdir/myscript.py
   entryClass: "org.apache.flink.client.python.PythonDriver"
   args: ["-pyclientexec", "/usr/bin/python", "-py", 
"/opt/myworkdir/myscript.py"]
   parallelism: 1

###

Here is myscript.py :
###
import os

from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
from pyflink.datastream.connectors.file_system import FileSource
from pyflink.common import Types, Row, WatermarkStrategy
import time

def run_flink_job(myfile):

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)

    file_csv_schema = CsvSchema.builder() \
        .add_string_column('column_1') \
        .add_string_column('column_2') \
        .set_column_separator(';') \
        .set_skip_first_data_row(True) \
        .build()

    csvshem = CsvReaderFormat.for_schema(file_csv_schema)
    f_source = FileSource.for_record_stream_format(csvshem, myfile).build()
    ds_f = env.from_source(f_source, WatermarkStrategy.no_watermarks(), 
'csv_source').map(lambda x: x)
    ds_f.print()
    env.execute()

if __name__ == '__main__':
    myfile = '%s/src/input/dummy.csv' % os.getcwd()
    run_flink_job(myfile)

###

I tried another flink deployment reading a kafka topic, and it works fine.
This problem seems to occure only if I try to read a CSV file.

Here is the CSV file :
###
column_1;column_2
toto;10
tyty;20
tata;30
tutu;40
tete;50
###

I installed the flink kubernetes operator as described in the official doc and 
I'm using minikube on my workstation.

Thank you in advance for your help !

Cdt.
Gwenael Le Barzic

Orange Restricted

Orange Restricted

____________________________________________________________________________________________________________
Ce message et ses pieces jointes peuvent contenir des informations 
confidentielles ou privilegiees et ne doivent donc
pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce 
message par erreur, veuillez le signaler
a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
electroniques etant susceptibles d'alteration,
Orange decline toute responsabilite si ce message a ete altere, deforme ou 
falsifie. Merci.

This message and its attachments may contain confidential or privileged 
information that may be protected by law;
they should not be distributed, used or copied without authorisation.
If you have received this email in error, please notify the sender and delete 
this message and its attachments.
As emails may be altered, Orange is not liable for messages that have been 
modified, changed or falsified.
Thank you.

Reply via email to