Re: Modify savepoints in Flink

2022-10-21 Thread Piotr Nowojski
Hi Sriram,

You can read and modify savepoints using StateProcessor API [1].

Alternatively, you can modify a code of your function/operator for which
you want to modify the state. For example in the
`org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
method you could add some code that would do a migration of your old state
to a new one.

```
private transient ValueState old;
private transient ValueState new;
(...)
initializeState(...) {
  (...)
  if (new.value() == null && old.value() != null) {
// code to migrate from old to new one
new.update(migrate(old.value());
old.update(null);
  }
}
```

And you can drop such code later, in the next savepoint.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/

pt., 21 paź 2022 o 10:05 Sriram Ganesh  napisał(a):

> Hi All,
>
> I am working on a scenario where I need to modify the existing savepoint
> operator state. Ex: Wanted to remove some offset of the savepoint.
>
> What is the better practice for these scenarios?. Could you please help me
> with any example as such?
>
> Thanks in advance.
>
> --
> *Sriram G*
> *Tech*
>
>


Re: Modify savepoints in Flink

2022-10-21 Thread Piotr Nowojski
ops

> Alternatively, you can modify a code of your function/operator for which
you want to modify the state. For example in the
`org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
method you could add some code that would do a migration of your old state
to a new one.
> And you can drop such code later, in the next savepoint.

That was not entirely true. This would work for the non-keyed state. For
the keyed state there is no easy alternative (you would have to iterate
through all of the keys, which I think is not exposed via Public API) -
best to use StateProcessor API.

Best,
Piotrek

pt., 21 paź 2022 o 10:54 Sriram Ganesh  napisał(a):

> Thanks !. Will try this.
>
> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski 
> wrote:
>
>> Hi Sriram,
>>
>> You can read and modify savepoints using StateProcessor API [1].
>>
>> Alternatively, you can modify a code of your function/operator for which
>> you want to modify the state. For example in the
>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>> method you could add some code that would do a migration of your old state
>> to a new one.
>>
>> ```
>> private transient ValueState old;
>> private transient ValueState new;
>> (...)
>> initializeState(...) {
>>   (...)
>>   if (new.value() == null && old.value() != null) {
>> // code to migrate from old to new one
>> new.update(migrate(old.value());
>> old.update(null);
>>   }
>> }
>> ```
>>
>> And you can drop such code later, in the next savepoint.
>>
>> Best,
>> Piotrek
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/
>>
>> pt., 21 paź 2022 o 10:05 Sriram Ganesh  napisał(a):
>>
>>> Hi All,
>>>
>>> I am working on a scenario where I need to modify the existing savepoint
>>> operator state. Ex: Wanted to remove some offset of the savepoint.
>>>
>>> What is the better practice for these scenarios?. Could you please help
>>> me with any example as such?
>>>
>>> Thanks in advance.
>>>
>>> --
>>> *Sriram G*
>>> *Tech*
>>>
>>>
>
> --
> *Sriram G*
> *Tech*
>
>


Re: Modify savepoints in Flink

2022-10-21 Thread Piotr Nowojski
Hi,

Yes and no. StateProcessor API can read any Flink state, but you have to
describe the state you want it to access. Take a look at the example in the
docs [1].

First you have an example of a theoretical production function
`StatefulFunctionWithTime`, which state you want to modify. Note the
`ValueState` and `ListState` fields and their descriptors. That's the state
of that particular function. Descriptors determine how the state is
serialised. Usually they are pretty simple.
Below is the `ReaderFunction`, that you want to use to access/modify the
state via the StateProcessor API. To do so, you have to specify the state
you want to access and effectively mimic/copy paste the state descriptors
from the production code.

If you want to modify the state of a source/sink function, you would have
to first take a look into the source code of such a connector to know what
to modify and copy its descriptors. Also note that for source/sink the
state is most likely non-keyed.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/#keyed-state

pt., 21 paź 2022 o 14:37 Sriram Ganesh  napisał(a):

> I have question on this. Different connector can have different
> serialisation and de-serlisation technique right?. Wouldn't that impact?.
> If I use StateProcessor API, would that be agnostic to all the sources and
> sinks?.
>
> On Fri, Oct 21, 2022, 18:00 Piotr Nowojski  wrote:
>
>> ops
>>
>> > Alternatively, you can modify a code of your function/operator for
>> which you want to modify the state. For example in the
>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>> method you could add some code that would do a migration of your old state
>> to a new one.
>> > And you can drop such code later, in the next savepoint.
>>
>> That was not entirely true. This would work for the non-keyed state. For
>> the keyed state there is no easy alternative (you would have to iterate
>> through all of the keys, which I think is not exposed via Public API) -
>> best to use StateProcessor API.
>>
>> Best,
>> Piotrek
>>
>> pt., 21 paź 2022 o 10:54 Sriram Ganesh  napisał(a):
>>
>>> Thanks !. Will try this.
>>>
>>> On Fri, Oct 21, 2022 at 2:22 PM Piotr Nowojski 
>>> wrote:
>>>
>>>> Hi Sriram,
>>>>
>>>> You can read and modify savepoints using StateProcessor API [1].
>>>>
>>>> Alternatively, you can modify a code of your function/operator for
>>>> which you want to modify the state. For example in the
>>>> `org.apache.flink.streaming.api.checkpoint.CheckpointedFunction#initializeState`
>>>> method you could add some code that would do a migration of your old state
>>>> to a new one.
>>>>
>>>> ```
>>>> private transient ValueState old;
>>>> private transient ValueState new;
>>>> (...)
>>>> initializeState(...) {
>>>>   (...)
>>>>   if (new.value() == null && old.value() != null) {
>>>> // code to migrate from old to new one
>>>> new.update(migrate(old.value());
>>>> old.update(null);
>>>>   }
>>>> }
>>>> ```
>>>>
>>>> And you can drop such code later, in the next savepoint.
>>>>
>>>> Best,
>>>> Piotrek
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/
>>>>
>>>> pt., 21 paź 2022 o 10:05 Sriram Ganesh 
>>>> napisał(a):
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am working on a scenario where I need to modify the existing
>>>>> savepoint operator state. Ex: Wanted to remove some offset of the
>>>>> savepoint.
>>>>>
>>>>> What is the better practice for these scenarios?. Could you please
>>>>> help me with any example as such?
>>>>>
>>>>> Thanks in advance.
>>>>>
>>>>> --
>>>>> *Sriram G*
>>>>> *Tech*
>>>>>
>>>>>
>>>
>>> --
>>> *Sriram G*
>>> *Tech*
>>>
>>>


Re: Kafka transactions drastically limit usability of Flink savepoints

2022-11-17 Thread Piotr Nowojski
Hi Yordan,

Indeed it looks like a missing feature. Probably someone implementing the
new KafkaSink didn't realize how important this is. I've created a ticket
to work on this issue [1], but I don't know when or who could fix it.

I think a workaround might be to create a new `KafkaSink` instance that
will have a new, different operator uid, and simply drop/ignore the old
instance and its state (by using the `allowNonRestoredState` option [2]).

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-30068
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#allowing-non-restored-state


śr., 16 lis 2022 o 11:36 Yordan Pavlov  napisał(a):

> Hi Piotr,
>
> the option you mention is applicable only for the deprecated
> KafkaProducer, is there an equivalent to the modern KafkaSink? I found
> this article comparing the behavior of the two:
>
> https://ververica.zendesk.com/hc/en-us/articles/360013269680-Best-Practices-for-Using-Kafka-Sources-Sinks-in-Flink-Jobs
>
> it suggests that the default behavior of KafkaSink would be: "The
> recovery continues with an ERROR message like the following is
> logged:", however this is not what I observe, instead the job fails. I
> am attaching the relevant part of the log. This error happens upon
> trying to recover from a one month old savepoint.
>
> Regards,
> Yordan
>
> On Tue, 15 Nov 2022 at 18:53, Piotr Nowojski  wrote:
> >
> > Hi Yordan,
> >
> > I don't understand where the problem is, why do you think savepoints are
> unusable? If you recover with `ignoreFailuresAfterTransactionTimeout`
> enabled, the current Flink behaviour shouldn't cause any problems (except
> for maybe some logged errors).
> >
> > Best,
> > Piotrek
> >
> > wt., 15 lis 2022 o 15:36 Yordan Pavlov 
> napisał(a):
> >>
> >> Hi,
> >> we are using Kafka savepoints as a recovery tool and want to store
> >> multiple ones for the past months. However as we use Kafka
> >> transactions for our KafkaSink this puts expiration time on our
> >> savepoints. We can use a savepoint only as old as our Kafka
> >> transaction timeout. The problem is explained in this issue:
> >> https://issues.apache.org/jira/browse/FLINK-16419
> >> the relative comment being this one:
> >> "FlinkKafkaProducer or KafkaSink do not know during recovery if they
> >> have to recover and commit or if it has already happened. Due to that,
> >> they are always attempting to recover and commit transactions during
> >> startup."
> >> I'm surprised that more people are not hitting this problem as this
> >> makes Savepoints pretty much unusable as a recovery mechanism.
>


