Request for sample codes for Dockerizing Java application

2024-02-13 Thread Fidea Lidea
Hi Team,

I request you to provide a few sample codes for dockerizing
flink-java application.
My application has only one job as of now.
Awaiting your response.

Thanks & Regards
Nida Shaikh


Re: Request for sample codes for Dockerizing Java application

2024-02-13 Thread Marco Villalobos
Hi Nida,

I request that you read 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/
in order to learn how to Dockerize your Flink job.

You're Welcome & Regard
Marco A. Villalobos

> On Feb 13, 2024, at 12:00 AM, Fidea Lidea  wrote:
> 
> Hi Team,
> 
> I request you to provide a few sample codes for dockerizing flink-java 
> application.
> My application has only one job as of now.
> Awaiting your response.
> 
> Thanks & Regards
> Nida Shaikh



Re: Flink Kubernetes Operator - Deadlock when Cluster Cleanup Fails

2024-02-13 Thread Gyula Fóra
Hi Niklas!

The best way to report the issue would be to open a JIRA ticket with the
same detailed information.

Otherwise I think your observations are correct and this is indeed a
frequent problem that comes up, it would be good to improve on it. In
addition to improving logging we could also increase the default timeout
and if we could actually do something on the timeout that would be even
better.

Please open the JIRA ticket and if you have time to work on these
improvements I will assign it to you.

Cheers
Gyula

On Mon, Feb 12, 2024 at 11:59 PM Niklas Wilcke 
wrote:

