Re: Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

2020-01-30 Thread Tzu-Li (Gordon) Tai
Update:
I can confirm my previous guess based on the changes in
https://issues.apache.org/jira/browse/FLINK-4280 that was merged for Flink
1.3.0.
When upgrading from Flink 1.2.x -> 1.3.0, the new startup position
configurations were respected over the checkpointed offsets (only once for
the first restore after upgrade).
After that, all restores from savepoints would only ever respect the
checkpointed offsets (regardless of whether or not it was the first restore
after upgrade).
This would explain the behaviour you encountered.

If you actually prefer to not have your Kafka consumer progress carried
over after the upgrade and want to just start consuming from the latest
offset,
one way to achieve that is to assign a new uid to the Kafka consumer
operator, and allow non-restored state when restoring.
With this change, Flink should consider the Kafka consumer operator to not
have any prior snapshotted state (i.e. offsets) and respect the startup
configuration.

Let me know if this works for you!

Cheers,
Gordon

On Thu, Jan 23, 2020 at 9:12 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi Somya,
>
> I'll have to take a closer look at the JIRA history to refresh my memory
> on potential past changes that caused this.
>
> My first suspection is this:
> It is expected that the Kafka consumer will *ignore* the configured
> startup position if the job was restored from a savepoint.
> It will always use the offsets that were persisted at the time of the
> savepoint.
> Would this probably already explain what you are seeing?
>
> What I'm not sure of yet is whether this was a behavioural change that
> occurred between versions 1.2.x and 1.3.x or later versions.
> I'll take a closer look once I'm back from travelling tomorrow and get
> back to you on that.
>
> Cheers,
> Gordon
>
> On Thu, Jan 23, 2020, 7:52 PM Chesnay Schepler  wrote:
>
>> @gordon Do you remember whether we changed any behavior of the Kafka 0.10
>> consumer after 1.3.3?
>>
>> On 23/01/2020 12:02, Somya Maithani wrote:
>>
>> Hey,
>>
>> Any ideas about this? We are blocked on the upgrade because we want async
>> timer checkpointing.
>>
>> Regards,
>>
>> Somya Maithani
>> Software Developer II
>> Helpshift Pvt Ltd
>>
>>
>> On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani <
>> somyamaithan...@gmail.com> wrote:
>>
>>> Hey Team,
>>>
>>> *Problem*
>>> Recently, we were trying to upgrade Flink infrastructure to version
>>> 1.9.1 and we noticed that a week old offset was consumed from Kafka even
>>> though the configuration says latest.
>>>
>>> *Pretext*
>>> 1. Our current Flink version in production is 1.2.1.
>>> 2. We use RocksDB + Hadoop as our backend / checkpointing data store.
>>> 3. We consume and produce messages to / from Kafka.
>>>
>>> *Release Plan*
>>> 1. Upgrade Flink 1.2.1 to 1.3.
>>> 2. Upgrade Flink 1.3.3 to 1.9.1
>>> Note: We have a transitioning version (1.3.3) because of the
>>> serialisation change in checkpointing.
>>>
>>> After performing step 1, the service was consuming latest Kafka events
>>> but after performing step 2 we noticed that the service was consuming one
>>> week old Kafka messages from the source topic. We did not see any
>>> exceptions but since the number of messages consumed increased a lot for
>>> our Flink infrastructure, our task managers started crashing eventually.
>>>
>>> We did not change Kafka configuration in the service for the upgrade but
>>> we did upgrade the Flink dependencies for Kafka.
>>>
>>> Old dependency:
>>>
>>> 
   org.apache.flink
   flink-streaming-java_2.10
   ${flink.version}
 
 
   org.apache.flink
   flink-clients_2.10
   ${flink.version}
 
 
   org.apache.flink
   flink-connector-kafka-0.10_2.10
   ${flink.version}
 
 
   org.apache.flink
   flink-statebackend-rocksdb_2.10
   ${flink.version}
 

