Hi all,

we're running a session cluster and I submit around 20 jobs to it at the
same time by creating FlinkSessionJob Kubernetes resources. After
sufficient time there are 20 jobs created and running healthy. However, it
appears that some jobs are started with the wrong config. As a result some
jobs are created multiple times, others are missing completely. Each job
runs the same logic, differing only by one argument that specifies the
country code, what determines the Kafka topic to read from and the sink
name.

The job code looks essentially like this:


    parser = argparse.ArgumentParser(description="Process some input
> arguments.")
>     parser.add_argument("--country", required=True, help="Country code to
> process")
>     parser.add_argument("--pyFiles", required=False, help="Python files")
>     args = parser.parse_args()
>     country = args.country
>     if country is None:
>         raise ValueError("Country argument (--country) not provided.")
>     t_env.execute_sql(f"""
>         CREATE OR REPLACE TABLE source_kafka (
>             raw_payload BYTES,
>             data AS parse(raw_payload),
>             `timestamp` AS parse(raw_payload).`timestamp`,
>             WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '1' SECOND
>         ) WITH (
>             'connector' = 'kafka',
>             'topic' = 'events.{country}',
>             ...
>         )
>     """)
>     table_result = t_env.sql_query("""
>        ...
>     """)
>     topic_name = "courier_states"
>     sink_name = f"{topic_name}_{country}"
>     env.create_temporary_table(sink_name,
> TableDescriptor.for_connector("kafka").schema(...).option("topic",
> topic_name).option(...))
>     table_result.execute_insert(sink_name).wait()


The created FlinkSessionJob resources look mostly as I expect them to:

 Spec
>    Deployment Name:  pyflink-streaming-job-courier-states
>    Job:
>      Args:
>        -py
>        /opt/flink/entrypoint.py
>        --country
>        hu
>        --pyFiles
>        /opt/flink
>      Entry Class:   org.apache.flink.client.python.PythonDriver
>      Parallelism:   1
>      State:         running
>      Upgrade Mode:  savepoint
>  Status:
>    Error:  
> {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.util.FlinkRuntimeException:
> java.util.concurrent.Time │
>  
> outException","throwableList":[{"type":"org.apache.flink.util.FlinkRuntimeException","message":"java.util.concurrent.TimeoutException"},{"type":"java.util.concurrent.Timeou
> │
>  tException"}]}
>      Job Status:
>      Job Id:    000000000504be940000000000000001
>      Job Name:
> insert-into_default_catalog.default_database.courier_states_eg


Only the generated "Job Name" is incorrect for some of them because the
corresponding job is started with the wrong configuration, the Job Name
should have the suffix "_hu". As you can see there are some Timeout
Exceptions, some ReconciliationExceptions and occasionally also
"RestHandlerException: File e37b911b-eec9-4e75-a6db-befc132a9c2b_empty.jar
does not exist", but none of them would explain why a job is started with a
wrong configuration.

We're using Flink 1.17.1 and Flink Kubernetes Operator 1.8.0 (1.5.0 same
issue).

For context:
We're having the use-case that we need to run the same Flink job for many
countries. For every country there is a separate Kafka topic to read from.
Every computation is separated for a country, e.g. all group by operations
are grouping by country code among other grouping columns. Having one Kafka
source subscribed to multiple Kafka topics (e.g. topic-pattern parameter)
has issues regarding the watermarks. The topics of some countries (with
less messages) are consumed much faster than other countries (with more
messages). That makes all messages from countries with more messages be
considered as late messages, yielding wrong window aggregation results.
What we'd need is per-key watermarking, what's not supported. Also,
initially I assumed that watermark alignment would be helpful here, but I
didn't get it working here. Hence running a session cluster for the same
Flink code with one job per country sounds like a convenient idea to me.

As far as I can see this looks like a bug of the Flink Kubernetes operator
to me. The only workaround I see would be to submit the jobs one by one,
but that's not really feasible for 20+ different jobs.
Does anyone have a good idea how to fix this?

Reply via email to