> Hi Flink Kubernetes Operator Community,
>
> I hope this is the right way to report an issue with the Apache Flink
> Kubernetes Operator. We are experiencing problems with some streaming job
> clusters which end up in a terminated state, because of the operator not
> behaving as expected. The problem is that the teardown of the Flink cluster
> by the operator doesn't succeed in the default timeout of 1 minute. After
> that the operator proceeds and tries to create a fresh cluster, which
> fails, because parts of the cluster still exist. After that it tries to
> fully remove the cluster including the HA metadata. After that it is stuck
> in an error loop that manual recovery is necessary, since the HA metadata
> is missing. At the very bottom of the mail you can find an condensed log
> attached, which hopefully gives a more detailed impression about the
> problem.
>
> The current workaround is to increase the 
> "kubernetes.operator.resource.cleanup.timeout"
> [0] to 10 minutes. Time will tell whether this workaround fixes the
> problem for us.
>
> The main problem I see is that the
> method AbstractFlinkService.waitForClusterShutdown(...) [1] isn't handling
> a timeout at all. Please correct me in case I missed a detail, but this is
> how we experience the problem. In case one of the service, the jobmanagers
> or the taskmanagers survives the cleanup timeout (of 1 minute), the
> operator seems to proceed as if the entities have been removed properly. To
> me this doesn't look good. From my point of view at least an error should
> be logged.
>
> Additionally the current logging makes it difficult to analyse the problem
> and to be notified about the timeout. The following things could possibly
> be improved or implemented.
>
>1. Successful removal of the entities should be logged.
>2. Timing out isn't logged (An error should probably be logged here)
>3. For some reason the logging of the waited seconds is somehow
>incomplete (L944, further analysis needed)
>
> We use the following Flink and Operator versions:
>
> Flink Image: flink:1.17.1 (from Dockerhub)
> Operator Version: 1.6.1
>
> I hope this description is well enough to get into touch and discuss the
> matter. I'm open to provide additional information or with some guidance,
> provide a patch to resolve the issue.
> Thanks for your work on the Operator. It is highly appreciated!
>
> Cheers,
> Niklas
>
>
> [0]
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/
> [1]
> https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L903
>
>
>
> #
> # The job in the cluster failed
> Event  | Info| JOBSTATUSCHANGED | Job status changed from RUNNING to
> FAILED"
> Stopping failed Flink job...
> Status | Error   | FAILED  |
> {""type"":""org.apache.flink.util.SerializedThrowable"",""message"":"
> "org.apache.flink.runtime.JobException: Recovery is suppressed by
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5,
> backoffTimeMS=3)"",""additionalMetadata"":{},""throwableList"":[{""type"":""org.apache.flink.util.SerializedThrowable"",""message"":""org
> .apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting
> down."",""additionalMetadata"":{}}]}
>  Deleting JobManager deployment while preserving HA metadata.
> Deleting cluster with Foreground propagation
> Waiting for cluster shutdown... (10s)
> Waiting for cluster shutdown... (30s)
> Waiting for cluster shutdown... (40s)
> Waiting for cluster shutdown... (45s)
> Waiting for cluster shutdown... (50s)
> Resubmitting Flink job...
> Cluster shutdown completed.
> Deploying application cluster
> Event  | Info| SUBMIT  | Starting deployment
> Submitting application in 'Application Mode
> Deploying application cluster
> ...
> Event  | Warning | CLUSTERDEPLOYMENTEXCEPTION | Could not create
> Kubernetes cluster 
>  Status | Error   | FAILED  |
> {""type"":""org.apache.flink.kubernetes.operator.exception.ReconciliationException"",""message"":""org.apache.flink.client.deployment.ClusterDeploymentException:
> Could not create Kubernetes cluster
> \"" not create Kubernetes cluster
> \""\""."",""additionalMetadata""

Re: [EXTERNAL]Re: Flink Kubernetes Operator - Deadlock when Cluster Cleanup Fails

2024-02-13 Thread Niklas Wilcke
Hi Gyula,

thanks for the advise. I requested a Jira account and will try to open a ticket 
as soon as I get access.

Cheers,
Niklas

> On 13. Feb 2024, at 09:13, Gyula Fóra  wrote:
> 
> Hi Niklas!
> 
> The best way to report the issue would be to open a JIRA ticket with the same 
> detailed information.
> 
> Otherwise I think your observations are correct and this is indeed a frequent 
> problem that comes up, it would be good to improve on it. In addition to 
> improving logging we could also increase the default timeout and if we could 
> actually do something on the timeout that would be even better.
> 
> Please open the JIRA ticket and if you have time to work on these 
> improvements I will assign it to you.
> 
> Cheers
> Gyula
> 
> On Mon, Feb 12, 2024 at 11:59 PM Niklas Wilcke  > wrote:
>> Hi Flink Kubernetes Operator Community,
>> 
>> I hope this is the right way to report an issue with the Apache Flink 
>> Kubernetes Operator. We are experiencing problems with some streaming job 
>> clusters which end up in a terminated state, because of the operator not 
>> behaving as expected. The problem is that the teardown of the Flink cluster 
>> by the operator doesn't succeed in the default timeout of 1 minute. After 
>> that the operator proceeds and tries to create a fresh cluster, which fails, 
>> because parts of the cluster still exist. After that it tries to fully 
>> remove the cluster including the HA metadata. After that it is stuck in an 
>> error loop that manual recovery is necessary, since the HA metadata is 
>> missing. At the very bottom of the mail you can find an condensed log 
>> attached, which hopefully gives a more detailed impression about the problem.
>> 
>> The current workaround is to increase the 
>> "kubernetes.operator.resource.cleanup.timeout" [0] to 10 minutes. Time will 
>> tell whether this workaround fixes the problem for us.
>> 
>> The main problem I see is that the method 
>> AbstractFlinkService.waitForClusterShutdown(...) [1] isn't handling a 
>> timeout at all. Please correct me in case I missed a detail, but this is how 
>> we experience the problem. In case one of the service, the jobmanagers or 
>> the taskmanagers survives the cleanup timeout (of 1 minute), the operator 
>> seems to proceed as if the entities have been removed properly. To me this 
>> doesn't look good. From my point of view at least an error should be logged.
>> 
>> Additionally the current logging makes it difficult to analyse the problem 
>> and to be notified about the timeout. The following things could possibly be 
>> improved or implemented.
>> Successful removal of the entities should be logged.
>> Timing out isn't logged (An error should probably be logged here)
>> For some reason the logging of the waited seconds is somehow incomplete 
>> (L944, further analysis needed)
>> We use the following Flink and Operator versions:
>> 
>> Flink Image: flink:1.17.1 (from Dockerhub)
>> Operator Version: 1.6.1
>> 
>> I hope this description is well enough to get into touch and discuss the 
>> matter. I'm open to provide additional information or with some guidance, 
>> provide a patch to resolve the issue.
>> Thanks for your work on the Operator. It is highly appreciated!
>> 
>> Cheers,
>> Niklas
>> 
>> 
>> [0] 
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/
>> [1] 
>> https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L903
>> 
>> 
>> #
>> # The job in the cluster failed
>> Event  | Info| JOBSTATUSCHANGED | Job status changed from RUNNING to 
>> FAILED"
>> Stopping failed Flink job...
>> Status | Error   | FAILED  | 
>> {""type"":""org.apache.flink.util.SerializedThrowable"",""message"":"
>> "org.apache.flink.runtime.JobException: Recovery is suppressed by 
>> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5, 
>> backoffTimeMS=3)"",""additionalMetadata"":{},""throwableList"":[{""type"":""org.apache.flink.util.SerializedThrowable"",""message"":""org
>> .apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting 
>> down."",""additionalMetadata"":{}}]}
>>  Deleting JobManager deployment while preserving HA metadata.
>> Deleting cluster with Foreground propagation
>> Waiting for cluster shutdown... (10s)
>> Waiting for cluster shutdown... (30s)
>> Waiting for cluster shutdown... (40s)
>> Waiting for cluster shutdown... (45s)
>> Waiting for cluster shutdown... (50s)
>> Resubmitting Flink job...
>> Cluster shutdown completed.
>> Deploying application cluster
>> Event  | Info| SUBMIT  | Starting deployment
>> Submitting application in 'Application Mode
>> Deploying application cluster
>> ...
>> Event  | Warning | CLUSTERDEPLOYMENTEXCEPTION | Could not