>>>
>>>
>>> New dependency:
>>>
>>> 
   org.apache.flink
   flink-streaming-java_2.12
   ${flink.version}
 
 
   org.apache.flink
   flink-clients_2.12
   ${flink.version}
 
 
   org.apache.flink
   flink-connector-kafka-0.10_2.11
   ${flink.version}
 
 
   org.apache.flink
   flink-statebackend-rocksdb_2.12
   ${flink.version}
 

>>>
>>>
>>> Do we know why this would be happening?
>>>
>>> Regards,
>>>
>>> Somya Maithani
>>> Software Developer II
>>> Helpshift Pvt Ltd
>>>
>>
>>


Re: Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

2020-01-23 Thread Tzu-Li (Gordon) Tai
Hi Somya,

I'll have to take a closer look at the JIRA history to refresh my memory on
potential past changes that caused this.

My first suspection is this:
It is expected that the Kafka consumer will *ignore* the configured startup
position if the job was restored from a savepoint.
It will always use the offsets that were persisted at the time of the
savepoint.
Would this probably already explain what you are seeing?

What I'm not sure of yet is whether this was a behavioural change that
occurred between versions 1.2.x and 1.3.x or later versions.
I'll take a closer look once I'm back from travelling tomorrow and get back
to you on that.

Cheers,
Gordon

On Thu, Jan 23, 2020, 7:52 PM Chesnay Schepler  wrote:

> @gordon Do you remember whether we changed any behavior of the Kafka 0.10
> consumer after 1.3.3?
>
> On 23/01/2020 12:02, Somya Maithani wrote:
>
> Hey,
>
> Any ideas about this? We are blocked on the upgrade because we want async
> timer checkpointing.
>
> Regards,
>
> Somya Maithani
> Software Developer II
> Helpshift Pvt Ltd
>
>
> On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani 
> wrote:
>
>> Hey Team,
>>
>> *Problem*
>> Recently, we were trying to upgrade Flink infrastructure to version 1.9.1
>> and we noticed that a week old offset was consumed from Kafka even though
>> the configuration says latest.
>>
>> *Pretext*
>> 1. Our current Flink version in production is 1.2.1.
>> 2. We use RocksDB + Hadoop as our backend / checkpointing data store.
>> 3. We consume and produce messages to / from Kafka.
>>
>> *Release Plan*
>> 1. Upgrade Flink 1.2.1 to 1.3.
>> 2. Upgrade Flink 1.3.3 to 1.9.1
>> Note: We have a transitioning version (1.3.3) because of the
>> serialisation change in checkpointing.
>>
>> After performing step 1, the service was consuming latest Kafka events
>> but after performing step 2 we noticed that the service was consuming one
>> week old Kafka messages from the source topic. We did not see any
>> exceptions but since the number of messages consumed increased a lot for
>> our Flink infrastructure, our task managers started crashing eventually.
>>
>> We did not change Kafka configuration in the service for the upgrade but
>> we did upgrade the Flink dependencies for Kafka.
>>
>> Old dependency:
>>
>> 
>>>   org.apache.flink
>>>   flink-streaming-java_2.10
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-clients_2.10
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-connector-kafka-0.10_2.10
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-statebackend-rocksdb_2.10
>>>   ${flink.version}
>>> 
>>>
>>
>>
>> New dependency:
>>
>> 
>>>   org.apache.flink
>>>   flink-streaming-java_2.12
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-clients_2.12
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-connector-kafka-0.10_2.11
>>>   ${flink.version}
>>> 
>>> 
>>>   org.apache.flink
>>>   flink-statebackend-rocksdb_2.12
>>>   ${flink.version}
>>> 
>>>
>>
>>
>> Do we know why this would be happening?
>>
>> Regards,
>>
>> Somya Maithani
>> Software Developer II
>> Helpshift Pvt Ltd
>>
>
>


Re: Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

2020-01-23 Thread Somya Maithani
Hey,

Any ideas about this? We are blocked on the upgrade because we want async
timer checkpointing.

Regards,

Somya Maithani
Software Developer II
Helpshift Pvt Ltd


On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani 
wrote:

