Efrat Levitan created FLINK-38862:
-------------------------------------

             Summary: Partition discovery interval not configurable in upsert 
mode (tableAPI)
                 Key: FLINK-38862
                 URL: https://issues.apache.org/jira/browse/FLINK-38862
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: kafka-4.0.1
         Environment:  

 
            Reporter: Efrat Levitan


While it is possible to pass `scan.topic-partition-discovery.interval` to table 
API connector in append mode, (connector=kafka), upsert mode will error out:

```

Caused by: org.apache.flink.table.api.ValidationException: Unsupported options 
found for 'upsert-kafka'.

Unsupported options:

scan.topic-partition-discovery.interval

Supported options:

connector

key.fields-prefix

key.format

key.test-format.changelog-mode

key.test-format.delimiter

key.test-format.deprecated-delimiter (deprecated)

key.test-format.fail-on-missing

key.test-format.fallback-fail-on-missing

key.test-format.readable-metadata

properties.bootstrap.servers

property-version

scan.bounded.mode

scan.bounded.specific-offsets

scan.bounded.timestamp-millis

scan.parallelism

scan.watermark.alignment.group

scan.watermark.alignment.max-drift

scan.watermark.alignment.update-interval

scan.watermark.emit.strategy

scan.watermark.idle-timeout

sink.buffer-flush.interval

sink.buffer-flush.max-rows

sink.delivery-guarantee

sink.parallelism

sink.transaction-naming-strategy

sink.transactional-id-prefix

topic

topic-pattern

value.fields-include

value.format

value.test-format.changelog-mode

value.test-format.delimiter

value.test-format.deprecated-delimiter (deprecated)

value.test-format.fail-on-missing

value.test-format.fallback-fail-on-missing

value.test-format.readable-metadata

```

However it doesn't mean partition discovery isn't enabled for upsert mode, only 
that it [falls 
back|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java#L166-L170]
 to the default config ([currently 
5m|https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceOptions.java#L42]).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to