Continuous transfer of data from a partitioned table

2024-02-13 Thread К В
Hello!

We need to read data from an Oracle database table in order to pass it to
Kafka.
Data is inserted in the table periodically.
The table has multiple partitions.
Data should be processed parallel, each task should consume one partition
in the database.

Can this be done using a Flink task?
How will Flink determine which records it has already read and which it
hasn't?
Can Flink work with composite table partitioning?
Could you please give an example of how to perform this kind of task?

Best Regards,
Konstantin.


Re: Continuous transfer of data from a partitioned table

2024-02-13 Thread Giannis Polyzos
You can check the Oracle CDC connector, which provides that
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html

Best,
G.

On Tue, Feb 13, 2024 at 3:25 PM К В  wrote:

> Hello!
>
> We need to read data from an Oracle database table in order to pass it to
> Kafka.
> Data is inserted in the table periodically.
> The table has multiple partitions.
> Data should be processed parallel, each task should consume one partition
> in the database.
>
> Can this be done using a Flink task?
> How will Flink determine which records it has already read and which it
> hasn't?
> Can Flink work with composite table partitioning?
> Could you please give an example of how to perform this kind of task?
>
> Best Regards,
> Konstantin.
>


Re: Flink Kubernetes Operator - Deadlock when Cluster Cleanup Fails

2024-02-13 Thread Mate Czagany
Hi,

I have opened a JIRA [1] as I had the same error (AlreadyExists) last week
and I could pinpoint the problem to the TaskManagers being still alive when
creating the new Deployment. In native mode we only check for the
JobManagers when we wait for the cluster to shut down in contrast to
standalone mode.

That fix alone won't help mitigating this problem, but I intend to add some
logs to the 'waitForClusterShutdown' method with that PR.

Best regards,
Mate

[1] https://issues.apache.org/jira/browse/FLINK-34438


Gyula Fóra  ezt írta (időpont: 2024. febr. 13., K,
9:14):

