Hello,

We recently did a migration of our Flink jobs from version 1.9.0 to 1.14.3.
These jobs consume from Kafka and produce to respective sinks. We are using
MemoryStateBackend for our checkpointing and GCS as our remote fs. After
migration, we found a few jobs that had left join in the SQL query started
failing where their checkpoint size kept increasing. We haven't changed the
SQL Query. Following is one of the queries that have started failing with
the issue mentioned.

SELECT
table1.field1,
table2.field2,
table2.field3,
table1.rowtime as estimate_timestamp,
table2.creation_time as created_timestamp,
CAST(table2.rowtime AS TIMESTAMP)
FROM
table1
LEFT JOIN table2 ON table1.field1 = coalesce(
nullif(table2.field4, ''),
table2.field5
)
AND table2.rowtime BETWEEN table1.rowtime
AND table1.rowtime + INTERVAL '90' MINUTE
WHERE
table2.field6 IS NOT TRUE

Few data points:

   - On version 1.9.0 it was running on parallelism of 20, now it is not
   even able to run on 40.
   - On version 1.9.0 the max checkpoint size was going up to 3.5 GB during
   peak hours. Now on 1.14.3, it just keeps on increasing and goes up to 30 GB
   and eventually fails due to lack of resources.
   - Earlier in version 1.9.0, we were
   using org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer and
   now in 1.14.3 we have moved to the new Kafka Source.

Any help will be highly appreciated as these are production jobs.

Thanks
Prakhar Mathur

Reply via email to