Slow restart from savepoint with large broadcast state when increasing parallelism

2022-12-14 Thread Ken Krugler
Hi all,

I have a job with a large amount of broadcast state (62MB).

I took a savepoint when my workflow was running with parallelism 300.

I then restarted the workflow with parallelism 400.

The first 297 sub-tasks restored their broadcast state fairly quickly, but 
after that it slowed to a crawl (maybe 2 sub-tasks finished per minute)

After 10 minutes we killed the job, so I don’t know if it would have ultimately 
succeeded.

Is this expected? Seems like it could lead to a bad situation, where it would 
take an hour to restart the workflow.

Thanks,

— Ken

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



[ANNOUNCE] Apache Flink Kubernetes Operator 1.3.0 released

2022-12-14 Thread Őrhidi Mátyás
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.3.0.

Release highlights:

   - Upgrade to Fabric8 6.x.x and JOSDK 4.x.x
   - Restart unhealthy Flink clusters
   - Contribute the Flink Kubernetes Operator to OperatorHub
   - Publish flink-kubernetes-operator-api module separately

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/12/14/release-kubernetes-operator-1.3.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator applications can be
found at: https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352322

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,

Matyas Orhidi


[ANNOUNCE] Apache Flink Kubernetes Operator 1.3.0 released

2022-12-14 Thread Őrhidi Mátyás
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.3.0.

Release highlights:

   - Upgrade to Fabric8 6.x.x and JOSDK 4.x.x
   - Restart unhealthy Flink clusters
   - Contribute the Flink Kubernetes Operator to OperatorHub
   - Publish flink-kubernetes-operator-api module separately

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/12/14/release-kubernetes-operator-1.3.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator applications can be
found at: https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352322

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,

Matyas Orhidi


Re: AsyncDataStream: Retries keep executing after timeout

2022-12-14 Thread Lincoln Lee
hi,
   Is this case running like a it case locally, or a streaming job running
on a cluster? If it's the former, one thing I can think of is local testing
using bounded datasource(has few test records) that will end input very
fastly and then trigger the endOfInput logic of AsyncWaitOperator, that is
it finishes all in fight delayed retry items immediately(asyncInvoke will
be called as the last attempt before the operator exits and as the final
result, regardless of whether it has timed out or not), this may be one
more attempt than when the job does not end in normal running.
   For a long running job, the retry will start from stratch when job
recover from restart(regardless of how many times it has been retried
before), this may also result more attempts and longer time for retry
elements.
   If you can provide more information about the test, maybe we can further
clarify what the problem is.

Best,
Lincoln Lee


Yoni Gibbs  于2022年12月13日周二 23:46写道:

