I'm adding Danny to this thread. He might be able to help on this topic. Best, Matthias
On Mon, Jan 3, 2022 at 4:57 PM Daniel Vol <vold...@gmail.com> wrote: > I definitely do, and you can see in my initial post that this is the first > thing I tried but I got warnings and it doesn't use credentials I supplied. > Though you are right that I do find a solution - using credentialProvider > object and injecting keys as a java env variables through: > -yd "env.java.opts.taskmanager=-Daws.secretKey=xxx -Daws.accessKeyId=xxx" > -yd "env.java.opts.jobmanager=-Daws.secretKey=xxx -Daws.accessKeyId=xxx" > > Though I do expect from producer to be able to get parameters as per > documentation (exactly as consumer do) so probably it is a good idea to > open a ticket for this behavior: > > val props = new Properties > > props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, kinesisConfig.accessKeyId.get) > > props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, > kinesisConfig.secretKey.get) > > [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, > ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN > o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property > aws.credentials.provider.basic.secretkey ignored as there is no > corresponding set method in KinesisProducerConfiguration > [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, > ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN > o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property > aws.region ignored as there is no corresponding set method in > KinesisProducerConfiguration > [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, > ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN > o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property > aws.credentials.provider.basic.accesskeyid ignored as there is no > corresponding set method in KinesisProducerConfiguration > > On Mon, Jan 3, 2022 at 5:34 PM Matthias Pohl <matth...@ververica.com> > wrote: > >> Hi Daniel, >> I'm assuming you already looked into the Flink documentation for this >> topic [1]? I'm gonna add Fabian to this thread. Maybe, he's able to help >> out here. >> >> Matthias >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kinesis.html#kinesis-producer >> >> On Fri, Dec 31, 2021 at 1:06 PM Daniel Vol <vold...@gmail.com> wrote: >> >>> Hi, >>> >>> I am trying to run a Flink on GCP with the current source and >>> destination on Kinesis on AWS. >>> I have configured the access key on AWS to be able to connect. >>> I am running Flink 1.12.1 >>> In flink I use the following code (Scala 2.12.2) >>> >>> val props = new Properties >>> >>> props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, >>> kinesisConfig.accessKeyId.get) >>> >>> props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, >>> kinesisConfig.secretKey.get) >>> >>> >>> It works just fine to get connection to consumer, but not to producer. >>> >>> In TaskManager stdout log I see the following: >>> >>> [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, >>> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN >>> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property >>> aws.credentials.provider.basic.secretkey ignored as there is no >>> corresponding set method in KinesisProducerConfiguration >>> [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, >>> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN >>> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property >>> aws.region ignored as there is no corresponding set method in >>> KinesisProducerConfiguration >>> [Window(EventTimeSessionWindows(1800000), EventTimeTrigger, >>> ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN >>> o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration - Property >>> aws.credentials.provider.basic.accesskeyid ignored as there is no >>> corresponding set method in KinesisProducerConfiguration >>> >>> Then I have tried a different approach: to create AWSCredentialsProvider >>> object with key + secret and add it by: >>> >>> (as it have setCredentialsProvider method) >>> >>> class CredentialsProvider(config: KinesisConfig) extends >>> AWSCredentialsProvider with Serializable { >>> override def getCredentials: AWSCredentials = >>> new BasicAWSCredentials(config.accessKeyId.get, config.secretKey.get) >>> >>> override def refresh(): Unit = {} >>> } >>> >>> val credentialsProvider = new CredentialsProvider(kinesisConfig) >>> >>> producerConfig.put("CredentialsProvider", credentialsProvider) >>> >>> But then I get different exceptions that the process can't find access_key >>> and secret key. >>> >>> [kpl-daemon-0000] ERROR >>> o.a.f.k.s.c.a.services.kinesis.producer.KinesisProducer - Error in child >>> process >>> java.lang.RuntimeException: Error running child process >>> at >>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533) >>> at >>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:513) >>> at >>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.access$200(Daemon.java:63) >>> at >>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:135) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: >>> org.apache.flink.kinesis.shaded.com.amazonaws.SdkClientException: Unable to >>> load AWS credentials from any provider in the chain: >>> [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials >>> from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and >>> AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), >>> SystemPropertiesCredentialsProvider: Unable to load AWS credentials from >>> Java system properties (aws.accessKeyId and aws.secretKey), >>> WebIdentityTokenCredentialsProvider: You must specify a value for roleArn >>> and roleSessionName, >>> org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.ProfileCredentialsProvider@4ec6449f: >>> profile file cannot be null, >>> org.apache.flink.kinesis.shaded.com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@2a5774f5: >>> The requested metadata is not found at >>> http://169.254.169.254/latest/meta-data/iam/security-credentials/] >>> >>> It tries to get either from env or java env. >>> >>> So I tried to add those as following: >>> >>> AWS_ACCESS_KEY_ID=xx AWS_SECRET_ACCESS_KEY=xx flink run [options] app.jar >>> [options] >>> >>> I tried >>> >>> flink run [options] app.jar -DAWS_ACCESS_KEY_ID=xx >>> -DAWS_SECRET_ACCESS_KEY=xx [options] >>> >>> but neither way is not working. >>> >>> Any idea how I am going to solve it? >>> >>>