I've overridden v1.11.3 FlinkKafkaConsumer's 'open' method in order to set
TLS configuration for kafka from the task manager node where the kafka
consumer is running (TLS configuration differs between job manager and each
task manager in our environment, which is why we use 'open' vs. setting
configuration on construct).

However, when I do the same for FlinkKafkaProducer, I've noticed that
'initializeState', which creates a kafka producer, is called prior to
calling 'open'. This causes an exception since a kafka producer is created
with the wrong configuration (i.e. the config from the job manager where
FlinkKafkaProducer was constructed).

What is the appropriate way to set up configuration for a
FlinkKafkaProducer, allowing me to read config from the taskManager node
where the producer is executing? I can override both 'open' and
'initializeState' to set up config; this solution works, but is there a
better alternative (e.g. 'createProducer', etc.)? What about v1.4.x
KafkaSource/KafkaSink?

Thanks,
Darius

Reply via email to