Glad to hear that you worked it out.

Indeed, the path has to be accessible by the worker nodes. A common
solution is also to put it on some DFS like HDFS and reference that. Then
you only need to update one file if the key changes.

On Thu, Nov 19, 2020 at 2:14 AM Fanbin Bu <fanbin...@coinbase.com> wrote:

> i have to put the keystore file to the nodes.
>
> On Wed, Nov 18, 2020 at 4:29 PM Fanbin Bu <fanbin...@coinbase.com> wrote:
>
>> Hi,
>>
>> This is a repost with modified subject per Sri Tummala's suggestion.
>>
>> I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I
>> tried to put keystore.jks location under /usr/lib/flink/... like:
>>
>> export
>> SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>>
>> Notice that this is on EMR master(master) node. Both JM and TMs are on
>> EMR core(slave) nodes.
>>
>> Here is the code snippet:
>>
>> val stmt = s"""
>>   |create table ${table.name} (${schema}, ${watermark})
>>   |with(
>>   |'connector' = 'kafka',
>>   |'topic' = '${table.topic}',
>>   |'scan.startup.mode'= '${table.scanStartUpMode}',
>>   |'properties.zookeeper.connect'='xxx',
>>   |'properties.bootstrap.servers'='xxx',
>>  *
>> |'properties.ssl.keystore.location'='/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks',*
>>   |'properties.ssl.keystore.password'='xxx',
>>   |'properties.ssl.key.password'='xxx',
>>   |'properties.security.protocol'='SSL',
>>   |'properties.ssl.keystore.type'='JKS',
>>   |'properties.ssl.truststore.type'='JKS',
>>   |'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1',
>>   |'properties.group.id' = '${table.name}_group_id',
>>   |'format' = 'json',
>>   |'json.ignore-parse-errors' = 'true'
>>   |)
>> """.stripMargin
>>
>> tEnv.executeSql(stmt)
>>
>>
>> However, I got exception: *Caused by: java.nio.file.NoSuchFileException:
>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks*
>> even though the file is there
>> [hadoop@ip-10-200-41-39 flink]$ ll
>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>> -rw-r--r-- 1 root root 5565 Nov 17 22:24
>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>>
>> Things i tried:
>> 1. the keystore.jks file itself works since I can use console-consumer to
>> read kafka topics on EMR master.
>> 2. set the location to be s3://my-bucket/keystore.jks, not working
>>
>> What value should I set the keystore location to?
>> Thanks!
>> Fanbin
>>
>> Also attached the full exception log:
>>
>> 2020-11-17 09:35:49
>> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.kafka.common.KafkaException:
>> org.apache.kafka.common.KafkaException:
>> org.apache.kafka.common.KafkaException: Failed to load SSL keystore
>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>> of type JKS
>> at
>> org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:71)
>> at
>> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
>> at
>> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
>> at
>> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
>> ... 15 more
>> Caused by: org.apache.kafka.common.KafkaException:
>> org.apache.kafka.common.KafkaException: Failed to load SSL keystore
>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>> of type JKS
>> at
>> org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163)
>> at
>> org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104)
>> at
>> org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
>> at
>> org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:69)
>> ... 19 more
>> Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL
>> keystore
>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>> of type JKS
>> at
>> org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
>> at
>> org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:144)
>> ... 22 more
>> *Caused by: java.nio.file.NoSuchFileException:
>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks*
>> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>> at
>> sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
>> at java.nio.file.Files.newByteChannel(Files.java:361)
>> at java.nio.file.Files.newByteChannel(Files.java:407)
>> at
>> java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
>> at java.nio.file.Files.newInputStream(Files.java:152)
>> at
>> org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285)
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to