Problem reading a CSV file with pyflink datastream in k8s with Flink operator

2024-06-14 Thread gwenael . lebarzic
Hello everyone.

I get the following error when trying to read a CSV file with pyflink 
datastream in a k8s environment using the flink operator.
###
  File "/opt/myworkdir/myscript.py", line 30, in 
run_flink_job(myfile)
  File "/opt/myworkdir/myscript.py", line 21, in run_flink_job
csvshem = CsvReaderFormat.for_schema(file_csv_schema)
  File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/formats/csv.py", 
line 322, in for_schema
items = list(charFrequency[char].items())
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 
1322, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 
146, in deco
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 
326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat.
: java.lang.IllegalAccessError: class 
org.apache.flink.formats.csv.PythonCsvUtils tried to access method 'void 
org.apache.flink.formats.csv.CsvReaderFormat.(org.apache.flink.util.function.SerializableSupplier,
 org.apache.flink.util.function.SerializableFunction, java.lang.Class, 
org.apache.flink.formats.common.Converter, 
org.apache.flink.api.common.typeinfo.TypeInformation, boolean)' 
(org.apache.flink.formats.csv.PythonCsvUtils is in unnamed module of loader 
org.apache.flink.util.ChildFirstClassLoader @5d9b7a8a; 
org.apache.flink.formats.csv.CsvReaderFormat is in unnamed module of loader 
'app')
at 
org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat(PythonCsvUtils.java:48)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
###

Here is my dockerfile :
###
FROM flink:1.18.1

RUN apt-get update -y && \
apt-get install -y python3 python3-pip python3-dev && rm -rf 
/var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python

RUN mkdir -p /opt/myworkdir
WORKDIR /opt/myworkdir

RUN alias python=python3

COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt

COPY src .
RUN chown -R flink:flink /opt/myworkdir
RUN chmod -R 755 /opt/myworkdir

###

Here is my flinkdeployment custom resource :
###
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
 finalizers:
  - flinkdeployments.flink.apache.org/finalizer
 name: myscript
 namespace: flink
spec:
 image: myscript:0.1
 flinkVersion: v1_18
 flinkConfiguration:
   taskmanager.numberOfTaskSlots: "2"
   state.backend.fs.checkpointdir: file:///checkpoints/flink/checkpoints
   state.checkpoints.dir: file:///checkpoints/flink/externalized-checkpoints
   state.savepoints.dir: file:///checkpoints/flink/savepoints
   job.autoscaler.enabled: "true"
   job.autoscaler.stabilization.interval: 1m
   job.autoscaler.metrics.window: 5m
   job.autoscaler.target.utilization: "0.6"
   job.autoscaler.target.utilization.boundary: "0.2"
   job.autoscaler.restart.time: 2m
   job.autoscaler.catch-up.duration: 5m
   pipeline.max-parallelism: "720"
 serviceAccount: flink
 jobManager:
   resource:
 memory: "2048m"
 cpu: 1
 taskManager:
   resource:
 memory: "2048m"
 cpu: 1
 job:
   jarURI: local:///opt/myworkdir/myscript.py
   entryClass: "org.apache.flink.client.python.PythonDriver"
   args: ["-pyclientexec", "/usr/bin/python", "-py", 
"/opt/myworkdir/myscript.py"]
   parallelism: 1

###

Here is myscript.py :
###
import os

from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
from pyflink.datastream.connectors.file_system import FileSource
from pyflink.common import Types, Row, WatermarkStrategy
import time

def run_flink_job(myfile):

env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)

file_csv_schema = CsvSchema.builder() \
.add_string_column('column_1') \
.add_string_column('column_2') \
.set_column_separator(';') \

Re: Which base image to use for pyflink on k8s with flink operator ?

2024-06-14 Thread Gunnar Morling
Hey,

I ran into some issues with PyFlink on Kubernetes myself a while ago.
Blogged about it here, perhaps it's useful:

  https://www.decodable.co/blog/getting-started-with-pyflink-on-kubernetes

Best,

--Gunnar


Am Fr., 14. Juni 2024 um 20:58 Uhr schrieb Mate Czagany :

