Maybe a naive question: why are you creating 1 Dstream per shard? It should be one Dstream corresponding to kinesis stream, isn't it?
On Fri, Jan 27, 2017 at 8:09 PM, Takeshi Yamamuro <linguin....@gmail.com> wrote: > Hi, > > Just a guess though, Kinesis shards sometimes have skew data. > So, before you compute something from kinesis RDDs, you'd be better to > repartition them > for better parallelism. > > // maropu > > On Fri, Jan 27, 2017 at 2:54 PM, Graham Clark <grcl...@gmail.com> wrote: > >> Hi everyone - I am building a small prototype in Spark 1.6.0 (cloudera) >> to read information from Kinesis and write it to HDFS in parquet format. >> The write seems very slow, and if I understood Spark's diagnostics >> correctly, always seemed to run from the same executor, one partition after >> the other, serially. So I stripped the program down to this: >> >> >> val kinesisStreams = (0 until numShards).map { i => { >> >> KinesisUtils.createStream(streamingContext, sparkApplicationName, >> >> kinesisStreamName, kinesisUrl, awsRegion, >> InitialPositionInStream.LATEST) >> >> new Duration(streamingInterval.millis), >> StorageLevel.MEMORY_AND_DISK_SER, >> >> awsCredentials.accessKey, awsCredentials.secretKey) >> >> }} >> >> val allKinesisStreams = streamingContext.union(kinesisStreams) >> >> allKinesisStreams.foreachRDD { >> >> rdd => { >> >> info("total for this batch is " + rdd.count()) >> >> } >> } >> >> The Kinesis stream has 20 shards (overprovisioned for this small test). I >> confirmed using a small boto program that data is periodically written to >> all 20 of the shards. I can see that Spark has created 20 executors, one >> for each Kinesis shard. It also creates one other executor, tied to a >> particular worker node, and that node seems to do the RDD counting. The >> streaming interval is 1 minute, during which time several shards have >> received data. Each minute interval, for this particular example, the >> driver prints out between 20 and 30 for the count value. I expected to see >> the count operation parallelized across the cluster. I think I must just be >> misunderstanding something fundamental! Can anyone point out where I'm >> going wrong? >> >> Yours in confusion, >> Graham >> >> > > > -- > --- > Takeshi Yamamuro > -- Best Regards, Ayan Guha