Thanks Chris! I was just looking to get back to Spark + Kinesis
integration. Will be in touch shortly.


On Sun, May 10, 2015 at 12:14 AM, Chris Fregly <> wrote:

> hey vadim-
> sorry for the delay.
> if you're interested in trying to get Kinesis working one-on-one, shoot me
> a direct email and we'll get it going off-list.
> we can circle back and summarize our findings here.
> lots of people are using Spark Streaming+Kinesis successfully.
> would love to help you through this - albeit a month later!  the goal is
> to have this working out of the box, so i'd like to implement anything i
> can do to make that happen.
> lemme know.
> btw, Spark 1.4 will have some improvements to the Kinesis Spark Streaming.
> TD and I have been working together on this.
> thanks!
> -chris
> On Tue, Apr 7, 2015 at 6:17 PM, Vadim Bichutskiy <
>> wrote:
>> Hey y'all,
>> While I haven't been able to get Spark + Kinesis integration working, I
>> pivoted to plan B: I now push data to S3 where I set up a DStream to
>> monitor an S3 bucket with textFileStream, and that works great.
>> I <3 Spark!
>> Best,
>> Vadim
>> ᐧ
>> On Mon, Apr 6, 2015 at 12:23 PM, Vadim Bichutskiy <
>>> wrote:
>>> Hi all,
>>> I am wondering, has anyone on this list been able to successfully
>>> implement Spark on top of Kinesis?
>>> Best,
>>> Vadim
>>> On Sun, Apr 5, 2015 at 1:50 PM, Vadim Bichutskiy <
>>>> wrote:
>>>> Hi all,
>>>> Below is the output that I am getting. My Kinesis stream has 1 shard,
>>>> and my Spark cluster on EC2 has 2 slaves (I think that's fine?).
>>>> I should mention that my Kinesis producer is written in Python where I
>>>> followed the example
>>>> I also wrote a Python consumer, again using the example at the above
>>>> link, that works fine. But I am unable to display output from my Spark
>>>> consumer.
>>>> I'd appreciate any help.
>>>> Thanks,
>>>> Vadim
>>>> -------------------------------------------
>>>> Time: 1428254090000 ms
>>>> -------------------------------------------
>>>> 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming
>>>> job 1428254090000 ms.0 from job set of time 1428254090000 ms
>>>> 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for
>>>> time 1428254090000 ms (execution: 0.090 s)
>>>> 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from
>>>> persistence list
>>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63
>>>> 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from
>>>> persistence list
>>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62
>>>> 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from
>>>> persistence list
>>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61
>>>> 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence
>>>> list
>>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60
>>>> 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence
>>>> list
>>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59
>>>> 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks
>>>> of RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time
>>>> 1428254090000 ms
>>>> ***********
>>>> 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches
>>>> ArrayBuffer(1428254070000 ms)
>>>> On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy <
>>>>> wrote:
>>>>> Hi all,
>>>>> More good news! I was able to utilize mergeStrategy to assembly my
>>>>> Kinesis consumer into an "uber jar"
>>>>> Here's what I added to* build.sbt:*
>>>>> *mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>*
>>>>> *  {*
>>>>> *  case PathList("com", "esotericsoftware", "minlog", xs @ _*) =>
>>>>> MergeStrategy.first*
>>>>> *  case PathList("com", "google", "common", "base", xs @ _*) =>
>>>>> MergeStrategy.first*
>>>>> *  case PathList("org", "apache", "commons", xs @ _*) =>
>>>>> MergeStrategy.last*
>>>>> *  case PathList("org", "apache", "hadoop", xs @ _*) =>
>>>>> MergeStrategy.first*
>>>>> *  case PathList("org", "apache", "spark", "unused", xs @ _*) =>
>>>>> MergeStrategy.first*
>>>>> *        case x => old(x)*
>>>>> *  }*
>>>>> *}*
>>>>> Everything appears to be working fine. Right now my producer is
>>>>> pushing simple strings through Kinesis,
>>>>> which my consumer is trying to print (using Spark's print() method for
>>>>> now).
>>>>> However, instead of displaying my strings, I get the following:
>>>>> *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting
>>>>> batches ArrayBuffer(1428173848000 ms)*
>>>>> Any idea on what might be going on?
>>>>> Thanks,
>>>>> Vadim
>>>>> Here's my consumer code (adapted from the WordCount example):
>>>>> *private object MyConsumer extends Logging {  def main(args:
>>>>> Array[String]) {    /* Check that all required args were passed in. */
>>>>> if (args.length < 2) {      System.err.println(        """          
>>>>> |Usage:
>>>>> KinesisWordCount <stream-name> <endpoint-url>          |    <stream-name>
>>>>> is the name of the Kinesis stream          |    <endpoint-url> is the
>>>>> endpoint of the Kinesis service          |                   (e.g.
>>>>> <>)        """.stripMargin)
>>>>> System.exit(1)    }    /* Populate the appropriate variables from the 
>>>>> given
>>>>> args */    val Array(streamName, endpointUrl) = args    /* Determine the
>>>>> number of shards from the stream */    val kinesisClient = new
>>>>> AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
>>>>> kinesisClient.setEndpoint(endpointUrl)    val numShards =
>>>>> kinesisClient.describeStream(streamName).getStreamDescription().getShards()
>>>>>     .size()    System.out.println("Num shards: " + numShards)    /* In 
>>>>> this
>>>>> example, we're going to create 1 Kinesis Worker/Receiver/DStream for each
>>>>> shard. */    val numStreams = numShards    /* Setup the and SparkConfig 
>>>>> and
>>>>> StreamingContext */    /* Spark Streaming batch interval */    val
>>>>> batchInterval = Milliseconds(2000)    val sparkConfig = new
>>>>> SparkConf().setAppName("MyConsumer")    val ssc = new
>>>>> StreamingContext(sparkConfig, batchInterval)    /* Kinesis checkpoint
>>>>> interval.  Same as batchInterval for this example. */    val
>>>>> kinesisCheckpointInterval = batchInterval    /* Create the same number of
>>>>> Kinesis DStreams/Receivers as Kinesis stream's shards */    val
>>>>> kinesisStreams = (0 until numStreams).map { i =>
>>>>> KinesisUtils.createStream(ssc, streamName, endpointUrl,
>>>>> kinesisCheckpointInterval,          InitialPositionInStream.LATEST,
>>>>> StorageLevel.MEMORY_AND_DISK_2)    }    /* Union all the streams */    val
>>>>> unionStreams  = ssc.union(kinesisStreams).map(byteArray => new
>>>>> String(byteArray))    unionStreams.print()    ssc.start()
>>>>> ssc.awaitTermination()  }}*
>>>>> On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das <>
>>>>> wrote:
>>>>>> Just remove "provided" for spark-streaming-kinesis-asl
>>>>>> libraryDependencies += "org.apache.spark" %%
>>>>>> "spark-streaming-kinesis-asl" % "1.3.0"
>>>>>> On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy <
>>>>>>> wrote:
>>>>>>> Thanks. So how do I fix it?
>>>>>>> On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan <
>>>>>>> > wrote:
>>>>>>>>   spark-streaming-kinesis-asl is not part of the Spark
>>>>>>>> distribution on your cluster, so you cannot have it be just a 
>>>>>>>> "provided"
>>>>>>>> dependency.  This is also why the KCL and its dependencies were not
>>>>>>>> included in the assembly (but yes, they should be).
>>>>>>>>  ~ Jonathan Kelly
>>>>>>>>   From: Vadim Bichutskiy <>
>>>>>>>> Date: Friday, April 3, 2015 at 12:26 PM
>>>>>>>> To: Jonathan Kelly <>
>>>>>>>> Cc: "" <>
>>>>>>>> Subject: Re: Spark + Kinesis
>>>>>>>>   Hi all,
>>>>>>>>  Good news! I was able to create a Kinesis consumer and assemble
>>>>>>>> it into an "uber jar" following
>>>>>>>> <>
>>>>>>>> and example
>>>>>>>> <>
>>>>>>>> .
>>>>>>>>  However when I try to spark-submit it I get the following
>>>>>>>> exception:
>>>>>>>>  *Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>>>>> com/amazonaws/auth/AWSCredentialsProvider*
>>>>>>>>  Do I need to include KCL dependency in *build.sbt*, here's what
>>>>>>>> it looks like currently:
>>>>>>>>  import AssemblyKeys._
>>>>>>>> name := "Kinesis Consumer"
>>>>>>>> version := "1.0"
>>>>>>>> organization := "com.myconsumer"
>>>>>>>> scalaVersion := "2.11.5"
>>>>>>>>  libraryDependencies += "org.apache.spark" %% "spark-core" %
>>>>>>>> "1.3.0" % "provided"
>>>>>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" %
>>>>>>>> "1.3.0" % "provided"
>>>>>>>> libraryDependencies += "org.apache.spark" %%
>>>>>>>> "spark-streaming-kinesis-asl" % "1.3.0" % "provided"
>>>>>>>>  assemblySettings
>>>>>>>> jarName in assembly :=  "consumer-assembly.jar"
>>>>>>>> assemblyOption in assembly := (assemblyOption in
>>>>>>>> assembly).value.copy(includeScala=false)
>>>>>>>>  Any help appreciated.
>>>>>>>>  Thanks,
>>>>>>>> Vadim
>>>>>>>> On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan <
>>>>>>>>> wrote:
>>>>>>>>>  It looks like you're attempting to mix Scala versions, so that's
>>>>>>>>> going to cause some problems.  If you really want to use Scala 
>>>>>>>>> 2.11.5, you
>>>>>>>>> must also use Spark package versions built for Scala 2.11 rather than
>>>>>>>>> 2.10.  Anyway, that's not quite the correct way to specify Scala
>>>>>>>>> dependencies in build.sbt.  Instead of placing the Scala version 
>>>>>>>>> after the
>>>>>>>>> artifactId (like "spark-core_2.10"), what you actually want is to use 
>>>>>>>>> just
>>>>>>>>> "spark-core" with two percent signs before it.  Using two percent 
>>>>>>>>> signs
>>>>>>>>> will make it use the version of Scala that matches your declared
>>>>>>>>> scalaVersion.  For example:
>>>>>>>>>  libraryDependencies += "org.apache.spark" %% "spark-core" %
>>>>>>>>> "1.3.0" % "provided"
>>>>>>>>>  libraryDependencies += "org.apache.spark" %% "spark-streaming" %
>>>>>>>>> "1.3.0" % "provided"
>>>>>>>>>  libraryDependencies += "org.apache.spark" %%
>>>>>>>>> "spark-streaming-kinesis-asl" % "1.3.0"
>>>>>>>>>  I think that may get you a little closer, though I think you're
>>>>>>>>> probably going to run into the same problems I ran into in this 
>>>>>>>>> thread:
>>>>>>>>> I never really got an answer for that, and I temporarily moved on to 
>>>>>>>>> other
>>>>>>>>> things for now.
>>>>>>>>>  ~ Jonathan Kelly
>>>>>>>>>   From: 'Vadim Bichutskiy' <>
>>>>>>>>> Date: Thursday, April 2, 2015 at 9:53 AM
>>>>>>>>> To: "" <>
>>>>>>>>> Subject: Spark + Kinesis
>>>>>>>>>   Hi all,
>>>>>>>>>  I am trying to write an Amazon Kinesis consumer Scala app that
>>>>>>>>> processes data in the
>>>>>>>>> Kinesis stream. Is this the correct way to specify *build.sbt*:
>>>>>>>>>  -------
>>>>>>>>> *import AssemblyKeys._*
>>>>>>>>> *name := "Kinesis Consumer"*
>>>>>>>>> *version := "1.0" organization := "com.myconsumer" scalaVersion :=
>>>>>>>>> "2.11.5" libraryDependencies ++= Seq("org.apache.spark" % 
>>>>>>>>> "spark-core_2.10"
>>>>>>>>> % "1.3.0" % "provided", "org.apache.spark" % "spark-streaming_2.10" %
>>>>>>>>> "1.3.0" "org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % 
>>>>>>>>> "1.3.0")*
>>>>>>>>> * assemblySettings jarName in assembly :=  "consumer-assembly.jar"
>>>>>>>>> assemblyOption in assembly := (assemblyOption in
>>>>>>>>> assembly).value.copy(includeScala=false)*
>>>>>>>>> --------
>>>>>>>>>  In *project/assembly.sbt* I have only the following line:
>>>>>>>>>  *addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")*
>>>>>>>>>  I am using sbt 0.13.7. I adapted Example 7.7 in the Learning
>>>>>>>>> Spark book.
>>>>>>>>>  Thanks,
>>>>>>>>> Vadim

Reply via email to