I definitely do, and you can see in my initial post that this is the first thing I tried but I got warnings and it doesn't use credentials I supplied. Though you are right that I do find a solution - using credentialProvider object and injecting keys as a java env variables through: -yd "env.java.opts.taskmanager=-Daws.secretKey=xxx -Daws.accessKeyId=xxx" -yd "env.java.opts.jobmanager=-Daws.secretKey=xxx -Daws.accessKeyId=xxx"
Though I do expect from producer to be able to get parameters as per documentation (exactly as consumer do) so probably it is a good idea to open a ticket for this behavior: val props = new Properties props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, kinesisConfig.accessKeyId.get) props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, kinesisConfig.secretKey.get) [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property aws.credentials.provider.basic.secretkey ignored as there is no corresponding set method in KinesisProducerConfiguration [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property aws.region ignored as there is no corresponding set method in KinesisProducerConfiguration [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property aws.credentials.provider.basic.accesskeyid ignored as there is no corresponding set method in KinesisProducerConfiguration On Mon, Jan 3, 2022 at 5:34 PM Matthias Pohl <matth...@ververica.com> wrote: > Hi Daniel, > I'm assuming you already looked into the Flink documentation for this > topic [1]? I'm gonna add Fabian to this thread. Maybe, he's able to help > out here. > > Matthias > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kinesis.html#kinesis-producer > > On Fri, Dec 31, 2021 at 1:06 PM Daniel Vol <vold...@gmail.com> wrote: > >> Hi, >> >> I am trying to run a Flink on GCP with the current source and >> destination on Kinesis on AWS. >> I have configured the access key on AWS to be able to connect. >> I am running Flink 1.12.1 >> In flink I use the following code (Scala 2.12.2) >> >> val props = new Properties >> >> props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, >> kinesisConfig.accessKeyId.get) >> >> props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, >> kinesisConfig.secretKey.get) >> >> >> It works just fine to get connection to consumer, but not to producer. >> >> In TaskManager stdout log I see the following: >> >> [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, >> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN >> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property >> aws.credentials.provider.basic.secretkey ignored as there is no >> corresponding set method in KinesisProducerConfiguration >> [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, >> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN >> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property >> aws.region ignored as there is no corresponding set method in >> KinesisProducerConfiguration >> [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, >> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN >> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property >> aws.credentials.provider.basic.accesskeyid ignored as there is no >> corresponding set method in KinesisProducerConfiguration >> >> Then I have tried a different approach: to create AWSCredentialsProvider >> object with key + secret and add it by: >> >> (as it have setCredentialsProvider method) >> >> class CredentialsProvider(config: KinesisConfig) extends >> AWSCredentialsProvider with Serializable { >> override def getCredentials: AWSCredentials = >> new BasicAWSCredentials(config.accessKeyId.get, config.secretKey.get) >> >> override def refresh(): Unit = {} >> } >> >> val credentialsProvider = new CredentialsProvider(kinesisConfig) >> >> producerConfig.put("CredentialsProvider", credentialsProvider) >> >> But then I get different exceptions that the process can't find access_key >> and secret key. >> >> [kpl-daemon-0000] ERROR >> o.a.f.k.s.c.a.services.kinesis.producer.KinesisProducer - Error in child >> process >> java.lang.RuntimeException: Error running child process >> at >> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533) >> at >> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:513) >> at >> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.access$200(Daemon.java:63) >> at >> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:135) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: org.apache.flink.kinesis.shaded.com.amazonaws.SdkClientException: >> Unable to load AWS credentials from any provider in the chain: >> [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from >> environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and >> AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), >> SystemPropertiesCredentialsProvider: Unable to load AWS credentials from >> Java system properties (aws.accessKeyId and aws.secretKey), >> WebIdentityTokenCredentialsProvider: You must specify a value for roleArn >> and roleSessionName, >> org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.ProfileCredentialsProvider@4ec6449f: >> profile file cannot be null, >> org.apache.flink.kinesis.shaded.com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@2a5774f5: >> The requested metadata is not found at >> http://169.254.169.254/latest/meta-data/iam/security-credentials/] >> >> It tries to get either from env or java env. >> >> So I tried to add those as following: >> >> AWS_ACCESS_KEY_ID=xx AWS_SECRET_ACCESS_KEY=xx flink run [options] app.jar >> [options] >> >> I tried >> >> flink run [options] app.jar -DAWS_ACCESS_KEY_ID=xx >> -DAWS_SECRET_ACCESS_KEY=xx [options] >> >> but neither way is not working. >> >> Any idea how I am going to solve it? >> >>