Hi Gwenael,

>From the logs I thought it was a JVM module opens/exports issue, but I
found it had a similar issue using a java8 base image too. I think the
issue is it's not permitted for PythonCsvUtils to call the package-private
constructor of CsvReaderFormat across class loaders.

One workaround I found is to add a `RUN cp /opt/flink/opt/flink-python*
/opt/flink/lib/` to the Dockerfile, so that the flink-python-1.18.1,jar is
present in both /opt and /lib. Then when Flink tries to
classload org.apache.flink.formats.csv.PythonCsvUtils it will be available
to the app classloader.

Thanks
Rob Young

On Mon, Jun 17, 2024 at 11:53 PM <gwenael.lebar...@orange.com> wrote:

> Hello everyone.
>
>
>
> Does someone know how to solve this please ?
>
>
>
> Cdt.
>
> [image: Logo Orange] <http://www.orange.com/>
>
>
>
> *Gwenael Le Barzic *
> Ingénieur technique techno BigData
> Orange/OF/DTSI/SI/DATA-IA/SOLID/CXP
>
>
>
> Mobile : +33 6 48 70 85 75
> <https://monsi.sso.francetelecom.fr/index.asp?target=http%3A%2F%2Fclicvoice.sso.francetelecom.fr%2FClicvoiceV2%2FToolBar.do%3Faction%3Ddefault%26rootservice%3DSIGNATURE%26to%3D+33%206%2048%2070%2085%2075>
> gwenael.lebar...@orange.com
>
>
>
> Nouveau lien vers le Portail de suivi des Tickets du CXP
> <https://portail.agir.orange.com/servicedesk/customer/portal/35>
>
>
>
>
> Orange Restricted
> De : LE BARZIC Gwenael DTSI/SI
> *Envoyé :* vendredi 14 juin 2024 22:02
> *À :* user@flink.apache.org
> *Objet :* Problem reading a CSV file with pyflink datastream in k8s with
> Flink operator
>
>
>
> Hello everyone.
>
>
>
> I get the following error when trying to read a CSV file with pyflink
> datastream in a k8s environment using the flink operator.
>
> ###
>
>   File "/opt/myworkdir/myscript.py", line 30, in <module>
>
>     run_flink_job(myfile)
>
>   File "/opt/myworkdir/myscript.py", line 21, in run_flink_job
>
>     csvshem = CsvReaderFormat.for_schema(file_csv_schema)
>
>   File
> "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/formats/csv.py", line
> 322, in for_schema
>
>     items = list(charFrequency[char].items())
>
>   File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
> line 1322, in __call__
>
>   File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 146, in deco
>
>   File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py",
> line 326, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat.
>
> : java.lang.IllegalAccessError: class
> org.apache.flink.formats.csv.PythonCsvUtils tried to access method 'void
> org.apache.flink.formats.csv.CsvReaderFormat.<init>(org.apache.flink.util.function.SerializableSupplier,
> org.apache.flink.util.function.SerializableFunction, java.lang.Class,
> org.apache.flink.formats.common.Converter,
> org.apache.flink.api.common.typeinfo.TypeInformation, boolean)'
> (org.apache.flink.formats.csv.PythonCsvUtils is in unnamed module of loader
> org.apache.flink.util.ChildFirstClassLoader @5d9b7a8a;
> org.apache.flink.formats.csv.CsvReaderFormat is in unnamed module of loader
> 'app')
>
>         at
> org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat(PythonCsvUtils.java:48)
>
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
>
>         at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
>
>         at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>
>         at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>
>         at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>
>         at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>
>         at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>
>         at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>
>         at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>
>         at java.base/java.lang.Thread.run(Unknown Source)
>
> ###
>
>
>
> Here is my dockerfile :
>
> ###
>
> FROM flink:1.18.1
>
>
>
> RUN apt-get update -y && \
>
>     apt-get install -y python3 python3-pip python3-dev && rm -rf
> /var/lib/apt/lists/*
>
> RUN ln -s /usr/bin/python3 /usr/bin/python
>
>
>
> RUN mkdir -p /opt/myworkdir
>
> WORKDIR /opt/myworkdir
>
>
>
> RUN alias python=python3
>
>
>
> COPY requirements.txt .
>
> RUN pip3 install --no-cache-dir -r requirements.txt
>
>
>
> COPY src .
>
> RUN chown -R flink:flink /opt/myworkdir
>
> RUN chmod -R 755 /opt/myworkdir
>
>
>
> ###
>
>
>
> Here is my flinkdeployment custom resource :
>
> ###
>
> apiVersion: flink.apache.org/v1beta1
>
> kind: FlinkDeployment
>
> metadata:
>
>  finalizers:
>
>   - flinkdeployments.flink.apache.org/finalizer
>
>  name: myscript
>
>  namespace: flink
>
> spec:
>
>  image: myscript:0.1
>
>  flinkVersion: v1_18
>
>  flinkConfiguration:
>
>    taskmanager.numberOfTaskSlots: "2"
>
>    state.backend.fs.checkpointdir: file:///checkpoints/flink/checkpoints
>
>    state.checkpoints.dir:
> file:///checkpoints/flink/externalized-checkpoints
>
>    state.savepoints.dir: file:///checkpoints/flink/savepoints
>
>    job.autoscaler.enabled: "true"
>
>    job.autoscaler.stabilization.interval: 1m
>
>    job.autoscaler.metrics.window: 5m
>
>    job.autoscaler.target.utilization: "0.6"
>
>    job.autoscaler.target.utilization.boundary: "0.2"
>
>    job.autoscaler.restart.time: 2m
>
>    job.autoscaler.catch-up.duration: 5m
>
>    pipeline.max-parallelism: "720"
>
>  serviceAccount: flink
>
>  jobManager:
>
>    resource:
>
>      memory: "2048m"
>
>      cpu: 1
>
>  taskManager:
>
>    resource:
>
>      memory: "2048m"
>
>      cpu: 1
>
>  job:
>
>    jarURI: local:///opt/myworkdir/myscript.py
>
>    entryClass: "org.apache.flink.client.python.PythonDriver"
>
>    args: ["-pyclientexec", "/usr/bin/python", "-py",
> "/opt/myworkdir/myscript.py"]
>
>    parallelism: 1
>
>
>
> ###
>
>
>
> Here is myscript.py :
>
> ###
>
> import os
>
>
>
> from pyflink.datastream import StreamExecutionEnvironment,
> RuntimeExecutionMode
>
> from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
>
> from pyflink.datastream.connectors.file_system import FileSource
>
> from pyflink.common import Types, Row, WatermarkStrategy
>
> import time
>
>
>
> def run_flink_job(myfile):
>
>
>
>     env = StreamExecutionEnvironment.get_execution_environment()
>
>     env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
>
>
>
>     file_csv_schema = CsvSchema.builder() \
>
>         .add_string_column('column_1') \
>
>         .add_string_column('column_2') \
>
>         .set_column_separator(';') \
>
>         .set_skip_first_data_row(True) \
>
>         .build()
>
>
>
>     csvshem = CsvReaderFormat.for_schema(file_csv_schema)
>
>     f_source = FileSource.for_record_stream_format(csvshem, myfile
> ).build()
>
>     ds_f = env.from_source(f_source, WatermarkStrategy.no_watermarks(),
> 'csv_source').map(lambda x: x)
>
>     ds_f.print()
>
>     env.execute()
>
>
>
> if __name__ == '__main__':
>
>     myfile = '%s/src/input/dummy.csv' % os.getcwd()
>
>     run_flink_job(myfile)
>
>
>
> ###
>
>
>
> I tried another flink deployment reading a kafka topic, and it works fine.
>
> This problem seems to occure only if I try to read a CSV file.
>
>
>
> Here is the CSV file :
>
> ###
>
> column_1;column_2
>
> toto;10
>
> tyty;20
>
> tata;30
>
> tutu;40
>
> tete;50
>
> ###
>
>
>
> I installed the flink kubernetes operator as described in the official doc
> and I’m using minikube on my workstation.
>
>
>
> Thank you in advance for your help !
>
>
>
> Cdt.
>
> *Gwenael Le Barzic *
>
>
>
> Orange Restricted
>
>
>
> Orange Restricted
>
>
>
> ____________________________________________________________________________________________________________
> Ce message et ses pieces jointes peuvent contenir des informations 
> confidentielles ou privilegiees et ne doivent donc
> pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu 
> ce message par erreur, veuillez le signaler
> a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
> electroniques etant susceptibles d'alteration,
> Orange decline toute responsabilite si ce message a ete altere, deforme ou 
> falsifie. Merci.
>
> This message and its attachments may contain confidential or privileged 
> information that may be protected by law;
> they should not be distributed, used or copied without authorisation.
> If you have received this email in error, please notify the sender and delete 
> this message and its attachments.
> As emails may be altered, Orange is not liable for messages that have been 
> modified, changed or falsified.
> Thank you.
>
>

Reply via email to