Hi Gwenael, >From the logs I thought it was a JVM module opens/exports issue, but I found it had a similar issue using a java8 base image too. I think the issue is it's not permitted for PythonCsvUtils to call the package-private constructor of CsvReaderFormat across class loaders.
One workaround I found is to add a `RUN cp /opt/flink/opt/flink-python* /opt/flink/lib/` to the Dockerfile, so that the flink-python-1.18.1,jar is present in both /opt and /lib. Then when Flink tries to classload org.apache.flink.formats.csv.PythonCsvUtils it will be available to the app classloader. Thanks Rob Young On Mon, Jun 17, 2024 at 11:53 PM <gwenael.lebar...@orange.com> wrote: > Hello everyone. > > > > Does someone know how to solve this please ? > > > > Cdt. > > [image: Logo Orange] <http://www.orange.com/> > > > > *Gwenael Le Barzic * > Ingénieur technique techno BigData > Orange/OF/DTSI/SI/DATA-IA/SOLID/CXP > > > > Mobile : +33 6 48 70 85 75 > <https://monsi.sso.francetelecom.fr/index.asp?target=http%3A%2F%2Fclicvoice.sso.francetelecom.fr%2FClicvoiceV2%2FToolBar.do%3Faction%3Ddefault%26rootservice%3DSIGNATURE%26to%3D+33%206%2048%2070%2085%2075> > gwenael.lebar...@orange.com > > > > Nouveau lien vers le Portail de suivi des Tickets du CXP > <https://portail.agir.orange.com/servicedesk/customer/portal/35> > > > > > Orange Restricted > De : LE BARZIC Gwenael DTSI/SI > *Envoyé :* vendredi 14 juin 2024 22:02 > *À :* user@flink.apache.org > *Objet :* Problem reading a CSV file with pyflink datastream in k8s with > Flink operator > > > > Hello everyone. > > > > I get the following error when trying to read a CSV file with pyflink > datastream in a k8s environment using the flink operator. > > ### > > File "/opt/myworkdir/myscript.py", line 30, in <module> > > run_flink_job(myfile) > > File "/opt/myworkdir/myscript.py", line 21, in run_flink_job > > csvshem = CsvReaderFormat.for_schema(file_csv_schema) > > File > "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/formats/csv.py", line > 322, in for_schema > > items = list(charFrequency[char].items()) > > File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", > line 1322, in __call__ > > File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line 146, in deco > > File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", > line 326, in get_return_value > > py4j.protocol.Py4JJavaError: An error occurred while calling > z:org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat. > > : java.lang.IllegalAccessError: class > org.apache.flink.formats.csv.PythonCsvUtils tried to access method 'void > org.apache.flink.formats.csv.CsvReaderFormat.<init>(org.apache.flink.util.function.SerializableSupplier, > org.apache.flink.util.function.SerializableFunction, java.lang.Class, > org.apache.flink.formats.common.Converter, > org.apache.flink.api.common.typeinfo.TypeInformation, boolean)' > (org.apache.flink.formats.csv.PythonCsvUtils is in unnamed module of loader > org.apache.flink.util.ChildFirstClassLoader @5d9b7a8a; > org.apache.flink.formats.csv.CsvReaderFormat is in unnamed module of loader > 'app') > > at > org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat(PythonCsvUtils.java:48) > > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) > > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) > > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) > > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > > at java.base/java.lang.Thread.run(Unknown Source) > > ### > > > > Here is my dockerfile : > > ### > > FROM flink:1.18.1 > > > > RUN apt-get update -y && \ > > apt-get install -y python3 python3-pip python3-dev && rm -rf > /var/lib/apt/lists/* > > RUN ln -s /usr/bin/python3 /usr/bin/python > > > > RUN mkdir -p /opt/myworkdir > > WORKDIR /opt/myworkdir > > > > RUN alias python=python3 > > > > COPY requirements.txt . > > RUN pip3 install --no-cache-dir -r requirements.txt > > > > COPY src . > > RUN chown -R flink:flink /opt/myworkdir > > RUN chmod -R 755 /opt/myworkdir > > > > ### > > > > Here is my flinkdeployment custom resource : > > ### > > apiVersion: flink.apache.org/v1beta1 > > kind: FlinkDeployment > > metadata: > > finalizers: > > - flinkdeployments.flink.apache.org/finalizer > > name: myscript > > namespace: flink > > spec: > > image: myscript:0.1 > > flinkVersion: v1_18 > > flinkConfiguration: > > taskmanager.numberOfTaskSlots: "2" > > state.backend.fs.checkpointdir: file:///checkpoints/flink/checkpoints > > state.checkpoints.dir: > file:///checkpoints/flink/externalized-checkpoints > > state.savepoints.dir: file:///checkpoints/flink/savepoints > > job.autoscaler.enabled: "true" > > job.autoscaler.stabilization.interval: 1m > > job.autoscaler.metrics.window: 5m > > job.autoscaler.target.utilization: "0.6" > > job.autoscaler.target.utilization.boundary: "0.2" > > job.autoscaler.restart.time: 2m > > job.autoscaler.catch-up.duration: 5m > > pipeline.max-parallelism: "720" > > serviceAccount: flink > > jobManager: > > resource: > > memory: "2048m" > > cpu: 1 > > taskManager: > > resource: > > memory: "2048m" > > cpu: 1 > > job: > > jarURI: local:///opt/myworkdir/myscript.py > > entryClass: "org.apache.flink.client.python.PythonDriver" > > args: ["-pyclientexec", "/usr/bin/python", "-py", > "/opt/myworkdir/myscript.py"] > > parallelism: 1 > > > > ### > > > > Here is myscript.py : > > ### > > import os > > > > from pyflink.datastream import StreamExecutionEnvironment, > RuntimeExecutionMode > > from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat > > from pyflink.datastream.connectors.file_system import FileSource > > from pyflink.common import Types, Row, WatermarkStrategy > > import time > > > > def run_flink_job(myfile): > > > > env = StreamExecutionEnvironment.get_execution_environment() > > env.set_runtime_mode(RuntimeExecutionMode.STREAMING) > > > > file_csv_schema = CsvSchema.builder() \ > > .add_string_column('column_1') \ > > .add_string_column('column_2') \ > > .set_column_separator(';') \ > > .set_skip_first_data_row(True) \ > > .build() > > > > csvshem = CsvReaderFormat.for_schema(file_csv_schema) > > f_source = FileSource.for_record_stream_format(csvshem, myfile > ).build() > > ds_f = env.from_source(f_source, WatermarkStrategy.no_watermarks(), > 'csv_source').map(lambda x: x) > > ds_f.print() > > env.execute() > > > > if __name__ == '__main__': > > myfile = '%s/src/input/dummy.csv' % os.getcwd() > > run_flink_job(myfile) > > > > ### > > > > I tried another flink deployment reading a kafka topic, and it works fine. > > This problem seems to occure only if I try to read a CSV file. > > > > Here is the CSV file : > > ### > > column_1;column_2 > > toto;10 > > tyty;20 > > tata;30 > > tutu;40 > > tete;50 > > ### > > > > I installed the flink kubernetes operator as described in the official doc > and I’m using minikube on my workstation. > > > > Thank you in advance for your help ! > > > > Cdt. > > *Gwenael Le Barzic * > > > > Orange Restricted > > > > Orange Restricted > > > > ____________________________________________________________________________________________________________ > Ce message et ses pieces jointes peuvent contenir des informations > confidentielles ou privilegiees et ne doivent donc > pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu > ce message par erreur, veuillez le signaler > a l'expediteur et le detruire ainsi que les pieces jointes. Les messages > electroniques etant susceptibles d'alteration, > Orange decline toute responsabilite si ce message a ete altere, deforme ou > falsifie. Merci. > > This message and its attachments may contain confidential or privileged > information that may be protected by law; > they should not be distributed, used or copied without authorisation. > If you have received this email in error, please notify the sender and delete > this message and its attachments. > As emails may be altered, Orange is not liable for messages that have been > modified, changed or falsified. > Thank you. > >