Re: listening to recursive folder structures in s3 using pyspark streaming (textFileStream)

2016-02-17 Thread Shixiong(Ryan) Zhu
textFileStream doesn't support that. It only supports monitoring one folder.

On Wed, Feb 17, 2016 at 7:20 AM, in4maniac  wrote:

> Hi all,
>
> I am new to pyspark streaming and I was following a tutorial I saw in the
> internet
> (
> https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/network_wordcount.py
> ).
> But I replaced the data input with an s3 directory path as:
>
> lines = ssc.textFileStream("s3n://bucket/first/second/third1/")
>
> When I run the code and upload a file to s3n://bucket/first/second/third1/
> (such as s3n://bucket/first/second/third1/test1.txt), the file gets
> processed as expected.
>
> Now I want it to listen to multiple directories and process files if they
> get uploaded to any of the directories:
> for example : [s3n://bucket/first/second/third1/,
> s3n://bucket/first/second/third2/ and s3n://bucket/first/second/third3/]
>
> I tried to use the pattern similar to sc.TextFile as :
>
> lines = ssc.textFileStream("s3n://bucket/first/second/*/")
>
> But this actually didn't work. Can someone please explain to me how I could
> achieve my objective?
>
> thanks in advance !!!
>
> in4maniac
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/listening-to-recursive-folder-structures-in-s3-using-pyspark-streaming-textFileStream-tp26247.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: SparkListener onApplicationEnd processing an RDD throws exception because of stopped SparkContext

2016-02-17 Thread Shixiong(Ryan) Zhu
`onApplicationEnd` is posted when SparkContext is stopping, and you cannot
submit any job to a stopping SparkContext. In general, SparkListener is
used to monitor the job progress and collect job information, an you should
not submit jobs there. Why not submit your jobs in the main thread?

On Wed, Feb 17, 2016 at 7:11 AM, Sumona Routh  wrote:

> Can anyone provide some insight into the flow of SparkListeners,
> specifically onApplicationEnd? I'm having issues with the SparkContext
> being stopped before my final processing can complete.
>
> Thanks!
> Sumona
>
> On Mon, Feb 15, 2016 at 8:59 AM Sumona Routh  wrote:
>
>> Hi there,
>> I am trying to implement a listener that performs as a post-processor
>> which stores data about what was processed or erred. With this, I use an
>> RDD that may or may not change during the course of the application.
>>
>> My thought was to use onApplicationEnd and then saveToCassandra call to
>> persist this.
>>
>> From what I've gathered in my experiments,  onApplicationEnd  doesn't get
>> called until sparkContext.stop() is called. If I don't call stop in my
>> code, the listener won't be called. This works fine on my local tests -
>> stop gets called, the listener is called and then persisted to the db, and
>> everything works fine. However when I run this on our server,  the code in
>> onApplicationEnd throws the following exception:
>>
>> Task serialization failed: java.lang.IllegalStateException: Cannot call
>> methods on a stopped SparkContext
>>
>> What's the best way to resolve this? I can think of creating a new
>> SparkContext in the listener (I think I have to turn on allowing multiple
>> contexts, in case I try to create one before the other one is stopped). It
>> seems odd but might be doable. Additionally, what if I were to simply add
>> the code into my job in some sort of procedural block: doJob,
>> doPostProcessing, does that guarantee postProcessing will occur after the
>> other?
>>
>> We are currently using Spark 1.2 standalone at the moment.
>>
>> Please let me know if you require more details. Thanks for the assistance!
>> Sumona
>>
>>


Re: Skip empty batches - spark streaming

2016-02-11 Thread Shixiong(Ryan) Zhu
Are you using a custom input dstream? If so, you can make the `compute`
method return None to skip a batch.

On Thu, Feb 11, 2016 at 1:03 PM, Sebastian Piu 
wrote:

> I was wondering if there is there any way to skip batches with zero events
> when streaming?
> By skip I mean avoid the empty rdd from being created at all?
>


Re: Skip empty batches - spark streaming

2016-02-11 Thread Shixiong(Ryan) Zhu
Yeah, DirectKafkaInputDStream always returns a RDD even if it's empty. Feel
free to send a PR to improve it.

On Thu, Feb 11, 2016 at 1:09 PM, Sebastian Piu <sebastian@gmail.com>
wrote:

> I'm using the Kafka direct stream api but I can have a look on extending
> it to have this behaviour
>
> Thanks!
> On 11 Feb 2016 9:07 p.m., "Shixiong(Ryan) Zhu" <shixi...@databricks.com>
> wrote:
>
>> Are you using a custom input dstream? If so, you can make the `compute`
>> method return None to skip a batch.
>>
>> On Thu, Feb 11, 2016 at 1:03 PM, Sebastian Piu <sebastian@gmail.com>
>> wrote:
>>
>>> I was wondering if there is there any way to skip batches with zero
>>> events when streaming?
>>> By skip I mean avoid the empty rdd from being created at all?
>>>
>>
>>


Re: Spark Streaming : Limiting number of receivers per executor

2016-02-10 Thread Shixiong(Ryan) Zhu
You can't. The number of cores must be great than the number of receivers.

On Wed, Feb 10, 2016 at 2:34 AM, ajay garg  wrote:

> Hi All,
>  I am running 3 executors in my spark streaming application with 3
> cores per executors. I have written my custom receiver for receiving
> network
> data.
>
> In my current configuration I am launching 3 receivers , one receiver per
> executor.
>
> In the run if 2 of my executor dies, I am left with only one executor and
> all 3 receivers are scheduled on that executor. Since this executor has
> only
> 3 cores and all cores are busy running 3 receivers, Action on accumulated
> window data(DStream) is not scheduled and my application hangs.
>
> Is there a way to restrict number of receivers per executor so that I am
> always left with some core to run action on DStream.
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Limiting-number-of-receivers-per-executor-tp26192.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: [Spark 1.5+] ReceiverTracker seems not to stop Kinesis receivers

2016-02-09 Thread Shixiong(Ryan) Zhu
Could you do a thread dump in the executor that runs the Kinesis receiver
and post it? It would be great if you can provide the executor log as well?

On Tue, Feb 9, 2016 at 3:14 PM, Roberto Coluccio  wrote:

> Hello,
>
> can anybody kindly help me out a little bit here? I just verified the
> problem is still there on Spark 1.6.0 and emr-4.3.0 as well. It's
> definitely a Kinesis-related issue, since with Spark 1.6.0 I'm successfully
> able to get Streaming drivers to terminate with no issue IF I don't use
> Kinesis and open any Receivers.
>
> Thank you!
>
> Roberto
>
>
> On Tue, Feb 2, 2016 at 4:40 PM, Roberto Coluccio <
> roberto.coluc...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm struggling around an issue ever since I tried to upgrade my Spark
>> Streaming solution from 1.4.1 to 1.5+.
>>
>> I have a Spark Streaming app which creates 3 ReceiverInputDStreams
>> leveraging KinesisUtils.createStream API.
>>
>> I used to leverage a timeout to terminate my app
>> (StreamingContext.awaitTerminationOrTimeout(timeout)) gracefully (SparkConf
>> spark.streaming.stopGracefullyOnShutdown=true).
>>
>> I used to submit my Spark app on EMR in yarn-cluster mode.
>>
>> Everything worked fine up to Spark 1.4.1 (on EMR AMI 3.9).
>>
>> Since I upgraded (tried with Spark 1.5.2 on emr-4.2.0 and Spark 1.6.0 on
>> emr-4.3.0) I can't get the app to actually terminate. Logs tells me it
>> tries to, but no confirmation of receivers stop is retrieved. Instead, when
>> the timer gets to the next period, the StreamingContext continues its
>> processing for a while (then it gets killed with a SIGTERM 15. YARN's vmem
>> and pmem killls disabled).
>>
>> ...
>>
>> 16/02/02 21:22:08 INFO ApplicationMaster: Final app status: SUCCEEDED, 
>> exitCode: 0
>> 16/02/02 21:22:08 INFO StreamingContext: Invoking stop(stopGracefully=true) 
>> from shutdown hook
>> 16/02/02 21:22:08 INFO ReceiverTracker: Sent stop signal to all 3 receivers
>> 16/02/02 21:22:18 INFO ReceiverTracker: Waiting for receiver job to 
>> terminate gracefully
>> 16/02/02 21:22:52 INFO ContextCleaner: Cleaned shuffle 141
>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
>> 172.31.3.140:50152 in memory (size: 23.9 KB, free: 2.1 GB)
>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
>> ip-172-31-3-141.ec2.internal:41776 in memory (size: 23.9 KB, free: 1224.9 MB)
>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
>> ip-172-31-3-140.ec2.internal:36295 in memory (size: 23.9 KB, free: 1224.0 MB)
>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
>> ip-172-31-3-141.ec2.internal:56428 in memory (size: 23.9 KB, free: 1224.9 MB)
>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
>> ip-172-31-3-140.ec2.internal:50542 in memory (size: 23.9 KB, free: 1224.7 MB)
>> 16/02/02 21:22:52 INFO ContextCleaner: Cleaned accumulator 184
>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on 
>> 172.31.3.140:50152 in memory (size: 3.0 KB, free: 2.1 GB)
>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on 
>> ip-172-31-3-141.ec2.internal:41776 in memory (size: 3.0 KB, free: 1224.9 MB)
>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on 
>> ip-172-31-3-141.ec2.internal:56428 in memory (size: 3.0 KB, free: 1224.9 MB)
>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on 
>> ip-172-31-3-140.ec2.internal:36295 in memory (size: 3.0 KB, free: 1224.0 MB)
>> 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on 
>> ip-172-31-3-140.ec2.internal:50542 in memory (size: 3.0 KB, free: 1224.7 MB)
>> 16/02/02 21:25:00 INFO StateDStream: Marking RDD 680 for time 145444830 
>> ms for checkpointing
>> 16/02/02 21:25:00 INFO StateDStream: Marking RDD 708 for time 145444830 
>> ms for checkpointing
>> 16/02/02 21:25:00 INFO TransformedDStream: Slicing from 145444800 ms to 
>> 145444830 ms (aligned to 145444800 ms and 145444830 ms)
>> 16/02/02 21:25:00 INFO StateDStream: Marking RDD 777 for time 145444830 
>> ms for checkpointing
>> 16/02/02 21:25:00 INFO StateDStream: Marking RDD 801 for time 145444830 
>> ms for checkpointing
>> 16/02/02 21:25:00 INFO JobScheduler: Added jobs for time 145444830 ms
>> 16/02/02 21:25:00 INFO JobGenerator: Checkpointing graph for time 
>> 145444830 ms
>> 16/02/02 21:25:00 INFO DStreamGraph: Updating checkpoint data for time 
>> 145444830 ms
>> 16/02/02 21:25:00 INFO JobScheduler: Starting job streaming job 
>> 145444830 ms.0 from job set of time 145444830 ms
>>
>> ...
>>
>>
>> Please, this is really blocking in the upgrade process to latest Spark
>> versions and I really don't know how to work it around.
>>
>> Any help would be very much appreciated.
>>
>> Thank you,
>>
>> Roberto
>>
>>
>>
>


Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

2016-02-04 Thread Shixiong(Ryan) Zhu
I guess it may be some dead-lock in BlockGenerator. Could you check it by
yourself?

On Thu, Feb 4, 2016 at 4:14 PM, Udo Fholl <udofholl1...@gmail.com> wrote:

> Thank you for your response
>
> Unfortunately I cannot share  a thread dump. What are you looking for
> exactly?
>
> Here is the list of the 50 biggest objects (retained size order,
> descendent):
>
> java.util.concurrent.ArrayBlockingQueue#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> scala.concurrent.forkjoin.ForkJoinPool#
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.storage.MemoryStore#
> java.util.LinkedHashMap#
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
> scala.concurrent.forkjoin.ForkJoinTask[]#
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
> scala.concurrent.forkjoin.ForkJoinTask[]#
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
> scala.concurrent.forkjoin.ForkJoinTask[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> scala.collection.Iterator$$anon$
> org.apache.spark.InterruptibleIterator#
> scala.collection.IndexedSeqLike$Elements#
> scala.collection.mutable.ArrayOps$ofRef#
> java.lang.Object[]#
>
>
>
>
> On Thu, Feb 4, 2016 at 7:14 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Hey Udo,
>>
>> mapWithState usually uses much more memory than updateStateByKey since it
>> caches the states in memory.
>>
>> However, from your description, looks BlockGenerator cannot push data
>> into BlockManager, there may be something wrong in BlockGenerator. Could
>> you share the top 50 objects in the heap dump and the thread dump?
>>
>>
>> On Wed, Feb 3, 2016 at 9:52 AM, Udo Fholl <udofholl1...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I recently migrated from 'updateStateByKey' to 'mapWithState' and now I
>>> see a huge increase of memory. Most of it is a massive "BlockGenerator"
>>> (which points to a massive "ArrayBlockingQueue" that in turns point to a
>>> huge "Object[]").
>>>
>>> I'm pretty sure it has to do with my code, but I barely changed anything
>>> in the code. Just adapted the function.
>>>
>>> Did anyone run into this?
>>>
>>> Best regards,
>>> Udo.
>>>
>>
>>
>


Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

2016-02-04 Thread Shixiong(Ryan) Zhu
Hey Udo,

mapWithState usually uses much more memory than updateStateByKey since it
caches the states in memory.

However, from your description, looks BlockGenerator cannot push data into
BlockManager, there may be something wrong in BlockGenerator. Could you
share the top 50 objects in the heap dump and the thread dump?


On Wed, Feb 3, 2016 at 9:52 AM, Udo Fholl  wrote:

> Hi all,
>
> I recently migrated from 'updateStateByKey' to 'mapWithState' and now I
> see a huge increase of memory. Most of it is a massive "BlockGenerator"
> (which points to a massive "ArrayBlockingQueue" that in turns point to a
> huge "Object[]").
>
> I'm pretty sure it has to do with my code, but I barely changed anything
> in the code. Just adapted the function.
>
> Did anyone run into this?
>
> Best regards,
> Udo.
>


Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Shixiong(Ryan) Zhu
Thanks for reporting it. I will take a look.

On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov  wrote:

> Hi,
> I've been playing with the expiramental PairDStreamFunctions.mapWithState
> feature and I've seem to have stumbled across a bug, and was wondering if
> anyone else has been seeing this behavior.
>
> I've opened up an issue in the Spark JIRA, I simply want to pass this along
> in case anyone else is experiencing such a failure or perhaps someone has
> insightful information if this is actually a bug:  SPARK-13195
> 
>
> Using the new spark mapWithState API, I've encountered a bug when setting a
> timeout for mapWithState but no explicit state handling.
>
> h1. Steps to reproduce:
>
> 1. Create a method which conforms to the StateSpec signature, make sure to
> not update any state inside it using *state.update*. Simply create a "pass
> through" method, may even be empty.
> 2. Create a StateSpec object with method from step 1, which explicitly sets
> a timeout using *StateSpec.timeout* method.
> 3. Create a DStream pipeline that uses mapWithState with the given
> StateSpec.
> 4. Run code using spark-submit. You'll see that the method ends up throwing
> the following exception:
>
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in
> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 136.0 (TID 176, ): java.util.NoSuchElementException: State is not set
> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
>
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> at
>
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
>
> h1. Sample code to reproduce the issue:
>
> {code:Title=MainObject}
> import org.apache.spark.streaming._
> import org.apache.spark.{SparkConf, SparkContext}
> /**
>   * Created by yuvali on 04/02/2016.
>   */
> object Program {
>
>   def main(args: Array[String]): Unit = {
>
> val sc = new SparkConf().setAppName("mapWithState bug reproduce")
> val sparkContext = new SparkContext(sc)
>
> val ssc = new StreamingContext(sparkContext, Seconds(4))
> val stateSpec = StateSpec.function(trackStateFunc
> _).timeout(Seconds(60))
>
> // Create a stream that generates 1000 lines per second
> val stream = ssc.receiverStream(new DummySource(10))
>
> // Split the lines into words, and create a paired (key-value) dstream
> val wordStream = stream.flatMap {
>   _.split(" ")
> }.map(word => (word, 1))
>
> // This represents the emitted stream from the trackStateFunc. Since we
> emit every input record with the updated value,
> // this stream will contain the same # of records as the input dstream.
> val wordCountStateStream = wordStream.mapWithState(stateSpec)
> wordCountStateStream.print()
>
> ssc.remember(Minutes(1)) // To make sure data is not deleted by the
> time
> we query it interactively
>
> // Don't forget to set checkpoint directory
> ssc.checkpoint("")
> ssc.start()
> ssc.awaitTermination()
>   }
>
>   def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
> state: State[Long]): Option[(String, Long)] = {
> val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
> val output = (key, sum)
> Some(output)
>   }
> }
> {code}
>
> {code:Title=DummySource}
>
> /**
>   * Created by yuvali on 04/02/2016.
>   */
>
> import org.apache.spark.storage.StorageLevel
> import scala.util.Random
> import 

Re: local class incompatible: stream classdesc serialVersionUID

2016-02-01 Thread Shixiong(Ryan) Zhu
I guess he used client model and the local Spark version is 1.5.2 but the
standalone Spark version is 1.5.1. In other words, he used a 1.5.2 driver
to talk with 1.5.1 executors.

On Mon, Feb 1, 2016 at 2:08 PM, Holden Karau  wrote:

> So I'm a little confused to exactly how this might have happened - but one
> quick guess is that maybe you've built an assembly jar with Spark core, can
> you mark it is a provided and or post your build file?
>
> On Fri, Jan 29, 2016 at 7:35 AM, Ted Yu  wrote:
>
>> I logged SPARK-13084
>>
>> For the moment, please consider running with 1.5.2 on all the nodes.
>>
>> On Fri, Jan 29, 2016 at 5:29 AM, Jason Plurad  wrote:
>>
>>> I agree with you, Ted, if RDD had a serial version UID this might not be
>>> an issue. So that could be a JIRA to submit to help avoid version
>>> mismatches in future Spark versions, but that doesn't help my current
>>> situation between 1.5.1 and 1.5.2.
>>>
>>> Any other ideas? Thanks.
>>> On Thu, Jan 28, 2016 at 5:06 PM Ted Yu  wrote:
>>>
 I am not Scala expert.

 RDD extends Serializable but doesn't have @SerialVersionUID()
 annotation.
 This may explain what you described.

 One approach is to add @SerialVersionUID so that RDD's have stable
 serial version UID.

 Cheers

 On Thu, Jan 28, 2016 at 1:38 PM, Jason Plurad 
 wrote:

> I've searched through the mailing list archive. It seems that if you
> try to run, for example, a Spark 1.5.2 program against a Spark 1.5.1
> standalone server, you will run into an exception like this:
>
> WARN  org.apache.spark.scheduler.TaskSetManager  - Lost task 0.0 in
> stage 0.0 (TID 0, 192.168.14.103): java.io.InvalidClassException:
> org.apache.spark.rdd.RDD; local class incompatible: stream classdesc
> serialVersionUID = -3343649307726848892, local class serialVersionUID =
> -3996494161745401652
>
> If my application is using a library that builds against Spark 1.5.2,
> does that mean that my application is now tied to that same Spark
> standalone server version?
>
> Is there a recommended way for that library to have a Spark dependency
> but keep it compatible against a wider set of versions, i.e. any version
> 1.5.x?
>
> Thanks!
>


>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


Re: Spark Streaming from existing RDD

2016-01-29 Thread Shixiong(Ryan) Zhu
Do you just want to write some unit tests? If so, you can use "queueStream"
to create a DStream from a queue of RDDs. However, because it doesn't
support metadata checkpointing, it's better to only use it in unit tests.

On Fri, Jan 29, 2016 at 7:35 AM, Sateesh Karuturi <
sateesh.karutu...@gmail.com> wrote:

> Anyone please  help me out how to create a DStream from existing RDD. My
> code is:
>
> JavaSparkContext ctx = new JavaSparkContext(conf);JavaRDD rddd = 
> ctx.parallelize(arraylist);
>
> Now i need to use these *rddd* as input to *JavaStreamingContext*.
>


Re: Spark streaming and ThreadLocal

2016-01-29 Thread Shixiong(Ryan) Zhu
It looks weird. Why don't you just pass "new SomeClass()" to "somefunc"?
You don't need to use ThreadLocal if there are no multiple threads in your
codes.

On Fri, Jan 29, 2016 at 4:39 PM, N B <nb.nos...@gmail.com> wrote:

> Fixed a typo in the code to avoid any confusion Please comment on the
> code below...
>
> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
>  public SomeClass initialValue() { return new SomeClass(); }
> };
> somefunc(p, d.get());
> d.remove();
> return p;
> }; );
>
> On Fri, Jan 29, 2016 at 4:32 PM, N B <nb.nos...@gmail.com> wrote:
>
>> So this use of ThreadLocal will be inside the code of a function
>> executing on the workers i.e. within a call from one of the lambdas. Would
>> it just look like this then:
>>
>> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
>>  public SomeClass initialValue() { return new SomeClass(); }
>> };
>> somefunc(p, d.get());
>> d.remove();
>> return p;
>> }; );
>>
>> Will this make sure that all threads inside the worker clean up the
>> ThreadLocal once they are done with processing this task?
>>
>> Thanks
>> NB
>>
>>
>> On Fri, Jan 29, 2016 at 1:00 PM, Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>>> Spark Streaming uses threadpools so you need to remove ThreadLocal when
>>> it's not used.
>>>
>>> On Fri, Jan 29, 2016 at 12:55 PM, N B <nb.nos...@gmail.com> wrote:
>>>
>>>> Thanks for the response Ryan. So I would say that it is in fact the
>>>> purpose of a ThreadLocal i.e. to have a copy of the variable as long as the
>>>> thread lives. I guess my concern is around usage of threadpools and whether
>>>> Spark streaming will internally create many threads that rotate between
>>>> tasks on purpose thereby holding onto ThreadLocals that may actually never
>>>> be used again.
>>>>
>>>> Thanks
>>>>
>>>> On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu <
>>>> shixi...@databricks.com> wrote:
>>>>
>>>>> Of cause. If you use a ThreadLocal in a long living thread and forget
>>>>> to remove it, it's definitely a memory leak.
>>>>>
>>>>> On Thu, Jan 28, 2016 at 9:31 PM, N B <nb.nos...@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Does anyone know if there are any potential pitfalls associated with
>>>>>> using ThreadLocal variables in a Spark streaming application? One things 
>>>>>> I
>>>>>> have seen mentioned in the context of app servers that use thread pools 
>>>>>> is
>>>>>> that ThreadLocals can leak memory. Could this happen in Spark streaming
>>>>>> also?
>>>>>>
>>>>>> Thanks
>>>>>> Nikunj
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Spark streaming and ThreadLocal