> Hey Team,
>
> *Problem*
> Recently, we were trying to upgrade Flink infrastructure to version 1.9.1
> and we noticed that a week old offset was consumed from Kafka even though
> the configuration says latest.
>
> *Pretext*
> 1. Our current Flink version in production is 1.2.1.
> 2. We use RocksDB + Hadoop as our backend / checkpointing data store.
> 3. We consume and produce messages to / from Kafka.
>
> *Release Plan*
> 1. Upgrade Flink 1.2.1 to 1.3.
> 2. Upgrade Flink 1.3.3 to 1.9.1
> Note: We have a transitioning version (1.3.3) because of the serialisation
> change in checkpointing.
>
> After performing step 1, the service was consuming latest Kafka events but
> after performing step 2 we noticed that the service was consuming one week
> old Kafka messages from the source topic. We did not see any exceptions but
> since the number of messages consumed increased a lot for our Flink
> infrastructure, our task managers started crashing eventually.
>
> We did not change Kafka configuration in the service for the upgrade but
> we did upgrade the Flink dependencies for Kafka.
>
> Old dependency:
>
> 
>>   org.apache.flink
>>   flink-streaming-java_2.10
>>   ${flink.version}
>> 
>> 
>>   org.apache.flink
>>   flink-clients_2.10
>>   ${flink.version}
>> 
>> 
>>   org.apache.flink
>>   flink-connector-kafka-0.10_2.10
>>   ${flink.version}
>> 
>> 
>>   org.apache.flink
>>   flink-statebackend-rocksdb_2.10
>>   ${flink.version}
>> 
>>
>
>
> New dependency:
>
> 
>>   org.apache.flink
>>   flink-streaming-java_2.12
>>   ${flink.version}
>> 
>> 
>>   org.apache.flink
>>   flink-clients_2.12
>>   ${flink.version}
>> 
>> 
>>   org.apache.flink
>>   flink-connector-kafka-0.10_2.11
>>   ${flink.version}
>> 
>> 
>>   org.apache.flink
>>   flink-statebackend-rocksdb_2.12
>>   ${flink.version}
>> 
>>
>
>
> Do we know why this would be happening?
>
> Regards,
>
> Somya Maithani
> Software Developer II
> Helpshift Pvt Ltd
>


Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

2020-01-16 Thread Somya Maithani
Hey Team,

*Problem*
Recently, we were trying to upgrade Flink infrastructure to version 1.9.1
and we noticed that a week old offset was consumed from Kafka even though
the configuration says latest.

*Pretext*
1. Our current Flink version in production is 1.2.1.
2. We use RocksDB + Hadoop as our backend / checkpointing data store.
3. We consume and produce messages to / from Kafka.

*Release Plan*
1. Upgrade Flink 1.2.1 to 1.3.
2. Upgrade Flink 1.3.3 to 1.9.1
Note: We have a transitioning version (1.3.3) because of the serialisation
change in checkpointing.

After performing step 1, the service was consuming latest Kafka events but
after performing step 2 we noticed that the service was consuming one week
old Kafka messages from the source topic. We did not see any exceptions but
since the number of messages consumed increased a lot for our Flink
infrastructure, our task managers started crashing eventually.

We did not change Kafka configuration in the service for the upgrade but we
did upgrade the Flink dependencies for Kafka.

Old dependency:


>   org.apache.flink
>   flink-streaming-java_2.10
>   ${flink.version}
> 
> 
>   org.apache.flink
>   flink-clients_2.10
>   ${flink.version}
> 
> 
>   org.apache.flink
>   flink-connector-kafka-0.10_2.10
>   ${flink.version}
> 
> 
>   org.apache.flink
>   flink-statebackend-rocksdb_2.10
>   ${flink.version}
> 
>


New dependency:


>   org.apache.flink
>   flink-streaming-java_2.12
>   ${flink.version}
> 
> 
>   org.apache.flink
>   flink-clients_2.12
>   ${flink.version}
> 
> 
>   org.apache.flink
>   flink-connector-kafka-0.10_2.11
>   ${flink.version}
> 
> 
>   org.apache.flink
>   flink-statebackend-rocksdb_2.12
>   ${flink.version}
> 
>


Do we know why this would be happening?

Regards,

Somya Maithani
Software Developer II
Helpshift Pvt Ltd