Hi Daniel, I'm assuming you already looked into the Flink documentation for this topic [1]? I'm gonna add Fabian to this thread. Maybe, he's able to help out here.
Matthias [1] https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kinesis.html#kinesis-producer On Fri, Dec 31, 2021 at 1:06 PM Daniel Vol <vold...@gmail.com> wrote: > Hi, > > I am trying to run a Flink on GCP with the current source and > destination on Kinesis on AWS. > I have configured the access key on AWS to be able to connect. > I am running Flink 1.12.1 > In flink I use the following code (Scala 2.12.2) > > val props = new Properties > > props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, kinesisConfig.accessKeyId.get) > > props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, > kinesisConfig.secretKey.get) > > > It works just fine to get connection to consumer, but not to producer. > > In TaskManager stdout log I see the following: > > [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, > ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN > o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property > aws.credentials.provider.basic.secretkey ignored as there is no corresponding > set method in KinesisProducerConfiguration > [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, > ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN > o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property > aws.region ignored as there is no corresponding set method in > KinesisProducerConfiguration > [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, > ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN > o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property > aws.credentials.provider.basic.accesskeyid ignored as there is no > corresponding set method in KinesisProducerConfiguration > > Then I have tried a different approach: to create AWSCredentialsProvider > object with key + secret and add it by: > > (as it have setCredentialsProvider method) > > class CredentialsProvider(config: KinesisConfig) extends > AWSCredentialsProvider with Serializable { > override def getCredentials: AWSCredentials = > new BasicAWSCredentials(config.accessKeyId.get, config.secretKey.get) > > override def refresh(): Unit = {} > } > > val credentialsProvider = new CredentialsProvider(kinesisConfig) > > producerConfig.put("CredentialsProvider", credentialsProvider) > > But then I get different exceptions that the process can't find access_key > and secret key. > > [kpl-daemon-0000] ERROR > o.a.f.k.s.c.a.services.kinesis.producer.KinesisProducer - Error in child > process > java.lang.RuntimeException: Error running child process > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:513) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.access$200(Daemon.java:63) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:135) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.kinesis.shaded.com.amazonaws.SdkClientException: > Unable to load AWS credentials from any provider in the chain: > [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from > environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and > AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), > SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java > system properties (aws.accessKeyId and aws.secretKey), > WebIdentityTokenCredentialsProvider: You must specify a value for roleArn and > roleSessionName, > org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.ProfileCredentialsProvider@4ec6449f: > profile file cannot be null, > org.apache.flink.kinesis.shaded.com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@2a5774f5: > The requested metadata is not found at > http://169.254.169.254/latest/meta-data/iam/security-credentials/] > > It tries to get either from env or java env. > > So I tried to add those as following: > > AWS_ACCESS_KEY_ID=xx AWS_SECRET_ACCESS_KEY=xx flink run [options] app.jar > [options] > > I tried > > flink run [options] app.jar -DAWS_ACCESS_KEY_ID=xx -DAWS_SECRET_ACCESS_KEY=xx > [options] > > but neither way is not working. > > Any idea how I am going to solve it? > >