2016-01-29 Thread Shixiong(Ryan) Zhu
I see. Then you should use `mapPartitions` rather than using ThreadLocal.
E.g.,

dstream.mapPartitions( iter ->
val d = new SomeClass();
return iter.map { p =>
   somefunc(p, d.get())
};
}; );


On Fri, Jan 29, 2016 at 5:29 PM, N B <nb.nos...@gmail.com> wrote:

> Well won't the code in lambda execute inside multiple threads in the
> worker because it has to process many records? I would just want to have a
> single copy of SomeClass instantiated per thread rather than once per each
> record being processed. That was what triggered this thought anyways.
>
> Thanks
> NB
>
>
> On Fri, Jan 29, 2016 at 5:09 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> It looks weird. Why don't you just pass "new SomeClass()" to "somefunc"?
>> You don't need to use ThreadLocal if there are no multiple threads in your
>> codes.
>>
>> On Fri, Jan 29, 2016 at 4:39 PM, N B <nb.nos...@gmail.com> wrote:
>>
>>> Fixed a typo in the code to avoid any confusion Please comment on
>>> the code below...
>>>
>>> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
>>>  public SomeClass initialValue() { return new SomeClass(); }
>>> };
>>> somefunc(p, d.get());
>>> d.remove();
>>> return p;
>>> }; );
>>>
>>> On Fri, Jan 29, 2016 at 4:32 PM, N B <nb.nos...@gmail.com> wrote:
>>>
>>>> So this use of ThreadLocal will be inside the code of a function
>>>> executing on the workers i.e. within a call from one of the lambdas. Would
>>>> it just look like this then:
>>>>
>>>> dstream.map( p -> { ThreadLocal d = new ThreadLocal<>() {
>>>>  public SomeClass initialValue() { return new SomeClass(); }
>>>> };
>>>> somefunc(p, d.get());
>>>> d.remove();
>>>> return p;
>>>> }; );
>>>>
>>>> Will this make sure that all threads inside the worker clean up the
>>>> ThreadLocal once they are done with processing this task?
>>>>
>>>> Thanks
>>>> NB
>>>>
>>>>
>>>> On Fri, Jan 29, 2016 at 1:00 PM, Shixiong(Ryan) Zhu <
>>>> shixi...@databricks.com> wrote:
>>>>
>>>>> Spark Streaming uses threadpools so you need to remove ThreadLocal
>>>>> when it's not used.
>>>>>
>>>>> On Fri, Jan 29, 2016 at 12:55 PM, N B <nb.nos...@gmail.com> wrote:
>>>>>
>>>>>> Thanks for the response Ryan. So I would say that it is in fact the
>>>>>> purpose of a ThreadLocal i.e. to have a copy of the variable as long as 
>>>>>> the
>>>>>> thread lives. I guess my concern is around usage of threadpools and 
>>>>>> whether
>>>>>> Spark streaming will internally create many threads that rotate between
>>>>>> tasks on purpose thereby holding onto ThreadLocals that may actually 
>>>>>> never
>>>>>> be used again.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu <
>>>>>> shixi...@databricks.com> wrote:
>>>>>>
>>>>>>> Of cause. If you use a ThreadLocal in a long living thread and
>>>>>>> forget to remove it, it's definitely a memory leak.
>>>>>>>
>>>>>>> On Thu, Jan 28, 2016 at 9:31 PM, N B <nb.nos...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> Does anyone know if there are any potential pitfalls associated
>>>>>>>> with using ThreadLocal variables in a Spark streaming application? One
>>>>>>>> things I have seen mentioned in the context of app servers that use 
>>>>>>>> thread
>>>>>>>> pools is that ThreadLocals can leak memory. Could this happen in Spark
>>>>>>>> streaming also?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Nikunj
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: mapWithState: remove key

