Re: Flink Kinesis Producer con't connect with AWS credentials
Hi Daniel, I'm assuming you already looked into the Flink documentation for this topic [1]? I'm gonna add Fabian to this thread. Maybe, he's able to help out here. Matthias [1] https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kinesis.html#kinesis-producer On Fri, Dec 31, 2021 at 1:06 PM Daniel Vol wrote: > Hi, > > I am trying to run a Flink on GCP with the current source and > destination on Kinesis on AWS. > I have configured the access key on AWS to be able to connect. > I am running Flink 1.12.1 > In flink I use the following code (Scala 2.12.2) > > val props = new Properties > > props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, kinesisConfig.accessKeyId.get) > > props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, > kinesisConfig.secretKey.get) > > > It works just fine to get connection to consumer, but not to producer. > > In TaskManager stdout log I see the following: > > [Window(EventTimeSessionWindows(180), EventTimeTrigger, > ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN > o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property > aws.credentials.provider.basic.secretkey ignored as there is no corresponding > set method in KinesisProducerConfiguration > [Window(EventTimeSessionWindows(180), EventTimeTrigger, > ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN > o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property > aws.region ignored as there is no corresponding set method in > KinesisProducerConfiguration > [Window(EventTimeSessionWindows(180), EventTimeTrigger, > ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN > o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property > aws.credentials.provider.basic.accesskeyid ignored as there is no > corresponding set method in KinesisProducerConfiguration > > Then I have tried a different approach: to create AWSCredentialsProvider > object with key + secret and add it by: > > (as it have setCredentialsProvider method) > > class CredentialsProvider(config: KinesisConfig) extends > AWSCredentialsProvider with Serializable { > override def getCredentials: AWSCredentials = > new BasicAWSCredentials(config.accessKeyId.get, config.secretKey.get) > > override def refresh(): Unit = {} > } > > val credentialsProvider = new CredentialsProvider(kinesisConfig) > > producerConfig.put("CredentialsProvider", credentialsProvider) > > But then I get different exceptions that the process can't find access_key > and secret key. > > [kpl-daemon-] ERROR > o.a.f.k.s.c.a.services.kinesis.producer.KinesisProducer - Error in child > process > java.lang.RuntimeException: Error running child process > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:513) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.access$200(Daemon.java:63) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:135) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.kinesis.shaded.com.amazonaws.SdkClientException: > Unable to load AWS credentials from any provider in the chain: > [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from > environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and > AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), > SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java > system properties (aws.accessKeyId and aws.secretKey), > WebIdentityTokenCredentialsProvider: You must specify a value for roleArn and > roleSessionName, > org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.ProfileCredentialsProvider@4ec6449f: > profile file cannot be null, > org.apache.flink.kinesis.shaded.com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@2a5774f5: > The requested metadata is not found at > http://169.254.169.254/latest/meta-data/iam/security-credentials/] > > It tries to get either from env or java env. > > So I tried to add those as following: > > AWS_ACCESS_KEY_ID=xx AWS_SECRET_ACCESS_KEY=xx flink run [options] app.jar > [options] > > I tried > > flink run [options] app.jar -DAWS_ACCESS_KEY_ID=xx -DAWS_SECRET_ACCESS_KEY=xx > [options] > > but neither way is not working. > > Any idea how I am going to solve it? > >
Re: Flink Kinesis Producer con't connect with AWS credentials
I definitely do, and you can see in my initial post that this is the first thing I tried but I got warnings and it doesn't use credentials I supplied. Though you are right that I do find a solution - using credentialProvider object and injecting keys as a java env variables through: -yd "env.java.opts.taskmanager=-Daws.secretKey=xxx -Daws.accessKeyId=xxx" -yd "env.java.opts.jobmanager=-Daws.secretKey=xxx -Daws.accessKeyId=xxx" Though I do expect from producer to be able to get parameters as per documentation (exactly as consumer do) so probably it is a good idea to open a ticket for this behavior: val props = new Properties props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, kinesisConfig.accessKeyId.get) props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, kinesisConfig.secretKey.get) [Window(EventTimeSessionWindows(180), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property aws.credentials.provider.basic.secretkey ignored as there is no corresponding set method in KinesisProducerConfiguration [Window(EventTimeSessionWindows(180), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property aws.region ignored as there is no corresponding set method in KinesisProducerConfiguration [Window(EventTimeSessionWindows(180), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property aws.credentials.provider.basic.accesskeyid ignored as there is no corresponding set method in KinesisProducerConfiguration On Mon, Jan 3, 2022 at 5:34 PM Matthias Pohl wrote: > Hi Daniel, > I'm assuming you already looked into the Flink documentation for this > topic [1]? I'm gonna add Fabian to this thread. Maybe, he's able to help > out here. > > Matthias > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kinesis.html#kinesis-producer > > On Fri, Dec 31, 2021 at 1:06 PM Daniel Vol wrote: > >> Hi, >> >> I am trying to run a Flink on GCP with the current source and >> destination on Kinesis on AWS. >> I have configured the access key on AWS to be able to connect. >> I am running Flink 1.12.1 >> In flink I use the following code (Scala 2.12.2) >> >> val props = new Properties >> >> props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, >> kinesisConfig.accessKeyId.get) >> >> props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, >> kinesisConfig.secretKey.get) >> >> >> It works just fine to get connection to consumer, but not to producer. >> >> In TaskManager stdout log I see the following: >> >> [Window(EventTimeSessionWindows(180), EventTimeTrigger, >> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN >> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property >> aws.credentials.provider.basic.secretkey ignored as there is no >> corresponding set method in KinesisProducerConfiguration >> [Window(EventTimeSessionWindows(180), EventTimeTrigger, >> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN >> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property >> aws.region ignored as there is no corresponding set method in >> KinesisProducerConfiguration >> [Window(EventTimeSessionWindows(180), EventTimeTrigger, >> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN >> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property >> aws.credentials.provider.basic.accesskeyid ignored as there is no >> corresponding set method in KinesisProducerConfiguration >> >> Then I have tried a different approach: to create AWSCredentialsProvider >> object with key + secret and add it by: >> >> (as it have setCredentialsProvider method) >> >> class CredentialsProvider(config: KinesisConfig) extends >> AWSCredentialsProvider with Serializable { >> override def getCredentials: AWSCredentials = >> new BasicAWSCredentials(config.accessKeyId.get, config.secretKey.get) >> >> override def refresh(): Unit = {} >> } >> >> val credentialsProvider = new CredentialsProvider(kinesisConfig) >> >> producerConfig.put("CredentialsProvider", credentialsProvider) >> >> But then I get different exceptions that the process can't find access_key >> and secret key. >> >> [kpl-daemon-] ERROR >> o.a.f.k.s.c.a.services.kinesis.producer.KinesisProducer - Error in child >> process >> java.lang.RuntimeException: Error running child process >> at >> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533) >> at >> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:513) >> at >> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.access$200(Daemon.java:63) >> at >> org.apache.flink.kinesis.shaded.com.amazon
Re: Flink Kinesis Producer con't connect with AWS credentials
I'm adding Danny to this thread. He might be able to help on this topic. Best, Matthias On Mon, Jan 3, 2022 at 4:57 PM Daniel Vol wrote: > I definitely do, and you can see in my initial post that this is the first > thing I tried but I got warnings and it doesn't use credentials I supplied. > Though you are right that I do find a solution - using credentialProvider > object and injecting keys as a java env variables through: > -yd "env.java.opts.taskmanager=-Daws.secretKey=xxx -Daws.accessKeyId=xxx" > -yd "env.java.opts.jobmanager=-Daws.secretKey=xxx -Daws.accessKeyId=xxx" > > Though I do expect from producer to be able to get parameters as per > documentation (exactly as consumer do) so probably it is a good idea to > open a ticket for this behavior: > > val props = new Properties > > props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, kinesisConfig.accessKeyId.get) > > props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, > kinesisConfig.secretKey.get) > > [Window(EventTimeSessionWindows(180), EventTimeTrigger, > ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN > o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property > aws.credentials.provider.basic.secretkey ignored as there is no > corresponding set method in KinesisProducerConfiguration > [Window(EventTimeSessionWindows(180), EventTimeTrigger, > ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN > o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property > aws.region ignored as there is no corresponding set method in > KinesisProducerConfiguration > [Window(EventTimeSessionWindows(180), EventTimeTrigger, > ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN > o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property > aws.credentials.provider.basic.accesskeyid ignored as there is no > corresponding set method in KinesisProducerConfiguration > > On Mon, Jan 3, 2022 at 5:34 PM Matthias Pohl > wrote: > >> Hi Daniel, >> I'm assuming you already looked into the Flink documentation for this >> topic [1]? I'm gonna add Fabian to this thread. Maybe, he's able to help >> out here. >> >> Matthias >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kinesis.html#kinesis-producer >> >> On Fri, Dec 31, 2021 at 1:06 PM Daniel Vol wrote: >> >>> Hi, >>> >>> I am trying to run a Flink on GCP with the current source and >>> destination on Kinesis on AWS. >>> I have configured the access key on AWS to be able to connect. >>> I am running Flink 1.12.1 >>> In flink I use the following code (Scala 2.12.2) >>> >>> val props = new Properties >>> >>> props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, >>> kinesisConfig.accessKeyId.get) >>> >>> props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, >>> kinesisConfig.secretKey.get) >>> >>> >>> It works just fine to get connection to consumer, but not to producer. >>> >>> In TaskManager stdout log I see the following: >>> >>> [Window(EventTimeSessionWindows(180), EventTimeTrigger, >>> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN >>> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property >>> aws.credentials.provider.basic.secretkey ignored as there is no >>> corresponding set method in KinesisProducerConfiguration >>> [Window(EventTimeSessionWindows(180), EventTimeTrigger, >>> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN >>> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property >>> aws.region ignored as there is no corresponding set method in >>> KinesisProducerConfiguration >>> [Window(EventTimeSessionWindows(180), EventTimeTrigger, >>> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN >>> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property >>> aws.credentials.provider.basic.accesskeyid ignored as there is no >>> corresponding set method in KinesisProducerConfiguration >>> >>> Then I have tried a different approach: to create AWSCredentialsProvider >>> object with key + secret and add it by: >>> >>> (as it have setCredentialsProvider method) >>> >>> class CredentialsProvider(config: KinesisConfig) extends >>> AWSCredentialsProvider with Serializable { >>> override def getCredentials: AWSCredentials = >>> new BasicAWSCredentials(config.accessKeyId.get, config.secretKey.get) >>> >>> override def refresh(): Unit = {} >>> } >>> >>> val credentialsProvider = new CredentialsProvider(kinesisConfig) >>> >>> producerConfig.put("CredentialsProvider", credentialsProvider) >>> >>> But then I get different exceptions that the process can't find access_key >>> and secret key. >>> >>> [kpl-daemon-] ERROR >>> o.a.f.k.s.c.a.services.kinesis.producer.KinesisProducer - Error in child >>> process >>> java.lang.RuntimeException: Error running child process >>> at >>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533) >>> at >