Hi all!

I am trying to use kinesis and spark streaming together. So when I execute
program I get exception com.amazonaws.AmazonClientException: Unable to load
AWS credentials from any provider in the chain


Here is my piece of code

val credentials = new
BasicAWSCredentials(KinesisProperties.AWS_ACCESS_KEY_ID,
KinesisProperties.AWS_SECRET_KEY)

                var kinesisClient: AmazonKinesisClient = new
AmazonKinesisClient(credentials)

                
kinesisClient.setEndpoint(KinesisProperties.KINESIS_ENDPOINT_URL,
                        KinesisProperties.KINESIS_SERVICE_NAME,
                        KinesisProperties.KINESIS_REGION_ID)
                System.setProperty("aws.accessKeyId", 
KinesisProperties.AWS_ACCESS_KEY_ID)
                System.setProperty("aws.secretKey", 
KinesisProperties.AWS_SECRET_KEY)
                System.setProperty("AWS_ACCESS_KEY_ID",
KinesisProperties.AWS_ACCESS_KEY_ID)
                System.setProperty("AWS_SECRET_KEY", 
KinesisProperties.AWS_SECRET_KEY)
                val numShards =
kinesisClient.describeStream(KinesisProperties.MY_STREAM_NAME)
                        .getStreamDescription().getShards().size()
                val numStreams = numShards
                val ssc = StreamingHelper.getStreamingInstance(new
Duration(KinesisProperties.KINESIS_CHECKPOINT_INTERVAL))
                ssc.addStreamingListener(new MyStreamListener)
                val kinesisStreams = (0 until numStreams).map { i =>
                        KinesisUtils.createStream(ssc, 
KinesisProperties.MY_STREAM_NAME,
                                KinesisProperties.KINESIS_ENDPOINT_URL,
                                new 
Duration(KinesisProperties.KINESIS_CHECKPOINT_INTERVAL),
InitialPositionInStream.TRIM_HORIZON,
                                StorageLevel.MEMORY_AND_DISK_2)
                }
                /* Union all the streams */
                val unionStreams = ssc.union(kinesisStreams)
                val tmp_stream = unionStreams.map(byteArray => new 
String(byteArray))
                val 
data=tmp_stream.window(Seconds(KinesisProperties.WINDOW_INTERVAL ),
Seconds(KinesisProperties.SLIDING_INTERVAL))
                data.foreachRDD((rdd: RDD[String], time: Time) => {
                        if (rdd.take(1).size == 1) {
                                rdd.saveAsTextFile(KinesisProperties.Sink + 
time.milliseconds)
                        }
                })
                ssc.start()
                ssc.awaitTermination()



Any suggestion?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/com-amazonaws-AmazonClientException-Unable-to-load-AWS-credentials-from-any-provider-in-the-chain-tp21255.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to