Re: Stopping the flink 1.18 program with savepoint seems to fail with timeout

2024-10-11 Thread Mate Czagany
Hi,

In the background it is a REST call to Flink. If it takes too long to
create the savepoint, you might hit a timeout. You can increase this using
the configuration client.timeout [1]. You can also use the --detached
option for the stop action, which will return once it receives a trigger ID
from Flink. [2]

Best,
Mate

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/#client-timeout
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/cli/#creating-a-savepoint

Sachin Mittal  ezt írta (időpont: 2024. okt. 11., P,
6:19):

> Hello,
> I am running a flink job which I stop it with a savepoint:
>
> ./bin/flink stop --savepointPath /tmp/flink-savepoints
> 0b3b584a298afa372491eff5e3d2160b
> Suspending job "0b3b584a298afa372491eff5e3d2160b" with a CANONICAL
> savepoint.
>
>
> However this is what I get in the cli
> 
> The program finished with the following exception:
>
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
> "0b3b584a298afa372491eff5e3d2160b".
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:595)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1041)
> at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:578)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1110)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
> Caused by: java.util.concurrent.TimeoutException
> at
> java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1950)
> at
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2085)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$4(CliFrontend.java:591)
> ... 7 more
>
>
> What I also see is that actually a savepoint does get generated at the
> specified path and my flink job is also stopped after a while.
>
> Is there any setting which is making the cli program to timeout and is
> there a way we can verify that the entire savepoint got generated on the
> specified path ?
>
> Thanks
> Sachin
>
>


Re: Which base image to use for pyflink on k8s with flink operator ?

2024-06-14 Thread Mate Czagany
Oops, forgot the links, sorry about that

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker
[2]
https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-python-example/Dockerfile

Mate Czagany  ezt írta (időpont: 2024. jún. 14., P,
18:30):

