@gordon Do you remember whether we changed any behavior of the Kafka 0.10 consumer after 1.3.3?

On 23/01/2020 12:02, Somya Maithani wrote:
Hey,

Any ideas about this? We are blocked on the upgrade because we want async timer checkpointing.

Regards,

Somya Maithani
Software Developer II
Helpshift Pvt Ltd


On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani <somyamaithan...@gmail.com <mailto:somyamaithan...@gmail.com>> wrote:

    Hey Team,

    *Problem*
    Recently, we were trying to upgrade Flink infrastructure to
    version 1.9.1 and we noticed that a week old offset was consumed
    from Kafka even though the configuration says latest.

    *Pretext*
    1. Our current Flink version in production is 1.2.1.
    2. We use RocksDB + Hadoop as our backend / checkpointing data store.
    3. We consume and produce messages to / from Kafka.
    *
    *
    *Release Plan*
    1. Upgrade Flink 1.2.1 to 1.3.
    2. Upgrade Flink 1.3.3 to 1.9.1
    Note: We have a transitioning version (1.3.3) because of the
    serialisation change in checkpointing.

    After performing step 1, the service was consuming latest Kafka
    events but after performing step 2 we noticed that the service was
    consuming one week old Kafka messages from the source topic. We
    did not see any exceptions but since the number of messages
    consumed increased a lot for our Flink infrastructure, our task
    managers started crashing eventually.

    We did not change Kafka configuration in the service for the
    upgrade but we did upgrade the Flink dependencies for Kafka.

    Old dependency:

        <dependency>
              <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.10</artifactId>
              <version>${flink.version}</version>
            </dependency>
            <dependency>
              <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
              <version>${flink.version}</version>
            </dependency>
            <dependency>
              <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
              <version>${flink.version}</version>
            </dependency>
            <dependency>
              <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
              <version>${flink.version}</version>
            </dependency>



    New dependency:

        <dependency>
              <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
              <version>${flink.version}</version>
            </dependency>
            <dependency>
              <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
              <version>${flink.version}</version>
            </dependency>
            <dependency>
              <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
              <version>${flink.version}</version>
            </dependency>
            <dependency>
              <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
              <version>${flink.version}</version>
            </dependency>


    Do we know why this would be happening?

    Regards,

    Somya Maithani
    Software Developer II
    Helpshift Pvt Ltd


Reply via email to