Hi all, we're running a session cluster and I submit around 20 jobs to it at the same time by creating FlinkSessionJob Kubernetes resources. After sufficient time there are 20 jobs created and running healthy. However, it appears that some jobs are started with the wrong config. As a result some jobs are created multiple times, others are missing completely. Each job runs the same logic, differing only by one argument that specifies the country code, what determines the Kafka topic to read from and the sink name.
The job code looks essentially like this: parser = argparse.ArgumentParser(description="Process some input > arguments.") > parser.add_argument("--country", required=True, help="Country code to > process") > parser.add_argument("--pyFiles", required=False, help="Python files") > args = parser.parse_args() > country = args.country > if country is None: > raise ValueError("Country argument (--country) not provided.") > t_env.execute_sql(f""" > CREATE OR REPLACE TABLE source_kafka ( > raw_payload BYTES, > data AS parse(raw_payload), > `timestamp` AS parse(raw_payload).`timestamp`, > WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '1' SECOND > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'events.{country}', > ... > ) > """) > table_result = t_env.sql_query(""" > ... > """) > topic_name = "courier_states" > sink_name = f"{topic_name}_{country}" > env.create_temporary_table(sink_name, > TableDescriptor.for_connector("kafka").schema(...).option("topic", > topic_name).option(...)) > table_result.execute_insert(sink_name).wait() The created FlinkSessionJob resources look mostly as I expect them to: Spec > Deployment Name: pyflink-streaming-job-courier-states > Job: > Args: > -py > /opt/flink/entrypoint.py > --country > hu > --pyFiles > /opt/flink > Entry Class: org.apache.flink.client.python.PythonDriver > Parallelism: 1 > State: running > Upgrade Mode: savepoint > Status: > Error: > {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.util.FlinkRuntimeException: > java.util.concurrent.Time │ > > outException","throwableList":[{"type":"org.apache.flink.util.FlinkRuntimeException","message":"java.util.concurrent.TimeoutException"},{"type":"java.util.concurrent.Timeou > │ > tException"}]} > Job Status: > Job Id: 000000000504be940000000000000001 > Job Name: > insert-into_default_catalog.default_database.courier_states_eg Only the generated "Job Name" is incorrect for some of them because the corresponding job is started with the wrong configuration, the Job Name should have the suffix "_hu". As you can see there are some Timeout Exceptions, some ReconciliationExceptions and occasionally also "RestHandlerException: File e37b911b-eec9-4e75-a6db-befc132a9c2b_empty.jar does not exist", but none of them would explain why a job is started with a wrong configuration. We're using Flink 1.17.1 and Flink Kubernetes Operator 1.8.0 (1.5.0 same issue). For context: We're having the use-case that we need to run the same Flink job for many countries. For every country there is a separate Kafka topic to read from. Every computation is separated for a country, e.g. all group by operations are grouping by country code among other grouping columns. Having one Kafka source subscribed to multiple Kafka topics (e.g. topic-pattern parameter) has issues regarding the watermarks. The topics of some countries (with less messages) are consumed much faster than other countries (with more messages). That makes all messages from countries with more messages be considered as late messages, yielding wrong window aggregation results. What we'd need is per-key watermarking, what's not supported. Also, initially I assumed that watermark alignment would be helpful here, but I didn't get it working here. Hence running a session cluster for the same Flink code with one job per country sounds like a convenient idea to me. As far as I can see this looks like a bug of the Flink Kubernetes operator to me. The only workaround I see would be to submit the jobs one by one, but that's not really feasible for 20+ different jobs. Does anyone have a good idea how to fix this?