Hi,

I am trying to run a Flink on GCP with the current source and
destination on Kinesis on AWS.
I have configured the access key on AWS to be able to connect.
I am running Flink 1.12.1
In flink I use the following code (Scala 2.12.2)

val props = new Properties

props.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, kinesisConfig.accessKeyId.get)

props.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, kinesisConfig.secretKey.get)


It works just fine to get connection to consumer, but not to producer.

In TaskManager stdout log I see the following:

[Window(EventTimeSessionWindows(1800000), EventTimeTrigger,
ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN
o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration  - Property
aws.credentials.provider.basic.secretkey ignored as there is no
corresponding set method in KinesisProducerConfiguration
[Window(EventTimeSessionWindows(1800000), EventTimeTrigger,
ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN
o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration  - Property
aws.region ignored as there is no corresponding set method in
KinesisProducerConfiguration
[Window(EventTimeSessionWindows(1800000), EventTimeTrigger,
ScalaProcessWindowFunctionWrapper) -> Sink: Unnamed (1/1)#0] WARN
o.a.f.k.s.c.a.s.k.producer.KinesisProducerConfiguration  - Property
aws.credentials.provider.basic.accesskeyid ignored as there is no
corresponding set method in KinesisProducerConfiguration

Then I have tried a different approach: to create
AWSCredentialsProvider object with key + secret and add it by:

(as it have setCredentialsProvider method)

class CredentialsProvider(config: KinesisConfig) extends
AWSCredentialsProvider with Serializable {
  override def getCredentials: AWSCredentials =
    new BasicAWSCredentials(config.accessKeyId.get, config.secretKey.get)

  override def refresh(): Unit = {}
}

val credentialsProvider = new CredentialsProvider(kinesisConfig)

producerConfig.put("CredentialsProvider", credentialsProvider)

But then I get different exceptions that the process can't find
access_key and secret key.

[kpl-daemon-0000] ERROR
o.a.f.k.s.c.a.services.kinesis.producer.KinesisProducer  - Error in
child process
java.lang.RuntimeException: Error running child process
    at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:533)
    at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:513)
    at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.access$200(Daemon.java:63)
    at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:135)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.kinesis.shaded.com.amazonaws.SdkClientException:
Unable to load AWS credentials from any provider in the chain:
[EnvironmentVariableCredentialsProvider: Unable to load AWS
credentials from environment variables (AWS_ACCESS_KEY_ID (or
AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)),
SystemPropertiesCredentialsProvider: Unable to load AWS credentials
from Java system properties (aws.accessKeyId and aws.secretKey),
WebIdentityTokenCredentialsProvider: You must specify a value for
roleArn and roleSessionName,
org.apache.flink.kinesis.shaded.com.amazonaws.auth.profile.ProfileCredentialsProvider@4ec6449f:
profile file cannot be null,
org.apache.flink.kinesis.shaded.com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@2a5774f5:
The requested metadata is not found at
http://169.254.169.254/latest/meta-data/iam/security-credentials/]

It tries to get either from env or java env.

So I tried to add those as following:

AWS_ACCESS_KEY_ID=xx AWS_SECRET_ACCESS_KEY=xx flink run [options]
app.jar [options]

I tried

flink run [options] app.jar -DAWS_ACCESS_KEY_ID=xx
-DAWS_SECRET_ACCESS_KEY=xx [options]

but neither way is not working.

Any idea how I am going to solve it?

Reply via email to