Hello, We recently did a migration of our Flink jobs from version 1.9.0 to 1.14.3. These jobs consume from Kafka and produce to respective sinks. We are using MemoryStateBackend for our checkpointing and GCS as our remote fs. After migration, we found a few jobs that had left join in the SQL query started failing where their checkpoint size kept increasing. We haven't changed the SQL Query. Following is one of the queries that have started failing with the issue mentioned.
SELECT table1.field1, table2.field2, table2.field3, table1.rowtime as estimate_timestamp, table2.creation_time as created_timestamp, CAST(table2.rowtime AS TIMESTAMP) FROM table1 LEFT JOIN table2 ON table1.field1 = coalesce( nullif(table2.field4, ''), table2.field5 ) AND table2.rowtime BETWEEN table1.rowtime AND table1.rowtime + INTERVAL '90' MINUTE WHERE table2.field6 IS NOT TRUE Few data points: - On version 1.9.0 it was running on parallelism of 20, now it is not even able to run on 40. - On version 1.9.0 the max checkpoint size was going up to 3.5 GB during peak hours. Now on 1.14.3, it just keeps on increasing and goes up to 30 GB and eventually fails due to lack of resources. - Earlier in version 1.9.0, we were using org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer and now in 1.14.3 we have moved to the new Kafka Source. Any help will be highly appreciated as these are production jobs. Thanks Prakhar Mathur