> Hi,
>
> You can refer to the example Dockerfile in the Flink docs [1] and you can
> also take a look at the example found in the Flink Kubernetes Operator repo
> [2]. The second Dockerfile won't work because it is missing all Flink
> libraries if I am not mistaken.
>
> Regards,
> Mate
>
>  ezt írta (időpont: 2024. jún. 14., P,
> 17:22):
>
>> Hello everyone.
>>
>>
>>
>> I contact you because I’m encountereing some strange difficulties with
>> pyflink on Kubernetes using the flink operator.
>>
>> So, first thing first, I was wondering which base image should I use for
>> my python image that I will then deploy on my Kubernetes cluster ?
>>
>>
>>
>> Can I use flink official image 1.18.1 ?
>>
>> FROM flink:1.18.1
>>
>>
>>
>> RUN mkdir -p /opt/myworkdir
>>
>> WORKDIR /opt/myworkdir
>>
>>
>>
>> RUN apt-get update && \
>>
>>   apt-get install -y python3 python3-pip python3-dev && \
>>
>>   apt-get clean && \
>>
>>   rm -rf /var/lib/apt/lists/*
>>
>>
>>
>> RUN alias python=python3 && ln -s /usr/bin/python3 /usr/bin/python
>>
>>
>>
>> COPY requirements.txt .
>>
>> RUN pip install --no-cache-dir -r requirements.txt
>>
>>
>>
>> COPY src .
>>
>> RUN chown -R root:root *
>>
>> RUN chmod -R 755 *
>>
>>
>>
>> Or can I use a base python 3.11 image with installation of java 11 ?
>>
>> FROM dockerproxy.repos.tech.orange/python:3.11-slim
>>
>>
>>
>> RUN mkdir -p /opt/myworkdir
>>
>> WORKDIR /opt/myworkdir
>>
>>
>>
>> RUN apt-get update && apt-get install -y wget tar
>>
>>
>>
>> RUN wget -O /tmp/openjdk11.tar.gz
>> https://github.com/AdoptOpenJDK/openjdk11-upstream-binaries/releases/download/jdk-11.0.12%2B7/OpenJDK11U-jdk_x64_linux_11.0.12_7.tar.gz
>> \
>>
>> && mkdir -p /opt/java/openjdk \
>>
>> && tar -xvf /tmp/openjdk11.tar.gz -C /opt/java/openjdk
>> --strip-components=1 \
>>
>> && rm -rf /tmp/openjdk11.tar.gz
>>
>>
>>
>> ENV JAVA_HOME=/opt/java/openjdk
>>
>> ENV PATH="$JAVA_HOME/bin:$PATH"
>>
>>
>>
>> COPY requirements.txt .
>>
>> RUN pip install --no-cache-dir -r requirements.txt
>>
>>
>>
>> COPY src .
>>
>> COPY pyflink*.yaml .
>>
>> RUN chown -R root:root *
>>
>> RUN chmod -R 755 *
>>
>>
>>
>> Thank you in advance for you answer !
>>
>>
>>
>> Cdt.
>>
>> *Gwenael Le Barzic *
>>
>>
>>
>> Orange Restricted
>>
>>
>>
>> Orange Restricted
>>
>> 
>> Ce message et ses pieces jointes peuvent contenir des informations 
>> confidentielles ou privilegiees et ne doivent donc
>> pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu 
>> ce message par erreur, veuillez le signaler
>> a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
>> electroniques etant susceptibles d'alteration,
>> Orange decline toute responsabilite si ce message a ete altere, deforme ou 
>> falsifie. Merci.
>>
>> This message and its attachments may contain confidential or privileged 
>> information that may be protected by law;
>> they should not be distributed, used or copied without authorisation.
>> If you have received this email in error, please notify the sender and 
>> delete this message and its attachments.
>> As emails may be altered, Orange is not liable for messages that have been 
>> modified, changed or falsified.
>> Thank you.
>>
>>


RE: Which base image to use for pyflink on k8s with flink operator ?

2024-06-14 Thread gwenael . lebarzic
To be sure about that, I can see this in the doc :
# install PyFlink

COPY apache-flink*.tar.gz /
RUN pip3 install /apache-flink-libraries*.tar.gz && pip3 install 
/apache-flink*.tar.gz

Is the result the same than this command below :
RUN pip install --no-cache-dir -r requirements.txt

With requirements.txt :
apache-flink==1.18.1

Cdt.
[Logo Orange]

Gwenael Le Barzic
Ingénieur technique techno BigData
Orange/OF/DTSI/SI/DATA-IA/SOLID/CXP

Mobile : +33 6 48 70 85 75 

gwenael.lebar...@orange.com

Nouveau lien vers le Portail de suivi des Tickets du 
CXP

De : Mate Czagany 
Envoyé : vendredi 14 juin 2024 18:53
À : LE BARZIC Gwenael DTSI/SI 
Cc : user@flink.apache.org
Objet : Re: Which base image to use for pyflink on k8s with flink operator ?

CAUTION : This email originated outside the company. Do not click on any links 
or open attachments unless you are expecting them from the sender.
ATTENTION : Cet e-mail provient de l'extérieur de l'entreprise. Ne cliquez pas 
sur les liens ou n'ouvrez pas les pièces jointes à moins de connaitre 
l'expéditeur.

Oops, forgot the links, sorry about that

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker
[2] 
https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-python-example/Dockerfile

Mate Czagany mailto:czmat...@gmail.com>> ezt írta (időpont: 
2024. jún. 14., P, 18:30):
Hi,

You can refer to the example Dockerfile in the Flink docs [1] and you can also 
take a look at the example found in the Flink Kubernetes Operator repo [2]. The 
second Dockerfile won't work because it is missing all Flink libraries if I am 
not mistaken.

Regards,
Mate

mailto:gwenael.lebar...@orange.com>> ezt írta 
(időpont: 2024. jún. 14., P, 17:22):
Hello everyone.

I contact you because I’m encountereing some strange difficulties with pyflink 
on Kubernetes using the flink operator.
So, first thing first, I was wondering which base image should I use for my 
python image that I will then deploy on my Kubernetes cluster ?

Can I use flink official image 1.18.1 ?
FROM flink:1.18.1

RUN mkdir -p /opt/myworkdir
WORKDIR /opt/myworkdir

RUN apt-get update && \
  apt-get install -y python3 python3-pip python3-dev && \
  apt-get clean && \
  rm -rf /var/lib/apt/lists/*

RUN alias python=python3 && ln -s /usr/bin/python3 /usr/bin/python

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src .
RUN chown -R root:root *
RUN chmod -R 755 *

Or can I use a base python 3.11 image with installation of java 11 ?
FROM dockerproxy.repos.tech.orange/python:3.11-slim

RUN mkdir -p /opt/myworkdir
WORKDIR /opt/myworkdir

RUN apt-get update && apt-get install -y wget tar

RUN wget -O /tmp/openjdk11.tar.gz 
https://github.com/AdoptOpenJDK/openjdk11-upstream-binaries/releases/download/jdk-11.0.12%2B7/OpenJDK11U-jdk_x64_linux_11.0.12_7.tar.gz
 \
&& mkdir -p /opt/java/openjdk \
&& tar -xvf /tmp/openjdk11.tar.gz -C /opt/java/openjdk --strip-components=1 
\
&& rm -rf /tmp/openjdk11.tar.gz

ENV JAVA_HOME=/opt/java/openjdk
ENV PATH="$JAVA_HOME/bin:$PATH"

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src .
COPY pyflink*.yaml .
RUN chown -R root:root *
RUN chmod -R 755 *

Thank you in advance for you answer !

Cdt.
Gwenael Le Barzic

Orange Restricted



Orange Restricted



Ce message et ses pieces jointes peuvent contenir des informations 
confidentielles ou privilegiees et ne doivent donc

pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce 
message par erreur, veuillez le signaler

a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
electroniques etant susceptibles d'alteration,

Orange decline toute responsabilite si ce message a ete altere, deforme ou 
falsifie. Merci.



This message and its attachments may contain confidential or privileged 
information that may be protected by law;

they should not be distributed, used or copied without authorisation.

If you have received this email in error, please notify the sender and delete 
this message and its attachments.

As emails may be altered, Orange is not liable for messages that have been 
modified, changed or falsified.

Thank you.

Ce message et ses pieces jointes peuvent contenir des informations 
confidentielles ou privilegiees et ne doivent donc
pas etre diffuses, exploites ou copies sans 

RE: Which base image to use for pyflink on k8s with flink operator ?

2024-06-14 Thread gwenael . lebarzic
Thank you for your answer, Mate o/

Cdt.
[Logo Orange]

Gwenael Le Barzic
Ingénieur technique techno BigData
Orange/OF/DTSI/SI/DATA-IA/SOLID/CXP

Mobile : +33 6 48 70 85 75 

gwenael.lebar...@orange.com

Nouveau lien vers le Portail de suivi des Tickets du 
CXP

De : Mate Czagany 
Envoyé : vendredi 14 juin 2024 18:30
À : LE BARZIC Gwenael DTSI/SI 
Cc : user@flink.apache.org
Objet : Re: Which base image to use for pyflink on k8s with flink operator ?

CAUTION : This email originated outside the company. Do not click on any links 
or open attachments unless you are expecting them from the sender.
ATTENTION : Cet e-mail provient de l'extérieur de l'entreprise. Ne cliquez pas 
sur les liens ou n'ouvrez pas les pièces jointes à moins de connaitre 
l'expéditeur.

Hi,

You can refer to the example Dockerfile in the Flink docs [1] and you can also 
take a look at the example found in the Flink Kubernetes Operator repo [2]. The 
second Dockerfile won't work because it is missing all Flink libraries if I am 
not mistaken.

Regards,
Mate

mailto:gwenael.lebar...@orange.com>> ezt írta 
(időpont: 2024. jún. 14., P, 17:22):
Hello everyone.

I contact you because I’m encountereing some strange difficulties with pyflink 
on Kubernetes using the flink operator.
So, first thing first, I was wondering which base image should I use for my 
python image that I will then deploy on my Kubernetes cluster ?

Can I use flink official image 1.18.1 ?
FROM flink:1.18.1

RUN mkdir -p /opt/myworkdir
WORKDIR /opt/myworkdir

RUN apt-get update && \
  apt-get install -y python3 python3-pip python3-dev && \
  apt-get clean && \
  rm -rf /var/lib/apt/lists/*

RUN alias python=python3 && ln -s /usr/bin/python3 /usr/bin/python

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src .
RUN chown -R root:root *
RUN chmod -R 755 *

Or can I use a base python 3.11 image with installation of java 11 ?
FROM dockerproxy.repos.tech.orange/python:3.11-slim

RUN mkdir -p /opt/myworkdir
WORKDIR /opt/myworkdir

RUN apt-get update && apt-get install -y wget tar

RUN wget -O /tmp/openjdk11.tar.gz 
https://github.com/AdoptOpenJDK/openjdk11-upstream-binaries/releases/download/jdk-11.0.12%2B7/OpenJDK11U-jdk_x64_linux_11.0.12_7.tar.gz
 \
&& mkdir -p /opt/java/openjdk \
&& tar -xvf /tmp/openjdk11.tar.gz -C /opt/java/openjdk --strip-components=1 
\
&& rm -rf /tmp/openjdk11.tar.gz

ENV JAVA_HOME=/opt/java/openjdk
ENV PATH="$JAVA_HOME/bin:$PATH"

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src .
COPY pyflink*.yaml .
RUN chown -R root:root *
RUN chmod -R 755 *

Thank you in advance for you answer !

Cdt.
Gwenael Le Barzic

Orange Restricted



Orange Restricted



Ce message et ses pieces jointes peuvent contenir des informations 
confidentielles ou privilegiees et ne doivent donc

pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce 
message par erreur, veuillez le signaler

a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
electroniques etant susceptibles d'alteration,

Orange decline toute responsabilite si ce message a ete altere, deforme ou 
falsifie. Merci.



This message and its attachments may contain confidential or privileged 
information that may be protected by law;

they should not be distributed, used or copied without authorisation.

If you have received this email in error, please notify the sender and delete 
this message and its attachments.

As emails may be altered, Orange is not liable for messages that have been 
modified, changed or falsified.

Thank you.

Ce message et ses pieces jointes peuvent contenir des informations 
confidentielles ou privilegiees et ne doivent donc
pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce 
message par erreur, veuillez le signaler
a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
electroniques etant susceptibles d'alteration,
Orange decline toute responsabilite si ce message a ete altere, deforme ou 
falsifie. Merci.

This message and its attachments may contain confidential or privileged 
information that may be protected by law;
they should not be distributed, used or copied without authorisation.
If you have received this email in error, please notify the sender and delete 
this message and its attachments.
As emails may be altered, Orange is not liable for messages that have been 
modified, 

Re: Which base image to use for pyflink on k8s with flink operator ?

2024-06-14 Thread Mate Czagany
Oops, forgot the links, sorry about that

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker
[2]
https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-python-example/Dockerfile

Mate Czagany  ezt írta (időpont: 2024. jún. 14., P,
18:30):

> Hi,
>
> You can refer to the example Dockerfile in the Flink docs [1] and you can
> also take a look at the example found in the Flink Kubernetes Operator repo
> [2]. The second Dockerfile won't work because it is missing all Flink
> libraries if I am not mistaken.
>
> Regards,
> Mate
>
>  ezt írta (időpont: 2024. jún. 14., P,
> 17:22):
>
>> Hello everyone.
>>
>>
>>
>> I contact you because I’m encountereing some strange difficulties with
>> pyflink on Kubernetes using the flink operator.
>>
>> So, first thing first, I was wondering which base image should I use for
>> my python image that I will then deploy on my Kubernetes cluster ?
>>
>>
>>
>> Can I use flink official image 1.18.1 ?
>>
>> FROM flink:1.18.1
>>
>>
>>
>> RUN mkdir -p /opt/myworkdir
>>
>> WORKDIR /opt/myworkdir
>>
>>
>>
>> RUN apt-get update && \
>>
>>   apt-get install -y python3 python3-pip python3-dev && \
>>
>>   apt-get clean && \
>>
>>   rm -rf /var/lib/apt/lists/*
>>
>>
>>
>> RUN alias python=python3 && ln -s /usr/bin/python3 /usr/bin/python
>>
>>
>>
>> COPY requirements.txt .
>>
>> RUN pip install --no-cache-dir -r requirements.txt
>>
>>
>>
>> COPY src .
>>
>> RUN chown -R root:root *
>>
>> RUN chmod -R 755 *
>>
>>
>>
>> Or can I use a base python 3.11 image with installation of java 11 ?
>>
>> FROM dockerproxy.repos.tech.orange/python:3.11-slim
>>
>>
>>
>> RUN mkdir -p /opt/myworkdir
>>
>> WORKDIR /opt/myworkdir
>>
>>
>>
>> RUN apt-get update && apt-get install -y wget tar
>>
>>
>>
>> RUN wget -O /tmp/openjdk11.tar.gz
>> https://github.com/AdoptOpenJDK/openjdk11-upstream-binaries/releases/download/jdk-11.0.12%2B7/OpenJDK11U-jdk_x64_linux_11.0.12_7.tar.gz
>> \
>>
>> && mkdir -p /opt/java/openjdk \
>>
>> && tar -xvf /tmp/openjdk11.tar.gz -C /opt/java/openjdk
>> --strip-components=1 \
>>
>> && rm -rf /tmp/openjdk11.tar.gz
>>
>>
>>
>> ENV JAVA_HOME=/opt/java/openjdk
>>
>> ENV PATH="$JAVA_HOME/bin:$PATH"
>>
>>
>>
>> COPY requirements.txt .
>>
>> RUN pip install --no-cache-dir -r requirements.txt
>>
>>
>>
>> COPY src .
>>
>> COPY pyflink*.yaml .
>>
>> RUN chown -R root:root *
>>
>> RUN chmod -R 755 *
>>
>>
>>
>> Thank you in advance for you answer !
>>
>>
>>
>> Cdt.
>>
>> *Gwenael Le Barzic *
>>
>>
>>
>> Orange Restricted
>>
>>
>>
>> Orange Restricted
>>
>> 
>> Ce message et ses pieces jointes peuvent contenir des informations 
>> confidentielles ou privilegiees et ne doivent donc
>> pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu 
>> ce message par erreur, veuillez le signaler
>> a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
>> electroniques etant susceptibles d'alteration,
>> Orange decline toute responsabilite si ce message a ete altere, deforme ou 
>> falsifie. Merci.
>>
>> This message and its attachments may contain confidential or privileged 
>> information that may be protected by law;
>> they should not be distributed, used or copied without authorisation.
>> If you have received this email in error, please notify the sender and 
>> delete this message and its attachments.
>> As emails may be altered, Orange is not liable for messages that have been 
>> modified, changed or falsified.
>> Thank you.
>>
>>


Problems with multiple sinks using postgres-cdc connector

2024-06-14 Thread David Bryson
Hi,

I have a stream reading from postgres-cdc connector version 3.1.0. I read
from two tables:

flink.cleaned_migrations
public.cleaned

I convert the tables into a datastream, do some processing, then write it
to a sink at the end of my stream:

joined_table_result =
joined_with_metadata.execute_insert(daily_sink_property_map['flink_table_name'])

This works well, however I recently tried to add a second table which
contains state reached in the middle of my stream:

continuous_metrics_table = table_env.execute_sql("SELECT f1, f2, f3
from joined_processed_table")
 
continuous_metrics_table.execute_insert(continuous_sink_property_map['flink_table_name'])

When I add this second sink, the postgres-cdc connector appears to add a
second reader from the replication log, but with the same slot name. It
seems to behave this way regardless of the sink connector I use, and seems
to happen in addition to the existing slot that is already allocated to the
stream.  This second reader of course cannot use the same replication slot,
and so the connector eventually times out.  Is this expected behavior from
the connector? It seems strange the connector would attempt to use a slot
twice.

I am using incremental snapshots, and I am passing a unique slot per table
connector.

Logs below:

2024-06-14 09:23:59,600 INFO
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
[] - Postgres captured tables : flink.cleaned_migrations .

2024-06-14 09:23:59,603 INFO  io.debezium.jdbc.JdbcConnection
[] - Connection gracefully closed

2024-06-14 09:24:00,198 INFO
org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils
[] - Postgres captured tables : public.cleaned .

2024-06-14 09:24:00,199 INFO  io.debezium.jdbc.JdbcConnection
[] - Connection gracefully closed

2024-06-14 09:24:00,224 INFO
io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
[] - Creating initial offset context

2024-06-14 09:24:00,417 INFO
io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
[] - Read xlogStart at 'LSN{6/C9806378}' from transaction '73559679'

2024-06-14 09:24:00,712 INFO  io.debezium.jdbc.JdbcConnection
[] - Connection gracefully closed

2024-06-14 09:24:00,712 INFO
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
[] - Source reader 0 discovers table schema for stream split stream-split
success

2024-06-14 09:24:00,712 INFO
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
[] - Source reader 0 received the stream split :
StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98060F8},
txId=73559674, lastCommitTs=-9223372036854775808],
endOffset=Offset{lsn=LSN{/}, txId=null,
lastCommitTs=-9223372036853775810], isSuspended=false}.

2024-06-14 09:24:00,714 INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase
[] - Adding split(s) to reader: [StreamSplit{splitId='stream-split',
offset=Offset{lsn=LSN{6/C98060F8}, txId=73559674,
lastCommitTs=-9223372036854775808],
endOffset=Offset{lsn=LSN{/}, txId=null,
lastCommitTs=-9223372036853775810], isSuspended=false}]

2024-06-14 09:24:00,714 INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher
[] - Starting split fetcher 0

2024-06-14 09:24:00,716 INFO
org.apache.flink.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator
[] - The enumerator receives notice from subtask 0 for the stream split
assignment.

2024-06-14 09:24:00,721 INFO
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext
[] - PostgresConnectorConfig is

2024-06-14 09:24:00,847 INFO
io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
[] - Creating initial offset context

2024-06-14 09:24:01,000 INFO
io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource
[] - Read xlogStart at 'LSN{6/C9806430}' from transaction '73559682'

2024-06-14 09:24:01,270 INFO  io.debezium.jdbc.JdbcConnection
[] - Connection gracefully closed

2024-06-14 09:24:01,271 INFO
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
[] - Source reader 0 discovers table schema for stream split stream-split
success

2024-06-14 09:24:01,271 INFO
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
[] - Source reader 0 received the stream split :
StreamSplit{splitId='stream-split', offset=Offset{lsn=LSN{6/C98061B0},
txId=73559676, lastCommitTs=-9223372036854775808],
endOffset=Offset{lsn=LSN{/}, txId=null,
lastCommitTs=-9223372036853775810], isSuspended=false}.

2024-06-14 09:24:01,272 INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase
[] - Adding split(s) to reader: [StreamSplit{splitId='stream-split',
offset=Offset{lsn=LSN{6/C98061B0}, txId=73559676,
lastCommitTs=-9223372036854775808],
endOffset=Offset{lsn=LSN{/}, txId=null,
lastCommitTs=-9223372036853775810], 

Re: Which base image to use for pyflink on k8s with flink operator ?

2024-06-14 Thread Mate Czagany
Hi,

You can refer to the example Dockerfile in the Flink docs [1] and you can
also take a look at the example found in the Flink Kubernetes Operator repo
[2]. The second Dockerfile won't work because it is missing all Flink
libraries if I am not mistaken.

Regards,
Mate

 ezt írta (időpont: 2024. jún. 14., P, 17:22):

> Hello everyone.
>
>
>
> I contact you because I’m encountereing some strange difficulties with
> pyflink on Kubernetes using the flink operator.
>
> So, first thing first, I was wondering which base image should I use for
> my python image that I will then deploy on my Kubernetes cluster ?
>
>
>
> Can I use flink official image 1.18.1 ?
>
> FROM flink:1.18.1
>
>
>
> RUN mkdir -p /opt/myworkdir
>
> WORKDIR /opt/myworkdir
>
>
>
> RUN apt-get update && \
>
>   apt-get install -y python3 python3-pip python3-dev && \
>
>   apt-get clean && \
>
>   rm -rf /var/lib/apt/lists/*
>
>
>
> RUN alias python=python3 && ln -s /usr/bin/python3 /usr/bin/python
>
>
>
> COPY requirements.txt .
>
> RUN pip install --no-cache-dir -r requirements.txt
>
>
>
> COPY src .
>
> RUN chown -R root:root *
>
> RUN chmod -R 755 *
>
>
>
> Or can I use a base python 3.11 image with installation of java 11 ?
>
> FROM dockerproxy.repos.tech.orange/python:3.11-slim
>
>
>
> RUN mkdir -p /opt/myworkdir
>
> WORKDIR /opt/myworkdir
>
>
>
> RUN apt-get update && apt-get install -y wget tar
>
>
>
> RUN wget -O /tmp/openjdk11.tar.gz
> https://github.com/AdoptOpenJDK/openjdk11-upstream-binaries/releases/download/jdk-11.0.12%2B7/OpenJDK11U-jdk_x64_linux_11.0.12_7.tar.gz
> \
>
> && mkdir -p /opt/java/openjdk \
>
> && tar -xvf /tmp/openjdk11.tar.gz -C /opt/java/openjdk
> --strip-components=1 \
>
> && rm -rf /tmp/openjdk11.tar.gz
>
>
>
> ENV JAVA_HOME=/opt/java/openjdk
>
> ENV PATH="$JAVA_HOME/bin:$PATH"
>
>
>
> COPY requirements.txt .
>
> RUN pip install --no-cache-dir -r requirements.txt
>
>
>
> COPY src .
>
> COPY pyflink*.yaml .
>
> RUN chown -R root:root *
>
> RUN chmod -R 755 *
>
>
>
> Thank you in advance for you answer !
>
>
>
> Cdt.
>
> *Gwenael Le Barzic *
>
>
>
> Orange Restricted
>
>
>
> Orange Restricted
>
> 
> Ce message et ses pieces jointes peuvent contenir des informations 
> confidentielles ou privilegiees et ne doivent donc
> pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu 
> ce message par erreur, veuillez le signaler
> a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
> electroniques etant susceptibles d'alteration,
> Orange decline toute responsabilite si ce message a ete altere, deforme ou 
> falsifie. Merci.
>
> This message and its attachments may contain confidential or privileged 
> information that may be protected by law;
> they should not be distributed, used or copied without authorisation.
> If you have received this email in error, please notify the sender and delete 
> this message and its attachments.
> As emails may be altered, Orange is not liable for messages that have been 
> modified, changed or falsified.
> Thank you.
>
>


Which base image to use for pyflink on k8s with flink operator ?

2024-06-14 Thread gwenael . lebarzic
Hello everyone.

I contact you because I'm encountereing some strange difficulties with pyflink 
on Kubernetes using the flink operator.
So, first thing first, I was wondering which base image should I use for my 
python image that I will then deploy on my Kubernetes cluster ?

Can I use flink official image 1.18.1 ?
FROM flink:1.18.1

RUN mkdir -p /opt/myworkdir
WORKDIR /opt/myworkdir

RUN apt-get update && \
  apt-get install -y python3 python3-pip python3-dev && \
  apt-get clean && \
  rm -rf /var/lib/apt/lists/*

RUN alias python=python3 && ln -s /usr/bin/python3 /usr/bin/python

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src .
RUN chown -R root:root *
RUN chmod -R 755 *

Or can I use a base python 3.11 image with installation of java 11 ?
FROM dockerproxy.repos.tech.orange/python:3.11-slim

RUN mkdir -p /opt/myworkdir
WORKDIR /opt/myworkdir

RUN apt-get update && apt-get install -y wget tar

RUN wget -O /tmp/openjdk11.tar.gz 
https://github.com/AdoptOpenJDK/openjdk11-upstream-binaries/releases/download/jdk-11.0.12%2B7/OpenJDK11U-jdk_x64_linux_11.0.12_7.tar.gz
 \
&& mkdir -p /opt/java/openjdk \
&& tar -xvf /tmp/openjdk11.tar.gz -C /opt/java/openjdk --strip-components=1 
\
&& rm -rf /tmp/openjdk11.tar.gz

ENV JAVA_HOME=/opt/java/openjdk
ENV PATH="$JAVA_HOME/bin:$PATH"

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src .
COPY pyflink*.yaml .
RUN chown -R root:root *
RUN chmod -R 755 *

Thank you in advance for you answer !

Cdt.
Gwenael Le Barzic


Orange Restricted



Orange Restricted

Ce message et ses pieces jointes peuvent contenir des informations 
confidentielles ou privilegiees et ne doivent donc
pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce 
message par erreur, veuillez le signaler
a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
electroniques etant susceptibles d'alteration,
Orange decline toute responsabilite si ce message a ete altere, deforme ou 
falsifie. Merci.

This message and its attachments may contain confidential or privileged 
information that may be protected by law;
they should not be distributed, used or copied without authorisation.
If you have received this email in error, please notify the sender and delete 
this message and its attachments.
As emails may be altered, Orange is not liable for messages that have been 
modified, changed or falsified.
Thank you.


Re:使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列

2024-06-14 Thread zboyu0104
怎么退订
from 阿里邮箱 
iPhone--
发件人:谢县东
日 期:2024年06月06日 16:07:05
收件人:
主 题:使用hbase连接器插入数据,一个列族下有多列时如何只更新其中一列

各位好:


flink版本: 1.13.6
我在使用 flink-connector-hbase 连接器,通过flinkSQL 将数据写入hbase,hbase 建表如下:


CREATE TABLE hbase_test_db_test_table_xxd (
rowkey STRING,
cf1 ROW,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-2.2',
'table-name' = 'test_db:test_table_t1',
'zookeeper.quorum' = 'xxx:2181',
'zookeeper.znode.parent' = '/hbase',
'null-string-literal' = '',
'sink.parallelism' = '2'
);


hbase cf1列族下有三列,看官网示例插入数据时需要构建一个row类型插入(row类型需包含列族下的所有列)
INSERT INTO hbase_test_db_test_table_xxd  select '002' as rowkey, row('xxd_2', 
'boy', '10') as cf1;




如果只想更新其中某一列如何实现?在flink中新建一个hbase表吗?