Hi, Expert I want to consumes data from kinesis stream using spark streaming.
I am trying to  create kinesis stream using scala code. Here is my code

def main(args: Array[String]) {
        println("Stream creation started")
        if(create(2))
            println("Stream is created successfully")
    }
    def create(shardCount: Int): Boolean = {
        val credentials = new
BasicAWSCredentials(KinesisProperties.AWS_ACCESS_KEY_ID,
KinesisProperties.AWS_SECRET_KEY)

        var kinesisClient: AmazonKinesisClient = new
AmazonKinesisClient(credentials)
        kinesisClient.setEndpoint(KinesisProperties.KINESIS_ENDPOINT_URL,
            KinesisProperties.KINESIS_SERVICE_NAME,
            KinesisProperties.KINESIS_REGION_ID)
        val createStreamRequest = new CreateStreamRequest()
        createStreamRequest.setStreamName(KinesisProperties.myStreamName);
        createStreamRequest.setShardCount(shardCount)
        val describeStreamRequest = new DescribeStreamRequest()
        describeStreamRequest.setStreamName(KinesisProperties.myStreamName)
        try {
            Thread.sleep(120000)
        } catch {
            case e: Exception =>
        }
        var streamStatus = "not active"
        while (!streamStatus.equalsIgnoreCase("ACTIVE")) {
            try {
                Thread.sleep(1000)
            } catch {
                case e: Exception => e.printStackTrace()
            }
            try {
                val describeStreamResponse =
kinesisClient.describeStream(describeStreamRequest)
                streamStatus =
describeStreamResponse.getStreamDescription.getStreamStatus
            } catch {
                case e: Exception => e.printStackTrace()
            }
        }
        if (streamStatus.equalsIgnoreCase("ACTIVE"))
            true
        else
            false
    }


When I run this code I get following exception

Exception in thread "main" java.lang.NoSuchMethodError:
org.joda.time.format.DateTimeFormatter.withZoneUTC()Lorg/joda/time/format/DateTimeFormatter;
        at com.amazonaws.auth.AWS4Signer.<clinit>(AWS4Signer.java:44)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
        at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at java.lang.Class.newInstance(Class.java:379)
        at
com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
        at
com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
        at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
        at
com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307)
        at
com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280)
        at
com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160)
        at
com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102)
        at
com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216)
        at
com.amazonaws.services.kinesis.AmazonKinesisClient.<init>(AmazonKinesisClient.java:139)
        at
com.amazonaws.services.kinesis.AmazonKinesisClient.<init>(AmazonKinesisClient.java:116)
        at
com.platalytics.platform.connectors.Kinesis.App$.create(App.scala:32)
        at
com.platalytics.platform.connectors.Kinesis.App$.main(App.scala:26)
        at com.platalytics.platform.connectors.Kinesis.App.main(App.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)



I have following maven dependency
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kinesis-asl_2.10</artifactId>
        <version>1.2.0</version>
</dependency> 


Any suggestion?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/kinesis-creating-stream-scala-code-exception-tp21154.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to