Re: Flink Kinesis Producer con't connect with AWS credentials

2022-01-03 Thread Matthias Pohl
Hi Daniel,
I'm assuming you already looked into the Flink documentation for this topic
[1]? I'm gonna add Fabian to this thread. Maybe, he's able to help out here.

Matthias

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kinesis.html#kinesis-producer

On Fri, Dec 31, 2021 at 1:06 PM Daniel Vol  wrote:

> Hi,
>
> I am trying to run a Flink on GCP with the current source and
> destination on Kinesis on AWS.
> I have configured the access key on AWS to be able to connect.
> I am running Flink 1.12.1
> In flink I use the following code (Scala 2.12.2)
>
> val props = new Properties
>
> props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, kinesisConfig.accessKeyId.get)
>
> props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> kinesisConfig.secretKey.get)
>
>
> It works just fine to get connection to consumer, but not to producer.
>
> In TaskManager stdout log I see the following:
>
> [Window(EventTimeSessionWindows(180), EventTimeTrigger, 
> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN  
> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration  - Property 
> aws.credentials.provider.basic.secretkey ignored as there is no corresponding 
> set method in KinesisProducerConfiguration
> [Window(EventTimeSessionWindows(180), EventTimeTrigger, 
> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN  
> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration  - Property 
> aws.region ignored as there is no corresponding set method in 
> KinesisProducerConfiguration
> [Window(EventTimeSessionWindows(180), EventTimeTrigger, 
> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN  
> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration  - Property 
> aws.credentials.provider.basic.accesskeyid ignored as there is no 
> corresponding set method in KinesisProducerConfiguration
>
> Then I have tried a different approach: to create AWSCredentialsProvider 
> object with key + secret and add it by:
>
> (as it have setCredentialsProvider method)
>
> class CredentialsProvider(config: KinesisConfig) extends 
> AWSCredentialsProvider with Serializable {
>   override def getCredentials: AWSCredentials =
> new BasicAWSCredentials(config.accessKeyId.get, config.secretKey.get)
>
>   override def refresh(): Unit = {}
> }
>
> val credentialsProvider = new CredentialsProvider(kinesisConfig)
>
> producerConfig.put("CredentialsProvider", credentialsProvider)
>
> But then I get different exceptions that the process can't find access_key 
> and secret key.
>
> [kpl-daemon-] ERROR 
> o.a.f.k.s.c.a.services.kinesis.producer.KinesisProducer  - Error in child 
> process
> java.lang.RuntimeException: Error running child process
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:513)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.access$200(Daemon.java:63)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:135)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.kinesis.shaded.com.amazonaws.SdkClientException: 
> Unable to load AWS credentials from any provider in the chain: 
> [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from 
> environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and 
> AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), 
> SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java 
> system properties (aws.accessKeyId and aws.secretKey), 
> WebIdentityTokenCredentialsProvider: You must specify a value for roleArn and 
> roleSessionName, 
> org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.ProfileCredentialsProvider@4ec6449f:
>  profile file cannot be null, 
> org.apache.flink.kinesis.shaded.com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@2a5774f5:
>  The requested metadata is not found at 
> http://169.254.169.254/latest/meta-data/iam/security-credentials/]
>
> It tries to get either from env or java env.
>
> So I tried to add those as following:
>
> AWS_ACCESS_KEY_ID=xx AWS_SECRET_ACCESS_KEY=xx flink run [options] app.jar 
> [options]
>
> I tried
>
> flink run [options] app.jar -DAWS_ACCESS_KEY_ID=xx -DAWS_SECRET_ACCESS_KEY=xx 
> [options]
>
> but neither way is not working.
>
> Any idea how I am going to solve it?
>
>


Re: Flink Kinesis Producer con't connect with AWS credentials

2022-01-03 Thread Daniel Vol
I definitely do, and you can see in my initial post that this is the first
thing I tried but I got warnings and it doesn't use credentials I supplied.
Though you are right that I do find a solution - using credentialProvider
object and injecting keys as a java env variables through:
-yd "env.java.opts.taskmanager=-Daws.secretKey=xxx -Daws.accessKeyId=xxx"
-yd "env.java.opts.jobmanager=-Daws.secretKey=xxx -Daws.accessKeyId=xxx"

Though I do expect from producer to be able to get parameters as per
documentation (exactly as consumer do) so probably it is a good idea to
open a ticket for this behavior:

val props = new Properties

props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, kinesisConfig.accessKeyId.get)

props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, kinesisConfig.secretKey.get)

[Window(EventTimeSessionWindows(180), EventTimeTrigger,
ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN
o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property
aws.credentials.provider.basic.secretkey ignored as there is no
corresponding set method in KinesisProducerConfiguration
[Window(EventTimeSessionWindows(180), EventTimeTrigger,
ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN
o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property
aws.region ignored as there is no corresponding set method in
KinesisProducerConfiguration
[Window(EventTimeSessionWindows(180), EventTimeTrigger,
ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN
o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property
aws.credentials.provider.basic.accesskeyid ignored as there is no
corresponding set method in KinesisProducerConfiguration

On Mon, Jan 3, 2022 at 5:34 PM Matthias Pohl  wrote:

> Hi Daniel,
> I'm assuming you already looked into the Flink documentation for this
> topic [1]? I'm gonna add Fabian to this thread. Maybe, he's able to help
> out here.
>
> Matthias
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kinesis.html#kinesis-producer
>
> On Fri, Dec 31, 2021 at 1:06 PM Daniel Vol  wrote:
>
>> Hi,
>>
>> I am trying to run a Flink on GCP with the current source and
>> destination on Kinesis on AWS.
>> I have configured the access key on AWS to be able to connect.
>> I am running Flink 1.12.1
>> In flink I use the following code (Scala 2.12.2)
>>
>> val props = new Properties
>>
>> props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
>> kinesisConfig.accessKeyId.get)
>>
>> props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
>> kinesisConfig.secretKey.get)
>>
>>
>> It works just fine to get connection to consumer, but not to producer.
>>
>> In TaskManager stdout log I see the following:
>>
>> [Window(EventTimeSessionWindows(180), EventTimeTrigger, 
>> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN  
>> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration  - Property 
>> aws.credentials.provider.basic.secretkey ignored as there is no 
>> corresponding set method in KinesisProducerConfiguration
>> [Window(EventTimeSessionWindows(180), EventTimeTrigger, 
>> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN  
>> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration  - Property 
>> aws.region ignored as there is no corresponding set method in 
>> KinesisProducerConfiguration
>> [Window(EventTimeSessionWindows(180), EventTimeTrigger, 
>> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN  
>> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration  - Property 
>> aws.credentials.provider.basic.accesskeyid ignored as there is no 
>> corresponding set method in KinesisProducerConfiguration
>>
>> Then I have tried a different approach: to create AWSCredentialsProvider 
>> object with key + secret and add it by:
>>
>> (as it have setCredentialsProvider method)
>>
>> class CredentialsProvider(config: KinesisConfig) extends 
>> AWSCredentialsProvider with Serializable {
>>   override def getCredentials: AWSCredentials =
>> new BasicAWSCredentials(config.accessKeyId.get, config.secretKey.get)
>>
>>   override def refresh(): Unit = {}
>> }
>>
>> val credentialsProvider = new CredentialsProvider(kinesisConfig)
>>
>> producerConfig.put("CredentialsProvider", credentialsProvider)
>>
>> But then I get different exceptions that the process can't find access_key 
>> and secret key.
>>
>> [kpl-daemon-] ERROR 
>> o.a.f.k.s.c.a.services.kinesis.producer.KinesisProducer  - Error in child 
>> process
>> java.lang.RuntimeException: Error running child process
>> at 
>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533)
>> at 
>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:513)
>> at 
>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.access$200(Daemon.java:63)
>> at 
>> org.apache.flink.kinesis.shaded.com.amazon

Re: Flink Kinesis Producer con't connect with AWS credentials

2022-01-07 Thread Matthias Pohl
I'm adding Danny to this thread. He might be able to help on this topic.

Best,
Matthias

On Mon, Jan 3, 2022 at 4:57 PM Daniel Vol  wrote:

> I definitely do, and you can see in my initial post that this is the first
> thing I tried but I got warnings and it doesn't use credentials I supplied.
> Though you are right that I do find a solution - using credentialProvider
> object and injecting keys as a java env variables through:
> -yd "env.java.opts.taskmanager=-Daws.secretKey=xxx -Daws.accessKeyId=xxx"
> -yd "env.java.opts.jobmanager=-Daws.secretKey=xxx -Daws.accessKeyId=xxx"
>
> Though I do expect from producer to be able to get parameters as per
> documentation (exactly as consumer do) so probably it is a good idea to
> open a ticket for this behavior:
>
> val props = new Properties
>
> props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, kinesisConfig.accessKeyId.get)
>
> props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> kinesisConfig.secretKey.get)
>
> [Window(EventTimeSessionWindows(180), EventTimeTrigger,
> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN
> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property
> aws.credentials.provider.basic.secretkey ignored as there is no
> corresponding set method in KinesisProducerConfiguration
> [Window(EventTimeSessionWindows(180), EventTimeTrigger,
> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN
> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property
> aws.region ignored as there is no corresponding set method in
> KinesisProducerConfiguration
> [Window(EventTimeSessionWindows(180), EventTimeTrigger,
> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN
> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property
> aws.credentials.provider.basic.accesskeyid ignored as there is no
> corresponding set method in KinesisProducerConfiguration
>
> On Mon, Jan 3, 2022 at 5:34 PM Matthias Pohl 
> wrote:
>
>> Hi Daniel,
>> I'm assuming you already looked into the Flink documentation for this
>> topic [1]? I'm gonna add Fabian to this thread. Maybe, he's able to help
>> out here.
>>
>> Matthias
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kinesis.html#kinesis-producer
>>
>> On Fri, Dec 31, 2021 at 1:06 PM Daniel Vol  wrote:
>>
>>> Hi,
>>>
>>> I am trying to run a Flink on GCP with the current source and
>>> destination on Kinesis on AWS.
>>> I have configured the access key on AWS to be able to connect.
>>> I am running Flink 1.12.1
>>> In flink I use the following code (Scala 2.12.2)
>>>
>>> val props = new Properties
>>>
>>> props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
>>> kinesisConfig.accessKeyId.get)
>>>
>>> props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
>>> kinesisConfig.secretKey.get)
>>>
>>>
>>> It works just fine to get connection to consumer, but not to producer.
>>>
>>> In TaskManager stdout log I see the following:
>>>
>>> [Window(EventTimeSessionWindows(180), EventTimeTrigger, 
>>> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN  
>>> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration  - Property 
>>> aws.credentials.provider.basic.secretkey ignored as there is no 
>>> corresponding set method in KinesisProducerConfiguration
>>> [Window(EventTimeSessionWindows(180), EventTimeTrigger, 
>>> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN  
>>> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration  - Property 
>>> aws.region ignored as there is no corresponding set method in 
>>> KinesisProducerConfiguration
>>> [Window(EventTimeSessionWindows(180), EventTimeTrigger, 
>>> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN  
>>> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration  - Property 
>>> aws.credentials.provider.basic.accesskeyid ignored as there is no 
>>> corresponding set method in KinesisProducerConfiguration
>>>
>>> Then I have tried a different approach: to create AWSCredentialsProvider 
>>> object with key + secret and add it by:
>>>
>>> (as it have setCredentialsProvider method)
>>>
>>> class CredentialsProvider(config: KinesisConfig) extends 
>>> AWSCredentialsProvider with Serializable {
>>>   override def getCredentials: AWSCredentials =
>>> new BasicAWSCredentials(config.accessKeyId.get, config.secretKey.get)
>>>
>>>   override def refresh(): Unit = {}
>>> }
>>>
>>> val credentialsProvider = new CredentialsProvider(kinesisConfig)
>>>
>>> producerConfig.put("CredentialsProvider", credentialsProvider)
>>>
>>> But then I get different exceptions that the process can't find access_key 
>>> and secret key.
>>>
>>> [kpl-daemon-] ERROR 
>>> o.a.f.k.s.c.a.services.kinesis.producer.KinesisProducer  - Error in child 
>>> process
>>> java.lang.RuntimeException: Error running child process
>>> at 
>>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533)
>>> at 
>