Re: Kafka transactions drastically limit usability of Flink savepoints

2022-11-15 Thread Piotr Nowojski
Hi Yordan,

I don't understand where the problem is, why do you think savepoints are
unusable? If you recover with `ignoreFailuresAfterTransactionTimeout`
enabled, the current Flink behaviour shouldn't cause any problems (except
for maybe some logged errors).

Best,
Piotrek

wt., 15 lis 2022 o 15:36 Yordan Pavlov  napisał(a):

> Hi,
> we are using Kafka savepoints as a recovery tool and want to store
> multiple ones for the past months. However as we use Kafka
> transactions for our KafkaSink this puts expiration time on our
> savepoints. We can use a savepoint only as old as our Kafka
> transaction timeout. The problem is explained in this issue:
> https://issues.apache.org/jira/browse/FLINK-16419
> the relative comment being this one:
> "FlinkKafkaProducer or KafkaSink do not know during recovery if they
> have to recover and commit or if it has already happened. Due to that,
> they are always attempting to recover and commit transactions during
> startup."
> I'm surprised that more people are not hitting this problem as this
> makes Savepoints pretty much unusable as a recovery mechanism.
>


Re: [Discussion] - Release major Flink version to support JDK 17 (LTS)

2023-03-30 Thread Piotr Nowojski
Hey,

