Hey Team,

*Problem*
Recently, we were trying to upgrade Flink infrastructure to version 1.9.1
and we noticed that a week old offset was consumed from Kafka even though
the configuration says latest.

*Pretext*
1. Our current Flink version in production is 1.2.1.
2. We use RocksDB + Hadoop as our backend / checkpointing data store.
3. We consume and produce messages to / from Kafka.

*Release Plan*
1. Upgrade Flink 1.2.1 to 1.3.
2. Upgrade Flink 1.3.3 to 1.9.1
Note: We have a transitioning version (1.3.3) because of the serialisation
change in checkpointing.

After performing step 1, the service was consuming latest Kafka events but
after performing step 2 we noticed that the service was consuming one week
old Kafka messages from the source topic. We did not see any exceptions but
since the number of messages consumed increased a lot for our Flink
infrastructure, our task managers started crashing eventually.

We did not change Kafka configuration in the service for the upgrade but we
did upgrade the Flink dependencies for Kafka.

Old dependency:

<dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-streaming-java_2.10</artifactId>
>       <version>${flink.version}</version>
>     </dependency>
>     <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-clients_2.10</artifactId>
>       <version>${flink.version}</version>
>     </dependency>
>     <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
>       <version>${flink.version}</version>
>     </dependency>
>     <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
>       <version>${flink.version}</version>
>     </dependency>
>


New dependency:

    <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-streaming-java_2.12</artifactId>
>       <version>${flink.version}</version>
>     </dependency>
>     <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-clients_2.12</artifactId>
>       <version>${flink.version}</version>
>     </dependency>
>     <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
>       <version>${flink.version}</version>
>     </dependency>
>     <dependency>
>       <groupId>org.apache.flink</groupId>
>       <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
>       <version>${flink.version}</version>
>     </dependency>
>


Do we know why this would be happening?

Regards,

Somya Maithani
Software Developer II
Helpshift Pvt Ltd

Reply via email to