2016-01-29 Thread Shixiong(Ryan) Zhu
1. To remove a state, you need to call "state.remove()". If you return a
None in the function, it just means don't output it as the DStream's
output, but the state won't be removed if you don't call "state.remove()".

2. For NoSuchElementException, here is the doc for "State.get":

  /**
   * Get the state if it exists, otherwise it will throw
`java.util.NoSuchElementException`.
   * Check with `exists()` whether the state exists or not before calling
`get()`.
   *
   * @throws java.util.NoSuchElementException If the state does not exist.
   */




On Fri, Jan 29, 2016 at 10:45 AM, Udo Fholl  wrote:

> Hi,
>
> From the signature of the "mapWithState" method I infer that by returning
> a "None.type" (in Scala) the key is removed from the state. Is that so?
> Sorry if it is in the docs, but it wasn't entirely clear to me.
>
> I'm chaining operations and calling "mapWithState" twice (one to
> consolidate, then I perform some operations that might, or might not
> succeed, and invoke "mapWithState" again). I'm getting this error[1] which
> I suppose is because I'm returning "None" in the "mapWithState" function.
>
> Thank you.
>
> Best regards,
> Udo.
>
> [1]: java.util.NoSuchElementException: State is not set
> at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>


Re: Getting Exceptions/WARN during random runs for same dataset

2016-01-29 Thread Shixiong(Ryan) Zhu
It's a known issue. See https://issues.apache.org/jira/browse/SPARK-10719

On Thu, Jan 28, 2016 at 5:44 PM, Khusro Siddiqui  wrote:

> It is happening on random executors on random nodes. Not on any specific
> node everytime.
> Or not happening at all
>
> On Thu, Jan 28, 2016 at 7:42 PM, Ted Yu  wrote:
>
>> Did the UnsupportedOperationException's happen from the executors on all the
>> nodes or only one node ?
>>
>> Thanks
>>
>> On Thu, Jan 28, 2016 at 5:13 PM, Khusro Siddiqui 
>> wrote:
>>
>>> Hi Everyone,
>>>
>>> Environment used: Datastax Enterprise 4.8.3 which is bundled with Spark
>>> 1.4.1 and scala 2.10.5.
>>>
>>> I am using Dataframes to query Cassandra, do processing and store the
>>> result back into Cassandra. The job is being submitted using spark-submit
>>> on a cluster of 3 nodes. While doing so I get three WARN messages:
>>>
>>> WARN  2016-01-28 19:08:18 org.apache.spark.scheduler.TaskSetManager:
>>> Lost task 99.0 in stage 2.0 (TID 107, 10.2.1.82):
>>> java.io.InvalidClassException: org.apache.spark.sql.types.TimestampType$;
>>> unable to create instance
>>>
>>> Caused by: java.lang.reflect.InvocationTargetException
>>>
>>> Caused by: java.lang.UnsupportedOperationException: tail of empty list
>>>
>>>
>>> For example, if I am running the same job, for the same input set of
>>> data, say 20 times,
>>>
>>> - 11 times it will run successfully without any WARN messages
>>>
>>> - 4 times it will run successfully with the above messages
>>>
>>> - 6 times it will run successfully by randomly giving one or two of
>>> the exceptions above
>>>
>>>
>>> In all the 20 runs, the output data is coming as expected and there is
>>> no error in that. My concern is, why is it not giving these messages every
>>> time I do a spark-submit but only at times. Also, the stack trace does not
>>> point to any specific point in my line of code. Full stack trace is as
>>> follows. Please let me know if you need any other information
>>>
>>>
>>> WARN  2016-01-28 19:08:24 org.apache.spark.scheduler.TaskSetManager:
>>> Lost task 188.0 in stage 16.0 (TID 637, 10.2.1.82):
>>> java.io.InvalidClassException: org.apache.spark.sql.types.TimestampType$;
>>> unable to create instance
>>>
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1788)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>
>>> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
>>>
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>>
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>>
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>>
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>>
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>>
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>>
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>
>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>>>
>>> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>>>
>>> at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
>>>
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>>
>>> at
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>>>
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
>>>
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>>
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>>
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>>
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>>>
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>>>
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>>

Re: Spark streaming and ThreadLocal

2016-01-29 Thread Shixiong(Ryan) Zhu
Of cause. If you use a ThreadLocal in a long living thread and forget to
remove it, it's definitely a memory leak.

On Thu, Jan 28, 2016 at 9:31 PM, N B  wrote:

> Hello,
>
> Does anyone know if there are any potential pitfalls associated with using
> ThreadLocal variables in a Spark streaming application? One things I have
> seen mentioned in the context of app servers that use thread pools is that
> ThreadLocals can leak memory. Could this happen in Spark streaming also?
>
> Thanks
> Nikunj
>
>


Re: Spark streaming and ThreadLocal

2016-01-29 Thread Shixiong(Ryan) Zhu
Spark Streaming uses threadpools so you need to remove ThreadLocal when
it's not used.

On Fri, Jan 29, 2016 at 12:55 PM, N B <nb.nos...@gmail.com> wrote:

> Thanks for the response Ryan. So I would say that it is in fact the
> purpose of a ThreadLocal i.e. to have a copy of the variable as long as the
> thread lives. I guess my concern is around usage of threadpools and whether
> Spark streaming will internally create many threads that rotate between
> tasks on purpose thereby holding onto ThreadLocals that may actually never
> be used again.
>
> Thanks
>
> On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Of cause. If you use a ThreadLocal in a long living thread and forget to
>> remove it, it's definitely a memory leak.
>>
>> On Thu, Jan 28, 2016 at 9:31 PM, N B <nb.nos...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Does anyone know if there are any potential pitfalls associated with
>>> using ThreadLocal variables in a Spark streaming application? One things I
>>> have seen mentioned in the context of app servers that use thread pools is
>>> that ThreadLocals can leak memory. Could this happen in Spark streaming
>>> also?
>>>
>>> Thanks
>>> Nikunj
>>>
>>>
>>
>


Re: streaming in 1.6.0 slower than 1.5.1

2016-01-28 Thread Shixiong(Ryan) Zhu
Hey Jesse,

Could you provide the operators you using?

For the heap dump, it may be not a real memory leak. Since batches started
to queue up, the memory usage should increase.

On Thu, Jan 28, 2016 at 11:54 AM, Ted Yu  wrote:

> bq. The total size by class B is 3GB in 1.5.1 and only 60MB in 1.6.0.
>
> From the information you posted, it seems the above is backwards.
>
> BTW [B is byte[], not class B.
>
> FYI
>
> On Thu, Jan 28, 2016 at 11:49 AM, Jesse F Chen  wrote:
>
>> I ran the same streaming application (compiled individually for 1.5.1 and
>> 1.6.0) that processes 5-second tweet batches.
>>
>> I noticed two things:
>>
>> 1. 10% regression in 1.6.0 vs 1.5.1
>>
>> Spark v1.6.0: 1,564 tweets/s
>> Spark v1.5.1: 1,747 tweets/s
>>
>> 2. 1.6.0 streaming seems to have a memory leak.
>>
>> 1.6.0, processing time gradually increases and eventually exceeds 5
>> seconds so batches started to queue up.
>> While in 1.5.1, no such slow down. See chart below to see the increasing
>> scheduling delay in 1.6:
>>
>>
>>
>> I captured heap dumps in two version and did a comparison. I noticed the
>> Byte base class is using 50X more space in 1.5.1.
>>
>> Here are some top classes in heap histogram and references.
>>
>> Heap Histogram
>>
>> All Classes (excluding platform)
>> 1.6.0 Streaming 1.5.1 Streaming
>> Class Instance Count Total Size Class Instance Count Total Size
>> class [B 8453 *3,227,649,599 * class [B 5095 62,938,466
>> class [C 44682 4,255,502 class [C 130482 12,844,182
>> class java.lang.reflect.Method 9059 1,177,670 class java.lang.String
>> 130171 1,562,052
>>
>>
>> References by Type References by Type
>>
>> class [B [0x640039e38] class [B [0x6c020bb08]
>>
>> Referrers by Type Referrers by Type
>>
>> Class Count Class Count
>> java.nio.HeapByteBuffer *3239* sun.security.util.DerInputBuffer 1233
>> sun.security.util.DerInputBuffer 1233 sun.security.util.ObjectIdentifier
>> 620
>> sun.security.util.ObjectIdentifier 620 [[B 397
>> [Ljava.lang.Object; 408 java.lang.reflect.Method 326
>>
>>
>> 
>>
>> The total size by class B is 3GB in 1.5.1 and only 60MB in 1.6.0.
>> The Java.nio.HeapByteBuffer referencing class did not show up in top in
>> 1.5.1.
>>
>> I have also placed jstack output for 1.5.1 and 1.6.0 online..you can get
>> them here
>>
>> https://ibm.box.com/sparkstreaming-jstack160
>> https://ibm.box.com/sparkstreaming-jstack151
>>
>> Jesse
>>
>>
>>
>>
>>
>>
>>
>


Re: Data not getting printed in Spark Streaming with print().

2016-01-28 Thread Shixiong(Ryan) Zhu
fileStream has a parameter "newFilesOnly". By default, it's true and means
processing only new files and ignore existing files in the directory. So
you need to ***move*** the files into the directory, otherwise it will
ignore existing files.

You can also set "newFilesOnly" to false. Then in the first batch, it will
process all existing files.

On Thu, Jan 28, 2016 at 4:22 PM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

> HI All,
>
> I am trying to run HdfsWordCount example from github.
>
>
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
>
> i am using ubuntu to run the program, but dont see any data getting
> printed after ,
> ---
> Time: 145402680 ms
> ---
>
> I dont see any errors, the program just runs, but i do not see any output
> of the data corresponding to the file used.
>
> object HdfsStream {
>
>   def main(args:Array[String]): Unit = {
>
> val sparkConf = new 
> SparkConf().setAppName("SpoolDirSpark").setMaster("local[5]")
> val ssc = new StreamingContext(sparkConf, Minutes(10))
>
> //val inputDirectory = "hdfs://localhost:9000/SpoolDirSpark"
> //val inputDirectory = "hdfs://localhost:9000/SpoolDirSpark/test.txt"
> val inputDirectory = "file:///home/satyajit/jsondata/"
>
> val lines = 
> ssc.fileStream[LongWritable,Text,TextInputFormat](inputDirectory).map{case(x,y)=>
>  (x.toString,y.toString)}
> //lines.saveAsTextFiles("hdfs://localhost:9000/SpoolDirSpark/datacheck")
> lines.saveAsTextFiles("file:///home/satyajit/jsondata/")
>
> println("check_data"+lines.print())
>
> ssc.start()
> ssc.awaitTermination()
>
> Would like to know if there is any workaround, or if there is something i
> am missing.
>
> Thanking in advance,
> Satyajit.
>


Re: spark.kryo.classesToRegister

2016-01-27 Thread Shixiong(Ryan) Zhu
It depends. The default Kryo serializer cannot handle all cases. If you
encounter any issue, you can follow the Kryo doc to set up custom
serializer: https://github.com/EsotericSoftware/kryo/blob/master/README.md

On Wed, Jan 27, 2016 at 3:13 AM, amit tewari  wrote:

> This is what I have added in my code:
>
>
>
> rdd.persist(StorageLevel.MEMORY_ONLY_SER())
>
> conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");
>
>
>
> Do I compulsorily need to do anything via : spark.kryo.classesToRegister?
>
> Or the above code sufficient to achieve performance gain using Kryo
> serialization?
>
>
>
> Thanks
>
> Amit
>


Re: FAIR scheduler in Spark Streaming

2016-01-26 Thread Shixiong(Ryan) Zhu
The number of concurrent Streaming job is controlled by
"spark.streaming.concurrentJobs". It's 1 by default. However, you need to
keep in mind that setting it to a bigger number will allow jobs of several
batches running at the same time. It's hard to predicate the behavior and
sometimes will surprise you.

On Tue, Jan 26, 2016 at 9:57 AM, Sebastian Piu 
wrote:

> Hi,
>
> I'm trying to get *FAIR *scheduling to work in a spark streaming app
> (1.6.0).
>
> I've found a previous mailing list where it is indicated to do:
>
> dstream.foreachRDD { rdd =>
> rdd.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1") // set
> the pool rdd.count() // or whatever job }
>
> This seems to work, in the sense that If I have 5 foreachRDD in my code,
> each one is sent to a different queue, but they still get executed one
> after the other rather than at the same time.
> Am I missing something?
>
> The scheduler config and scheduler mode are being picked alright as I can
> see them on the Spark UI
>
> //Context config
>
> *spark.scheduler.mode=FAIR*
>
> Here is my scheduler config:
>
>
> *  
> FAIR 2
> 1  
> FAIR 1
> 0  
> FAIR 1
> 0  
> FAIR 1
> 0  
> FAIR 2
> 1 *
>
>
> Any idea on what could be wrong?
>


Re: Need a sample code to load XML files into cassandra database using spark streaming

2016-01-26 Thread Shixiong(Ryan) Zhu
You can use spark-xml to read the xml files.
https://github.com/databricks/spark-xml has some examples.

To save your results to cassandra, you can use spark-cassandra-connector:
https://github.com/datastax/spark-cassandra-connector

On Tue, Jan 26, 2016 at 10:10 AM, Sree Eedupuganti  wrote:

> Hello everyone, new to spark streaming, need a sample code to load xml
> files from AWS S3 server to cassandra database. Any suggesttions please,
> Thanks in advance.
>
> --
> Best Regards,
> Sreeharsha Eedupuganti
> Data Engineer
> innData Analytics Private Limited
>


Re: streaming textFileStream problem - got only ONE line

2016-01-25 Thread Shixiong(Ryan) Zhu
Did you move the file into "hdfs://helmhdfs/user/patcharee/cerdata/", or
write into it directly? `textFileStream` requires that files must be
written to the monitored directory by "moving" them from another location
within the same file system.

On Mon, Jan 25, 2016 at 6:30 AM, patcharee 
wrote:

> Hi,
>
> My streaming application is receiving data from file system and just
> prints the input count every 1 sec interval, as the code below:
>
> val sparkConf = new SparkConf()
> val ssc = new StreamingContext(sparkConf, Milliseconds(interval_ms))
> val lines = ssc.textFileStream(args(0))
> lines.count().print()
>
> The problem is sometimes the data received from scc.textFileStream is ONLY
> ONE line. But in fact there are multiple lines in the new file found in
> that interval. See log below which shows three intervals. In the 2nd
> interval, the new file is:
> hdfs://helmhdfs/user/patcharee/cerdata/datetime_19617.txt. This file
> contains 6288 lines. The ssc.textFileStream returns ONLY ONE line (the
> header).
>
> Any ideas/suggestions what the problem is?
>
>
> -
> SPARK LOG
>
> -
>
> 16/01/25 15:11:11 INFO FileInputDStream: Cleared 1 old files that were
> older than 1453731011000 ms: 145373101 ms
> 16/01/25 15:11:11 INFO FileInputDStream: Cleared 0 old files that were
> older than 1453731011000 ms:
> 16/01/25 15:11:12 INFO FileInputDStream: Finding new files took 4 ms
> 16/01/25 15:11:12 INFO FileInputDStream: New files at time 1453731072000
> ms:
> hdfs://helmhdfs/user/patcharee/cerdata/datetime_19616.txt
> ---
> Time: 1453731072000 ms
> ---
> 6288
>
> 16/01/25 15:11:12 INFO FileInputDStream: Cleared 1 old files that were
> older than 1453731012000 ms: 1453731011000 ms
> 16/01/25 15:11:12 INFO FileInputDStream: Cleared 0 old files that were
> older than 1453731012000 ms:
> 16/01/25 15:11:13 INFO FileInputDStream: Finding new files took 4 ms
> 16/01/25 15:11:13 INFO FileInputDStream: New files at time 1453731073000
> ms:
> hdfs://helmhdfs/user/patcharee/cerdata/datetime_19617.txt
> ---
> Time: 1453731073000 ms
> ---
> 1
>
> 16/01/25 15:11:13 INFO FileInputDStream: Cleared 1 old files that were
> older than 1453731013000 ms: 1453731012000 ms
> 16/01/25 15:11:13 INFO FileInputDStream: Cleared 0 old files that were
> older than 1453731013000 ms:
> 16/01/25 15:11:14 INFO FileInputDStream: Finding new files took 3 ms
> 16/01/25 15:11:14 INFO FileInputDStream: New files at time 1453731074000
> ms:
> hdfs://helmhdfs/user/patcharee/cerdata/datetime_19618.txt
> ---
> Time: 1453731074000 ms
> ---
> 6288
>
>
> Thanks,
> Patcharee
>


Re: mapWithState and context start when checkpoint exists

2016-01-25 Thread Shixiong(Ryan) Zhu
Hey Andrey,

`ConstantInputDStream` doesn't support checkpoint as it contains an RDD
field. It cannot resume from checkpoints.

On Mon, Jan 25, 2016 at 1:12 PM, Andrey Yegorov 
wrote:

> Hi,
>
> I am new to spark (and scala) and hope someone can help me with the issue
> I got stuck on in my experiments/learning.
>
> mapWithState from spark 1.6 seems to be a great way for the task I want to
> implement with spark but unfortunately I am getting error "RDD
> transformations and actions can only be invoked by the driver, not inside
> of other transformations" on job restart when checkpoint already exists.
> Job starts and works ok if checkpoint is empty (this kind of defeats the
> point of having the checkpoint).
>
> I can reproduce it with ~65 lines of test code, see below.
> Is there something that I am doing wrong there?
>
> code:
> 
>
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.dstream.ConstantInputDStream
> import org.apache.spark.streaming.{Durations, StreamingContext, _}
>
> object TestJob {
>   def stateFunc(id: String,
> txt: Option[Iterable[String]],
> state: State[String]) : (String, Long) = {
> if (txt.nonEmpty) {
>   val aggregatedString = state.getOption().getOrElse("") + txt
>   state.update(aggregatedString)
>   (id, aggregatedString.length)
> } else { // happens when state is timing out? any other cases?
>   (id, 0)
> }
>   }
>
>   def createContext(checkpointDirectory: String): StreamingContext = {
> val sparkConf = new
> SparkConf().setMaster("local[2]").setAppName("test")
>
> val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
> ssc.checkpoint(checkpointDirectory)
>
> val input = Seq("1", "21", "321", "41", "42", "543", "67")
> val inputRdd = ssc.sparkContext.parallelize(input)
> val testStream = new ConstantInputDStream(ssc, inputRdd)
>
> val streamWithIds = testStream.map(x => (x.substring(0,1), x))
> val batched = streamWithIds.groupByKey()
>
> val stateSpec = StateSpec.function(stateFunc
> _).numPartitions(3).timeout(Minutes(3 * 60 * 24)) // 3 days
>
> val result = batched.mapWithState(stateSpec)
> result.print
> ssc
>   }
>
>   def main(args: Array[String]): Unit = {
> val checkpointDirectory = com.google.common.io.Files.createTempDir()
> checkpointDirectory.deleteOnExit()
> val checkpointDirectoryName = checkpointDirectory.getAbsolutePath
>
> val ssc = StreamingContext.getOrCreate(checkpointDirectoryName,
>   () => {
> createContext(checkpointDirectoryName)
>   })
>
> ssc.start()
> ssc.awaitTerminationOrTimeout(7000)
> ssc.stop()
> Thread.sleep(5000)
>
> val ssc2 = StreamingContext.getOrCreate(checkpointDirectoryName,
>   () => {
> createContext(checkpointDirectoryName)
>   })
>
> // terminates here with
> // Exception in thread "main" org.apache.spark.SparkException: RDD
> transformations and actions can only be invoked by the driver, not inside
> of other transformations; for example, rdd1.map(x => rdd2.values.count() *
> x) is invalid because the values transformation and count action cannot be
> performed inside of the rdd1.map transformation. For more information, see
> SPARK-5063.
> ssc2.start()
> ssc2.awaitTerminationOrTimeout(7000)
> ssc2.stop()
>   }
> }
>
> --
> Andrey Yegorov
>


Re: Spark Streaming Write Ahead Log (WAL) not replaying data after restart

2016-01-25 Thread Shixiong(Ryan) Zhu
You need to define a create function and use StreamingContext.getOrCreate.
See the example here:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#how-to-configure-checkpointing

On Thu, Jan 21, 2016 at 3:32 AM, Patrick McGloin 
wrote:

> Hi all,
>
> To have a simple way of testing the Spark Streaming Write Ahead Log I
> created a very simple Custom Input Receiver, which will generate strings
> and store those:
>
> class InMemoryStringReceiver extends 
> Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) {
>
>   val batchID = System.currentTimeMillis()
>
>   def onStart() {
> new Thread("InMemoryStringReceiver") {
>   override def run(): Unit = {
> var i = 0
> while(true) {
>   
> //http://spark.apache.org/docs/latest/streaming-custom-receivers.html
>   //To implement a reliable receiver, you have to use 
> store(multiple-records) to store data.
>   store(ArrayBuffer(s"$batchID-$i"))
>   println(s"Stored => [$batchID-$i)]")
>   Thread.sleep(1000L)
>   i = i + 1
> }
>   }
> }.start()
>   }
>
>   def onStop() {}
> }
>
> I then created a simple Application which will use the Custom Receiver to
> stream the data and process it:
>
> object DStreamResilienceTest extends App {
>
>   val conf = new 
> SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable",
>  "true")
>   val ssc = new StreamingContext(conf, Seconds(1))
>   
> ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
>   val customReceiverStream: ReceiverInputDStream[String] = 
> ssc.receiverStream(new InMemoryStringReceiver())
>   customReceiverStream.foreachRDD { (rdd: RDD[String]) =>
> println(s"processed => [${rdd.collect().toList}]")
> Thread.sleep(2000L)
>   }
>   ssc.start()
>   ssc.awaitTermination()
>
> }
>
> As you can see the processing of each received RDD has sleep of 2 seconds
> while the Strings are stored every second. This creates a backlog and the
> new strings pile up, and should be stored in the WAL. Indeed, I can see the
> files in the checkpoint dirs getting updated. Running the app I get output
> like this:
>
> [info] Stored => [1453374654941-0)]
> [info] processed => [List(1453374654941-0)]
> [info] Stored => [1453374654941-1)]
> [info] Stored => [1453374654941-2)]
> [info] processed => [List(1453374654941-1)]
> [info] Stored => [1453374654941-3)]
> [info] Stored => [1453374654941-4)]
> [info] processed => [List(1453374654941-2)]
> [info] Stored => [1453374654941-5)]
> [info] Stored => [1453374654941-6)]
> [info] processed => [List(1453374654941-3)]
> [info] Stored => [1453374654941-7)]
> [info] Stored => [1453374654941-8)]
> [info] processed => [List(1453374654941-4)]
> [info] Stored => [1453374654941-9)]
> [info] Stored => [1453374654941-10)]
>
> As you would expect, the storing is out pacing the processing. So I kill
> the application and restart it. This time I commented out the sleep in the
> foreachRDD so that the processing can clear any backlog:
>
> [info] Stored => [1453374753946-0)]
> [info] processed => [List(1453374753946-0)]
> [info] Stored => [1453374753946-1)]
> [info] processed => [List(1453374753946-1)]
> [info] Stored => [1453374753946-2)]
> [info] processed => [List(1453374753946-2)]
> [info] Stored => [1453374753946-3)]
> [info] processed => [List(1453374753946-3)]
> [info] Stored => [1453374753946-4)]
> [info] processed => [List(1453374753946-4)]
>
> As you can see the new events are processed but none from the previous
> batch. The old WAL logs are cleared and I see log messages like this but
> the old data does not get processed.
>
> INFO WriteAheadLogManager : Recovered 1 write ahead log files from 
> hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0
>
> What am I doing wrong? I am using Spark 1.5.2.
>
> Best regards,
>
> Patrick
>