> Hi,
>
> I've got a Kinesis consumer which reacts to each record by doing some
> async work using an implementation of RichAsyncFunction. I'm adding a
> retry strategy. After x failed attempts I want this to time out and give up
> by returning no data (i.e. not be treated as a failure).
>
> Here is a cut down version of my code, which works as expected (in Kotlin,
> I hope that's OK - can supply Java translation if required):
>
> val targetStream = AsyncDataStream
> .unorderedWaitWithRetry(
> inputStream,
> object : RichAsyncFunction() {
> override fun asyncInvoke(input: String, resultFuture: 
> ResultFuture) {
> println("Received input: $input")
> resultFuture.completeExceptionally(Exception("Error from 
> inside CompletableFuture"))
> }
>
> override fun timeout(input: String, resultFuture: 
> ResultFuture) {
> println("Timeout")
> resultFuture.complete(listOf())
> }
> },
> 4,
> TimeUnit.SECONDS,
> 100,
> AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(5, 2_000)
> .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
> .build()
> )
>
> This will time out after 4 seconds, and the retry strategy is set to retry
> every two seconds. If I run that I get the output I expect, namely:
>
> Received input: foo
> Received input: foo
> Timeout
>
> Importantly, I see that asyncInvoke is only called twice, because by the
> time the third invocation is due to occur, the timeout has already kicked
> in and marked this record as handled.
>
> However the above is clearly unrealistic as it calls
> resultFuture.completeExceptionally immediately rather than asynchronously
> after some work as taken place. So now I replace the asyncInvoke 
> implementation
> above with the following:
>
> override fun asyncInvoke(input: String, resultFuture: ResultFuture) {
> println("Received input: $input")
> CompletableFuture.supplyAsync {
> Thread.sleep(500)
> resultFuture.completeExceptionally(Exception("Error from inside 
> CompletableFuture"))
> }
> }
>
> Now I get output which I don't expect, which shows that after the timeout,
> asyncInvoke continues to be called a few more times.
>
> That seems wrong to me: shouldn't it stop being called because timeout has
> already been invoked and it called resultFuture.complete()?
>
> I might well just be misunderstanding something here.
>
> Thanks in advance,
>
> Yoni.
>


Re: [SURVEY] Drop Share and Key_Shared subscription support in Pulsar connector

2022-12-14 Thread 盛宇帆
Hi Zili,

Thanks for picking up this discussion. Here is my answer:

I agreed with your first question. If the problems are related to
Pulsar, it should be redelivered to the Pulsar repo. But these flaky
tests only occur on the Shared or Key_Shared subscription with the
transaction and I can’t reproduce it on my local machine. I don’t know
how to submit issues.

The performance issue is due to the internal implementation of the
Pulsar transaction. Pulsar has to log the ack status in an individual
topic which makes the performance extremely slow for large throughput.

The only reason I can recall when I started to support the Shared
subscription is that we can have more consumers on the same partition
to increase the processing speed. But Flink can increase the
performance by increasing the parallelism of the backend operators.
The bottle neck isn’t the consuming message from Pulsar with exclusive
subscription. This means that we don’t have to support the Shared
subscription for performance.

The Key_Shared subscription is only used to distribute the messages by
its key hash for the different consumers in Pulsar which can be
achieved by using Flink’s keyBy(). If we want to consume a subset of
key hash. We have to use an Exclusive subscription with a key ranges.
This makes the support for Key_Shared meaningless.

So I prefer to remove them to get a better support of Pulsar in Flink.

Best,
Yufan

On Wed, Dec 14, 2022 at 8:49 PM Zili Chen  wrote:
>
> Hi Yufan,
>
> Thanks for starting this discussion. My two coins:
>
> 1. It can help the upstream to fix the transaction issues by submitting the 
> instability and performance issues to the pulsar repo also.
> 2. Could you elaborate on whether and (if so) why we should drop the Shared 
> and Key_Share subscription support on Flink?
>
> Best,
> tison.
>
> On 2022/12/14 10:00:56 盛宇帆 wrote:
> > Hi, I'm the maintainer of flink-connector-pulsar. I would like to
> > start a survey on a function change proposal in
> > flink-connector-pulsar.
> >
> > I have created a ticket
> >  on JIRA and paste
> > its description here:
> >
> > A lot of Pulsar connector test unstable issues are related to Shared
> > and Key_Shared subscription. Because this two subscription is designed
> > to consume the records in an unordered way. And we can support
> > multiple consumers in same topic partition. But this feature lead to
> > some drawbacks in connector.
> >
> > 1. Performance
> >
> > Flink is a true stream processor with high correctness support. But
> > support multiple consumer will require higher correctness which
> > depends on Pulsar transaction. But the internal implementation of
> > Pulsar transaction on source is record the message one by one and
> > stores all the pending ack status in client side. Which is slow and
> > memory inefficient.
> >
> > This means that we can only use Shared and Key_Shared on Flink with
> > low throughput. This against our intention to support these two
> > subscription. Because adding multiple consumer to same partition can
> > increase the consuming speed.
> >
> > 2. Unstable
> >
> > Pulsar transaction acknowledge the messages one by one in an internal
> > Pulsar's topic. But it's not stable enough to get it works. A lot of
> > pending issues in Flink JIRA are related to Pulsar transaction and we
> > don't have any workaround.
> >
> > 3. Complex
> >
> > Support Shared and Key_Shared subscription make the connector's code
> > more complex than we expect. We have to make every part of code into
> > ordered and unordered way. Which is hard to understand for the
> > maintainer.
> >
> > 4. Necessary
> >
> > The current implementation on Shared and Key_Shared is completely
> > unusable to use in Production environment. For the user, this function
> > is not necessary. Because there is no bottleneck in consuming data
> > from Pulsar, the bottleneck is in processing the data, which we can
> > achieve by increasing the parallelism of the processing operator.
> >


Re: [SURVEY] Drop Share and Key_Shared subscription support in Pulsar connector

2022-12-14 Thread Zili Chen
Hi Yufan,

Thanks for starting this discussion. My two coins:

1. It can help the upstream to fix the transaction issues by submitting the 
instability and performance issues to the pulsar repo also.
2. Could you elaborate on whether and (if so) why we should drop the Shared and 
Key_Share subscription support on Flink?

Best,
tison.

On 2022/12/14 10:00:56 盛宇帆 wrote:
> Hi, I'm the maintainer of flink-connector-pulsar. I would like to
> start a survey on a function change proposal in
> flink-connector-pulsar.
> 
> I have created a ticket
>  on JIRA and paste
> its description here:
> 
> A lot of Pulsar connector test unstable issues are related to Shared
> and Key_Shared subscription. Because this two subscription is designed
> to consume the records in an unordered way. And we can support
> multiple consumers in same topic partition. But this feature lead to
> some drawbacks in connector.
> 
> 1. Performance
> 
> Flink is a true stream processor with high correctness support. But
> support multiple consumer will require higher correctness which
> depends on Pulsar transaction. But the internal implementation of
> Pulsar transaction on source is record the message one by one and
> stores all the pending ack status in client side. Which is slow and
> memory inefficient.
> 
> This means that we can only use Shared and Key_Shared on Flink with
> low throughput. This against our intention to support these two
> subscription. Because adding multiple consumer to same partition can
> increase the consuming speed.
> 
> 2. Unstable
> 
> Pulsar transaction acknowledge the messages one by one in an internal
> Pulsar's topic. But it's not stable enough to get it works. A lot of
> pending issues in Flink JIRA are related to Pulsar transaction and we
> don't have any workaround.
> 
> 3. Complex
> 
> Support Shared and Key_Shared subscription make the connector's code
> more complex than we expect. We have to make every part of code into
> ordered and unordered way. Which is hard to understand for the
> maintainer.
> 
> 4. Necessary
> 
> The current implementation on Shared and Key_Shared is completely
> unusable to use in Production environment. For the user, this function
> is not necessary. Because there is no bottleneck in consuming data
> from Pulsar, the bottleneck is in processing the data, which we can
> achieve by increasing the parallelism of the processing operator.
> 


Re: Could not restore keyed state backend for KeyedProcessOperator

2022-12-14 Thread Lars Skjærven
As far as I understand we are not specifying anything on restore mode. so I
guess default (NO_CLAIM) is what we're using.

We're using ververica platform to handle deploys, and things are a bit
obscure on what happens underneath.

It happened again this morning:

Caused by: java.io.FileNotFoundException: Item not found:
'gs://bucketname/namespace/flink-jobs/namespaces/default/jobs/fbdde9e7-cf5a-44b4-a3d4-d3ed517432a0/checkpoints/fbdde9e7cf5a44b4a3d4d3ed517432a0/shared/ae551eda-a588-45be-ba08-32bfbc50e965'.
Note, it is possible that the live version is still available but the
requested generation is deleted.


On Tue, Dec 13, 2022 at 11:37 PM Martijn Visser 
wrote:

> Hi Lars,
>
> Have you used any of the new restore modes that were introduced with 1.15?
> https://flink.apache.org/2022/05/06/restore-modes.html
>
> Best regards,
>
> Martijn
>
> On Fri, Dec 9, 2022 at 2:52 PM Lars Skjærven  wrote:
>
>> Lifecycle rulesNone
>>
>> On Fri, Dec 9, 2022 at 3:17 AM Hangxiang Yu  wrote:
>>
>>> Hi, Lars.
>>> Could you check whether you have configured the lifecycle of google
>>> cloud storage[1] which is not recommended in the flink checkpoint usage?
>>>
>>> [1] https://cloud.google.com/storage/docs/lifecycle
>>>
>>> On Fri, Dec 9, 2022 at 2:02 AM Lars Skjærven  wrote:
>>>
 Hello,
 We had an incident today with a job that could not restore after crash
 (for unknown reason). Specifically, it fails due to a missing checkpoint
 file. We've experienced this a total of three times with Flink 1.15.2, but
 never with 1.14.x. Last time was during a node upgrade, but that was not
 the case this time.

 I've not been able to reproduce this issue. I've checked that I can
 kill the taskmanager and jobmanager (using kubectl delete pod), and the job
 restores as expected.

 The job is running with kubernetes high availability, rocksdb and
 incremental checkpointing.

 Any tips are highly appreciated.

 Thanks,
 Lars

 Caused by: org.apache.flink.util.FlinkException: Could not restore
 keyed state backend for
 KeyedProcessOperator_bf374b554824ef28e76619f4fa153430_(2/2) from any of the
 1 provided restore options.
 at
 org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
 at
 org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
 at
 org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
 ... 11 more
 Caused by: org.apache.flink.runtime.state.BackendBuildingException:
 Caught unexpected exception.
 at
 org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:395)
 at
 org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:483)
 at
 org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
 at
 org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
 at
 org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
 at
 org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 ... 13 more
 Caused by: java.io.FileNotFoundException: Item not found:
 'gs://some-bucket-name/flink-jobs/namespaces/default/jobs/d60a6c94-ddbc-42a1-947e-90f62749835a/checkpoints/d60a6c94ddbc42a1947e90f62749835a/shared/3cb2bb55-b4b0-44e5-948a-5d38ec088253'.
 Note, it is possible that the live version is still available but the
 requested generation is deleted.
 at
 com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException(GoogleCloudStorageExceptions.java:46)


>>>
>>> --
>>> Best,
>>> Hangxiang.
>>>
>>


[SURVEY] Drop Share and Key_Shared subscription support in Pulsar connector

2022-12-14 Thread 盛宇帆
Hi, I'm the maintainer of flink-connector-pulsar. I would like to
start a survey on a function change proposal in
flink-connector-pulsar.

I have created a ticket
 on JIRA and paste
its description here:

A lot of Pulsar connector test unstable issues are related to Shared
and Key_Shared subscription. Because this two subscription is designed
to consume the records in an unordered way. And we can support
multiple consumers in same topic partition. But this feature lead to
some drawbacks in connector.

1. Performance

Flink is a true stream processor with high correctness support. But
support multiple consumer will require higher correctness which
depends on Pulsar transaction. But the internal implementation of
Pulsar transaction on source is record the message one by one and
stores all the pending ack status in client side. Which is slow and
memory inefficient.

This means that we can only use Shared and Key_Shared on Flink with
low throughput. This against our intention to support these two
subscription. Because adding multiple consumer to same partition can
increase the consuming speed.

2. Unstable

Pulsar transaction acknowledge the messages one by one in an internal
Pulsar's topic. But it's not stable enough to get it works. A lot of
pending issues in Flink JIRA are related to Pulsar transaction and we
don't have any workaround.

3. Complex

Support Shared and Key_Shared subscription make the connector's code
more complex than we expect. We have to make every part of code into
ordered and unordered way. Which is hard to understand for the
maintainer.

4. Necessary

The current implementation on Shared and Key_Shared is completely
unusable to use in Production environment. For the user, this function
is not necessary. Because there is no bottleneck in consuming data
from Pulsar, the bottleneck is in processing the data, which we can
achieve by increasing the parallelism of the processing operator.


Re: Can't use nested attributes as watermarks in Table

2022-12-14 Thread Theodor Wübker
Actually, this behaviour is documented 

 (See the Watermarks section, where it is stated that the column must be a 
“top-level” column). So I suppose, there is a reason. Nevertheless it is quite 
a limiting factor, since it makes me unable to use Window queries with the 
desired timestamp as watermark … I suppose one workaround could be to transform 
the table so the attribute is at top level and then use it as a watermark. But 
in my case this would be quite the effort. 

My question remains, what is the reason for this behaviour? Also, are there any 
good workarounds for this?

Thanks,

-Theo

> On 14. Dec 2022, at 08:13, Theodor Wübker  wrote:
> 
> Hey everyone,
> 
> I have encountered a problem with my Table API Program. I am trying to use a 
> nested attribute as a watermark. The structure of my schema is a row, which 
> itself has 3 rows as attributes and they again have some attributes, 
> especially the Timestamp that I want to use as a watermark. Flink does not 
> let me reference it using the dot operator, sadly. I checked the sourcecode 
> and tracked it down to this (part of a) method in the DefaultSchemaResolver:
> 
> private Column validateTimeColumn(String columnName, List columns) {
> final Optional timeColumn =
> columns.stream().filter(c -> 
> c.getName().equals(columnName)).findFirst();
> if (!timeColumn.isPresent()) {
> throw new ValidationException(
> String.format(
> "Invalid column name '%s' for rowtime attribute in 
> watermark declaration. Available columns are: %s",
> columnName,
> 
> columns.stream().map(Column::getName).collect(Collectors.toList(;
> } ...
> The list of available columns is just the 3 rows and none of the nested 
> attributes. Is there a reason for nested columns being unavailable for 
> watermark declaration? Or am I overseeing something / doing something wrong?
> 
> -Theo



smime.p7s
Description: S/MIME cryptographic signature