Dear Flink developers,

Wanted to check, if there is a way to control the parallelism of
auto-generated Flink operators of the FlinkSQL job graph?

In Java API, it is possible to have full control of the parallelism of each
operator.

On FlinkSQL some source and sink connectors support `source.parallelism`
and `sink.parallelism`, and the rest can be set via `default.parallelism`.

In this particular scenario, enchancedEvents gets chained to the
KafkaSource operator, it can be separated by calling disableChain() on
KafkaSource  stream on Kafka connector side, but even with disabled
chaining on the source stream, `enhancedEvents` operator parallelism is
still set to 5 (same as Kafka Source operator parallelism), instead of 3
(which is default parallelism) :

```sql
SET 'parallelism.default' = '3';

CREATE TABLE input_kafka_table
(
    ...
    ts AS TO_TIMESTAMP_LTZ(CAST(`timestamp` AS BIGINT),3),
    WATERMARK FOR ts AS ts - INTERVAL '1' MINUTE
) WITH (
    'connector' = 'kafka',
    'source.parallelism' = '5' // this is supported by cutomization of
kafka connector
    ...
);

CREATE TEMPORARY VIEW enhancedEvents AS (
     SELECT x, y
     FROM input_kafka_table, LATERAL TABLE(udf.doIt(x, y)
);

CREATE TABLE other_table_source (...) WITH(...);
CREATE TABLE other_table_sink (...) WITH(...);

BEGIN STATEMENT SET;
 INSERT into enhancedEventsSink (Select * from enhancedEvents);
 INSERT into other_table_sink (Select z from other_table_source );
END;
```

Is there a way to force override parallelism of auto-generated operators
for FlinkSQL pipeline?

Or is this expected behavior of some operator's parallelism not assigned
from default parallelism but from another operator's parallelism?

Want to understand if this is a bug or intended behavior.

Thank you.

Reply via email to