Re: updateStateByKey not persisting in Spark 1.5.1

2016-01-20 Thread Shixiong(Ryan) Zhu
Could you share your log?

On Wed, Jan 20, 2016 at 7:55 AM, Brian London 
wrote:

> I'm running a streaming job that has two calls to updateStateByKey.  When
> run in standalone mode both calls to updateStateByKey behave as expected.
> When run on a cluster, however, it appears that the first call is not being
> checkpointed as shown in this DAG image:
>
> http://i.imgur.com/zmQ8O2z.png
>
> The middle column continues to grow one level deeper every batch until I
> get a stack overflow error.  I'm guessing its a problem of the stateRDD not
> being persisted, but I can't imagine why they wouldn't be.  I thought
> updateStateByKey was supposed to just handle that for you internally.
>
> Any ideas?
>
> I'll post stack trace excperpts of the stack overflow if anyone is
> interested below:
>
> Job aborted due to stage failure: Task 7 in stage 195811.0 failed 4 times,
> most recent failure: Lost task 7.3 in stage 195811.0 (TID 213529,
> ip-10-168-177-216.ec2.internal): java.lang.StackOverflowError at
> java.lang.Exception.(Exception.java:102) at
> java.lang.ReflectiveOperationException.(ReflectiveOperationException.java:89)
> at
> java.lang.reflect.InvocationTargetException.(InvocationTargetException.java:72)
> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606) at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> ...
>
> And
>
> scala.collection.immutable.$colon$colon in readObject at line 362
> scala.collection.immutable.$colon$colon in readObject at line 366
> scala.collection.immutable.$colon$colon in readObject at line 362
> scala.collection.immutable.$colon$colon in readObject at line 362
> scala.collection.immutable.$colon$colon in readObject at line 366
> scala.collection.immutable.$colon$colon in readObject at line 362
> scala.collection.immutable.$colon$colon in readObject at line 362
> ...
>
>


Re: Container exited with a non-zero exit code 1-SparkJOb on YARN

2016-01-20 Thread Shixiong(Ryan) Zhu
Could you share your log?

On Wed, Jan 20, 2016 at 5:37 AM, Siddharth Ubale <
siddharth.ub...@syncoms.com> wrote:

>
>
> Hi,
>
>
>
> I am running a Spark Job on the yarn cluster.
>
> The spark job is a spark streaming application which is reading JSON from
> a kafka topic , inserting the JSON values to hbase tables via Phoenix ,
> ands then sending out certain messages to a websocket if the JSON satisfies
> a certain criteria.
>
>
>
> My cluster is a 3 node cluster with 24GB ram and 24 cores in total.
>
>
>
> Now :
>
> 1. when I am submitting the job with 10GB memory, the application fails
> saying memory is insufficient to run the job
>
> 2. The job is submitted with 6G ram. However, it does not run successfully
> always.Common issues faced :
>
> a. Container exited with a non-zero exit code 1 , and
> after multiple such warning the job is finished.
>
> d. The failed job notifies that it was unable to find a
> file in HDFS which is something like _*hadoop_conf*_xx.zip
>
>
>
> Can someone pls let me know why am I seeing the above 2 issues.
>
>
>
> Thanks,
>
> Siddharth Ubale,
>
>
>


Re: using spark context in map funciton TASk not serilizable error

2016-01-20 Thread Shixiong(Ryan) Zhu
You should not use SparkContext or RDD directly in your closures.

Could you show the codes of "method1"? Maybe you only needs join or
something else. E.g.,

val cassandraRDD = sc.cassandraTable("keySpace", "tableName")
reRDD.join(cassandraRDD).map().saveAsTextFile(outputDir)


On Tue, Jan 19, 2016 at 4:12 AM, Ricardo Paiva  wrote:

> Did you try SparkContext.getOrCreate() ?
>
> You don't need to pass the sparkContext to the map function, you can
> retrieve it from the SparkContext singleton.
>
> Regards,
>
> Ricardo
>
>
> On Mon, Jan 18, 2016 at 6:29 PM, gpatcham [via Apache Spark User List] 
> <[hidden
> email] > wrote:
>
>> Hi,
>>
>> I have a use case where I need to pass sparkcontext in map function
>>
>> reRDD.map(row =>method1(row,sc)).saveAsTextFile(outputDir)
>>
>> Method1 needs spark context to query cassandra. But I see below error
>>
>> java.io.NotSerializableException: org.apache.spark.SparkContext
>>
>> Is there a way we can fix this ?
>>
>> Thanks
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/using-spark-context-in-map-funciton-TASk-not-serilizable-error-tp25998.html
>> To start a new topic under Apache Spark User List, email [hidden email]
>> 
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>> 
>>
>
>
>
> --
> Ricardo Paiva
> Big Data
> *globo.com* 
>
> --
> View this message in context: Re: using spark context in map funciton
> TASk not serilizable error
> 
>
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: Calling SparkContext methods in scala Future

2016-01-18 Thread Shixiong(Ryan) Zhu
Hey Marco,

Since the codes in Future is in an asynchronous way, you cannot call
"sparkContext.stop" at the end of "fetch" because the codes in Future may
not finish.

However, the exception seems weird. Do you have a simple reproducer?


On Mon, Jan 18, 2016 at 9:13 AM, Ted Yu  wrote:

>   externalCallTwo map { dataTwo =>
> println("in map") // prints, so it gets here ...
> val rddOne = sparkContext.parallelize(dataOne)
>
> I don't think you should call method on sparkContext in map function.
> sparkContext lives on driver side.
>
> Cheers
>
> On Mon, Jan 18, 2016 at 6:27 AM, Marco  wrote:
>
>> Hello,
>>
>> I am using Spark 1.5.1 within SBT, and Scala 2.10.6 and I am facing an
>> issue with the SparkContext.
>>
>> Basically, I have an object that needs to do several things:
>>
>> - call an external service One (web api)
>> - call an external service Two (another api)
>> - read and produce an RDD from HDFS (Spark)
>> - parallelize the data obtained in the first two calls
>> - join these different rdds, do stuff with them...
>>
>> Now, I am trying to do it in an asynchronous way. This doesn't seem to
>> work, though. My guess is that Spark doesn't see the calls to .parallelize,
>> as they are made in different tasks (or Future, therefore this code is
>> called before/later or maybe with an unset Context (can it be?)). I have
>> tried different ways, one of these being the call to SparkEnv.set in the
>> calls to flatMap and map (in the Future). However, all I get is Cannot call
>> methods on a stopped SparkContext. It just doesnt'work - maybe I just
>> misunderstood what it does, therefore I removed it.
>>
>> This is the code I have written so far:
>>
>> object Fetcher {
>>
>>   def fetch(name, master, ...) = {
>> val externalCallOne: Future[WSResponse] = externalService1()
>> val externalCallTwo: Future[String] = externalService2()
>> // val sparkEnv = SparkEnv.get
>> val config = new SparkConf()
>> .setAppName(name)
>> .set("spark.master", master)
>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>
>> val sparkContext = new SparkContext(config)
>> //val sparkEnv = SparkEnv.get
>>
>> val eventuallyJoinedData = externalCallOne flatMap { dataOne =>
>>   // SparkEnv.set(sparkEnv)
>>   externalCallTwo map { dataTwo =>
>> println("in map") // prints, so it gets here ...
>> val rddOne = sparkContext.parallelize(dataOne)
>> val rddTwo = sparkContext.parallelize(dataTwo)
>> // do stuff here ... foreach/println, and
>>
>> val joinedData = rddOne leftOuterJoin (rddTwo)
>>   }
>> }
>> eventuallyJoinedData onSuccess { case success => ...  }
>> eventuallyJoinedData onFailure { case error =>
>> println(error.getMessage) }
>> // sparkContext.stop
>>   }
>>
>> }
>> As you can see, I have also tried to comment the line to stop the
>> context, but then I get another issue:
>>
>> 13:09:14.929 [ForkJoinPool-1-worker-5] INFO
>>  org.apache.spark.SparkContext - Starting job: count at Fetcher.scala:38
>> 13:09:14.932 [shuffle-server-0] DEBUG io.netty.channel.nio.NioEventLoop -
>> Selector.select() returned prematurely because
>> Thread.currentThread().interrupt() was called. Use
>> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
>> 13:09:14.936 [Spark Context Cleaner] ERROR
>> org.apache.spark.ContextCleaner - Error in cleaning thread
>> java.lang.InterruptedException: null
>> at java.lang.Object.wait(Native Method) ~[na:1.8.0_65]
>> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
>> ~[na:1.8.0_65]
>> at
>> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:157)
>> ~[spark-core_2.10-1.5.1.jar:1.5.1]
>> at
>> org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
>> [spark-core_2.10-1.5.1.jar:1.5.1]
>> at 
>> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
>> [spark-core_2.10-1.5.1.jar:1.5.1]
>> at
>> org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
>> [spark-core_2.10-1.5.1.jar:1.5.1]
>> 13:09:14.940 [db-async-netty-thread-1] DEBUG
>> io.netty.channel.nio.NioEventLoop - Selector.select() returned prematurely
>> because Thread.currentThread().interrupt() was called. Use
>> NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.
>> 13:09:14.943 [SparkListenerBus] ERROR org.apache.spark.util.Utils -
>> uncaught error in thread SparkListenerBus, stopping SparkContext
>> java.lang.InterruptedException: null
>> at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
>> ~[na:1.8.0_65]
>> at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>> ~[na:1.8.0_65]
>> at 

Re: Spark Streaming: Does mapWithState implicitly partition the dsteram?

2016-01-18 Thread Shixiong(Ryan) Zhu
mapWithState uses HashPartitioner by default. You can use
"StateSpec.partitioner" to set your custom partitioner.

On Sun, Jan 17, 2016 at 11:00 AM, Lin Zhao  wrote:

