Hello everyone.
I get the following error when trying to read a CSV file with pyflink
datastream in a k8s environment using the flink operator.
###
File "/opt/myworkdir/myscript.py", line 30, in <module>
run_flink_job(myfile)
File "/opt/myworkdir/myscript.py", line 21, in run_flink_job
csvshem = CsvReaderFormat.for_schema(file_csv_schema)
File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/formats/csv.py",
line 322, in for_schema
items = list(charFrequency[char].items())
File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line
1322, in __call__
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line
146, in deco
File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line
326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat.
: java.lang.IllegalAccessError: class
org.apache.flink.formats.csv.PythonCsvUtils tried to access method 'void
org.apache.flink.formats.csv.CsvReaderFormat.<init>(org.apache.flink.util.function.SerializableSupplier,
org.apache.flink.util.function.SerializableFunction, java.lang.Class,
org.apache.flink.formats.common.Converter,
org.apache.flink.api.common.typeinfo.TypeInformation, boolean)'
(org.apache.flink.formats.csv.PythonCsvUtils is in unnamed module of loader
org.apache.flink.util.ChildFirstClassLoader @5d9b7a8a;
org.apache.flink.formats.csv.CsvReaderFormat is in unnamed module of loader
'app')
at
org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat(PythonCsvUtils.java:48)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
###
Here is my dockerfile :
###
FROM flink:1.18.1
RUN apt-get update -y && \
apt-get install -y python3 python3-pip python3-dev && rm -rf
/var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python
RUN mkdir -p /opt/myworkdir
WORKDIR /opt/myworkdir
RUN alias python=python3
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt
COPY src .
RUN chown -R flink:flink /opt/myworkdir
RUN chmod -R 755 /opt/myworkdir
###
Here is my flinkdeployment custom resource :
###
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
finalizers:
- flinkdeployments.flink.apache.org/finalizer
name: myscript
namespace: flink
spec:
image: myscript:0.1
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.backend.fs.checkpointdir: file:///checkpoints/flink/checkpoints
state.checkpoints.dir: file:///checkpoints/flink/externalized-checkpoints
state.savepoints.dir: file:///checkpoints/flink/savepoints
job.autoscaler.enabled: "true"
job.autoscaler.stabilization.interval: 1m
job.autoscaler.metrics.window: 5m
job.autoscaler.target.utilization: "0.6"
job.autoscaler.target.utilization.boundary: "0.2"
job.autoscaler.restart.time: 2m
job.autoscaler.catch-up.duration: 5m
pipeline.max-parallelism: "720"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/myworkdir/myscript.py
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/bin/python", "-py",
"/opt/myworkdir/myscript.py"]
parallelism: 1
###
Here is myscript.py :
###
import os
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
from pyflink.datastream.connectors.file_system import FileSource
from pyflink.common import Types, Row, WatermarkStrategy
import time
def run_flink_job(myfile):
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
file_csv_schema = CsvSchema.builder() \
.add_string_column('column_1') \
.add_string_column('column_2') \
.set_column_separator(';') \
.set_skip_first_data_row(True) \
.build()
csvshem = CsvReaderFormat.for_schema(file_csv_schema)
f_source = FileSource.for_record_stream_format(csvshem, myfile).build()
ds_f = env.from_source(f_source, WatermarkStrategy.no_watermarks(),
'csv_source').map(lambda x: x)
ds_f.print()
env.execute()
if __name__ == '__main__':
myfile = '%s/src/input/dummy.csv' % os.getcwd()
run_flink_job(myfile)
###
I tried another flink deployment reading a kafka topic, and it works fine.
This problem seems to occure only if I try to read a CSV file.
Here is the CSV file :
###
column_1;column_2
toto;10
tyty;20
tata;30
tutu;40
tete;50
###
I installed the flink kubernetes operator as described in the official doc and
I'm using minikube on my workstation.
Thank you in advance for your help !
Cdt.
Gwenael Le Barzic
Orange Restricted
Orange Restricted
____________________________________________________________________________________________________________
Ce message et ses pieces jointes peuvent contenir des informations
confidentielles ou privilegiees et ne doivent donc
pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce
message par erreur, veuillez le signaler
a l'expediteur et le detruire ainsi que les pieces jointes. Les messages
electroniques etant susceptibles d'alteration,
Orange decline toute responsabilite si ce message a ete altere, deforme ou
falsifie. Merci.
This message and its attachments may contain confidential or privileged
information that may be protected by law;
they should not be distributed, used or copied without authorisation.
If you have received this email in error, please notify the sender and delete
this message and its attachments.
As emails may be altered, Orange is not liable for messages that have been
modified, changed or falsified.
Thank you.