RE: Problem reading a CSV file with pyflink datastream in k8s with Flink operator

2024-06-18 Thread gwenael . lebarzic
Hello Rob.

This workaround works indeed !

Cdt.
[Logo Orange]<http://www.orange.com/>

Gwenael Le Barzic


De : Robert Young 
Envoyé : mardi 18 juin 2024 03:54
À : LE BARZIC Gwenael DTSI/SI 
Cc : user@flink.apache.org
Objet : Re: Problem reading a CSV file with pyflink datastream in k8s with 
Flink operator

CAUTION : This email originated outside the company. Do not click on any links 
or open attachments unless you are expecting them from the sender.
ATTENTION : Cet e-mail provient de l'extérieur de l'entreprise. Ne cliquez pas 
sur les liens ou n'ouvrez pas les pièces jointes à moins de connaitre 
l'expéditeur.

Hi Gwenael,

From the logs I thought it was a JVM module opens/exports issue, but I found it 
had a similar issue using a java8 base image too. I think the issue is it's not 
permitted for PythonCsvUtils to call the package-private constructor of 
CsvReaderFormat across class loaders.

One workaround I found is to add a `RUN cp /opt/flink/opt/flink-python* 
/opt/flink/lib/` to the Dockerfile, so that the flink-python-1.18.1,jar is 
present in both /opt and /lib. Then when Flink tries to classload 
org.apache.flink.formats.csv.PythonCsvUtils it will be available to the app 
classloader.

Thanks
Rob Young

On Mon, Jun 17, 2024 at 11:53 PM 
mailto:gwenael.lebar...@orange.com>> wrote:
Hello everyone.

Does someone know how to solve this please ?

Cdt.
[Logo Orange]<http://www.orange.com/>

Gwenael Le Barzic
Ingénieur technique techno BigData
Orange/OF/DTSI/SI/DATA-IA/SOLID/CXP

Mobile : +33 6 48 70 85 75 
<https://monsi.sso.francetelecom.fr/index.asp?target=http%3A%2F%2Fclicvoice.sso.francetelecom.fr%2FClicvoiceV2%2FToolBar.do%3Faction%3Ddefault%26rootservice%3DSIGNATURE%26to%3D+33%206%2048%2070%2085%2075>
gwenael.lebar...@orange.com<mailto:gwenael.lebar...@orange.com>

Nouveau lien vers le Portail de suivi des Tickets du 
CXP<https://portail.agir.orange.com/servicedesk/customer/portal/35>



Orange Restricted
De : LE BARZIC Gwenael DTSI/SI
Envoyé : vendredi 14 juin 2024 22:02
À : user@flink.apache.org<mailto:user@flink.apache.org>
Objet : Problem reading a CSV file with pyflink datastream in k8s with Flink 
operator

Hello everyone.

I get the following error when trying to read a CSV file with pyflink 
datastream in a k8s environment using the flink operator.
###
  File "/opt/myworkdir/myscript.py", line 30, in 
run_flink_job(myfile)
  File "/opt/myworkdir/myscript.py", line 21, in run_flink_job
csvshem = CsvReaderFormat.for_schema(file_csv_schema)
  File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/formats/csv.py", 
line 322, in for_schema
items = list(charFrequency[char].items())
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 
1322, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 
146, in deco
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 
326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat.
: java.lang.IllegalAccessError: class 
org.apache.flink.formats.csv.PythonCsvUtils tried to access method 'void 
org.apache.flink.formats.csv.CsvReaderFormat.(org.apache.flink.util.function.SerializableSupplier,
 org.apache.flink.util.function.SerializableFunction, java.lang.Class, 
org.apache.flink.formats.common.Converter, 
org.apache.flink.api.common.typeinfo.TypeInformation, boolean)' 
(org.apache.flink.formats.csv.PythonCsvUtils is in unnamed module of loader 
org.apache.flink.util.ChildFirstClassLoader @5d9b7a8a; 
org.apache.flink.formats.csv.CsvReaderFormat is in unnamed module of loader 
'app')
at 
org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat(PythonCsvUtils.java:48)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
###