> 1. The Flink community agrees that we upgrade Kryo to a later version,
which means breaking all checkpoint/savepoint compatibility and releasing a
Flink 2.0 with Java 17 support added and Java 8 and Flink Scala API support
dropped. This is probably the quickest way, but would still mean that we
expose Kryo in the Flink APIs, which is the main reason why we haven't been
able to upgrade Kryo at all.

This sounds pretty bad to me.

Has anyone looked into what it would take to provide a smooth migration
from Kryo2 -> Kryo5?

Best,
Piotrek

czw., 30 mar 2023 o 16:54 Alexis Sarda-Espinosa 
napisał(a):

> Hi Martijn,
>
> just to be sure, if all state-related classes use a POJO serializer, Kryo
> will never come into play, right? Given FLINK-16686 [1], I wonder how many
> users actually have jobs with Kryo and RocksDB, but even if there aren't
> many, that still leaves those who don't use RocksDB for
> checkpoints/savepoints.
>
> If Kryo were to stay in the Flink APIs in v1.X, is it impossible to let
> users choose between v2/v5 jars by separating them like log4j2 jars?
>
> [1] https://issues.apache.org/jira/browse/FLINK-16686
>
> Regards,
> Alexis.
>
> Am Do., 30. März 2023 um 14:26 Uhr schrieb Martijn Visser <
> martijnvis...@apache.org>:
>
>> Hi all,
>>
>> I also saw a thread on this topic from Clayton Wohl [1] on this topic,
>> which I'm including in this discussion thread to avoid that it gets lost.
>>
>> From my perspective, there's two main ways to get to Java 17:
>>
>> 1. The Flink community agrees that we upgrade Kryo to a later version,
>> which means breaking all checkpoint/savepoint compatibility and releasing a
>> Flink 2.0 with Java 17 support added and Java 8 and Flink Scala API support
>> dropped. This is probably the quickest way, but would still mean that we
>> expose Kryo in the Flink APIs, which is the main reason why we haven't been
>> able to upgrade Kryo at all.
>> 2. There's a contributor who makes a contribution that bumps Kryo, but
>> either a) automagically reads in all old checkpoints/savepoints in using
>> Kryo v2 and writes them to new snapshots using Kryo v5 (like is mentioned
>> in the Kryo migration guide [2][3] or b) provides an offline tool that
>> allows users that are interested in migrating their snapshots manually
>> before starting from a newer version. That potentially could prevent the
>> need to introduce a new Flink major version. In both scenarios, ideally the
>> contributor would also help with avoiding the exposure of Kryo so that we
>> will be in a better shape in the future.
>>
>> It would be good to get the opinion of the community for either of these
>> two options, or potentially for another one that I haven't mentioned. If it
>> appears that there's an overall agreement on the direction, I would propose
>> that a FLIP gets created which describes the entire process.
>>
>> Looking forward to the thoughts of others, including the Users (therefore
>> including the User ML).
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]  https://lists.apache.org/thread/qcw8wy9dv8szxx9bh49nz7jnth22p1v2
>> [2] https://lists.apache.org/thread/gv49jfkhmbshxdvzzozh017ntkst3sgq
>> [3] https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5
>>
>> On Sun, Mar 19, 2023 at 8:16 AM Tamir Sagi 
>> wrote:
>>
>>> I agree, there are several options to mitigate the migration from v2 to
>>> v5.
>>> yet, Oracle roadmap is to end JDK 11 support in September this year.
>>>
>>>
>>>
>>> 
>>> From: ConradJam 
>>> Sent: Thursday, March 16, 2023 4:36 AM
>>> To: d...@flink.apache.org 
>>> Subject: Re: [Discussion] - Release major Flink version to support JDK
>>> 17 (LTS)
>>>
>>> EXTERNAL EMAIL
>>>
>>>
>>>
>>> Thanks for your start this discuss
>>>
>>>
>>> I have been tracking this problem for a long time, until I saw a
>>> conversation in ISSUSE a few days ago and learned that the Kryo version
>>> problem will affect the JDK17 compilation of snapshots [1] FLINK-24998 ,
>>>
>>> As @cherry said it ruined our whole effort towards JDK17
>>>
>>> I am in favor of providing an external tool to migrate from Kryo old
>>> version checkpoint to the new Kryo new checkpoint at one time (Maybe this
>>> tool start in flink 2.0 ?), does this tool currently have any plans or
>>> ideas worth discuss
>>>
>>>
>>> I think it should not be difficult to be compatible with JDK11 and JDK17.
>>> We should indeed abandon JDK8 in 2.0.0. It is also mentioned in the doc
>>> that it is marked as Deprecated [2]
>>>
>>>
>>> Here I add that we need to pay attention to the version of Scala and the
>>> version of JDK17
>>>
>>>
>>> [1] FLINK-24998  IGSEGV in Kryo / C2 CompilerThread on Java 17
>>> https://issues.apache.org/jira/browse/FLINK-24998
>>>
>>> [2] FLINK-30501 Update Flink build instruction to deprecate Java 8
>>> instead
>>> of requiring Java 11  https://issues.apache.org/jira/browse/FLINK-30501
>>>
>>> Tamir Sagi  于2023年3月16日周四 00:54写道:
>>>
>>> > Hey dev community,
>>> >
>>> > I'm writing 

