Hi all,
I am probably not the smartest but I cannot find how to set ssl-properties
for a Kafka Sink.
My assumption was that it would be just like the Kafka Consumer
KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type", trustStoreType)
.setProperty("ssl.truststore.password", trustStorePassword)
.setProperty("ssl.truststore.location", trustStoreLocation)
.setProperty("security.protocol", securityProtocol)
.setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
.setGroupId(inputGroupId)
.setClientIdPrefix(clientId)
.setTopics(kafkaInputTopic)
.setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();
But that seems not to be the case.
Any quick pointers?
Regards Hans-Peter