Here is my dockerfil

Re: Problem reading a CSV file with pyflink datastream in k8s with Flink operator

2024-06-17 Thread Robert Young
Hi Gwenael,

>From the logs I thought it was a JVM module opens/exports issue, but I
found it had a similar issue using a java8 base image too. I think the
issue is it's not permitted for PythonCsvUtils to call the package-private
constructor of CsvReaderFormat across class loaders.

One workaround I found is to add a `RUN cp /opt/flink/opt/flink-python*
/opt/flink/lib/` to the Dockerfile, so that the flink-python-1.18.1,jar is
present in both /opt and /lib. Then when Flink tries to
classload org.apache.flink.formats.csv.PythonCsvUtils it will be available
to the app classloader.

Thanks
Rob Young

On Mon, Jun 17, 2024 at 11:53 PM  wrote:

> Hello everyone.
>
>
>
> Does someone know how to solve this please ?
>
>
>
> Cdt.
>
> [image: Logo Orange] 
>
>
>
> *Gwenael Le Barzic *
> Ingénieur technique techno BigData
> Orange/OF/DTSI/SI/DATA-IA/SOLID/CXP
>
>
>
> Mobile : +33 6 48 70 85 75
> 
> gwenael.lebar...@orange.com
>
>
>
> Nouveau lien vers le Portail de suivi des Tickets du CXP
> 
>
>
>
>
> Orange Restricted
> De : LE BARZIC Gwenael DTSI/SI
> *Envoyé :* vendredi 14 juin 2024 22:02
> *À :* user@flink.apache.org
> *Objet :* Problem reading a CSV file with pyflink datastream in k8s with
> Flink operator
>
>
>
> Hello everyone.
>
>
>
> I get the following error when trying to read a CSV file with pyflink
> datastream in a k8s environment using the flink operator.
>
> ###
>
>   File "/opt/myworkdir/myscript.py", line 30, in 
>
> run_flink_job(myfile)
>
>   File "/opt/myworkdir/myscript.py", line 21, in run_flink_job
>
> csvshem = CsvReaderFormat.for_schema(file_csv_schema)
>
>   File
> "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/formats/csv.py", line
> 322, in for_schema
>
> items = list(charFrequency[char].items())
>
>   File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
> line 1322, in __call__
>
>   File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 146, in deco
>
>   File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py",
> line 326, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat.
>
> : java.lang.IllegalAccessError: class
> org.apache.flink.formats.csv.PythonCsvUtils tried to access method 'void
> org.apache.flink.formats.csv.CsvReaderFormat.(org.apache.flink.util.function.SerializableSupplier,
> org.apache.flink.util.function.SerializableFunction, java.lang.Class,
> org.apache.flink.formats.common.Converter,
> org.apache.flink.api.common.typeinfo.TypeInformation, boolean)'
> (org.apache.flink.formats.csv.PythonCsvUtils is in unnamed module of loader
> org.apache.flink.util.ChildFirstClassLoader @5d9b7a8a;
> org.apache.flink.formats.csv.CsvReaderFormat is in unnamed module of loader
> 'app')
>
> at
> org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat(PythonCsvUtils.java:48)
>
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
>
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
>
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>
> at java.base/java.lang.Thread.run(Unknown Source)
>
> ###
>
>
>
> Here is my dockerfile :
>
> ###
>
> FROM flink:1.18.1
>
>
>
> RUN apt-get update -y && \
>
> apt-get install -y python3 python3-pip python3-dev && rm -rf
> /var/lib/apt/lists/*
>
> RUN ln -s /usr/bin/python3 /usr/bin/python
>
>
>
> RUN mkdir -p /opt/myworkdir
>
> WORKDIR /opt/myworkdir
>
>
>
> RUN alias python=python3
>
>
>
> COPY requirements.txt .
>
> RUN pip3 install --no-cache-dir -r requirements.txt
>
>
>
> COPY src .
>
> RUN chown -R flink:flink /opt/myworkdir
>
> RUN chmod -R 755 /opt/myworkdir
>
>
>
> ###
>
>
>
> Here is my flinkdeployment custom resource :
>
> ###
>
> apiVersion: flink.apache.org/v1beta1
>
> kind: FlinkDeployment
>
> metadata:

RE: Problem reading a CSV file with pyflink datastream in k8s with Flink operator

2024-06-17 Thread gwenael . lebarzic
Hello everyone.

Does someone know how to solve this please ?

Cdt.
[Logo Orange]

Gwenael Le Barzic
Ingénieur technique techno BigData
Orange/OF/DTSI/SI/DATA-IA/SOLID/CXP

Mobile : +33 6 48 70 85 75 

gwenael.lebar...@orange.com

Nouveau lien vers le Portail de suivi des Tickets du 
CXP



Orange Restricted

De : LE BARZIC Gwenael DTSI/SI
Envoyé : vendredi 14 juin 2024 22:02
À : user@flink.apache.org
Objet : Problem reading a CSV file with pyflink datastream in k8s with Flink 
operator

Hello everyone.

I get the following error when trying to read a CSV file with pyflink 
datastream in a k8s environment using the flink operator.
###
  File "/opt/myworkdir/myscript.py", line 30, in 
run_flink_job(myfile)
  File "/opt/myworkdir/myscript.py", line 21, in run_flink_job
csvshem = CsvReaderFormat.for_schema(file_csv_schema)
  File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/formats/csv.py", 
line 322, in for_schema
items = list(charFrequency[char].items())
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 
1322, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 
146, in deco
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 
326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat.
: java.lang.IllegalAccessError: class 
org.apache.flink.formats.csv.PythonCsvUtils tried to access method 'void 
org.apache.flink.formats.csv.CsvReaderFormat.(org.apache.flink.util.function.SerializableSupplier,
 org.apache.flink.util.function.SerializableFunction, java.lang.Class, 
org.apache.flink.formats.common.Converter, 
org.apache.flink.api.common.typeinfo.TypeInformation, boolean)' 
(org.apache.flink.formats.csv.PythonCsvUtils is in unnamed module of loader 
org.apache.flink.util.ChildFirstClassLoader @5d9b7a8a; 
org.apache.flink.formats.csv.CsvReaderFormat is in unnamed module of loader 
'app')
at 
org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat(PythonCsvUtils.java:48)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
###

Here is my dockerfile :
###
FROM flink:1.18.1

RUN apt-get update -y && \
apt-get install -y python3 python3-pip python3-dev && rm -rf 
/var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python

RUN mkdir -p /opt/myworkdir
WORKDIR /opt/myworkdir

RUN alias python=python3

COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt

COPY src .
RUN chown -R flink:flink /opt/myworkdir
RUN chmod -R 755 /opt/myworkdir

###

Here is my flinkdeployment custom resource :
###
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
 finalizers:
  - flinkdeployments.flink.apache.org/finalizer
 name: myscript
 namespace: flink
spec:
 image: myscript:0.1
 flinkVersion: v1_18
 flinkConfiguration:
   taskmanager.numberOfTaskSlots: "2"
   state.backend.fs.checkpointdir: file:///checkpoints/flink/checkpoints
   state.checkpoints.dir: file:///checkpoints/flink/externalized-checkpoints
   state.savepoints.dir: file:///checkpoints/flink/savepoints
   job.autoscaler.enabled: "true"
   job.autoscaler.stabilization.interval: 1m
   job.autoscaler.metrics.window: 5m
   job.autoscaler.target.utilization: "0.6"
   job.autoscaler.target.utilization.boundary: "0.2"
   job.autoscaler.restart.time: 2m
   job.autoscaler.catch-up.duration: 5m
   pipeline.max-parallelism: "720"
 serviceAccount: flink
 jobManager:
   resource:
 memory: "2048m"
 cpu: 1
 taskManager:
   resource:
 memory: "2048m"
 cpu: 1
 job:
   jarURI: local:///opt/myworkdir/myscript.py
   entryClass