> Hi Niklas!
>
> The best way to report the issue would be to open a JIRA ticket with the
> same detailed information.
>
> Otherwise I think your observations are correct and this is indeed a
> frequent problem that comes up, it would be good to improve on it. In
> addition to improving logging we could also increase the default timeout
> and if we could actually do something on the timeout that would be even
> better.
>
> Please open the JIRA ticket and if you have time to work on these
> improvements I will assign it to you.
>
> Cheers
> Gyula
>
> On Mon, Feb 12, 2024 at 11:59 PM Niklas Wilcke 
> wrote:
>
>> Hi Flink Kubernetes Operator Community,
>>
>> I hope this is the right way to report an issue with the Apache Flink
>> Kubernetes Operator. We are experiencing problems with some streaming job
>> clusters which end up in a terminated state, because of the operator not
>> behaving as expected. The problem is that the teardown of the Flink cluster
>> by the operator doesn't succeed in the default timeout of 1 minute. After
>> that the operator proceeds and tries to create a fresh cluster, which
>> fails, because parts of the cluster still exist. After that it tries to
>> fully remove the cluster including the HA metadata. After that it is stuck
>> in an error loop that manual recovery is necessary, since the HA metadata
>> is missing. At the very bottom of the mail you can find an condensed log
>> attached, which hopefully gives a more detailed impression about the
>> problem.
>>
>> The current workaround is to increase the 
>> "kubernetes.operator.resource.cleanup.timeout"
>> [0] to 10 minutes. Time will tell whether this workaround fixes the
>> problem for us.
>>
>> The main problem I see is that the
>> method AbstractFlinkService.waitForClusterShutdown(...) [1] isn't handling
>> a timeout at all. Please correct me in case I missed a detail, but this is
>> how we experience the problem. In case one of the service, the jobmanagers
>> or the taskmanagers survives the cleanup timeout (of 1 minute), the
>> operator seems to proceed as if the entities have been removed properly. To
>> me this doesn't look good. From my point of view at least an error should
>> be logged.
>>
>> Additionally the current logging makes it difficult to analyse the
>> problem and to be notified about the timeout. The following things could
>> possibly be improved or implemented.
>>
>>1. Successful removal of the entities should be logged.
>>2. Timing out isn't logged (An error should probably be logged here)
>>3. For some reason the logging of the waited seconds is somehow
>>incomplete (L944, further analysis needed)
>>
>> We use the following Flink and Operator versions:
>>
>> Flink Image: flink:1.17.1 (from Dockerhub)
>> Operator Version: 1.6.1
>>
>> I hope this description is well enough to get into touch and discuss the
>> matter. I'm open to provide additional information or with some guidance,
>> provide a patch to resolve the issue.
>> Thanks for your work on the Operator. It is highly appreciated!
>>
>> Cheers,
>> Niklas
>>
>>
>> [0]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/
>> [1]
>> https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L903
>>
>>
>>
>> #
>> # The job in the cluster failed
>> Event  | Info| JOBSTATUSCHANGED | Job status changed from RUNNING to
>> FAILED"
>> Stopping failed Flink job...
>> Status | Error   | FAILED  |
>> {""type"":""org.apache.flink.util.SerializedThrowable"",""message"":"
>> "org.apache.flink.runtime.JobException: Recovery is suppressed by
>> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5,
>> backoffTimeMS=3)"",""additionalMetadata"":{},""throwableList"":[{""type"":""org.apache.flink.util.SerializedThrowable"",""message"":""org
>> .apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting
>> down."",""additionalMetadata"":{}}]}
>>  Deleting JobManager deployment while preserving HA metadata.
>> Deleting cluster with Foreground propagation
>> Waiting for cluster shutdown... (10s)
>> Waiting for cluster shutdown... (30s)
>> Waiting for cluster shutdown... (40s)
>> Waiting for cluster shutdo

Re: [EXTERNAL]Re: Flink Kubernetes Operator - Deadlock when Cluster Cleanup Fails

2024-02-13 Thread Niklas Wilcke
Hi Mate,

thanks for creating the issue and pointing it out. I think the issue you 
created is a bit more specific than my whole point. It rather focuses on the 
taskmanagers, which is of course fine. From my point of view the following two 
things are the low hanging fruits:

1. Improving the logging
2. Changing the default cleanup timeout

Resolving these two points could already help a lot. The harder part could be 
to correctly handle the case, when a cleanup timeout appears, which currently 
is unhandled.
Please let me know if you need any assistance with your PR and let me know, 
when you opened it. Unfortunately I'm still waiting for my Jira account.

Cheers,
Niklas