> When the state is passed to the task that handles a mapWithState for a
> particular key, if the key is distributed, it seems extremely difficult to
> coordinate and synchronise the state. Is there a partition by key before a
> mapWithState? If not what exactly is the execution model?
>
> Thanks,
>
> Lin
>
>


Re: Incorrect timeline for Scheduling Delay in Streaming page in web UI?

2016-01-18 Thread Shixiong(Ryan) Zhu
Hey, did you mean that the scheduling delay timeline is incorrect because
it's too short and some values are missing? A batch won't have a scheduling
delay until it starts to run. In your example, a lot of batches are waiting
so that they don't have the scheduling delay.

On Sun, Jan 17, 2016 at 4:49 AM, Jacek Laskowski  wrote:

> Hi,
>
> I'm trying to understand how Scheduling Delays are displayed in
> Streaming page in web UI and think the values are displayed
> incorrectly in the Timelines column. I'm only concerned with the
> scheduling delays (on y axis) per batch times (x axis). It appears
> that the values (on y axis) are correct, but not how they are
> displayed per batch times.
>
> See the second screenshot in
>
> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-streaming-webui.html#scheduling-delay
> .
>
> Can anyone explain how the delays for batches per batch time should be
> read? I'm specifically asking about the timeline (not histogram as it
> seems fine).
>
> Pozdrawiam,
> Jacek
>
> Jacek Laskowski | https://medium.com/@jaceklaskowski/
> Mastering Apache Spark
> ==> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
> Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: 答复: 答复: 答复: 答复: spark streaming context trigger invoke stop why?

2016-01-18 Thread Shixiong(Ryan) Zhu
t; at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:902)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:900)
>
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
>
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:900)
>
> at
> org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:222)
>
> at
> org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:47)
>
> at
> com.vip.ubt.spark.streaming.MarsScLogMetric$1.call(MarsScLogMetric.java:40)
>
> at
> com.vip.ubt.spark.streaming.MarsScLogMetric$1.call(MarsScLogMetric.java:36)
>
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)
>
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
>
>at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
>
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>
> at scala.util.Try$.apply(Try.scala:161)
>
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:218)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:218)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:218)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:217)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.Exception: Could not compute split, block
> input-22-1452641669000 not found
>
> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
> ... 3 more
>
> 16/01/13 07:35:42 INFO
> [org.apache.spark.streaming.StreamingContext---Thread-0]: Invoking
> stop(stopGracefully=false) from shutdown hook
>
> 16/01/13 07:35:42 INFO
> [org.apache.spark.streaming.scheduler.ReceiverTracker---sparkDriver-akka.actor.default-dispatcher-4]:
> Sent stop signal to all 42 receivers
>
>
>
>
&

Re: [Spark 1.6][Streaming] About the behavior of mapWithState

2016-01-15 Thread Shixiong(Ryan) Zhu
Hey Terry,

That's expected. If you want to only output (1, 3), you can use
"reduceByKey" before "mapWithState" like this:

dstream.reduceByKey(_ + _).mapWithState(spec)

On Fri, Jan 15, 2016 at 1:21 AM, Terry Hoo  wrote:

> Hi,
> I am doing a simple test with mapWithState, and get some events
> unexpected, is this correct?
>
> The test is very simple: sum the value of each key
>
> val mappingFunction = (key: Int, value: Option[Int], state: State[Int]) => {
>   state.update(state.getOption().getOrElse(0) + value.getOrElse(0))
>   (key, state.get())
> }
> val spec = StateSpec.function(mappingFunction)
> dstream.mapWithState(spec)
>
> I create two RDDs and insert into dstream:
> RDD((1,1), (1,2), (2,1))
> RDD((1,3))
>
> Get result like this:
> RDD(*(1,1)*, *(1,3)*, (2,1))
> RDD((1,6))
>
> You can see that the first batch will generate two items with the same key
> "1": (1,1) and (1,3), is this expected behavior? I would expect (1,3) only.
>
> Regards
> - Terry
>


Re: 答复: 答复: 答复: spark streaming context trigger invoke stop why?

2016-01-15 Thread Shixiong(Ryan) Zhu
I see. So when your job fails, `jsc.awaitTermination();` will throw an
exception. Then you app main method will exit and trigger the shutdown hook
and call `jsc.stop()`.

On Thu, Jan 14, 2016 at 10:20 PM, Triones,Deng(vip.com) <
triones.d...@vipshop.com> wrote:

