Thanks Chris! I was just looking to get back to Spark + Kinesis
integration. Will be in touch shortly.

Vadim
ᐧ

On Sun, May 10, 2015 at 12:14 AM, Chris Fregly <ch...@fregly.com> wrote:

> hey vadim-
>
> sorry for the delay.
>
> if you're interested in trying to get Kinesis working one-on-one, shoot me
> a direct email and we'll get it going off-list.
>
> we can circle back and summarize our findings here.
>
> lots of people are using Spark Streaming+Kinesis successfully.
>
> would love to help you through this - albeit a month later!  the goal is
> to have this working out of the box, so i'd like to implement anything i
> can do to make that happen.
>
> lemme know.
>
> btw, Spark 1.4 will have some improvements to the Kinesis Spark Streaming.
>
> TD and I have been working together on this.
>
> thanks!
>
> -chris
>
> On Tue, Apr 7, 2015 at 6:17 PM, Vadim Bichutskiy <
> vadim.bichuts...@gmail.com> wrote:
>
>> Hey y'all,
>>
>> While I haven't been able to get Spark + Kinesis integration working, I
>> pivoted to plan B: I now push data to S3 where I set up a DStream to
>> monitor an S3 bucket with textFileStream, and that works great.
>>
>> I <3 Spark!
>>
>> Best,
>> Vadim
>>
>>
>> ᐧ
>>
>> On Mon, Apr 6, 2015 at 12:23 PM, Vadim Bichutskiy <
>> vadim.bichuts...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I am wondering, has anyone on this list been able to successfully
>>> implement Spark on top of Kinesis?
>>>
>>> Best,
>>> Vadim
>>>
>>> On Sun, Apr 5, 2015 at 1:50 PM, Vadim Bichutskiy <
>>> vadim.bichuts...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Below is the output that I am getting. My Kinesis stream has 1 shard,
>>>> and my Spark cluster on EC2 has 2 slaves (I think that's fine?).
>>>> I should mention that my Kinesis producer is written in Python where I
>>>> followed the example
>>>> http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python
>>>>
>>>> I also wrote a Python consumer, again using the example at the above
>>>> link, that works fine. But I am unable to display output from my Spark
>>>> consumer.
>>>>
>>>> I'd appreciate any help.
>>>>
>>>> Thanks,
>>>> Vadim
>>>>
>>>> -------------------------------------------
>>>>
>>>> Time: 1428254090000 ms
>>>>
>>>> -------------------------------------------
>>>>
>>>>
>>>> 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming
>>>> job 1428254090000 ms.0 from job set of time 1428254090000 ms
>>>>
>>>> 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for
>>>> time 1428254090000 ms (execution: 0.090 s)
>>>>
>>>> 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from
>>>> persistence list
>>>>
>>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63
>>>>
>>>> 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from
>>>> persistence list
>>>>
>>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62
>>>>
>>>> 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from
>>>> persistence list
>>>>
>>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61
>>>>
>>>> 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence
>>>> list
>>>>
>>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60
>>>>
>>>> 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence
>>>> list
>>>>
>>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59
>>>>
>>>> 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks
>>>> of RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time
>>>> 1428254090000 ms
>>>>
>>>> ***********
>>>>
>>>> 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches
>>>> ArrayBuffer(1428254070000 ms)
>>>> On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy <
>>>> vadim.bichuts...@gmail.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> More good news! I was able to utilize mergeStrategy to assembly my
>>>>> Kinesis consumer into an "uber jar"
>>>>>
>>>>> Here's what I added to* build.sbt:*
>>>>>
>>>>> *mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>*
>>>>> *  {*
>>>>> *  case PathList("com", "esotericsoftware", "minlog", xs @ _*) =>
>>>>> MergeStrategy.first*
>>>>> *  case PathList("com", "google", "common", "base", xs @ _*) =>
>>>>> MergeStrategy.first*
>>>>> *  case PathList("org", "apache", "commons", xs @ _*) =>
>>>>> MergeStrategy.last*
>>>>> *  case PathList("org", "apache", "hadoop", xs @ _*) =>
>>>>> MergeStrategy.first*
>>>>> *  case PathList("org", "apache", "spark", "unused", xs @ _*) =>
>>>>> MergeStrategy.first*
>>>>> *        case x => old(x)*
>>>>> *  }*
>>>>> *}*
>>>>>
>>>>> Everything appears to be working fine. Right now my producer is
>>>>> pushing simple strings through Kinesis,
>>>>> which my consumer is trying to print (using Spark's print() method for
>>>>> now).
>>>>>
>>>>> However, instead of displaying my strings, I get the following:
>>>>>
>>>>> *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting
>>>>> batches ArrayBuffer(1428173848000 ms)*
>>>>>
>>>>> Any idea on what might be going on?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Vadim
>>>>>
>>>>> Here's my consumer code (adapted from the WordCount example):
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *private object MyConsumer extends Logging {  def main(args:
>>>>> Array[String]) {    /* Check that all required args were passed in. */
>>>>> if (args.length < 2) {      System.err.println(        """          
>>>>> |Usage:
>>>>> KinesisWordCount <stream-name> <endpoint-url>          |    <stream-name>
>>>>> is the name of the Kinesis stream          |    <endpoint-url> is the
>>>>> endpoint of the Kinesis service          |                   (e.g.
>>>>> https://kinesis.us-east-1.amazonaws.com
>>>>> <https://kinesis.us-east-1.amazonaws.com>)        """.stripMargin)
>>>>> System.exit(1)    }    /* Populate the appropriate variables from the 
>>>>> given
>>>>> args */    val Array(streamName, endpointUrl) = args    /* Determine the
>>>>> number of shards from the stream */    val kinesisClient = new
>>>>> AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
>>>>> kinesisClient.setEndpoint(endpointUrl)    val numShards =
>>>>> kinesisClient.describeStream(streamName).getStreamDescription().getShards()
>>>>>     .size()    System.out.println("Num shards: " + numShards)    /* In 
>>>>> this
>>>>> example, we're going to create 1 Kinesis Worker/Receiver/DStream for each
>>>>> shard. */    val numStreams = numShards    /* Setup the and SparkConfig 
>>>>> and
>>>>> StreamingContext */    /* Spark Streaming batch interval */    val
>>>>> batchInterval = Milliseconds(2000)    val sparkConfig = new
>>>>> SparkConf().setAppName("MyConsumer")    val ssc = new
>>>>> StreamingContext(sparkConfig, batchInterval)    /* Kinesis checkpoint
>>>>> interval.  Same as batchInterval for this example. */    val
>>>>> kinesisCheckpointInterval = batchInterval    /* Create the same number of
>>>>> Kinesis DStreams/Receivers as Kinesis stream's shards */    val
>>>>> kinesisStreams = (0 until numStreams).map { i =>
>>>>> KinesisUtils.createStream(ssc, streamName, endpointUrl,
>>>>> kinesisCheckpointInterval,          InitialPositionInStream.LATEST,
>>>>> StorageLevel.MEMORY_AND_DISK_2)    }    /* Union all the streams */    val
>>>>> unionStreams  = ssc.union(kinesisStreams).map(byteArray => new
>>>>> String(byteArray))    unionStreams.print()    ssc.start()
>>>>> ssc.awaitTermination()  }}*
>>>>>
>>>>>
>>>>> On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das <t...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> Just remove "provided" for spark-streaming-kinesis-asl
>>>>>>
>>>>>> libraryDependencies += "org.apache.spark" %%
>>>>>> "spark-streaming-kinesis-asl" % "1.3.0"
>>>>>>
>>>>>> On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy <
>>>>>> vadim.bichuts...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks. So how do I fix it?
>>>>>>>
>>>>>>> On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan <jonat...@amazon.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>>   spark-streaming-kinesis-asl is not part of the Spark
>>>>>>>> distribution on your cluster, so you cannot have it be just a 
>>>>>>>> "provided"
>>>>>>>> dependency.  This is also why the KCL and its dependencies were not
>>>>>>>> included in the assembly (but yes, they should be).
>>>>>>>>
>>>>>>>>
>>>>>>>>  ~ Jonathan Kelly
>>>>>>>>
>>>>>>>>   From: Vadim Bichutskiy <vadim.bichuts...@gmail.com>
>>>>>>>> Date: Friday, April 3, 2015 at 12:26 PM
>>>>>>>> To: Jonathan Kelly <jonat...@amazon.com>
>>>>>>>> Cc: "user@spark.apache.org" <user@spark.apache.org>
>>>>>>>> Subject: Re: Spark + Kinesis
>>>>>>>>
>>>>>>>>   Hi all,
>>>>>>>>
>>>>>>>>  Good news! I was able to create a Kinesis consumer and assemble
>>>>>>>> it into an "uber jar" following
>>>>>>>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>>>>>>> <http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fstreaming-kinesis-integration.html&si=5533377798602752&pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9>
>>>>>>>> and example
>>>>>>>> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
>>>>>>>> <http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fextras%2Fkinesis-asl%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FKinesisWordCountASL.scala&si=5533377798602752&pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9>
>>>>>>>> .
>>>>>>>>
>>>>>>>>  However when I try to spark-submit it I get the following
>>>>>>>> exception:
>>>>>>>>
>>>>>>>>  *Exception in thread "main" java.lang.NoClassDefFoundError:
>>>>>>>> com/amazonaws/auth/AWSCredentialsProvider*
>>>>>>>>
>>>>>>>>  Do I need to include KCL dependency in *build.sbt*, here's what
>>>>>>>> it looks like currently:
>>>>>>>>
>>>>>>>>  import AssemblyKeys._
>>>>>>>> name := "Kinesis Consumer"
>>>>>>>> version := "1.0"
>>>>>>>> organization := "com.myconsumer"
>>>>>>>> scalaVersion := "2.11.5"
>>>>>>>>
>>>>>>>>  libraryDependencies += "org.apache.spark" %% "spark-core" %
>>>>>>>> "1.3.0" % "provided"
>>>>>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" %
>>>>>>>> "1.3.0" % "provided"
>>>>>>>> libraryDependencies += "org.apache.spark" %%
>>>>>>>> "spark-streaming-kinesis-asl" % "1.3.0" % "provided"
>>>>>>>>
>>>>>>>>  assemblySettings
>>>>>>>> jarName in assembly :=  "consumer-assembly.jar"
>>>>>>>> assemblyOption in assembly := (assemblyOption in
>>>>>>>> assembly).value.copy(includeScala=false)
>>>>>>>>
>>>>>>>>  Any help appreciated.
>>>>>>>>
>>>>>>>>  Thanks,
>>>>>>>> Vadim
>>>>>>>>
>>>>>>>> On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan <
>>>>>>>> jonat...@amazon.com> wrote:
>>>>>>>>
>>>>>>>>>  It looks like you're attempting to mix Scala versions, so that's
>>>>>>>>> going to cause some problems.  If you really want to use Scala 
>>>>>>>>> 2.11.5, you
>>>>>>>>> must also use Spark package versions built for Scala 2.11 rather than
>>>>>>>>> 2.10.  Anyway, that's not quite the correct way to specify Scala
>>>>>>>>> dependencies in build.sbt.  Instead of placing the Scala version 
>>>>>>>>> after the
>>>>>>>>> artifactId (like "spark-core_2.10"), what you actually want is to use 
>>>>>>>>> just
>>>>>>>>> "spark-core" with two percent signs before it.  Using two percent 
>>>>>>>>> signs
>>>>>>>>> will make it use the version of Scala that matches your declared
>>>>>>>>> scalaVersion.  For example:
>>>>>>>>>
>>>>>>>>>  libraryDependencies += "org.apache.spark" %% "spark-core" %
>>>>>>>>> "1.3.0" % "provided"
>>>>>>>>>
>>>>>>>>>  libraryDependencies += "org.apache.spark" %% "spark-streaming" %
>>>>>>>>> "1.3.0" % "provided"
>>>>>>>>>
>>>>>>>>>  libraryDependencies += "org.apache.spark" %%
>>>>>>>>> "spark-streaming-kinesis-asl" % "1.3.0"
>>>>>>>>>
>>>>>>>>>  I think that may get you a little closer, though I think you're
>>>>>>>>> probably going to run into the same problems I ran into in this 
>>>>>>>>> thread:
>>>>>>>>> https://www.mail-archive.com/user@spark.apache.org/msg23891.html
>>>>>>>>> I never really got an answer for that, and I temporarily moved on to 
>>>>>>>>> other
>>>>>>>>> things for now.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  ~ Jonathan Kelly
>>>>>>>>>
>>>>>>>>>   From: 'Vadim Bichutskiy' <vadim.bichuts...@gmail.com>
>>>>>>>>> Date: Thursday, April 2, 2015 at 9:53 AM
>>>>>>>>> To: "user@spark.apache.org" <user@spark.apache.org>
>>>>>>>>> Subject: Spark + Kinesis
>>>>>>>>>
>>>>>>>>>   Hi all,
>>>>>>>>>
>>>>>>>>>  I am trying to write an Amazon Kinesis consumer Scala app that
>>>>>>>>> processes data in the
>>>>>>>>> Kinesis stream. Is this the correct way to specify *build.sbt*:
>>>>>>>>>
>>>>>>>>>  -------
>>>>>>>>> *import AssemblyKeys._*
>>>>>>>>> *name := "Kinesis Consumer"*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *version := "1.0" organization := "com.myconsumer" scalaVersion :=
>>>>>>>>> "2.11.5" libraryDependencies ++= Seq("org.apache.spark" % 
>>>>>>>>> "spark-core_2.10"
>>>>>>>>> % "1.3.0" % "provided", "org.apache.spark" % "spark-streaming_2.10" %
>>>>>>>>> "1.3.0" "org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % 
>>>>>>>>> "1.3.0")*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> * assemblySettings jarName in assembly :=  "consumer-assembly.jar"
>>>>>>>>> assemblyOption in assembly := (assemblyOption in
>>>>>>>>> assembly).value.copy(includeScala=false)*
>>>>>>>>> --------
>>>>>>>>>
>>>>>>>>>  In *project/assembly.sbt* I have only the following line:
>>>>>>>>>
>>>>>>>>>  *addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")*
>>>>>>>>>
>>>>>>>>>  I am using sbt 0.13.7. I adapted Example 7.7 in the Learning
>>>>>>>>> Spark book.
>>>>>>>>>
>>>>>>>>>  Thanks,
>>>>>>>>> Vadim
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to