Hello Matthias and others

I am trying to configure a Kafka Sink with SSL properties as shown further
below.

But in the logs I see warnings:

2022-03-21 12:30:17,108 WARN
 org.apache.kafka.clients.admin.AdminClientConfig             [] - The
configuration 'group.id' was supplied but isn't a known config.
2022-03-21 12:30:17,109 WARN
 org.apache.kafka.clients.admin.AdminClientConfig             [] - The
configuration 'partition.discovery.interval.ms' was supplied but isn't a
known config.
2022-03-21 12:30:17,109 WARN
 org.apache.kafka.clients.admin.AdminClientConfig             [] - The
configuration 'auto.commit.interval.ms' was supplied but isn't a known
config.
2022-03-21 12:30:17,109 WARN
 org.apache.kafka.clients.admin.AdminClientConfig             [] - The
configuration 'ssl.truststore.type' was supplied but isn't a known config.
2022-03-21 12:30:17,111 WARN
 org.apache.kafka.clients.admin.AdminClientConfig             [] - The
configuration 'ssl.truststore.location' was supplied but isn't a known
config.
2022-03-21 12:30:17,115 WARN
 org.apache.kafka.clients.admin.AdminClientConfig             [] - The
configuration 'ssl.truststore.password' was supplied but isn't a known
config.
2022-03-21 12:30:17,115 WARN
 org.apache.kafka.clients.admin.AdminClientConfig             [] - The
configuration 'auto.offset.reset' was supplied but isn't a known config.

It seems that they are bogus.

Regards Hans-Peter

Properties sinkkafkaProps  = new Properties();
sinkkafkaProps.setProperty("ssl.truststore.type", outputTrustStoreType);
sinkkafkaProps.setProperty("ssl.truststore.location", outputTrustStoreLocation);
sinkkafkaProps.setProperty("ssl.truststore.password", outputTrustStorePassword);
sinkkafkaProps.setProperty("security.protocol", outputSecurityProtocol);
sinkkafkaProps.setProperty("max.request.size", maxRequestSize);
sinkkafkaProps.setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs);
sinkkafkaProps.setProperty("commit.offsets.on.checkpoint",
commitOffsetsOnCheckpoint);

if (kafkaOutputDisabled.equals("false")) {
    KafkaSink<String> kSink = KafkaSink.<String>builder()
            .setBootstrapServers(outputBrokers)
            .setKafkaProducerConfig(sinkkafkaProps)
            .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                    .setTopic(kafkaOutputTopic)
                    .setValueSerializationSchema(new SimpleStringSchema())
                    .build()
            )
            .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .build();



Op do 17 mrt. 2022 om 17:29 schreef Matthias Pohl <matth...@ververica.com>:

> Could you share more details on what's not working? Is the
> ssl.trustore.location accessible from the Flink nodes?
>
> Matthias
>
> On Thu, Mar 17, 2022 at 4:00 PM HG <hanspeter.sl...@gmail.com> wrote:
>
>> Hi all,
>> I am probably not the smartest but I cannot find how to set
>> ssl-properties for a Kafka Sink.
>> My assumption was that it would be just like the Kafka Consumer
>>
>> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
>>         .setProperties(kafkaProps)
>>         .setProperty("ssl.truststore.type", trustStoreType)
>>         .setProperty("ssl.truststore.password", trustStorePassword)
>>         .setProperty("ssl.truststore.location", trustStoreLocation)
>>         .setProperty("security.protocol", securityProtocol)
>>         .setProperty("partition.discovery.interval.ms", 
>> partitionDiscoveryIntervalMs)
>>         .setProperty("commit.offsets.on.checkpoint", 
>> commitOffsetsOnCheckpoint)
>>         .setGroupId(inputGroupId)
>>         .setClientIdPrefix(clientId)
>>         .setTopics(kafkaInputTopic)
>>         .setDeserializer(KafkaRecordDeserializationSchema.of(new 
>> JSONKeyValueDeserializationSchema(fetchMetadata)))
>>         
>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>>         .build();
>>
>>
>> But that seems not to be the case.
>>
>> Any quick pointers?
>>
>> Regards Hans-Peter
>>
>

Reply via email to