I have it working without any issues (tried with 5 shrads), except my java
version was 1.7.
Here's the piece of code that i used.
System.setProperty("AWS_ACCESS_KEY_ID",
this.kConf.getOrElse("access_key", ""))
System.setProperty("AWS_SECRET_KEY", this.kConf.getOrElse("secret",
"")) val streamName = this.kConf.getOrElse("stream", "") val
endpointUrl =
this.kConf.getOrElse("end_point","https://kinesis.us-east-1.amazonaws.com/")
val kinesisClient = new AmazonKinesisClient(new
DefaultAWSCredentialsProviderChain())
kinesisClient.setEndpoint(endpointUrl) *val numShards =
kinesisClient.describeStream(streamName).getStreamDescription().getShards()
.size()* val numStreams = numShards val
kinesisCheckpointInterval = Seconds(this.kConf.getOrElse("duration",
"").toInt) val kinesisStreams = (0 until numStreams).map { i =>
KinesisUtils.createStream(ssc, streamName, endpointUrl,
kinesisCheckpointInterval, InitialPositionInStream.LATEST,
StorageLevel.MEMORY_AND_DISK_2) } /* Union all the streams
*/ val unionStreams = ssc.union(kinesisStreams) val
tmp_stream = unionStreams.map(byteArray => new String(byteArray))
tmp_stream.print()
Thanks
Best Regards
On Wed, Nov 26, 2014 at 5:53 PM, A.K.M. Ashrafuzzaman <
[email protected]> wrote:
> Hi guys,
> When we are using Kinesis with 1 shard then it works fine. But when we use
> more that 1 then it falls into an infinite loop and no data is processed by
> the spark streaming. In the kinesis dynamo DB, I can see that it keeps
> increasing the leaseCounter. But it do start processing.
>
> I am using,
> scala: 2.10.4
> java version: 1.8.0_25
> Spark: 1.1.0
> spark-streaming-kinesis-asl: 1.1.0
>
> A.K.M. Ashrafuzzaman
> Lead Software Engineer
> NewsCred <http://www.newscred.com/>
>
> (M) 880-175-5592433
> Twitter <https://twitter.com/ashrafuzzaman> | Blog
> <http://jitu-blog.blogspot.com/> | Facebook
> <https://www.facebook.com/ashrafuzzaman.jitu>
>
> Check out The Academy <http://newscred.com/theacademy>, your #1 source
> for free content marketing resources
>
>