Hi all,

I am wondering, has anyone on this list been able to successfully implement
Spark on top of Kinesis?

Best,
Vadim
ᐧ

On Sun, Apr 5, 2015 at 1:50 PM, Vadim Bichutskiy <vadim.bichuts...@gmail.com
> wrote:

> ᐧ
> Hi all,
>
> Below is the output that I am getting. My Kinesis stream has 1 shard, and
> my Spark cluster on EC2 has 2 slaves (I think that's fine?).
> I should mention that my Kinesis producer is written in Python where I
> followed the example
> http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python
>
> I also wrote a Python consumer, again using the example at the above link,
> that works fine. But I am unable to display output from my Spark consumer.
>
> I'd appreciate any help.
>
> Thanks,
> Vadim
>
> -------------------------------------------
>
> Time: 1428254090000 ms
>
> -------------------------------------------
>
>
> 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming job
> 1428254090000 ms.0 from job set of time 1428254090000 ms
>
> 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for
> time 1428254090000 ms (execution: 0.090 s)
>
> 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from persistence
> list
>
> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63
>
> 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from
> persistence list
>
> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62
>
> 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from
> persistence list
>
> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61
>
> 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence list
>
> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60
>
> 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence list
>
> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59
>
> 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks of
> RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time
> 1428254090000 ms
>
> ***********
>
> 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches
> ArrayBuffer(1428254070000 ms)
> On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy <
> vadim.bichuts...@gmail.com> wrote:
>
>> Hi all,
>>
>> More good news! I was able to utilize mergeStrategy to assembly my
>> Kinesis consumer into an "uber jar"
>>
>> Here's what I added to* build.sbt:*
>>
>> *mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>*
>> *  {*
>> *  case PathList("com", "esotericsoftware", "minlog", xs @ _*) =>
>> MergeStrategy.first*
>> *  case PathList("com", "google", "common", "base", xs @ _*) =>
>> MergeStrategy.first*
>> *  case PathList("org", "apache", "commons", xs @ _*) =>
>> MergeStrategy.last*
>> *  case PathList("org", "apache", "hadoop", xs @ _*) =>
>> MergeStrategy.first*
>> *  case PathList("org", "apache", "spark", "unused", xs @ _*) =>
>> MergeStrategy.first*
>> *        case x => old(x)*
>> *  }*
>> *}*
>>
>> Everything appears to be working fine. Right now my producer is pushing
>> simple strings through Kinesis,
>> which my consumer is trying to print (using Spark's print() method for
>> now).
>>
>> However, instead of displaying my strings, I get the following:
>>
>> *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches
>> ArrayBuffer(1428173848000 ms)*
>>
>> Any idea on what might be going on?
>>
>> Thanks,
>>
>> Vadim
>>
>> Here's my consumer code (adapted from the WordCount example):
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *private object MyConsumer extends Logging {  def main(args:
>> Array[String]) {    /* Check that all required args were passed in. */
>> if (args.length < 2) {      System.err.println(        """          |Usage:
>> KinesisWordCount <stream-name> <endpoint-url>          |    <stream-name>
>> is the name of the Kinesis stream          |    <endpoint-url> is the
>> endpoint of the Kinesis service          |                   (e.g.
>> https://kinesis.us-east-1.amazonaws.com
>> <https://kinesis.us-east-1.amazonaws.com>)        """.stripMargin)
>> System.exit(1)    }    /* Populate the appropriate variables from the given
>> args */    val Array(streamName, endpointUrl) = args    /* Determine the
>> number of shards from the stream */    val kinesisClient = new
>> AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
>> kinesisClient.setEndpoint(endpointUrl)    val numShards =
>> kinesisClient.describeStream(streamName).getStreamDescription().getShards()
>>     .size()    System.out.println("Num shards: " + numShards)    /* In this
>> example, we're going to create 1 Kinesis Worker/Receiver/DStream for each
>> shard. */    val numStreams = numShards    /* Setup the and SparkConfig and
>> StreamingContext */    /* Spark Streaming batch interval */    val
>> batchInterval = Milliseconds(2000)    val sparkConfig = new
>> SparkConf().setAppName("MyConsumer")    val ssc = new
>> StreamingContext(sparkConfig, batchInterval)    /* Kinesis checkpoint
>> interval.  Same as batchInterval for this example. */    val
>> kinesisCheckpointInterval = batchInterval    /* Create the same number of
>> Kinesis DStreams/Receivers as Kinesis stream's shards */    val
>> kinesisStreams = (0 until numStreams).map { i =>
>> KinesisUtils.createStream(ssc, streamName, endpointUrl,
>> kinesisCheckpointInterval,          InitialPositionInStream.LATEST,
>> StorageLevel.MEMORY_AND_DISK_2)    }    /* Union all the streams */    val
>> unionStreams  = ssc.union(kinesisStreams).map(byteArray => new
>> String(byteArray))    unionStreams.print()    ssc.start()
>> ssc.awaitTermination()  }}*
>>
>>
>> On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> Just remove "provided" for spark-streaming-kinesis-asl
>>>
>>> libraryDependencies += "org.apache.spark" %%
>>> "spark-streaming-kinesis-asl" % "1.3.0"
>>>
>>> On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy <
>>> vadim.bichuts...@gmail.com> wrote:
>>>
>>>> Thanks. So how do I fix it?
>>>>
>>>> On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan <jonat...@amazon.com>
>>>> wrote:
>>>>
>>>>>   spark-streaming-kinesis-asl is not part of the Spark distribution
>>>>> on your cluster, so you cannot have it be just a "provided" dependency.
>>>>> This is also why the KCL and its dependencies were not included in the
>>>>> assembly (but yes, they should be).
>>>>>
>>>>>
>>>>>  ~ Jonathan Kelly
>>>>>
>>>>>   From: Vadim Bichutskiy <vadim.bichuts...@gmail.com>
>>>>> Date: Friday, April 3, 2015 at 12:26 PM
>>>>> To: Jonathan Kelly <jonat...@amazon.com>
>>>>> Cc: "user@spark.apache.org" <user@spark.apache.org>
>>>>> Subject: Re: Spark + Kinesis
>>>>>
>>>>>   Hi all,
>>>>>
>>>>>  Good news! I was able to create a Kinesis consumer and assemble it
>>>>> into an "uber jar" following
>>>>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>>>> <http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fstreaming-kinesis-integration.html&si=5533377798602752&pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9>
>>>>> and example
>>>>> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
>>>>> <http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fextras%2Fkinesis-asl%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FKinesisWordCountASL.scala&si=5533377798602752&pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9>
>>>>> .
>>>>>
>>>>>  However when I try to spark-submit it I get the following exception:
>>>>>
>>>>>  *Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>> com/amazonaws/auth/AWSCredentialsProvider*
>>>>>
>>>>>  Do I need to include KCL dependency in *build.sbt*, here's what it
>>>>> looks like currently:
>>>>>
>>>>>  import AssemblyKeys._
>>>>> name := "Kinesis Consumer"
>>>>> version := "1.0"
>>>>> organization := "com.myconsumer"
>>>>> scalaVersion := "2.11.5"
>>>>>
>>>>>  libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0"
>>>>> % "provided"
>>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" %
>>>>> "1.3.0" % "provided"
>>>>> libraryDependencies += "org.apache.spark" %%
>>>>> "spark-streaming-kinesis-asl" % "1.3.0" % "provided"
>>>>>
>>>>>  assemblySettings
>>>>> jarName in assembly :=  "consumer-assembly.jar"
>>>>> assemblyOption in assembly := (assemblyOption in
>>>>> assembly).value.copy(includeScala=false)
>>>>>
>>>>>  Any help appreciated.
>>>>>
>>>>>  Thanks,
>>>>> Vadim
>>>>>
>>>>> On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan <jonat...@amazon.com>
>>>>> wrote:
>>>>>
>>>>>>  It looks like you're attempting to mix Scala versions, so that's
>>>>>> going to cause some problems.  If you really want to use Scala 2.11.5, 
>>>>>> you
>>>>>> must also use Spark package versions built for Scala 2.11 rather than
>>>>>> 2.10.  Anyway, that's not quite the correct way to specify Scala
>>>>>> dependencies in build.sbt.  Instead of placing the Scala version after 
>>>>>> the
>>>>>> artifactId (like "spark-core_2.10"), what you actually want is to use 
>>>>>> just
>>>>>> "spark-core" with two percent signs before it.  Using two percent signs
>>>>>> will make it use the version of Scala that matches your declared
>>>>>> scalaVersion.  For example:
>>>>>>
>>>>>>  libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0"
>>>>>> % "provided"
>>>>>>
>>>>>>  libraryDependencies += "org.apache.spark" %% "spark-streaming" %
>>>>>> "1.3.0" % "provided"
>>>>>>
>>>>>>  libraryDependencies += "org.apache.spark" %%
>>>>>> "spark-streaming-kinesis-asl" % "1.3.0"
>>>>>>
>>>>>>  I think that may get you a little closer, though I think you're
>>>>>> probably going to run into the same problems I ran into in this thread:
>>>>>> https://www.mail-archive.com/user@spark.apache.org/msg23891.html  I
>>>>>> never really got an answer for that, and I temporarily moved on to other
>>>>>> things for now.
>>>>>>
>>>>>>
>>>>>>  ~ Jonathan Kelly
>>>>>>
>>>>>>   From: 'Vadim Bichutskiy' <vadim.bichuts...@gmail.com>
>>>>>> Date: Thursday, April 2, 2015 at 9:53 AM
>>>>>> To: "user@spark.apache.org" <user@spark.apache.org>
>>>>>> Subject: Spark + Kinesis
>>>>>>
>>>>>>   Hi all,
>>>>>>
>>>>>>  I am trying to write an Amazon Kinesis consumer Scala app that
>>>>>> processes data in the
>>>>>> Kinesis stream. Is this the correct way to specify *build.sbt*:
>>>>>>
>>>>>>  -------
>>>>>> *import AssemblyKeys._*
>>>>>> *name := "Kinesis Consumer"*
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *version := "1.0" organization := "com.myconsumer" scalaVersion :=
>>>>>> "2.11.5" libraryDependencies ++= Seq("org.apache.spark" % 
>>>>>> "spark-core_2.10"
>>>>>> % "1.3.0" % "provided", "org.apache.spark" % "spark-streaming_2.10" %
>>>>>> "1.3.0" "org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % 
>>>>>> "1.3.0")*
>>>>>>
>>>>>>
>>>>>>
>>>>>> * assemblySettings jarName in assembly :=  "consumer-assembly.jar"
>>>>>> assemblyOption in assembly := (assemblyOption in
>>>>>> assembly).value.copy(includeScala=false)*
>>>>>> --------
>>>>>>
>>>>>>  In *project/assembly.sbt* I have only the following line:
>>>>>>
>>>>>>  *addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")*
>>>>>>
>>>>>>  I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark
>>>>>> book.
>>>>>>
>>>>>>  Thanks,
>>>>>> Vadim
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to