Hi Matthias,
It should be probably be like this:
Properties SinkkafkaProps = new Properties();
SinkkafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
outputBrokers);
SinkkafkaProps.setProperty("ssl.truststore.type", trustStoreType);
SinkkafkaProps.setProperty("ssl.truststore.location", trustStoreLocation);
SinkkafkaProps.setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs);
SinkkafkaProps.setProperty("commit.offsets.on.checkpoint",
commitOffsetsOnCheckpoint);
KafkaSink<String> kSink = KafkaSink.<String>builder()
.setBootstrapServers(outputBrokers)
.setKafkaProducerConfig(SinkkafkaProps)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(kafkaOutputTopic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
Op do 17 mrt. 2022 om 17:29 schreef Matthias Pohl <[email protected]>:
> Could you share more details on what's not working? Is the
> ssl.trustore.location accessible from the Flink nodes?
>
> Matthias
>
> On Thu, Mar 17, 2022 at 4:00 PM HG <[email protected]> wrote:
>
>> Hi all,
>> I am probably not the smartest but I cannot find how to set
>> ssl-properties for a Kafka Sink.
>> My assumption was that it would be just like the Kafka Consumer
>>
>> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
>> .setProperties(kafkaProps)
>> .setProperty("ssl.truststore.type", trustStoreType)
>> .setProperty("ssl.truststore.password", trustStorePassword)
>> .setProperty("ssl.truststore.location", trustStoreLocation)
>> .setProperty("security.protocol", securityProtocol)
>> .setProperty("partition.discovery.interval.ms",
>> partitionDiscoveryIntervalMs)
>> .setProperty("commit.offsets.on.checkpoint",
>> commitOffsetsOnCheckpoint)
>> .setGroupId(inputGroupId)
>> .setClientIdPrefix(clientId)
>> .setTopics(kafkaInputTopic)
>> .setDeserializer(KafkaRecordDeserializationSchema.of(new
>> JSONKeyValueDeserializationSchema(fetchMetadata)))
>>
>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>> .build();
>>
>>
>> But that seems not to be the case.
>>
>> Any quick pointers?
>>
>> Regards Hans-Peter
>>
>