Hello Matthias and others
I am trying to configure a Kafka Sink with SSL properties as shown further
below.
But in the logs I see warnings:
2022-03-21 12:30:17,108 WARN
org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'group.id' was supplied but isn't a known config.
2022-03-21 12:30:17,109 WARN
org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'partition.discovery.interval.ms' was supplied but isn't a
known config.
2022-03-21 12:30:17,109 WARN
org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'auto.commit.interval.ms' was supplied but isn't a known
config.
2022-03-21 12:30:17,109 WARN
org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'ssl.truststore.type' was supplied but isn't a known config.
2022-03-21 12:30:17,111 WARN
org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'ssl.truststore.location' was supplied but isn't a known
config.
2022-03-21 12:30:17,115 WARN
org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'ssl.truststore.password' was supplied but isn't a known
config.
2022-03-21 12:30:17,115 WARN
org.apache.kafka.clients.admin.AdminClientConfig [] - The
configuration 'auto.offset.reset' was supplied but isn't a known config.
It seems that they are bogus.
Regards Hans-Peter
Properties sinkkafkaProps = new Properties();
sinkkafkaProps.setProperty("ssl.truststore.type", outputTrustStoreType);
sinkkafkaProps.setProperty("ssl.truststore.location", outputTrustStoreLocation);
sinkkafkaProps.setProperty("ssl.truststore.password", outputTrustStorePassword);
sinkkafkaProps.setProperty("security.protocol", outputSecurityProtocol);
sinkkafkaProps.setProperty("max.request.size", maxRequestSize);
sinkkafkaProps.setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs);
sinkkafkaProps.setProperty("commit.offsets.on.checkpoint",
commitOffsetsOnCheckpoint);
if (kafkaOutputDisabled.equals("false")) {
KafkaSink<String> kSink = KafkaSink.<String>builder()
.setBootstrapServers(outputBrokers)
.setKafkaProducerConfig(sinkkafkaProps)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(kafkaOutputTopic)
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
Op do 17 mrt. 2022 om 17:29 schreef Matthias Pohl <[email protected]>:
> Could you share more details on what's not working? Is the
> ssl.trustore.location accessible from the Flink nodes?
>
> Matthias
>
> On Thu, Mar 17, 2022 at 4:00 PM HG <[email protected]> wrote:
>
>> Hi all,
>> I am probably not the smartest but I cannot find how to set
>> ssl-properties for a Kafka Sink.
>> My assumption was that it would be just like the Kafka Consumer
>>
>> KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
>> .setProperties(kafkaProps)
>> .setProperty("ssl.truststore.type", trustStoreType)
>> .setProperty("ssl.truststore.password", trustStorePassword)
>> .setProperty("ssl.truststore.location", trustStoreLocation)
>> .setProperty("security.protocol", securityProtocol)
>> .setProperty("partition.discovery.interval.ms",
>> partitionDiscoveryIntervalMs)
>> .setProperty("commit.offsets.on.checkpoint",
>> commitOffsetsOnCheckpoint)
>> .setGroupId(inputGroupId)
>> .setClientIdPrefix(clientId)
>> .setTopics(kafkaInputTopic)
>> .setDeserializer(KafkaRecordDeserializationSchema.of(new
>> JSONKeyValueDeserializationSchema(fetchMetadata)))
>>
>> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>> .build();
>>
>>
>> But that seems not to be the case.
>>
>> Any quick pointers?
>>
>> Regards Hans-Peter
>>
>