Hi Gwenael,

>From the logs I thought it was a JVM module opens/exports issue, but I
found it had a similar issue using a java8 base image too. I think the
issue is it's not permitted for PythonCsvUtils to call the package-private
constructor of CsvReaderFormat across class loaders.

One workaround I found is to add a `RUN cp /opt/flink/opt/flink-python*
/opt/flink/lib/` to the Dockerfile, so that the flink-python-1.18.1,jar is
present in both /opt and /lib. Then when Flink tries to
classload org.apache.flink.formats.csv.PythonCsvUtils it will be available
to the app classloader.

Rob Young

On Mon, Jun 17, 2024 at 11:53 PM <gwenael.lebar...@orange.com> wrote:

> Hello everyone.
> Does someone know how to solve this please ?
> Cdt.
> De : LE BARZIC Gwenael DTSI/SI
> *Envoyé :* vendredi 14 juin 2024 22:02
> *À :* user@flink.apache.org
> *Objet :* Problem reading a CSV file with pyflink datastream in k8s with
> Flink operator
> Hello everyone.
> I get the following error when trying to read a CSV file with pyflink
> datastream in a k8s environment using the flink operator.
> ###
>   File "/opt/myworkdir/myscript.py", line 30, in <module>
>     run_flink_job(myfile)
>   File "/opt/myworkdir/myscript.py", line 21, in run_flink_job
>     csvshem = CsvReaderFormat.for_schema(file_csv_schema)
>   File
> "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/formats/csv.py", line
> 322, in for_schema
>     items = list(charFrequency[char].items())
>   File "/opt/flink/opt/python/py4j-",
> line 1322, in __call__
>   File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 146, in deco
>   File "/opt/flink/opt/python/py4j-",
> line 326, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat.
> : java.lang.IllegalAccessError: class
> org.apache.flink.formats.csv.PythonCsvUtils tried to access method 'void
> org.apache.flink.formats.csv.CsvReaderFormat.<init>(org.apache.flink.util.function.SerializableSupplier,
> org.apache.flink.util.function.SerializableFunction, java.lang.Class,
> org.apache.flink.formats.common.Converter,
> org.apache.flink.api.common.typeinfo.TypeInformation, boolean)'
> (org.apache.flink.formats.csv.PythonCsvUtils is in unnamed module of loader
> org.apache.flink.util.ChildFirstClassLoader @5d9b7a8a;
> org.apache.flink.formats.csv.CsvReaderFormat is in unnamed module of loader
> 'app')
>         at
> org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat(PythonCsvUtils.java:48)
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
>         at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
>         at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>         at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>         at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>         at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>         at java.base/java.lang.Thread.run(Unknown Source)
> ###
> Here is my dockerfile :
> ###
> FROM flink:1.18.1
> RUN apt-get update -y && \
>     apt-get install -y python3 python3-pip python3-dev && rm -rf
> /var/lib/apt/lists/*
> RUN ln -s /usr/bin/python3 /usr/bin/python
> RUN mkdir -p /opt/myworkdir
> WORKDIR /opt/myworkdir
> RUN alias python=python3
> COPY requirements.txt .
> RUN pip3 install --no-cache-dir -r requirements.txt
> COPY src .
> RUN chown -R flink:flink /opt/myworkdir
> RUN chmod -R 755 /opt/myworkdir
> ###
> Here is my flinkdeployment custom resource :
> ###
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>  finalizers:
>   - flinkdeployments.flink.apache.org/finalizer
>  name: myscript
>  namespace: flink
> spec:
>  image: myscript:0.1
>  flinkVersion: v1_18
>  flinkConfiguration:
>    taskmanager.numberOfTaskSlots: "2"
>    state.backend.fs.checkpointdir: file:///checkpoints/flink/checkpoints
>    state.checkpoints.dir:
> file:///checkpoints/flink/externalized-checkpoints
>    state.savepoints.dir: file:///checkpoints/flink/savepoints
>    job.autoscaler.enabled: "true"
>    job.autoscaler.stabilization.interval: 1m
>    job.autoscaler.metrics.window: 5m
>    job.autoscaler.target.utilization: "0.6"
>    job.autoscaler.target.utilization.boundary: "0.2"
>    job.autoscaler.restart.time: 2m
>    job.autoscaler.catch-up.duration: 5m
>    pipeline.max-parallelism: "720"
>  serviceAccount: flink
>  jobManager:
>    resource:
>      memory: "2048m"
>      cpu: 1
>  taskManager:
>    resource:
>      memory: "2048m"
>      cpu: 1
>  job:
>    jarURI: local:///opt/myworkdir/myscript.py
>    entryClass: "org.apache.flink.client.python.PythonDriver"
>    args: ["-pyclientexec", "/usr/bin/python", "-py",
> "/opt/myworkdir/myscript.py"]
>    parallelism: 1
> ###
> Here is myscript.py :
> ###
> import os
> from pyflink.datastream import StreamExecutionEnvironment,
> RuntimeExecutionMode
> from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
> from pyflink.datastream.connectors.file_system import FileSource
> from pyflink.common import Types, Row, WatermarkStrategy
> import time
> def run_flink_job(myfile):
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
>     file_csv_schema = CsvSchema.builder() \
>         .add_string_column('column_1') \
>         .add_string_column('column_2') \
>         .set_column_separator(';') \
>         .set_skip_first_data_row(True) \
>         .build()
>     csvshem = CsvReaderFormat.for_schema(file_csv_schema)
>     f_source = FileSource.for_record_stream_format(csvshem, myfile
> ).build()
>     ds_f = env.from_source(f_source, WatermarkStrategy.no_watermarks(),
> 'csv_source').map(lambda x: x)
>     ds_f.print()
>     env.execute()
> if __name__ == '__main__':
>     myfile = '%s/src/input/dummy.csv' % os.getcwd()
>     run_flink_job(myfile)
> ###
> I tried another flink deployment reading a kafka topic, and it works fine.
> This problem seems to occure only if I try to read a CSV file.
> Here is the CSV file :
> ###
> column_1;column_2
> toto;10
> tyty;20
> tata;30
> tutu;40
> tete;50
> ###
> I installed the flink kubernetes operator as described in the official doc
> and I’m using minikube on my workstation.
> Thank you in advance for your help !
> Cdt.