> Thanks for your response .
>
> Our code as below :
>
>
>
>
>
> public void process(){
>
> logger.info("streaming process start !!!");
>
>
>
> SparkConf sparkConf =
> createSparkConf(this.getClass().getSimpleName());
>
>
>
> JavaStreamingContext jsc =
> this.createJavaStreamingContext(sparkConf);
>
>
>
> if(this.streamingListener != null){
>
> jsc.addStreamingListener(this.streamingListener);
>
> }
>
> JavaPairDStream<String, String> allKafkaWindowData =
> this.sparkReceiverDStream.createReceiverDStream(jsc,this.streamingConf.getWindowDuration(),
>
> this.streamingConf.getSlideDuration());
>
>
>
> this.businessProcess(allKafkaWindowData);
>
> this.sleep();
>
>jsc.start();
>
> jsc.awaitTermination();
>
>
>
>
>
> *发件人:* Shixiong(Ryan) Zhu [mailto:shixi...@databricks.com]
> *发送时间:* 2016年1月15日 6:02
> *收件人:* 邓刚[技术中心]
> *抄送:* Yogesh Mahajan; user
> *主题:* Re: 答复: 答复: spark streaming context trigger invoke stop why?
>
>
>
> Could you show your codes? Did you use
> `StreamingContext.awaitTermination`? If so, it will return if any exception
> happens.
>
>
>
> On Wed, Jan 13, 2016 at 11:47 PM, Triones,Deng(vip.com) <
> triones.d...@vipshop.com> wrote:
>
> What’s more, I am running a 7*24 hours job , so I won’t call System.exit()
> by myself. So I believe somewhere of the driver kill itself
>
>
>
> *发件人:* 邓刚[技术中心]
> *发送时间:* 2016年1月14日 15:45
> *收件人:* 'Yogesh Mahajan'
> *抄送:* user
> *主题:* 答复: 答复: spark streaming context trigger invoke stop why?
>
>
>
> Thanks for your response, ApplicationMaster is only for yarn mode. I am
> using standalone mode. Could you kindly please let me know where trigger
> the shutdown hook?
>
>
>
> *发件人:* Yogesh Mahajan [mailto:ymaha...@snappydata.io
> <ymaha...@snappydata.io>]
> *发送时间:* 2016年1月14日 12:42
> *收件人:* 邓刚[技术中心]
> *抄送:* user
> *主题:* Re: 答复: spark streaming context trigger invoke stop why?
>
>
>
> All the action happens in ApplicationMaster expecially in run method
>
> Check ApplicationMaster#startUserApplication : userThread(Driver) which
> invokes ApplicationMaster#finish method. You can also try System.exit in
> your program
>
>
>
> Regards,
>
> Yogesh Mahajan,
>
> SnappyData Inc, snappydata.io
>
>
>
> On Thu, Jan 14, 2016 at 9:56 AM, Yogesh Mahajan <ymaha...@snappydata.io>
> wrote:
>
> Hi Triones,
>
>
>
> Check the org.apache.spark.util.ShutdownHookManager : It adds this
> ShutDownHook when you start a StreamingContext
>
>
>
> Here is the code in StreamingContext.start()
>
>
>
> shutdownHookRef = ShutdownHookManager.addShutdownHook(
>
>   StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
>
>
>
> Also looke at the following def in StreamingContext which actually stops
> the context from shutdown hook :
>
> private def stopOnShutdown(): Unit = {
>
> val stopGracefully =
> conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
>
> logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown
> hook")
>
> // Do not stop SparkContext, let its own shutdown hook stop it
>
> stop(stopSparkContext = false, stopGracefully = stopGracefully)
>
> }
>
>
>
> Regards,
>
> Yogesh Mahajan,
>
> SnappyData Inc, snappydata.io
>
>
>
> On Thu, Jan 14, 2016 at 8:55 AM, Triones,Deng(vip.com) <
> triones.d...@vipshop.com> wrote:
>
> More info
>
>
>
> I am using spark version 1.5.2
>
>
>
>
>
> *发件人:* Triones,Deng(vip.com) [mailto:triones.d...@vipshop.com]
> *发送时间:* 2016年1月14日 11:24
> *收件人:* user
> *主题:* spark streaming context trigger invoke stop why?
>
>
>
> Hi all
>
>  As I saw the driver log, the task failed 4 times in a stage, the
> stage will be dropped when the input block was deleted before make use of.
> After that the StreamingContext invoke stop.  Does anyone know what kind of
> akka message trigger the stop or which code trigger the shutdown hook?
>
>
>
>
>
> Thanks
>
>
>
>
>
>
>
>
>
> Driver log:
>
>
>
>  Job aborted due to stage failure: Task 410 in stage 215.0 failed 4 times
>
> [org.

Re: Spark Streaming: custom actor receiver losing vast majority of data

2016-01-14 Thread Shixiong(Ryan) Zhu
Could you change MEMORY_ONLY_SER to MEMORY_AND_DISK_SER_2 and see if this
still happens? It may be because you don't have enough memory to cache the
events.

On Thu, Jan 14, 2016 at 4:06 PM, Lin Zhao  wrote:

> Hi,
>
> I'm testing spark streaming with actor receiver. The actor keeps calling
> store() to save a pair to Spark.
>
> Once the job is launched, on the UI everything looks good. Millions of
> events gets through every batch. However, I added logging to the first step
> and found that only 20 or 40 events in a batch actually gets to the first
> mapper. Any idea what might be causing this?
>
> I also have log in the custom receiver before "store()" call and it's
> really calling this function millions of times.
>
> The receiver definition looks like:
>
> val stream = ssc.actorStream[(String, 
> Message)](MessageRetriever.props("message-retriever",
>   mrSections.head, conf, flowControlDef, None, None), "Martini",
>   StorageLevel.MEMORY_ONLY_SER)
>
>
> The job looks like:
>
> stream.map { pair =>
> logger.info(s"before pipeline key=${pair._1}") // Only a handful gets 
> logged although there are over 1 million in a batch
> pair._2
> }.flatMap { m =>
>   // Event Builder
>   logger.info(s"event builder thread-id=${Thread.currentThread().getId} 
> user=${m.fields.getOrElse('user, "NA")}")
>   ebHelper(m)
> }.map { e =>
>   // Event Normalizer
>   logger.info(s"normalizer thread-id=${Thread.currentThread().getId} 
> user=${e.getFieldAsString('user)}")
>   DefaultEventNormalizer.normalizeFields(e)
> }.map { e =>
>   logger.info(s"resolver thread-id=${Thread.currentThread().getId} 
> user=${e.getFieldAsString('user)}")
>   resolver(e)
> }.flatMap { e =>
>   // Event Discarder
>   logger.info(s"discarder thread-id=${Thread.currentThread().getId} 
> user=${e.getFieldAsString('user)}")
>   discarder(e)
> }.map { e =>
>   ep(e)
> }
>
>


Re: How to bind webui to localhost?

2016-01-14 Thread Shixiong(Ryan) Zhu
Yeah, it's hard code as "0.0.0.0". Could you send a PR to add a
configuration for it?

On Thu, Jan 14, 2016 at 2:51 PM, Zee Chen  wrote:

> Hi, what is the easiest way to configure the Spark webui to bind to
> localhost or 127.0.0.1? I intend to use this with ssh socks proxy to
> provide a rudimentary "secured access". Unlike hadoop config options,
> Spark doesn't allow the user to directly specify the ip addr to bind
> services to.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming: custom actor receiver losing vast majority of data

2016-01-14 Thread Shixiong(Ryan) Zhu
Could you post the codes of MessageRetriever? And by the way, could you
post the screenshot of tasks for a batch and check the input size of these
tasks? Considering there are so many events, there should be a lot of
blocks as well as a lot of tasks.

On Thu, Jan 14, 2016 at 4:34 PM, Lin Zhao <l...@exabeam.com> wrote:

> Hi Shixiong,
>
> I tried this but it still happens. If it helps, it's 1.6.0 and runs on
> YARN. Batch duration is 20 seconds.
>
> Some logs seemingly related to block manager:
>
> 16/01/15 00:31:25 INFO receiver.BlockGenerator: Pushed block 
> input-0-1452817873000
> 16/01/15 00:31:27 INFO storage.MemoryStore: Block input-0-1452817879000 
> stored as bytes in memory (estimated size 60.1 MB, free 1563.4 MB)
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 31
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 30
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 29
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 28
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 27
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 26
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 25
> 16/01/15 00:31:28 INFO receiver.BlockGenerator: Pushed block 
> input-0-1452817879000
> 16/01/15 00:31:28 INFO context.TimeSpanHDFSReader: descr="Waiting to read 
> [win] message file(s) for 2015-12-17T21:00:00.000." 
> module=TIMESPAN_HDFS_READER
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 38
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 37
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 36
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 35
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 34
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 33
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 32
> 16/01/15 00:31:29 INFO context.MessageRetriever: descr="Processed 4,636,461 
> lines (80 s); 0 event/s (42,636 incoming msg/s) at 2015-12-17T21" 
> module=MESSAGE_RETRIEVER
> 16/01/15 00:31:30 INFO storage.MemoryStore: Block input-0-1452817879200 
> stored as bytes in memory (estimated size 93.3 MB, free 924.9 MB)
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 45
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 44
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 40
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 39
>
>
> From: "Shixiong(Ryan) Zhu" <shixi...@databricks.com>
> Date: Thursday, January 14, 2016 at 4:13 PM
> To: Lin Zhao <l...@exabeam.com>
> Cc: user <user@spark.apache.org>
> Subject: Re: Spark Streaming: custom actor receiver losing vast majority
> of data
>
> MEMORY_AND_DISK_SER_2
>


Re: NPE when using Joda DateTime

2016-01-14 Thread Shixiong(Ryan) Zhu
Could you try to use "Kryo.setDefaultSerializer" like this:

class YourKryoRegistrator extends KryoRegistrator {

  override def registerClasses(kryo: Kryo) {

kryo.setDefaultSerializer(classOf[com.esotericsoftware.kryo.serializers.JavaSerializer])
  }
}


On Thu, Jan 14, 2016 at 12:54 PM, Durgesh Verma  wrote:

> Today is my day... Trying to go thru where I can pitch in. Let me know if
> below makes sense.
>
> I looked at joda Java Api source code (1.2.9) and traced that call in NPE.
> It looks like AssembledChronology class is being used, the iYears instance
> variable is defined as transient.
>
> DateTime.minusYears(int years) call trace:
> long instant = getChronology().years().subtract(getMillis(), years);
>
> Not sure how the suggested serializer would help if variable is transient.
>
> Thanks,
> -Durgesh
>
> On Jan 14, 2016, at 11:49 AM, Spencer, Alex (Santander) <
> alex.spen...@santander.co.uk.INVALID
> > wrote:
>
> I appreciate this – thank you.
>
>
>
> I’m not an admin on the box I’m using spark-shell on – so I’m not sure I
> can add them to that namespace. I’m hoping if I declare the
> JodaDateTimeSerializer class in my REPL that I can still get this to work.
> I think the INTERVAL part below may be key, I haven’t tried that yet.
>
>
>
> Kind Regards,
>
> Alex.
>
>
>
> *From:* Todd Nist [mailto:tsind...@gmail.com ]
> *Sent:* 14 January 2016 16:28
> *To:* Spencer, Alex (Santander)
> *Cc:* Sean Owen; user@spark.apache.org
> *Subject:* Re: NPE when using Joda DateTime
>
>
>
> I had a similar problem a while back and leveraged these Kryo serializers,
> https://github.com/magro/kryo-serializers.  I had to fallback to version
> 0.28, but that was a while back.  You can add these to the
>
> org.apache.spark.serializer.KryoRegistrator
>
> and then set your registrator in the spark config:
>
> sparkConfig.
> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
> .set("spark.kryo.registrator", "com.yourpackage.YourKryoRegistrator")
> ...
>
> where YourKryoRegistrator is something like:
>
> class YourKryoRegistrator extends KryoRegistrator {
>
>   override def registerClasses(kryo: Kryo) {
> kryo.register(classOf[org.joda.time.DateTime], new
> JodaDateTimeSerializer)
> kryo.register(classOf[org.joda.time.Interval], new
> JodaIntervalSerializer)
>   }
> }
>
> HTH.
>
> -Todd
>
>
>
> On Thu, Jan 14, 2016 at 9:28 AM, Spencer, Alex (Santander) <
> alex.spen...@santander.co.uk.invalid> wrote:
>
> Hi,
>
> I tried take(1500) and test.collect and these both work on the "single"
> map statement.
>
> I'm very new to Kryo serialisation, I managed to find some code and I
> copied and pasted and that's what originally made the single map statement
> work:
>
> class MyRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
> kryo.register(classOf[org.joda.time.DateTime])
>   }
> }
>
> Is it because the groupBy sees a different class type? Maybe
> Array[DateTime]? I don’t want to find the answer by trial and error though.
>
> Alex
>
> -Original Message-
> From: Sean Owen [mailto:so...@cloudera.com]
> Sent: 14 January 2016 14:07
> To: Spencer, Alex (Santander)
> Cc: user@spark.apache.org
> Subject: Re: NPE when using Joda DateTime
>
> It does look somehow like the state of the DateTime object isn't being
> recreated properly on deserialization somehow, given where the NPE occurs
> (look at the Joda source code). However the object is java.io.Serializable.
> Are you sure the Kryo serialization is correct?
>
> It doesn't quite explain why the map operation works by itself. It could
> be the difference between executing locally (take(1) will look at 1
> partition in 1 task which prefers to be local) and executing remotely
> (groupBy is going to need a shuffle).
>
> On Thu, Jan 14, 2016 at 1:01 PM, Spencer, Alex (Santander) <
> alex.spen...@santander.co.uk.invalid> wrote:
> > Hello,
> >
> >
> >
> > I was wondering if somebody is able to help me get to the bottom of a
> > null pointer exception I’m seeing in my code. I’ve managed to narrow
> > down a problem in a larger class to my use of Joda’s DateTime
> > functions. I’ve successfully run my code in scala, but I’ve hit a few
> > problems when adapting it to run in spark.
> >
> >
> >
> > Spark version: 1.3.0
> >
> > Scala version: 2.10.4
> >
> > Java HotSpot 1.7
> >
> >
> >
> > I have a small case class called Transaction, which looks something
> > like
> > this:
> >
> >
> >
> > case class Transaction(date : org.joda.time.DateTime = new
> > org.joda.time.DateTime())
> >
> >
> >
> > I have an RDD[Transactions] trans:
> >
> > org.apache.spark.rdd.RDD[Transaction] = MapPartitionsRDD[4] at map at
> > :44
> >
> >
> >
> > I am able to run this successfully:
> >
> >
> >
> > val test = trans.map(_.date.minusYears(10))
> >
> > test.take(1)
> >
> >
> >
> > However if I do:
> >
> >
> >
> > val groupedTrans = 

Re: 答复: 答复: spark streaming context trigger invoke stop why?

2016-01-14 Thread Shixiong(Ryan) Zhu
Could you show your codes? Did you use `StreamingContext.awaitTermination`?
If so, it will return if any exception happens.

On Wed, Jan 13, 2016 at 11:47 PM, Triones,Deng(vip.com) <
triones.d...@vipshop.com> wrote:

> What’s more, I am running a 7*24 hours job , so I won’t call System.exit()
> by myself. So I believe somewhere of the driver kill itself
>
>
>
> *发件人:* 邓刚[技术中心]
> *发送时间:* 2016年1月14日 15:45
> *收件人:* 'Yogesh Mahajan'
> *抄送:* user
> *主题:* 答复: 答复: spark streaming context trigger invoke stop why?
>
>
>
> Thanks for your response, ApplicationMaster is only for yarn mode. I am
> using standalone mode. Could you kindly please let me know where trigger
> the shutdown hook?
>
>
>
> *发件人:* Yogesh Mahajan [mailto:ymaha...@snappydata.io
> ]
> *发送时间:* 2016年1月14日 12:42
> *收件人:* 邓刚[技术中心]
> *抄送:* user
> *主题:* Re: 答复: spark streaming context trigger invoke stop why?
>
>
>
> All the action happens in ApplicationMaster expecially in run method
>
> Check ApplicationMaster#startUserApplication : userThread(Driver) which
> invokes ApplicationMaster#finish method. You can also try System.exit in
> your program
>
>
>
> Regards,
>
> Yogesh Mahajan,
>
> SnappyData Inc, snappydata.io
>
>
>
> On Thu, Jan 14, 2016 at 9:56 AM, Yogesh Mahajan 
> wrote:
>
> Hi Triones,
>
>
>
> Check the org.apache.spark.util.ShutdownHookManager : It adds this
> ShutDownHook when you start a StreamingContext
>
>
>
> Here is the code in StreamingContext.start()
>
>
>
> shutdownHookRef = ShutdownHookManager.addShutdownHook(
>
>   StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
>
>
>
> Also looke at the following def in StreamingContext which actually stops
> the context from shutdown hook :
>
> private def stopOnShutdown(): Unit = {
>
> val stopGracefully =
> conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
>
> logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown
> hook")
>
> // Do not stop SparkContext, let its own shutdown hook stop it
>
> stop(stopSparkContext = false, stopGracefully = stopGracefully)
>
> }
>
>
>
> Regards,
>
> Yogesh Mahajan,
>
> SnappyData Inc, snappydata.io
>
>
>
> On Thu, Jan 14, 2016 at 8:55 AM, Triones,Deng(vip.com) <
> triones.d...@vipshop.com> wrote:
>
> More info
>
>
>
> I am using spark version 1.5.2
>
>
>
>
>
> *发件人:* Triones,Deng(vip.com) [mailto:triones.d...@vipshop.com]
> *发送时间:* 2016年1月14日 11:24
> *收件人:* user
> *主题:* spark streaming context trigger invoke stop why?
>
>
>
> Hi all
>
>  As I saw the driver log, the task failed 4 times in a stage, the
> stage will be dropped when the input block was deleted before make use of.
> After that the StreamingContext invoke stop.  Does anyone know what kind of
> akka message trigger the stop or which code trigger the shutdown hook?
>
>
>
>
>
> Thanks
>
>
>
>
>
>
>
>
>
> Driver log:
>
>
>
>  Job aborted due to stage failure: Task 410 in stage 215.0 failed 4 times
>
> [org.apache.spark.streaming.StreamingContext---Thread-0]: Invoking
> stop(stopGracefully=false) from shutdown hook
>
>
> 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
> This communication is intended only for the addressee(s) and may contain
> information that is privileged and confidential. You are hereby notified
> that, if you are not an intended recipient listed above, or an authorized
> employee or agent of an addressee of this communication responsible for
> delivering e-mail messages to an intended recipient, any dissemination,
> distribution or reproduction of this communication (including any
> attachments hereto) is strictly prohibited. If you have received this
> communication in error, please notify us immediately by a reply e-mail
> addressed to the sender and permanently delete the original e-mail
> communication and any attachments from all storage devices without making
> or otherwise retaining a copy.
>
>
> 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
> This communication is intended only for the addressee(s) and may contain
> information that is privileged and confidential. You are hereby notified
> that, if you are not an intended recipient listed above, or an authorized
> employee or agent of an addressee of this communication responsible for
> delivering e-mail messages to an intended recipient, any dissemination,
> distribution or reproduction of this communication (including any
> attachments hereto) is strictly prohibited. If you have received this
> communication in error, please notify us immediately by a reply e-mail
> addressed to the sender and permanently delete the original e-mail
> communication and any attachments from all storage devices without making
> or otherwise retaining a copy.
>
>
>
>
> 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
> This communication is intended only for the addressee(s) and 

Re: Too many tasks killed the scheduler

2016-01-11 Thread Shixiong(Ryan) Zhu
Could you use "coalesce" to reduce the number of partitions?


Shixiong Zhu

On Mon, Jan 11, 2016 at 12:21 AM, Gavin Yue  wrote:

> Here is more info.
>
> The job stuck at:
> INFO cluster.YarnScheduler: Adding task set 1.0 with 79212 tasks
>
> Then got the error:
> Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out
> after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
>
> So I increased spark.network.timeout from 120s to 600s.  It sometimes
> works.
>
> Each task is a parquet file.  I could not repartition due to out of GC
> problem.
>
> Is there any way I could to improve the performance?
>
> Thanks,
> Gavin
>
>
> On Sun, Jan 10, 2016 at 1:51 AM, Gavin Yue  wrote:
>
>> Hey,
>>
>> I have 10 days data, each day has a parquet directory with over 7000
>> partitions.
>> So when I union 10 days and do a count, then it submits over 70K tasks.
>>
>> Then the job failed silently with one container exit with code 1.  The
>> union with like 5, 6 days data is fine.
>> In the spark-shell, it just hang showing: Yarn scheduler submit 7+
>> tasks.
>>
>> I am running spark 1.6 over hadoop 2.7.  Is there any setting I could
>> change to make this work?
>>
>> Thanks,
>> Gavin
>>
>>
>>
>


Re: Kryo serializer Exception during serialization: java.io.IOException: java.lang.IllegalArgumentException:

2016-01-08 Thread Shixiong(Ryan) Zhu
Could you disable `spark.kryo.registrationRequired`? Some classes may not
be registered but they work well with Kryo's default serializer.

On Fri, Jan 8, 2016 at 8:58 AM, Ted Yu  wrote:

> bq. try adding scala.collection.mutable.WrappedArray
>
> But the hint said registering 
> scala.collection.mutable.WrappedArray$ofRef.class
> , right ?
>
> On Fri, Jan 8, 2016 at 8:52 AM, jiml  wrote:
>
>> (point of post is to see if anyone has ideas about errors at end of post)
>>
>> In addition, the real way to test if it's working is to force
>> serialization:
>>
>> In Java:
>>
>> Create array of all your classes:
>> // for kyro serializer it wants to register all classes that need to be
>> serialized
>> Class[] kryoClassArray = new Class[]{DropResult.class,
>> DropEvaluation.class,
>> PrintHetSharing.class};
>>
>> in the builder for your SparkConf (or in conf/spark-defaults.sh)
>> .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> //require registration of all classes with Kyro
>> .set("spark.kryo.registrationRequired", "true")
>> // don't forget to register ALL classes or will get error
>> .registerKryoClasses(kryoClassArray);
>>
>> Then you will start to get neat errors like the one I am working on:
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due
>> to stage failure: Failed to serialize task 0, not attempting to retry it.
>> Exception during serialization: java.io.IOException:
>> java.lang.IllegalArgumentException: Class is not registered:
>> scala.collection.mutable.WrappedArray$ofRef
>> Note: To register this class use:
>> kryo.register(scala.collection.mutable.WrappedArray$ofRef.class);
>>
>> I did try adding scala.collection.mutable.WrappedArray to the Class array
>> up
>> top but no luck. Thanks
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/kryos-serializer-tp16454p25921.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Negative Number of Active Tasks in Spark UI

2016-01-05 Thread Shixiong(Ryan) Zhu
Did you enable "spark.speculation"?

On Tue, Jan 5, 2016 at 9:14 AM, Prasad Ravilla  wrote:

> I am using Spark 1.5.2.
>
> I am not using Dynamic allocation.
>
> Thanks,
> Prasad.
>
>
>
>
> On 1/5/16, 3:24 AM, "Ted Yu"  wrote:
>
> >Which version of Spark do you use ?
> >
> >This might be related:
> >
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D8560=CwICAg=fa_WZs7nNMvOIDyLmzi2sMVHyyC4hN9WQl29lWJQ5Y4=-5JY3iMOXXyFuBleKruCQ-6rGWyZEyiHu8ySSzJdEHw=4v0Ji1ymhcVi2Ys2mzOne0cuiDxWMiYmeRYVUeF3hWU=9L2ltekpwnC0BDcJPW43_ctNL_G4qTXN4EY2H_Ys0nU=
> >
> >Do you use dynamic allocation ?
> >
> >Cheers
> >
> >> On Jan 4, 2016, at 10:05 PM, Prasad Ravilla  wrote:
> >>
> >> I am seeing negative active tasks in the Spark UI.
> >>
> >> Is anyone seeing this?
> >> How is this possible?
> >>
> >> Thanks,
> >> Prasad.
> >> 
> >> 
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Double Counting When Using Accumulators with Spark Streaming

2016-01-05 Thread Shixiong(Ryan) Zhu
Hey Rachana,

There are two jobs in your codes actually: `rdd.isEmpty` and
`rdd.saveAsTextFile`. Since you don't cache or checkpoint this rdd, it will
execute your map function twice for each record.

You can move "accum.add(1)" to "rdd.saveAsTextFile" like this:

JavaDStream lines = messages.map(
new Function, String>() {
  public String call(Tuple2 tuple2) {
LOG.info("#  Input json stream data
 # " + tuple2._2);
return tuple2._2();
  }
});
lines.foreachRDD(new Function() {
  public Void call(JavaRDD rdd) throws Exception {
if (!rdd.isEmpty() || !rdd.partitions().isEmpty()) {
  rdd.map(new Function() {
public String call(String str) {
  accum.add(1);
  return str;
}

}).saveAsTextFile("hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");
}
System.out.println(" & COUNT OF ACCUMULATOR IS
" + accum.value());
return null;
  }
});



On Tue, Jan 5, 2016 at 8:37 AM, Rachana Srivastava <
rachana.srivast...@markmonitor.com> wrote:

> Thanks a lot for your prompt response.  I am pushing one message.
>
>
>
> HashMap kafkaParams = *new* HashMap();
> kafkaParams.put("metadata.broker.list","localhost:9092");
> kafkaParams.put("zookeeper.connect", "localhost:2181");
>
> JavaPairInputDStream messages = KafkaUtils.
> *createDirectStream*( jssc, String.*class*, String.*class*, StringDecoder.
> *class*, StringDecoder.*class*, kafkaParams, topicsSet);
>
> *final** Accumulator **accum** = **jssc*
> *.sparkContext().accumulator(0);*
>
> JavaDStream lines = messages.map(
>
> *new* *Function, String>()* {
>
>*public* String call(Tuple2 tuple2) { *LOG*
> .info("#  Input json stream data  # " +
> tuple2._2);*accum**.add(1);* *return* tuple2._2();
>
> } });
>
> lines.foreachRDD(*new* *Function()* {
>
> *public* Void call(JavaRDD rdd) *throws* Exception {
>
> *if*(!rdd.isEmpty() || !rdd.partitions().isEmpty()){ rdd.saveAsTextFile(
> "hdfs://quickstart.cloudera:8020/user/cloudera/testDirJan4/test1.text");}
>
> System.*out*.println(" & COUNT OF ACCUMULATOR IS " +
> *accum**.value(*)); *return* *null*;}
>
>  });
>
>  jssc.start();
>
>
>
> If I remove this saveAsTextFile I get correct count with this line I am
> getting double counting.
>
>
>
> *Here are the Stack trace with SaveAsText statement Please see double
> counting below:*
>
>
>
> &&& BEFORE COUNT OF ACCUMULATOR IS &&& 0
>
> INFO : org.apache.spark.SparkContext - Starting job: foreachRDD at
> KafkaURLStreaming.java:90
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Got job 0 (foreachRDD at
> KafkaURLStreaming.java:90) with 1 output partitions
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage
> 0(foreachRDD at KafkaURLStreaming.java:90)
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Parents of final stage:
> List()
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 0
> (MapPartitionsRDD[1] at map at KafkaURLStreaming.java:83), which has no
> missing parents
>
> INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(3856) called
> with curMem=0, maxMem=1893865881
>
> INFO : org.apache.spark.storage.MemoryStore - Block broadcast_0 stored as
> values in memory (estimated size 3.8 KB, free 1806.1 MB)
>
> INFO : org.apache.spark.storage.MemoryStore - ensureFreeSpace(2225) called
> with curMem=3856, maxMem=1893865881
>
> INFO : org.apache.spark.storage.MemoryStore - Block broadcast_0_piece0
> stored as bytes in memory (estimated size 2.2 KB, free 1806.1 MB)
>
> INFO : org.apache.spark.storage.BlockManagerInfo - Added
> broadcast_0_piece0 in memory on localhost:51637 (size: 2.2 KB, free: 1806.1
> MB)
>
> INFO : org.apache.spark.SparkContext - Created broadcast 0 from broadcast
> at DAGScheduler.scala:861
>
> INFO : org.apache.spark.scheduler.DAGScheduler - Submitting 1 missing
> tasks from ResultStage 0 (MapPartitionsRDD[1] at map at
> KafkaURLStreaming.java:83)
>
> INFO : org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0
> with 1 tasks
>
> INFO : org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in
> stage 0.0 (TID 0, localhost, ANY, 2026 bytes)
>
> INFO : org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0
> (TID 0)
>
> INFO : org.apache.spark.streaming.kafka.KafkaRDD - Computing topic test11,
> partition 0 offsets 36 -> 37
>
> INFO : kafka.utils.VerifiableProperties - Verifying properties
>
> INFO : kafka.utils.VerifiableProperties - Property fetch.message.max.bytes
> is 

Re: How to register a Tuple3 with KryoSerializer?

2015-12-30 Thread Shixiong(Ryan) Zhu
You can use "_", e.g.,
sparkConf.registerKryoClasses(Array(classOf[scala.Tuple3[_, _, _]]))

Best Regards,
Shixiong(Ryan) Zhu

Software Engineer

Databricks Inc.

shixi...@databricks.com

databricks.com

<http://databricks.com/>

On Wed, Dec 30, 2015 at 10:16 AM, Russ <russ.br...@yahoo.com.invalid> wrote:

> I need to register with KryoSerializer a Tuple3 that is generated by a
> call to the sortBy() method that eventually calls collect() from
> Partitioner.RangePartitioner.sketch().
>
> The IntelliJ Idea debugger indicates that the for the Tuple3 are
> java.lang.Integer, java.lang.Integer and long[].  So, the question is, how
> should I specify the long[] type?
>
> I have tried the following from my Scala code:
>
> sparkConf.registerKryoClasses(Array(classOf[scala.Tuple3[java.lang.Integer,
> java.lang.Integer, Array[java.lang.Long]]]))
>
> However, that approach throws the following exception which indicates that
> I have failed to register the Tuple3 correctly:
>
> java.lang.IllegalArgumentException: Class is not registered: scala.Tuple3[]
>
> Can anyone suggest the correct way to register this Tuple3?  I suppose
> that I could create register the tuple from a Java method but it would be
> nice to avoid having to introduce any Java into my code.
>
> Thanks.
>


Re: 2 of 20,675 Spark Streaming : Out put frequency different from read frequency in StatefulNetworkWordCount

2015-12-30 Thread Shixiong(Ryan) Zhu
You can use "reduceByKeyAndWindow", e.g.,

val lines = ssc.socketTextStream("localhost", )
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow((x: Int,
y: Int) => x + y, Seconds(60), Seconds(60))
wordCounts.print()


On Wed, Dec 30, 2015 at 12:00 PM, Soumitra Johri <
soumitra.siddha...@gmail.com> wrote:

> Hi, in the example :
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
>
> the streaming frequency is 1 seconds however I do not want to print the
> contents of the word-counts every minute and resent the word counts again
> back to 0 every minute. How can I do that ?
>
> I have to print per minute work counts with streaming frequency of 1
> second. I though of using scala schedulers but then there can be
> concurrency issues.
>
> My algorithm is as follows :
>
>1. Read the words every 1 second
>2. Do cumulative work count for 60 seconds
>3. After the end of every 60 second (1 minute ) print the workcounts
>and resent the counters to zero.
>
> Any help would be appreciated!
>
> Thanks
>
> Warm Regards
>


<    1   2