> Hi,
>
> You can refer to the example Dockerfile in the Flink docs [1] and you can
> also take a look at the example found in the Flink Kubernetes Operator repo
> [2]. The second Dockerfile won't work because it is missing all Flink
> libraries if I am not mistaken.
>
> Regards,
> Mate
>
>  ezt írta (időpont: 2024. jún. 14., P,
> 17:22):
>
>> Hello everyone.
>>
>>
>>
>> I contact you because I’m encountereing some strange difficulties with
>> pyflink on Kubernetes using the flink operator.
>>
>> So, first thing first, I was wondering which base image should I use for
>> my python image that I will then deploy on my Kubernetes cluster ?
>>
>>
>>
>> Can I use flink official image 1.18.1 ?
>>
>> FROM flink:1.18.1
>>
>>
>>
>> RUN mkdir -p /opt/myworkdir
>>
>> WORKDIR /opt/myworkdir
>>
>>
>>
>> RUN apt-get update && \
>>
>>   apt-get install -y python3 python3-pip python3-dev && \
>>
>>   apt-get clean && \
>>
>>   rm -rf /var/lib/apt/lists/*
>>
>>
>>
>> RUN alias python=python3 && ln -s /usr/bin/python3 /usr/bin/python
>>
>>
>>
>> COPY requirements.txt .
>>
>> RUN pip install --no-cache-dir -r requirements.txt
>>
>>
>>
>> COPY src .
>>
>> RUN chown -R root:root *
>>
>> RUN chmod -R 755 *
>>
>>
>>
>> Or can I use a base python 3.11 image with installation of java 11 ?
>>
>> FROM dockerproxy.repos.tech.orange/python:3.11-slim
>>
>>
>>
>> RUN mkdir -p /opt/myworkdir
>>
>> WORKDIR /opt/myworkdir
>>
>>
>>
>> RUN apt-get update && apt-get install -y wget tar
>>
>>
>>
>> RUN wget -O /tmp/openjdk11.tar.gz
>> https://github.com/AdoptOpenJDK/openjdk11-upstream-binaries/releases/download/jdk-11.0.12%2B7/OpenJDK11U-jdk_x64_linux_11.0.12_7.tar.gz
>> \
>>
>> && mkdir -p /opt/java/openjdk \
>>
>> && tar -xvf /tmp/openjdk11.tar.gz -C /opt/java/openjdk
>> --strip-components=1 \
>>
>> && rm -rf /tmp/openjdk11.tar.gz
>>
>>
>>
>> ENV JAVA_HOME=/opt/java/openjdk
>>
>> ENV PATH="$JAVA_HOME/bin:$PATH"
>>
>>
>>
>> COPY requirements.txt .
>>
>> RUN pip install --no-cache-dir -r requirements.txt
>>
>>
>>
>> COPY src .
>>
>> COPY pyflink*.yaml .
>>
>> RUN chown -R root:root *
>>
>> RUN chmod -R 755 *
>>
>>
>>
>> Thank you in advance for you answer !
>>
>>
>>
>> Cdt.
>>
>> *Gwenael Le Barzic *
>>
>>
>>
>> Orange Restricted
>>
>>
>>
>> Orange Restricted
>>
>> 
>> Ce message et ses pieces jointes peuvent contenir des informations 
>> confidentielles ou privilegiees et ne doivent donc
>> pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu 
>> ce message par erreur, veuillez le signaler
>> a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
>> electroniques etant susceptibles d'alteration,
>> Orange decline toute responsabilite si ce message a ete altere, deforme ou 
>> falsifie. Merci.
>>
>> This message and its attachments may contain confidential or privileged 
>> information that may be protected by law;
>> they should not be distributed, used or copied without authorisation.
>> If you have received this email in error, please notify the sender and 
>> delete this message and its attachments.
>> As emails may be altered, Orange is not liable for messages that have been 
>> modified, changed or falsified.
>> Thank you.
>>
>>


Re: Which base image to use for pyflink on k8s with flink operator ?

2024-06-14 Thread Mate Czagany
Hi,

You can refer to the example Dockerfile in the Flink docs [1] and you can
also take a look at the example found in the Flink Kubernetes Operator repo
[2]. The second Dockerfile won't work because it is missing all Flink
libraries if I am not mistaken.

Regards,
Mate

 ezt írta (időpont: 2024. jún. 14., P, 17:22):

> Hello everyone.
>
>
>
> I contact you because I’m encountereing some strange difficulties with
> pyflink on Kubernetes using the flink operator.
>
> So, first thing first, I was wondering which base image should I use for
> my python image that I will then deploy on my Kubernetes cluster ?
>
>
>
> Can I use flink official image 1.18.1 ?
>
> FROM flink:1.18.1
>
>
>
> RUN mkdir -p /opt/myworkdir
>
> WORKDIR /opt/myworkdir
>
>
>
> RUN apt-get update && \
>
>   apt-get install -y python3 python3-pip python3-dev && \
>
>   apt-get clean && \
>
>   rm -rf /var/lib/apt/lists/*
>
>
>
> RUN alias python=python3 && ln -s /usr/bin/python3 /usr/bin/python
>
>
>
> COPY requirements.txt .
>
> RUN pip install --no-cache-dir -r requirements.txt
>
>
>
> COPY src .
>
> RUN chown -R root:root *
>
> RUN chmod -R 755 *
>
>
>
> Or can I use a base python 3.11 image with installation of java 11 ?
>
> FROM dockerproxy.repos.tech.orange/python:3.11-slim
>
>
>
> RUN mkdir -p /opt/myworkdir
>
> WORKDIR /opt/myworkdir
>
>
>
> RUN apt-get update && apt-get install -y wget tar
>
>
>
> RUN wget -O /tmp/openjdk11.tar.gz
> https://github.com/AdoptOpenJDK/openjdk11-upstream-binaries/releases/download/jdk-11.0.12%2B7/OpenJDK11U-jdk_x64_linux_11.0.12_7.tar.gz
> \
>
> && mkdir -p /opt/java/openjdk \
>
> && tar -xvf /tmp/openjdk11.tar.gz -C /opt/java/openjdk
> --strip-components=1 \
>
> && rm -rf /tmp/openjdk11.tar.gz
>
>
>
> ENV JAVA_HOME=/opt/java/openjdk
>
> ENV PATH="$JAVA_HOME/bin:$PATH"
>
>
>
> COPY requirements.txt .
>
> RUN pip install --no-cache-dir -r requirements.txt
>
>
>
> COPY src .
>
> COPY pyflink*.yaml .
>
> RUN chown -R root:root *
>
> RUN chmod -R 755 *
>
>
>
> Thank you in advance for you answer !
>
>
>
> Cdt.
>
> *Gwenael Le Barzic *
>
>
>
> Orange Restricted
>
>
>
> Orange Restricted
>
> 
> Ce message et ses pieces jointes peuvent contenir des informations 
> confidentielles ou privilegiees et ne doivent donc
> pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu 
> ce message par erreur, veuillez le signaler
> a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
> electroniques etant susceptibles d'alteration,
> Orange decline toute responsabilite si ce message a ete altere, deforme ou 
> falsifie. Merci.
>
> This message and its attachments may contain confidential or privileged 
> information that may be protected by law;
> they should not be distributed, used or copied without authorisation.
> If you have received this email in error, please notify the sender and delete 
> this message and its attachments.
> As emails may be altered, Orange is not liable for messages that have been 
> modified, changed or falsified.
> Thank you.
>
>


Re: Flink Kubernetes Operator - How can I use a jar that is hosted on a private maven repo for a FlinkSessionJob?

2024-05-12 Thread Mate Czagany
Hi Nathan,

Job submissions for FlinkSessionJob resources will always be done by first
uploading the JAR file itself from the Operator pod using the JobManager's
REST API, then starting a new job using the uploaded JAR. This means that
downloading the JAR file with an initContainer to the JobManager will not
help in your case.

You could look into the Operator config option
'kubernetes.operator.user.artifacts.http.header' to set the HTTP headers
used to download the artifacts. Please check FLINK-27483 [1] for more
information.

[1] https://issues.apache.org/jira/browse/FLINK-27483

Regards,
Mate Czagany

Nathan T. A. Lewis  ezt írta (időpont: 2024.
máj. 9., Cs, 19:00):

> Hello,
>
> I am trying to run a Flink Session Job with a jar that is hosted on a
> maven repository in Google's Artifact Registry.
>
> The first thing I tried was to just specify the `jarURI` directly:
>
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkSessionJob
> metadata:
>   name: myJobName
> spec:
>   deploymentName: flink-session
>   job:
> jarURI: "
> https://mylocation-maven.pkg.dev/myGCPproject/myreponame/path/to/the.jar";
> entryClass: myentryclass
> parallelism: 1
> upgradeMode: savepoint
>
> But, since it is a private repository, it not surprisingly resulted in:
>
> java.io.IOException: Server returned HTTP response code: 401 for URL:
> https://mylocation-maven.pkg.dev/myGCPproject/myreponame/path/to/the.jar
>
> I didn't see anywhere in the FlinkSessionJob definition to put a bearer
> token and doubt it would be a good idea security-wise to store one there
> anyway, so I instead looked into using `initContainers` on the
> FlinkDeployment like in this example:
> https://github.com/apache/flink-kubernetes-operator/blob/main/examples/pod-template.yaml
>
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: flink-session
> spec:
>   flinkVersion: v1_18
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "2"
> state.checkpoints.dir: mycheckpointsdir
> state.savepoints.dir: mysavepointsdir
> state.backend: rocksdb
> state.backend.rocksdb.timer-service.factory: ROCKSDB
> state.backend.incremental: "true"
> execution.checkpointing.interval: "1m"
>   serviceAccount: flink
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 0.5
>   taskManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   podTemplate:
>   spec:
> initContainers:
>   - name: gcloud
> image: google/cloud-sdk:latest
> volumeMounts:
>   - mountPath: /opt/flink/downloads
> name: downloads
> command: ["sh", "-c", "gcloud artifacts files download
> --project=myGCPproject --repository=myreponame --location=mylocation
> --destination=/opt/flink/downloads path/to/the.jar"]
> containers:
>   - name: flink-main-container
> volumeMounts:
>   - mountPath: /opt/flink/downloads
> name: downloads
> volumes:
>   - name: downloads
> emptyDir: { }
>
> This worked well for getting the jar onto the jobManager pod, but it looks
> like the FlinkSessionJob actually looks for the jar on the pod of the Flink
> Kubernetes Operator itself. So in the end, the job still isn't being run.
>
> As a workaround for now, I'm planning to move my jar from Maven to a
> Google Cloud Storage bucket and then add the gcs filesystem plugin to the
> operator image. What I'd love to know is if I've overlooked some already
> implemented way to connect to a private maven repository for a
> FlinkSessionJob. I suppose in a worst case, we could write a filesystem
> plugin that handles the `artifactrepository://` scheme and uses Google's
> java libraries to handle authentication and download of the artifact.
> Again, I'm kind of hoping something already exists though, rather than
> having to build something new.
>
>
> Best regards,
> Nathan T.A. Lewis
>


Re: Flink Kubernetes Operator - Deadlock when Cluster Cleanup Fails

2024-02-13 Thread Mate Czagany
Hi,

I have opened a JIRA [1] as I had the same error (AlreadyExists) last week
and I could pinpoint the problem to the TaskManagers being still alive when
creating the new Deployment. In native mode we only check for the
JobManagers when we wait for the cluster to shut down in contrast to
standalone mode.

That fix alone won't help mitigating this problem, but I intend to add some
logs to the 'waitForClusterShutdown' method with that PR.

Best regards,
Mate

[1] https://issues.apache.org/jira/browse/FLINK-34438


Gyula Fóra  ezt írta (időpont: 2024. febr. 13., K,
9:14):

> Hi Niklas!
>
> The best way to report the issue would be to open a JIRA ticket with the
> same detailed information.
>
> Otherwise I think your observations are correct and this is indeed a
> frequent problem that comes up, it would be good to improve on it. In
> addition to improving logging we could also increase the default timeout
> and if we could actually do something on the timeout that would be even
> better.
>
> Please open the JIRA ticket and if you have time to work on these
> improvements I will assign it to you.
>
> Cheers
> Gyula
>
> On Mon, Feb 12, 2024 at 11:59 PM Niklas Wilcke 
> wrote:
>
>> Hi Flink Kubernetes Operator Community,
>>
>> I hope this is the right way to report an issue with the Apache Flink
>> Kubernetes Operator. We are experiencing problems with some streaming job
>> clusters which end up in a terminated state, because of the operator not
>> behaving as expected. The problem is that the teardown of the Flink cluster
>> by the operator doesn't succeed in the default timeout of 1 minute. After
>> that the operator proceeds and tries to create a fresh cluster, which
>> fails, because parts of the cluster still exist. After that it tries to
>> fully remove the cluster including the HA metadata. After that it is stuck
>> in an error loop that manual recovery is necessary, since the HA metadata
>> is missing. At the very bottom of the mail you can find an condensed log
>> attached, which hopefully gives a more detailed impression about the
>> problem.
>>
>> The current workaround is to increase the 
>> "kubernetes.operator.resource.cleanup.timeout"
>> [0] to 10 minutes. Time will tell whether this workaround fixes the
>> problem for us.
>>
>> The main problem I see is that the
>> method AbstractFlinkService.waitForClusterShutdown(...) [1] isn't handling
>> a timeout at all. Please correct me in case I missed a detail, but this is
>> how we experience the problem. In case one of the service, the jobmanagers
>> or the taskmanagers survives the cleanup timeout (of 1 minute), the
>> operator seems to proceed as if the entities have been removed properly. To
>> me this doesn't look good. From my point of view at least an error should
>> be logged.
>>
>> Additionally the current logging makes it difficult to analyse the
>> problem and to be notified about the timeout. The following things could
>> possibly be improved or implemented.
>>
>>1. Successful removal of the entities should be logged.
>>2. Timing out isn't logged (An error should probably be logged here)
>>3. For some reason the logging of the waited seconds is somehow
>>incomplete (L944, further analysis needed)
>>
>> We use the following Flink and Operator versions:
>>
>> Flink Image: flink:1.17.1 (from Dockerhub)
>> Operator Version: 1.6.1
>>
>> I hope this description is well enough to get into touch and discuss the
>> matter. I'm open to provide additional information or with some guidance,
>> provide a patch to resolve the issue.
>> Thanks for your work on the Operator. It is highly appreciated!
>>
>> Cheers,
>> Niklas
>>
>>
>> [0]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/
>> [1]
>> https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L903
>>
>>
>>
>> #
>> # The job in the cluster failed
>> Event  | Info| JOBSTATUSCHANGED | Job status changed from RUNNING to
>> FAILED"
>> Stopping failed Flink job...
>> Status | Error   | FAILED  |
>> {""type"":""org.apache.flink.util.SerializedThrowable"",""message"":"
>> "org.apache.flink.runtime.JobException: Recovery is suppressed by
>> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5,
>> backoffTimeMS=3)"",""additionalMetadata"":{},""throwableList"":[{""type"":""org.apache.flink.util.SerializedThrowable"",""message"":""org
>> .apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting
>> down."",""additionalMetadata"":{}}]}
>>  Deleting JobManager deployment while preserving HA metadata.
>> Deleting cluster with Foreground propagation
>> Waiting for cluster shutdown... (10s)
>> Waiting for cluster shutdown... (30s)
>> Waiting for cluster shutdown... (40s)
>> Waiting for cluster shutdo

Re: changing the 'flink-main-container' name

2023-10-20 Thread Mate Czagany
Hi,

By naming the container flink-main-container, Flink will know which
container spec it should use for the Flink containers.
If you change the name Flink won't know which container spec to use for the
Flink container, and will probably think it's just a sidecar container, and
there will still be a flink-main-container with the default spec instead.

There is unfortunately no way to rename this container as of now.

Regards,
Mate

Nuno  ezt írta (időpont: 2023. okt. 20., P, 14:39):

> Hello,
>
> We just adopted the flink operator. According to this link
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/
> it prescribes a pod template containing among other things the following:
>
> containers:# Do not change the main container name- name: 
> flink-main-container
>
> I would actually like to change this name for each of my different flink 
> microservices, because having a bunch of microservices with the same 
> container name messes terribly with the container-based metrics and 
> dashboards of our monitoring system.
>
> I'd like to try to understand if possible why this comment is there, and how 
> seriously should I take it ? What will break, concretely, if I change it, 
> please ?
>
> I tried going through the operator code itself but couldn't find anything 
> concrete.
>
> Any help to understand the underlying constraints will be very welcome.
>
> Thank you very much!
>
>


Re: Bloom Filter for Rocksdb

2023-10-20 Thread Mate Czagany
Hi,

There have been no reports about setting this configuration causing any
issues. I would guess it's off by default because it can increase the
memory usage by an unpredictable amount.

I would say feel free to enable it, from what you've said I also think that
this would improve the performance of your jobs. But make sure to configure
your jobs so that they will be able to accommodate the potential memory
footprint growth. Also please read the following resources to know more
about RocksDBs bloom filter:
https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter
https://rocksdb.org/blog/2014/09/12/new-bloom-filter-format.html

Regards,
Mate


Kenan Kılıçtepe  ezt írta (időpont: 2023. okt. 20.,
P, 15:50):

> Can someone tell the exact performance effect of enabling bloom filter?
> May enabling it cause some unpredictable performance problems?
>
> I read what it is and how it works and it makes sense but  I also asked
> myself why the default value of state.backend.rocksdb.use-bloom-filter is
> false.
>
> We have a 5 servers flink cluster, processing real time IoT data coming
> from 5 million devices and for a lot of jobs, we keep different states for
> each device.
>
> Sometimes we have performance issues and when I check the flamegraph on
> the test server I always see rocksdb.get() is the blocker. I just want to
> increase rocksdb performance.
>
> Thanks
>
>


Re: Flink HDFS with Flink Kubernetes Operator

2023-10-19 Thread Mate Czagany
Hello,

Please look into using 'kubernetes.decorator.hadoop-conf-mount.enabled' [1]
that was added for use cases where the user wishes to skip adding these
Hadoop mount decorators. It's true by default, but by setting it to false
Flink won't add this mount.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/config/#kubernetes-decorator-hadoop-conf-mount-enabled

Regards,
Mate

Raihan Sunny via user  ezt írta (időpont: 2023. okt.
19., Cs, 11:48):

> Hi,
>
> I've been using HDFS with Flink for checkpoint and savepoint storage which
> works perfectly fine. Now I have another use case where I want to read and
> write to HDFS from the application code as well. For this, I'm using the
> "pyarrow" library which is already installed with PyFlink as a dependency.
>
> According to the pyarrow documentation [1], HADOOP_HOME and CLASSPATH
> environment variables are mandatory. As per the Flink documentation [2],
> HADOOP_CLASSPATH must be set.
>
> I'm using Flink Kubernetes Operator to deploy my application and the issue
> arises only when I'm using the native mode. When I deploy the application
> with all the variables above, the JobManager starts up but the TaskManager
> fails to start with the following error from Kubernetes:
>
> MountVolume.SetUp failed for volume "hadoop-config-volume" : configmap
> "hadoop-config-flink-job" not found
>
> It also seems to set a HADOOP_CONF_DIR environment variable on the
> TaskManager with the value "/opt/hadoop/conf" which doesn't exist as my
> hadoop installation is elsewhere. If I run the job on standalone mode,
> everything seems to work fine as the TaskManager doesn't look for a
> "hadoop-config-volume" to mount. Here's the YAML file for reference:
>
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: flink-job
>   namespace: flink
> spec:
>   image: 
>   imagePullPolicy: Always
>   flinkVersion: v1_17
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "1"
> state.savepoints.dir:
> hdfs://hdfs-namenode-0.hdfs-namenodes.hdfs:8020/flink-data/savepoints
> state.checkpoints.dir:
> hdfs://hdfs-namenode-0.hdfs-namenodes.hdfs:8020/flink-data/checkpoints
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir:
> hdfs://hdfs-namenode-0.hdfs-namenodes.hdfs:8020/flink-data/ha
> execution.checkpointing.interval: "3s"
> execution.checkpointing.unaligned: "true"
> execution.checkpointing.timeout: "30m"
>   serviceAccount: flink
>   podTemplate:
> spec:
>   imagePullSecrets:
> - name: 
>   containers:
> - name: flink-main-container
>   env:
> - name: HADOOP_HOME
>   value: /hadoop-3.2.1
> - name: CLASSPATH
>   value: 
> - name: HADOOP_CLASSPATH
>   value: 
>   jobManager:
> resource:
>   memory: "1024m"
>   cpu: 0.5
>   taskManager:
> resource:
>   memory: "1024m"
>   cpu: 0.5
>   job:
> jarURI: local:///opt/flink/opt/flink-python_2.12-1.17.0.jar
> entryClass: "org.apache.flink.client.python.PythonDriver"
> args: ["python", "-pym", "flink_job"]
> parallelism: 1
> upgradeMode: savepoint
> state: running
> savepointTriggerNonce: 0
>
> Please note that I've also tried by installing hadoop to "/opt/" and
> symlinking the "conf" directory as expected by HADOOP_CONF_DIR. This also
> didn't work.
>
> As mentioned before, if I add "mode: standalone", this job runs without
> any problem. But since the autoscaling feature only works on the native
> mode, I need to get it working there. Any help is appreciated.
>
> Versions:
> Flink - 1.17.1
> PyFlink - 1.17.1
> Flink Kubernetes Operator - 1.5.0
>
>
> - [1]
> https://arrow.apache.org/docs/python/filesystems.html#hadoop-distributed-file-system-hdfs
> - [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/configuration/advanced/#hadoop-dependencies
>
>
> Thanks,
> Sunny
>
> [image: SELISE]
>
> SELISE Group
> Zürich: The Circle 37, 8058 Zürich-Airport, Switzerland
> Munich: Tal 44, 80331 München, Germany
> Dubai: Building 3, 3rd Floor, Dubai Design District, Dubai, United Arab
> Emirates
> Dhaka: Midas Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
> Thimphu: Bhutan Innovation Tech Center, Babesa, P.O. Box 633, Thimphu,
> Bhutan
>
> Visit us: www.selisegroup.com
>
> *Important Note: This e-mail and any attachment are confidential and may
> contain trade secrets and may well also be legally privileged or otherwise
> protected from disclosure. If you have received it in error, you are on
> notice of its status. Please notify us immediately by reply e-mail and then
> delete this e-mail and any attachment from your system. If you are not the
> intended recipient please understand that you must not copy this e-mail or
> any attachment or disclose the contents to any other person. Thank you for
> your cooperat

Re: Apache Atlas - Flink Integration

2023-08-01 Thread Mate Czagany
Hi,

Unfortunately the Atlas hook you've read about is only available in the
Cloudera Flink solution and has not been made open-source.

In the future FLIP-314[1] might offer a simple solution to implement the
Atlas integration.

Best Regards,
Mate

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener


arjun s  ezt írta (időpont: 2023. aug. 1., K,
16:23):

> I am looking to integrate Apache Atlas with Apache Flink to capture Job
> lineage. I found some references around it from Cloudera (CDP) and they are
> using Atlas-flink hook , but I am not able to find any documentation or
> implementation.
>
> I had gone through the JIRA link as mentioned below.But in this link the
> feature is still open.
>
> https://issues.apache.org/jira/browse/FLINK-6757
>
> I would like to know whether this feature on Apache Atlas with Apache
> Flink is released or not. If yes, anyone could share with me the references
> for integrating.
>
> Thanks and Regards,
> Arjun S
>


Re: Questions on S3 File Sink Behavior

2023-03-29 Thread Mate Czagany
Hi,

1. In case of S3 FileSystem, Flink uses the multipart upload process [1]
for better performance. It might not be obvious at first by looking at the
docs, but it's noted at the bottom of the FileSystem page [2]
For more information you can also check FLINK-9751 and FLINK-9752

2. In case of local FileSystem it always starts with a dot according to
LocalRecoverableWriter [3] but make sure to check the implementation of
RecoverableWriter for the FileSystem you want to use.

Regards,
Mate

[1] https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/#s3-specific
[3]
https://github.com/apache/flink/blob/1e0b58aa8d962469fa9dd7b470037aeaece43500/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java#L129

Chirag Dewan via user  ezt írta (időpont: 2023.
márc. 29., Sze, 9:07):

> Hi,
>
>
>
> We are tying to use Flink's File sink to distribute files to AWS S3
> storage. We are using Flink provided Hadoop s3a connector as plugin.
>
> We have some observations that we needed to clarify:
>
> 1. When using file sink for local filesystem distribution, we can see that
> the sink creates 3 sets of files - in progress, pending (on rolling) and
> finished (upon checkpointing). But with S3 file sink we can see only the
> finished files, in the S3 buckets.
>
> So we wanted to understand where does the sink creates the in-progress and
> pending files for S3 file sink ?
>
>
> 2. We can also see with local file system sink, the in-progress and
> pending file names follow the nomenclature:
> .--.inprogress.uid-
>
> There is a dot at the begining of the filename, may be flink is trying to
> create these files as hidden files. But in the flink documentation this is
> not mentioned.
>
> So can we assume that the in-progress and pending filenames shall always
> start with a dot ?
>
> thanks a lot in advance
>
>
>


Re: Table API function and expression vs SQL

2023-03-25 Thread Mate Czagany
Hi,

Please also keep in mind that restoring existing Table API jobs from
savepoints when upgrading to a newer minor version of Flink, e.g. 1.16 ->
1.17 is not supported as the topology might change between these versions
due to optimizer changes.

See here for more information:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution

Regards,
Mate

Hang Ruan  ezt írta (időpont: 2023. márc. 25., Szo,
13:38):

> Hi,
>
> I think the SQL job is better. Flink SQL jobs can be easily shared with
> others for debugging. And it is more suitable for flow batch integration.
> For a small part of jobs which can not be expressed through SQL, we will
> choose a job by DataStream API.
>
> Best,
> Hang
>
> ravi_suryavanshi.yahoo.com via user  于2023年3月24日周五
> 17:25写道:
>
>> Hello Team,
>> Need your advice on which method is recommended considering don't want to
>> change my query code when the Flink is updated/upgraded to the higher
>> version.
>>
>> Here I am seeking advice for writing the SQL using java code(Table API
>> function and Expression) or using pure SQL.
>>
>> I am assuming that SQL will not have any impact if upgraded to the higher
>> version.
>>
>> Thanks and Regards,
>> Ravi
>>
>