> On 13. Feb 2024, at 16:35, Mate Czagany  wrote:
> 
> Hi,
> 
> I have opened a JIRA [1] as I had the same error (AlreadyExists) last week 
> and I could pinpoint the problem to the TaskManagers being still alive when 
> creating the new Deployment. In native mode we only check for the JobManagers 
> when we wait for the cluster to shut down in contrast to standalone mode.
> 
> That fix alone won't help mitigating this problem, but I intend to add some 
> logs to the 'waitForClusterShutdown' method with that PR.
> 
> Best regards,
> Mate
> 
> [1] https://issues.apache.org/jira/browse/FLINK-34438
> 
> 
> Gyula Fóra mailto:gyula.f...@gmail.com>> ezt írta 
> (időpont: 2024. febr. 13., K, 9:14):
>> Hi Niklas!
>> 
>> The best way to report the issue would be to open a JIRA ticket with the 
>> same detailed information.
>> 
>> Otherwise I think your observations are correct and this is indeed a 
>> frequent problem that comes up, it would be good to improve on it. In 
>> addition to improving logging we could also increase the default timeout and 
>> if we could actually do something on the timeout that would be even better.
>> 
>> Please open the JIRA ticket and if you have time to work on these 
>> improvements I will assign it to you.
>> 
>> Cheers
>> Gyula
>> 
>> On Mon, Feb 12, 2024 at 11:59 PM Niklas Wilcke > > wrote:
>>> Hi Flink Kubernetes Operator Community,
>>> 
>>> I hope this is the right way to report an issue with the Apache Flink 
>>> Kubernetes Operator. We are experiencing problems with some streaming job 
>>> clusters which end up in a terminated state, because of the operator not 
>>> behaving as expected. The problem is that the teardown of the Flink cluster 
>>> by the operator doesn't succeed in the default timeout of 1 minute. After 
>>> that the operator proceeds and tries to create a fresh cluster, which 
>>> fails, because parts of the cluster still exist. After that it tries to 
>>> fully remove the cluster including the HA metadata. After that it is stuck 
>>> in an error loop that manual recovery is necessary, since the HA metadata 
>>> is missing. At the very bottom of the mail you can find an condensed log 
>>> attached, which hopefully gives a more detailed impression about the 
>>> problem.
>>> 
>>> The current workaround is to increase the 
>>> "kubernetes.operator.resource.cleanup.timeout" [0] to 10 minutes. Time will 
>>> tell whether this workaround fixes the problem for us.
>>> 
>>> The main problem I see is that the method 
>>> AbstractFlinkService.waitForClusterShutdown(...) [1] isn't handling a 
>>> timeout at all. Please correct me in case I missed a detail, but this is 
>>> how we experience the problem. In case one of the service, the jobmanagers 
>>> or the taskmanagers survives the cleanup timeout (of 1 minute), the 
>>> operator seems to proceed as if the entities have been removed properly. To 
>>> me this doesn't look good. From my point of view at least an error should 
>>> be logged.
>>> 
>>> Additionally the current logging makes it difficult to analyse the problem 
>>> and to be notified about the timeout. The following things could possibly 
>>> be improved or implemented.
>>> Successful removal of the entities should be logged.
>>> Timing out isn't logged (An error should probably be logged here)
>>> For some reason the logging of the waited seconds is somehow incomplete 
>>> (L944, further analysis needed)
>>> We use the following Flink and Operator versions:
>>> 
>>> Flink Image: flink:1.17.1 (from Dockerhub)
>>> Operator Version: 1.6.1
>>> 
>>> I hope this description is well enough to get into touch and discuss the 
>>> matter. I'm open to provide additional information or with some guidance, 
>>> provide a patch to resolve the issue.
>>> Thanks for your work on the Operator. It is highly appreciated!
>>> 
>>> Cheers,
>>> Niklas
>>> 
>>> 
>>> [0] 
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/
>>> [1] 
>>> https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L903
>>> 
>>> 
>>> #
>>> #

Stream enrichment with ingest mode

2024-02-13 Thread Lars Skjærven
Dear all,

A reoccurring challenge we have with stream enrichment in Flink is a robust
mechanism to estimate that all messages of the source(s) have been
consumed/processed before output is collected.

A simple example is two sources of catalogue metadata:
- source A delivers products,
- source B delivers product categories,

For a process function to enrich the categories with the number of products
in each category, we would do a KeyedCoProcessFunction (or a
RichCoFlatMap), keyed by category ID, and put both the category and
products in state. Then count all products for each keyed state and collect
the result.

Typically, however, we don't want to start counting before all products are
included in state (to avoid emitting incomplete aggregations downstream).
Therefore we use the event lag time (i.e. processing time - current
watermark) to indicate "ingest mode" of the processor (e.g. lag time > 30
seconds). When in "ingest mode" we will trigger a timer, and return without
collecting. Finally, the timer fires when the watermark has advanced
sufficiently.

This strategy of "ingest mode" (and timers) seems to be more complicated
when you have multiple process functions (with the same need of ingest
mode) downstream of the first one processor. The reason seems to be that
watermarks are passed from the first process function even though no
elements are collected. Therefore, when elements finally arrive at the
second process function, the current watermark has already advanced, so the
same strategy of watermarks is less robust.

I'm curious how others in the community handle this "challenge" of initial
ingest. Any ideas are greatly appreciated.

Note: we use a custom watermark generator that emits watermarks derived
from event time, and advances the watermarks when the source is idle for a
longer period (e.g. 30 seconds).

Thanks !

L