Hi Somya,

I'll have to take a closer look at the JIRA history to refresh my memory on
potential past changes that caused this.

My first suspection is this:
It is expected that the Kafka consumer will *ignore* the configured startup
position if the job was restored from a savepoint.
It will always use the offsets that were persisted at the time of the
savepoint.
Would this probably already explain what you are seeing?

What I'm not sure of yet is whether this was a behavioural change that
occurred between versions 1.2.x and 1.3.x or later versions.
I'll take a closer look once I'm back from travelling tomorrow and get back
to you on that.

Cheers,
Gordon

On Thu, Jan 23, 2020, 7:52 PM Chesnay Schepler <ches...@apache.org> wrote:

> @gordon Do you remember whether we changed any behavior of the Kafka 0.10
> consumer after 1.3.3?
>
> On 23/01/2020 12:02, Somya Maithani wrote:
>
> Hey,
>
> Any ideas about this? We are blocked on the upgrade because we want async
> timer checkpointing.
>
> Regards,
>
> Somya Maithani
> Software Developer II
> Helpshift Pvt Ltd
>
>
> On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani <somyamaithan...@gmail.com>
> wrote:
>
>> Hey Team,
>>
>> *Problem*
>> Recently, we were trying to upgrade Flink infrastructure to version 1.9.1
>> and we noticed that a week old offset was consumed from Kafka even though
>> the configuration says latest.
>>
>> *Pretext*
>> 1. Our current Flink version in production is 1.2.1.
>> 2. We use RocksDB + Hadoop as our backend / checkpointing data store.
>> 3. We consume and produce messages to / from Kafka.
>>
>> *Release Plan*
>> 1. Upgrade Flink 1.2.1 to 1.3.
>> 2. Upgrade Flink 1.3.3 to 1.9.1
>> Note: We have a transitioning version (1.3.3) because of the
>> serialisation change in checkpointing.
>>
>> After performing step 1, the service was consuming latest Kafka events
>> but after performing step 2 we noticed that the service was consuming one
>> week old Kafka messages from the source topic. We did not see any
>> exceptions but since the number of messages consumed increased a lot for
>> our Flink infrastructure, our task managers started crashing eventually.
>>
>> We did not change Kafka configuration in the service for the upgrade but
>> we did upgrade the Flink dependencies for Kafka.
>>
>> Old dependency:
>>
>> <dependency>
>>>       <groupId>org.apache.flink</groupId>
>>>       <artifactId>flink-streaming-java_2.10</artifactId>
>>>       <version>${flink.version}</version>
>>>     </dependency>
>>>     <dependency>
>>>       <groupId>org.apache.flink</groupId>
>>>       <artifactId>flink-clients_2.10</artifactId>
>>>       <version>${flink.version}</version>
>>>     </dependency>
>>>     <dependency>
>>>       <groupId>org.apache.flink</groupId>
>>>       <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
>>>       <version>${flink.version}</version>
>>>     </dependency>
>>>     <dependency>
>>>       <groupId>org.apache.flink</groupId>
>>>       <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
>>>       <version>${flink.version}</version>
>>>     </dependency>
>>>
>>
>>
>> New dependency:
>>
>>     <dependency>
>>>       <groupId>org.apache.flink</groupId>
>>>       <artifactId>flink-streaming-java_2.12</artifactId>
>>>       <version>${flink.version}</version>
>>>     </dependency>
>>>     <dependency>
>>>       <groupId>org.apache.flink</groupId>
>>>       <artifactId>flink-clients_2.12</artifactId>
>>>       <version>${flink.version}</version>
>>>     </dependency>
>>>     <dependency>
>>>       <groupId>org.apache.flink</groupId>
>>>       <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
>>>       <version>${flink.version}</version>
>>>     </dependency>
>>>     <dependency>
>>>       <groupId>org.apache.flink</groupId>
>>>       <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
>>>       <version>${flink.version}</version>
>>>     </dependency>
>>>
>>
>>
>> Do we know why this would be happening?
>>
>> Regards,
>>
>> Somya Maithani
>> Software Developer II
>> Helpshift Pvt Ltd
>>
>
>

Reply via email to