Hi, I am trying to run a Flink on GCP with the current source and destination on Kinesis on AWS. I have configured the access key on AWS to be able to connect. I am running Flink 1.12.1 In flink I use the following code (Scala 2.12.2)
val props = new Properties props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, kinesisConfig.accessKeyId.get) props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, kinesisConfig.secretKey.get) It works just fine to get connection to consumer, but not to producer. In TaskManager stdout log I see the following: [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property aws.credentials.provider.basic.secretkey ignored as there is no corresponding set method in KinesisProducerConfiguration [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property aws.region ignored as there is no corresponding set method in KinesisProducerConfiguration [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property aws.credentials.provider.basic.accesskeyid ignored as there is no corresponding set method in KinesisProducerConfiguration Then I have tried a different approach: to create AWSCredentialsProvider object with key + secret and add it by: (as it have setCredentialsProvider method) class CredentialsProvider(config: KinesisConfig) extends AWSCredentialsProvider with Serializable { override def getCredentials: AWSCredentials = new BasicAWSCredentials(config.accessKeyId.get, config.secretKey.get) override def refresh(): Unit = {} } val credentialsProvider = new CredentialsProvider(kinesisConfig) producerConfig.put("CredentialsProvider", credentialsProvider) But then I get different exceptions that the process can't find access_key and secret key. [kpl-daemon-0000] ERROR o.a.f.k.s.c.a.services.kinesis.producer.KinesisProducer - Error in child process java.lang.RuntimeException: Error running child process at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:513) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.access$200(Daemon.java:63) at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:135) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.kinesis.shaded.com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), WebIdentityTokenCredentialsProvider: You must specify a value for roleArn and roleSessionName, org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.ProfileCredentialsProvider@4ec6449f: profile file cannot be null, org.apache.flink.kinesis.shaded.com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@2a5774f5: The requested metadata is not found at http://169.254.169.254/latest/meta-data/iam/security-credentials/] It tries to get either from env or java env. So I tried to add those as following: AWS_ACCESS_KEY_ID=xx AWS_SECRET_ACCESS_KEY=xx flink run [options] app.jar [options] I tried flink run [options] app.jar -DAWS_ACCESS_KEY_ID=xx -DAWS_SECRET_ACCESS_KEY=xx [options] but neither way is not working. Any idea how I am going to solve it?