Re: java.lang.StackOverflowError

2020-01-23 Thread Piotr Nowojski
Hi,

Thanks for reporting the issue. Could you first try to upgrade to Flink 1.6.4? 
This might be a known issue fixed in a later bug fix release [1].

Also, are you sure you are using (unmodified) Flink 1.6.2? Stack traces somehow 
do not match with the 1.6.2 release tag in the repository, for example this one:

at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
 

Doesn’t match the code [2]. (But first try to upgrade the Flink)

Piotrek


[1] https://issues.apache.org/jira/browse/FLINK-10367 

[2] 
https://github.com/apache/flink/blob/3456ad0dcacb3163e8aecf8fbe2dd684404cf37d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java#L380
 


> On 22 Jan 2020, at 03:24, 刘建刚  wrote:
> 
> I am using flink 1.6.2 on yarn. State backend is rocksdb. 
> 
>> 2020年1月22日 上午10:15,刘建刚 > > 写道:
>> 
>>   I have a flink job which fails occasionally. I am eager to avoid this 
>> problem. Can anyone help me? The error stacktrace is as following:
>> java.io.IOException: java.lang.StackOverflowError
>>  at 
>> org.apache.flink.runtime.io.network.partition.consumer.InputChannel.checkError(InputChannel.java:191)
>>  at 
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:194)
>>  at 
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:589)
>>  at 
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:546)
>>  at 
>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:175)
>>  at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:236)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:335)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:754)
>>  at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.StackOverflowError
>>  at 
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.notifyChannelNonEmpty(SingleInputGate.java:656)
>>  at 
>> org.apache.flink.runtime.io.network.partition.consumer.InputChannel.notifyChannelNonEmpty(InputChannel.java:125)
>>  at 
>> org.apache.flink.runtime.io.network.partition.consumer.InputChannel.setError(InputChannel.java:203)
>>  at 
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:403)
>>  at 
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>>  at 
>> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>>  at 
>> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>>  at 
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
>>  at 
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>>  at 
>> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>>  at 
>> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>>  at 
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
>>  at 
>> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>>  at 
>> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>>  at 
>> 

<    2   3   4   5   6   7