Re: application failed on large dataset

2015-09-18 Thread 周千昊
Hi,
 The issue turn outs to be a memory issue. Thanks for the guidance.

周千昊 于2015年9月17日周四 下午12:39写道:

> indeed, the operation in this stage is quite memory consuming.
> We are trying to enable the printGCDetail option and see what is going on.
>
> java8964 于2015年9月16日周三 下午11:47写道:
>
>> This sounds like a memory issue.
>>
>> Do you enable the GC output? When this is happening, are your executors
>> doing full gc? How long is the full gc?
>>
>> Yong
>>
>> --
>> From: qhz...@apache.org
>> Date: Wed, 16 Sep 2015 13:52:25 +
>>
>> Subject: Re: application failed on large dataset
>> To: java8...@hotmail.com; user@spark.apache.org
>>
>> Hi,
>>  I have switch 'spark.shuffle.blockTransferService' to 'nio'. But the
>> problem still exists. However the stack trace is a little bit different:
>> PART one:
>> 15/09/16 06:20:32 ERROR executor.Executor: Exception in task 1.2 in stage
>> 15.0 (TID 5341)
>> java.io.IOException: Failed without being ACK'd
>> at
>> org.apache.spark.network.nio.ConnectionManager$MessageStatus.failWithoutAck(ConnectionManager.scala:72)
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:533)
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$removeConnection$3.apply(ConnectionManager.scala:531)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.network.nio.ConnectionManager.removeConnection(ConnectionManager.scala:531)
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:510)
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$addListeners$3.apply(ConnectionManager.scala:510)
>> at
>> org.apache.spark.network.nio.Connection.callOnCloseCallback(Connection.scala:162)
>> at
>> org.apache.spark.network.nio.Connection.close(Connection.scala:130)
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$stop$1.apply(ConnectionManager.scala:1000)
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anonfun$stop$1.apply(ConnectionManager.scala:1000)
>> at
>> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
>> at
>> scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:107)
>> at
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>> at
>> scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:107)
>> at
>> org.apache.spark.network.nio.ConnectionManager.stop(ConnectionManager.scala:1000)
>> at
>> org.apache.spark.network.nio.NioBlockTransferService.close(NioBlockTransferService.scala:78)
>> at
>> org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1228)
>> at org.apache.spark.SparkEnv.stop(SparkEnv.scala:100)
>> at org.apache.spark.executor.Executor.stop(Executor.scala:144)
>> at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:113)
>> at org.apache.spark.rpc.akka.AkkaRpcEnv.org
>> $apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:177)
>> at
>> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:126)
>> at org.apache.spark.rpc.akka.AkkaRpcEnv.org
>> $apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:197)
>> at
>> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:125)
>> at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>> at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>> at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>> at
>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
>> at
>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
>> at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>> at
>> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> at
>> org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:92)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>   

Re: Checkpointing with Kinesis

2015-09-18 Thread Nick Pentreath
Are you doing actual transformations / aggregation in Spark Streaming? Or
just using it to bulk write to S3?

If the latter, then you could just use your AWS Lambda function to read
directly from the Kinesis stream. If the former, then perhaps either look
into the WAL option that Aniket mentioned, or perhaps you could write the
processed RDD back to Kinesis, and have the Lambda function read the
Kinesis stream and write to Redshift?

On Thu, Sep 17, 2015 at 5:48 PM, Alan Dipert  wrote:

> Hello,
> We are using Spark Streaming 1.4.1 in AWS EMR to process records from
> Kinesis.  Our Spark program saves RDDs to S3, after which the records are
> picked up by a Lambda function that loads them into Redshift.  That no data
> is lost during processing is important to us.
>
> We have set our Kinesis checkpoint interval to 15 minutes, which is also
> our window size.
>
> Unfortunately, checkpointing happens after receiving data from Kinesis,
> not after we have successfully written to S3.  If batches back up in Spark,
> and the cluster is terminated, whatever data was in-memory will be lost
> because it was checkpointed but not actually saved to S3.
>
> We are considering forking and modifying the kinesis-asl library with
> changes that would allow us to perform the checkpoint manually and at the
> right time.  We'd rather not do this.
>
> Are we overlooking an easier way to deal with this problem?  Thank you in
> advance for your insight!
>
> Alan
>


Re: WAL on S3

2015-09-18 Thread Steve Loughran

> On 17 Sep 2015, at 21:40, Tathagata Das  wrote:
> 
> Actually, the current WAL implementation (as of Spark 1.5) does not work with 
> S3 because S3 does not support flushing. Basically, the current 
> implementation assumes that after write + flush, the data is immediately 
> durable, and readable if the system crashes without closing the WAL file. 
> This does not work with S3 as data is durable only and only if the S3 file 
> output stream is cleanly closed. 
> 


more precisely, unless you turn multipartition uploads on, the S3n/s3a clients 
Spark uses *doesn't even upload anything to s3*.

It's not a filesystem, and you have to bear that in mind.

Amazon's own s3 client used in EMR behaves differently; it may be usable as a 
destination (I haven't tested)


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Running the deep-learning the application on cluster:

2015-09-18 Thread Angel Angel
Hello,


Re: WAL on S3

2015-09-18 Thread Tathagata Das
I dont think it would work with multipart upload either. The file is not
visible until the multipart download is explicitly closed. So even if each
write a part upload, all the parts are not visible until the multiple
download is closed.

TD

On Fri, Sep 18, 2015 at 1:55 AM, Steve Loughran 
wrote:

>
> > On 17 Sep 2015, at 21:40, Tathagata Das  wrote:
> >
> > Actually, the current WAL implementation (as of Spark 1.5) does not work
> with S3 because S3 does not support flushing. Basically, the current
> implementation assumes that after write + flush, the data is immediately
> durable, and readable if the system crashes without closing the WAL file.
> This does not work with S3 as data is durable only and only if the S3 file
> output stream is cleanly closed.
> >
>
>
> more precisely, unless you turn multipartition uploads on, the S3n/s3a
> clients Spark uses *doesn't even upload anything to s3*.
>
> It's not a filesystem, and you have to bear that in mind.
>
> Amazon's own s3 client used in EMR behaves differently; it may be usable
> as a destination (I haven't tested)
>
>


Re: KafkaDirectStream can't be recovered from checkpoint

2015-09-18 Thread Tathagata Das
Also, giving more log4j messages around the error area would be useful.

On Thu, Sep 17, 2015 at 1:02 PM, Cody Koeninger  wrote:

> Is there a particular reason you're calling checkpoint on the stream in
> addition to the streaming context?
>
> On Thu, Sep 17, 2015 at 2:36 PM, Petr Novak  wrote:
>
>> Hi all,
>> it throws FileBasedWriteAheadLogReader: Error reading next item, EOF
>> reached
>> java.io.EOFException
>>   at java.io.DataInputStream.readInt(DataInputStream.java:392)
>>   at
>> org.apache.spark.streaming.util.FileBaseWriteAheadLogReader.hasNext(FileBasedWriteAheadLogReader.scala:47)
>>
>> WAL is not enabled in config, it is default, hence false.
>>
>> The code is by example and quite simple for testing (I'm aware that file
>> save isn't idempotent). Or do I have something wrong there? It was tried on
>> Spark 1.5.0.
>>
>> object Loader {
>>   def main(args: Array[String]): Unit = {
>>
>> val checkpointDir = "/dfs/spark/checkpoints"
>>
>> val sparkConf = new SparkConf()
>>   .setAppName("Spark Loader")
>>   .setIfMissing("spark.master", "local[2]")
>>   .setIfMissing("spark.streaming.kafka.maxRatePerPartition", "1000")
>>
>> val ssc = StreamingContext.getOrCreate(
>>   checkpointDir,
>>   createStreamingContext(sparkConf, checkpointDir))
>>
>> ssc.start()
>> ssc.awaitTermination()
>>   }
>>
>>   def createStreamingContext(conf: SparkConf, checkpointDir: String)(): 
>> StreamingContext = {
>> val ssc = new StreamingContext(conf, Seconds(60))
>>
>> val sc = ssc.sparkContext
>> val sqlc = new SQLContext(sc)
>>
>> ssc.checkpoint(checkpointDir)
>>
>> import sqlc.implicits._
>>
>> val kafkaParams = Map[String, String](
>>   "metadata.broker.list" -> "tesla1:9092,tesla2:9092,tesla3:9092",
>>   "auto.offset.reset" -> "smallest")
>>
>> val topics = Set("topic-p03-r01")
>>
>> val stream = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](
>> ssc, kafkaParams, topics)
>>
>> stream
>>   .checkpoint(Seconds(60))
>>   .foreachRDD { (rdd, time) =>
>>   rdd.toDF()
>> .write
>> .json(s"/dfs/spark/agg/${time.milliseconds / 1000}")
>> }
>>
>> ssc
>>   }
>> }
>>
>>
>> Many thanks for any idea,
>> Petr
>>
>
>


Zeppelin on Yarn : org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.

2015-09-18 Thread shahab
Hi,

Probably I have wrong zeppelin  configuration, because I get the following
error when I execute spark statements in Zeppelin:

org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't
running on a cluster. Deployment to YARN is not supported directly by
SparkContext. Please use spark-submit.


Anyone knows What's the solution to this?

best,
/Shahab


Re: Zeppelin on Yarn : org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submi

2015-09-18 Thread Aniket Bhatnagar
Can you try yarn-client mode?

On Fri, Sep 18, 2015, 3:38 PM shahab  wrote:

> Hi,
>
> Probably I have wrong zeppelin  configuration, because I get the following
> error when I execute spark statements in Zeppelin:
>
> org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't
> running on a cluster. Deployment to YARN is not supported directly by
> SparkContext. Please use spark-submit.
>
>
> Anyone knows What's the solution to this?
>
> best,
> /Shahab
>


Constant Spark execution time with different # of slaves

2015-09-18 Thread Warfish
Hi everyone,

for research purposes I wanted to see how Spark scales for my algorithm with
regards to different cluster sizes. I have a cluster with 10 nodes with 6
cores and 45 GB of RAM each. My algorithm takes approximately 15 minutes to
execute on all nodes (as seen in Spark UI, each node was running). Here's
the weird thing: I gradually reduced the number of slave nodes down to 1 and
the execution time for my application stayed exactly the same. I expected
the execution time to go up linearly/exponentially as the number of slave
nodes goes down, but it doesn't. Now I have no idea how to debug this
"issue" or what to look for in the UI.

Things I have tried:
- Reduce the amount of RAM on each machine down to 1GB.
- Reduce the number of cores on each machine down to 1.
- Increase the amount of data I am processing.

I see that when I reduce resources on each machine (i.e. reduce RAM/CPU),
the computation time goes up, but the time taken still remains almost
constant with different numbers of slaves. 

Can someone give me a hint on what to look for? This behaviour seems very
strange to me.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Constant-Spark-execution-time-with-different-of-slaves-tp24735.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: master hung after killing the streaming sc

2015-09-18 Thread Tathagata Das
Do you have event logs enabled?
Streaming + event logs enabled  can hang master -
https://issues.apache.org/jira/browse/SPARK-6270

On Thu, Sep 17, 2015 at 11:35 PM, ZhuGe  wrote:

> Hi there:
> we recently deploy a streaming application in our stand alone cluster. And
> we found a issue when we trying to stop the streaming sc(has been working
> for several days)with the kill command in the spark ui.
> By kill command, i mean the 'kill' button in the "Submission ID" column
> of  "Running Drivers" table.
> It would cause the master hung,  the ui could not work any more as all the
> request would time out.
> I use jstat to print the gc info of the mater. there is 3-5 young gc per
> sec.
>
> Log is attached below:
> [INFO 2015-09-18 12:26:41 (Logging.scala:59)] Asked to kill driver
> driver-20150916172518-0002
> [INFO 2015-09-18 12:26:41 (Logging.scala:59)] Kill request for
> driver-20150916172518-0002 submitted
> [INFO 2015-09-18 12:26:43 (Logging.scala:59)] Received unregister request
> from application app-20150916172521-0055
> [INFO 2015-09-18 12:26:43 (Logging.scala:59)] Removing app
> app-20150916172521-0055
> [WARN 2015-09-18 12:26:43 (Logging.scala:71)] Application
> GuessitDirectKafkaStreaming is still in progress, it may be terminated
> abnormally.
> [INFO 2015-09-18 12:26:43 (Logging.scala:59)] Changing view acls to:
> spdcadmin
> [INFO 2015-09-18 12:26:43 (Logging.scala:59)] Changing modify acls to:
> spdcadmin
> [INFO 2015-09-18 12:26:43 (Logging.scala:59)] SecurityManager:
> authentication disabled; ui acls disabled; users with view permissions:
> Set(spdcadmin); users with modify permissions: Set(spdcadmin)
> [WARN 2015-09-18 12:28:46 (Logging.scala:92)] GET / failed:
> java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
> java.util.concurrent.TimeoutException: Futures timed out after [120
> seconds]
> at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at
> org.apache.spark.deploy.master.ui.MasterPage.getMasterState(MasterPage.scala:40)
> at
> org.apache.spark.deploy.master.ui.MasterPage.render(MasterPage.scala:74)
> at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:79)
> at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:79)
> at
> org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:69)
> at javax.servlet.http.HttpServlet.service(HttpServlet.java:735)
> at javax.servlet.http.HttpServlet.service(HttpServlet.java:848)
> at
> org.spark-project.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
> at
> org.spark-project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
> at
> org.spark-project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
> at
> org.spark-project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
> at
> org.spark-project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
> at
> org.spark-project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
> at
> org.spark-project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
> at
> org.spark-project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
> at org.spark-project.jetty.server.Server.handle(Server.java:370)
> at
> org.spark-project.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
> at
> org.spark-project.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
> at
> org.spark-project.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033)
> at
> org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:644)
> at
> org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
> at
> org.spark-project.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
> at
> org.spark-project.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
> at
> org.spark-project.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
> at
> org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
> at
> org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> We stuck in this situation several times and could not find any reference
> about this.
> Any help would be appreciated!
>
> Cheers
> Ge Zhu
>


Re: Zeppelin on Yarn : org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submi

2015-09-18 Thread shahab
It works using yarn-client but I want to make it running on cluster. Is
there any way to do so?

best,
/Shahab

On Fri, Sep 18, 2015 at 12:54 PM, Aniket Bhatnagar <
aniket.bhatna...@gmail.com> wrote:

> Can you try yarn-client mode?
>
> On Fri, Sep 18, 2015, 3:38 PM shahab  wrote:
>
>> Hi,
>>
>> Probably I have wrong zeppelin  configuration, because I get the
>> following error when I execute spark statements in Zeppelin:
>>
>> org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't
>> running on a cluster. Deployment to YARN is not supported directly by
>> SparkContext. Please use spark-submit.
>>
>>
>> Anyone knows What's the solution to this?
>>
>> best,
>> /Shahab
>>
>


Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-18 Thread Vipul Rai
Hi Nick/Igor,

​​
Any solution for this ?
Even I am having the same issue and copying jar to each executor is not
feasible if we use lot of jars.

Thanks,
Vipul


Re: Zeppelin on Yarn : org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submi

2015-09-18 Thread Aniket Bhatnagar
I don't think yarn-cluster mode is currently supported. You may want to ask
zeppelin community for confirmation though.

On Fri, Sep 18, 2015, 5:41 PM shahab  wrote:

> It works using yarn-client but I want to make it running on cluster. Is
> there any way to do so?
>
> best,
> /Shahab
>
> On Fri, Sep 18, 2015 at 12:54 PM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> Can you try yarn-client mode?
>>
>> On Fri, Sep 18, 2015, 3:38 PM shahab  wrote:
>>
>>> Hi,
>>>
>>> Probably I have wrong zeppelin  configuration, because I get the
>>> following error when I execute spark statements in Zeppelin:
>>>
>>> org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't
>>> running on a cluster. Deployment to YARN is not supported directly by
>>> SparkContext. Please use spark-submit.
>>>
>>>
>>> Anyone knows What's the solution to this?
>>>
>>> best,
>>> /Shahab
>>>
>>
>


GraphX to work with Streaming

2015-09-18 Thread Rohit Kumar
 

Hi 

 

I am having a setup where I have the edges coming as a stream I want to
create a graph in GraphX which updates its structure after new edge comes.
Is there any way to do this using spark streaming and graphx?

 

Regards

Rohit



Re: KafkaDirectStream can't be recovered from checkpoint

2015-09-18 Thread Petr Novak
This one is generated, I suppose, after Ctrl+C

15/09/18 14:38:25 INFO Worker: Asked to kill executor
app-20150918143823-0001/0
15/09/18 14:38:25 INFO Worker: Asked to kill executor
app-20150918143823-0001/0
15/09/18 14:38:25 DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
message (0.568753 ms) AkkaMessage(KillExecutor(#,false) from
Actor[akka://sparkWorker/deadLetters]
15/09/18 14:38:25 DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
message (0.568753 ms) AkkaMessage(KillExecutor(#,false) from
Actor[akka://sparkWorker/deadLetters]
15/09/18 14:38:25 INFO ExecutorRunner: Runner thread for executor
app-20150918143823-0001/0 interrupted
15/09/18 14:38:25 INFO ExecutorRunner: Runner thread for executor
app-20150918143823-0001/0 interrupted
15/09/18 14:38:25 INFO ExecutorRunner: Killing process!
15/09/18 14:38:25 INFO ExecutorRunner: Killing process!
15/09/18 14:38:25 ERROR FileAppender: Error writing stream to file
/dfs/spark/work/app-20150918143823-0001/0/stderr
java.io.IOException: Stream closed
at
java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:272)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at java.io.FilterInputStream.read(FilterInputStream.java:107)
at
org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
at
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
at
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
at
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at
org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
15/09/18 14:38:25 ERROR FileAppender: Error writing stream to file
/dfs/spark/work/app-20150918143823-0001/0/stderr
java.io.IOException: Stream closed
at
java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:272)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at java.io.FilterInputStream.read(FilterInputStream.java:107)
at
org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
at
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
at
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
at
org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at
org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
15/09/18 14:38:25 DEBUG FileAppender: Closed file
/dfs/spark/work/app-20150918143823-0001/0/stderr
15/09/18 14:38:25 DEBUG FileAppender: Closed file
/dfs/spark/work/app-20150918143823-0001/0/stderr

Petr


Re: KafkaDirectStream can't be recovered from checkpoint

2015-09-18 Thread Petr Novak
I have tried it on Spark 1.3.0 2.10 and it works. The same code doesn't on
Spark 1.5.0 2.11. It would be nice if anybody could try on another
installation to ensure it is something wrong on my cluster.

Many thanks,
Petr

On Fri, Sep 18, 2015 at 4:07 PM, Petr Novak  wrote:

> This one is generated, I suppose, after Ctrl+C
>
> 15/09/18 14:38:25 INFO Worker: Asked to kill executor
> app-20150918143823-0001/0
> 15/09/18 14:38:25 INFO Worker: Asked to kill executor
> app-20150918143823-0001/0
> 15/09/18 14:38:25 DEBUG
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
> message (0.568753 ms) AkkaMessage(KillExecutor(#,false) from
> Actor[akka://sparkWorker/deadLetters]
> 15/09/18 14:38:25 DEBUG
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
> message (0.568753 ms) AkkaMessage(KillExecutor(#,false) from
> Actor[akka://sparkWorker/deadLetters]
> 15/09/18 14:38:25 INFO ExecutorRunner: Runner thread for executor
> app-20150918143823-0001/0 interrupted
> 15/09/18 14:38:25 INFO ExecutorRunner: Runner thread for executor
> app-20150918143823-0001/0 interrupted
> 15/09/18 14:38:25 INFO ExecutorRunner: Killing process!
> 15/09/18 14:38:25 INFO ExecutorRunner: Killing process!
> 15/09/18 14:38:25 ERROR FileAppender: Error writing stream to file
> /dfs/spark/work/app-20150918143823-0001/0/stderr
> java.io.IOException: Stream closed
> at
> java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162)
> at java.io.BufferedInputStream.read1(BufferedInputStream.java:272)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
> at java.io.FilterInputStream.read(FilterInputStream.java:107)
> at
> org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
> at
> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
> at
> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
> at
> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
> at
> org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
> 15/09/18 14:38:25 ERROR FileAppender: Error writing stream to file
> /dfs/spark/work/app-20150918143823-0001/0/stderr
> java.io.IOException: Stream closed
> at
> java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162)
> at java.io.BufferedInputStream.read1(BufferedInputStream.java:272)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
> at java.io.FilterInputStream.read(FilterInputStream.java:107)
> at
> org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
> at
> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
> at
> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
> at
> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
> at
> org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
> 15/09/18 14:38:25 DEBUG FileAppender: Closed file
> /dfs/spark/work/app-20150918143823-0001/0/stderr
> 15/09/18 14:38:25 DEBUG FileAppender: Closed file
> /dfs/spark/work/app-20150918143823-0001/0/stderr
>
> Petr
>


Re: KafkaDirectStream can't be recovered from checkpoint

2015-09-18 Thread Petr Novak
...to ensure it is not something wrong on my cluster.

On Fri, Sep 18, 2015 at 4:09 PM, Petr Novak  wrote:

> I have tried it on Spark 1.3.0 2.10 and it works. The same code doesn't on
> Spark 1.5.0 2.11. It would be nice if anybody could try on another
> installation to ensure it is something wrong on my cluster.
>
> Many thanks,
> Petr
>
> On Fri, Sep 18, 2015 at 4:07 PM, Petr Novak  wrote:
>
>> This one is generated, I suppose, after Ctrl+C
>>
>> 15/09/18 14:38:25 INFO Worker: Asked to kill executor
>> app-20150918143823-0001/0
>> 15/09/18 14:38:25 INFO Worker: Asked to kill executor
>> app-20150918143823-0001/0
>> 15/09/18 14:38:25 DEBUG
>> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
>> message (0.568753 ms) AkkaMessage(KillExecutor(#,false) from
>> Actor[akka://sparkWorker/deadLetters]
>> 15/09/18 14:38:25 DEBUG
>> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
>> message (0.568753 ms) AkkaMessage(KillExecutor(#,false) from
>> Actor[akka://sparkWorker/deadLetters]
>> 15/09/18 14:38:25 INFO ExecutorRunner: Runner thread for executor
>> app-20150918143823-0001/0 interrupted
>> 15/09/18 14:38:25 INFO ExecutorRunner: Runner thread for executor
>> app-20150918143823-0001/0 interrupted
>> 15/09/18 14:38:25 INFO ExecutorRunner: Killing process!
>> 15/09/18 14:38:25 INFO ExecutorRunner: Killing process!
>> 15/09/18 14:38:25 ERROR FileAppender: Error writing stream to file
>> /dfs/spark/work/app-20150918143823-0001/0/stderr
>> java.io.IOException: Stream closed
>> at
>> java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162)
>> at java.io.BufferedInputStream.read1(BufferedInputStream.java:272)
>> at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>> at java.io.FilterInputStream.read(FilterInputStream.java:107)
>> at
>> org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
>> at
>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
>> at
>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
>> at
>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
>> at
>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>> at
>> org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
>> 15/09/18 14:38:25 ERROR FileAppender: Error writing stream to file
>> /dfs/spark/work/app-20150918143823-0001/0/stderr
>> java.io.IOException: Stream closed
>> at
>> java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162)
>> at java.io.BufferedInputStream.read1(BufferedInputStream.java:272)
>> at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>> at java.io.FilterInputStream.read(FilterInputStream.java:107)
>> at
>> org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
>> at
>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
>> at
>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
>> at
>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
>> at
>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>> at
>> org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
>> 15/09/18 14:38:25 DEBUG FileAppender: Closed file
>> /dfs/spark/work/app-20150918143823-0001/0/stderr
>> 15/09/18 14:38:25 DEBUG FileAppender: Closed file
>> /dfs/spark/work/app-20150918143823-0001/0/stderr
>>
>> Petr
>>
>
>


Re: Caching intermediate results in Spark ML pipeline?

2015-09-18 Thread Jingchu Liu
Thanks buddy I'll try it out in my project.

Best,
Lewis

2015-09-16 13:29 GMT+08:00 Feynman Liang :

> If you're doing hyperparameter grid search, consider using
> ml.tuning.CrossValidator which does cache the dataset
> 
> .
>
> Otherwise, perhaps you can elaborate more on your particular use case for
> caching intermediate results and if the current API doesn't support it we
> can create a JIRA for it.
>
> On Tue, Sep 15, 2015 at 10:26 PM, Jingchu Liu 
> wrote:
>
>> Yeah I understand on the low-level we should do as you said.
>>
>> But since ML pipeline is a high-level API, it is pretty natural to expect
>> the ability to recognize overlapping parameters between successive runs.
>> (Actually, this happen A LOT when we have lots of hyper-params to search
>> for)
>>
>> I can also imagine the implementation by appending parameter information
>> to the cached results. Let's say if we implemented an "equal" method for
>> param1. By comparing param1 with the previous run, the program will know
>> data1 is reusable. And time used for generating data1 can be saved.
>>
>> Best,
>> Lewis
>>
>> 2015-09-15 23:05 GMT+08:00 Feynman Liang :
>>
>>> Nope, and that's intentional. There is no guarantee that rawData did not
>>> change between intermediate calls to searchRun, so reusing a cached data1
>>> would be incorrect.
>>>
>>> If you want data1 to be cached between multiple runs, you have a few
>>> options:
>>> * cache it first and pass it in as an argument to searchRun
>>> * use a creational pattern like singleton to ensure only one
>>> instantiation
>>>
>>> On Tue, Sep 15, 2015 at 12:49 AM, Jingchu Liu 
>>> wrote:
>>>
 Hey Feynman,

 I doubt DF persistence will work in my case. Let's use the following
 code:
 ==
 def searchRun( params = [param1, param2] )
   data1 = hashing1.transform(rawData, param1)
   data1.cache()
   data2 = hashing2.transform(data1, param2)
   data2.someAction()
 ==
 Say if we run "searchRun()" for 2 times with the same "param1" but
 different "param2". Will spark recognize that the two local variables
 "data1" in consecutive runs has the same content?


 Best,
 Lewis

 2015-09-15 13:58 GMT+08:00 Feynman Liang :

> You can persist the transformed Dataframes, for example
>
> val data : DF = ...
> val hashedData = hashingTF.transform(data)
> hashedData.cache() // to cache DataFrame in memory
>
> Future usage of hashedData read from an in-memory cache now.
>
> You can also persist to disk, eg:
>
> hashedData.write.parquet(FilePath) // to write DataFrame in Parquet
> format to disk
> ...
> val savedHashedData = sqlContext.read.parquet(FilePath)
>
> Future uses of hash
>
> Like my earlier response, this will still require you call each
> PipelineStage's `transform` method (i.e. to NOT use the overall
> Pipeline.setStages API)
>
> On Mon, Sep 14, 2015 at 10:45 PM, Jingchu Liu 
> wrote:
>
>> Hey Feynman,
>>
>> Thanks for your response, but I'm afraid "model save/load" is not
>> exactly the feature I'm looking for.
>>
>> What I need to cache and reuse are the intermediate outputs of
>> transformations, not transformer themselves. Do you know any related dev.
>> activities or plans?
>>
>> Best,
>> Lewis
>>
>> 2015-09-15 13:03 GMT+08:00 Feynman Liang :
>>
>>> Lewis,
>>>
>>> Many pipeline stages implement save/load methods, which can be used
>>> if you instantiate and call the underlying pipeline stages `transform`
>>> methods individually (instead of using the Pipeline.setStages API). See
>>> associated JIRAs .
>>>
>>> Pipeline persistence is on the 1.6 roadmap, JIRA here
>>> .
>>>
>>> Feynman
>>>
>>> On Mon, Sep 14, 2015 at 9:20 PM, Jingchu Liu 
>>> wrote:
>>>
 Hi all,

 I have a question regarding the ability of ML pipeline to cache
 intermediate results. I've posted this question on stackoverflow
 
 but got no answer, hope someone here can help me out.

 ===
 Lately I'm planning to migrate my standalone python ML code to
 spark. The ML pipeline in spark.ml turns out quite handy, with
 streamlined API for chaining up algorithm stages and hyper-parameter 
 grid
 search.

 Still, I found its support for one important feature obscure in
 existing documents: caching of inter

Re: Spark Streaming stop gracefully doesn't return to command line after upgrade to 1.4.0 and beyond

2015-09-18 Thread Petr Novak
I removed custom shutdown hook and it still doesn't work. I'm using
KafkaDirectStream.

I sometimes get java.lang.InterruptedException on Ctrl+C sometimes it goes
through fine.

I have this code now:

... some stream processing ...
ssc.start()
ssc.awaitTermination()
ssc.stop(stopSparkContext = false, stopGracefully = true)

It seems there is undocumented config parameter
spark.streaming.stopGracefullyOnShutdown (Boolean).

Based on the source code for StreamingContext.scala where there is:

def start()
  match {
case INITIALIZED =>
  ...
  shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
  ...
...
  }
}

def stopOnShutdown() = {
  val stopGracefully =
conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
  stop(stopSparkContext = false, stopGracefully = stopGracefully)
}

I tried to set spark.streaming.stopGracefullyOnShutdown=true in SparkConf.
And removed ssc.stop(stopSparkContext = false, stopGracefully = true) from
my code.

On Ctrl+C is seems to gracefully shutdown without any erorr but the job
can't be restarted with the same error that DirectKafkaInputDStream has not
been initialized. Only when checkpoints are deleted it can been started
again.

Can anybody confirm that graceful shutdown works with
DirectKafkaInputDStream?

Here is the error output for InterruptedException for Ctrl+C:

[2015-09-11 14:47:16,662] INFO Starting task 0.0 in stage 1.0 (TID 1,
10.29.52.111, ANY, 2026 bytes) (org.apache.spark.scheduler.TaskSetManager)
[2015-09-11 14:47:16,662] INFO ensureFreeSpace(1600) called with
curMem=6769, maxMem=556038881 (org.apache.spark.storage.MemoryStore)
[2015-09-11 14:47:16,662] INFO Block broadcast_2 stored as values in memory
(estimated size 1600.0 B, free 530.3 MB)
(org.apache.spark.storage.MemoryStore)
[2015-09-11 14:47:16,663] INFO ensureFreeSpace(1020) called with
curMem=8369, maxMem=556038881 (org.apache.spark.storage.MemoryStore)
[2015-09-11 14:47:16,663] INFO Block broadcast_2_piece0 stored as bytes in
memory (estimated size 1020.0 B, free 530.3 MB)
(org.apache.spark.storage.MemoryStore)
[2015-09-11 14:47:16,671] INFO Added broadcast_2_piece0 in memory on
10.29.52.111:51578 (size: 1020.0 B, free: 530.3 MB)
(org.apache.spark.storage.BlockManagerInfo)
[2015-09-11 14:47:16,671] INFO Created broadcast 2 from broadcast at
DAGScheduler.scala:861 (org.apache.spark.SparkContext)
[2015-09-11 14:47:16,672] INFO Submitting 2 missing tasks from
ShuffleMapStage 2 (ParallelCollectionRDD[0] at parallelize at
Aggregator.scala:174) (org.apache.spark.scheduler.DAGScheduler)
[2015-09-11 14:47:16,672] INFO Adding task set 2.0 with 2 tasks
(org.apache.spark.scheduler.TaskSchedulerImpl)
[2015-09-11 14:47:16,673] INFO Added broadcast_1_piece0 in memory on
10.29.52.111:50459 (size: 2.4 KB, free: 530.3 MB)
(org.apache.spark.storage.BlockManagerInfo)
^C[2015-09-11 14:47:19,896] INFO Invoking stop(stopGracefully=false) from
shutdown hook (org.apache.spark.streaming.StreamingContext)
[2015-09-11 14:47:19,896] INFO Stopping JobGenerator immediately
(org.apache.spark.streaming.scheduler.JobGenerator)
[2015-09-11 14:47:19,897] INFO Stopped timer for JobGenerator after time
144197562 (org.apache.spark.streaming.util.RecurringTimer)
[2015-09-11 14:47:19,897] INFO CheckpointWriter executor terminated ? true,
waited for 0 ms. (org.apache.spark.streaming.CheckpointWriter)
[2015-09-11 14:47:19,898] INFO Stopped JobGenerator
(org.apache.spark.streaming.scheduler.JobGenerator)
[2015-09-11 14:47:21,899] ERROR Aborting job.
(org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation)
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:503)
at
org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:559)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
 

Re: KafkaDirectStream can't be recovered from checkpoint

2015-09-18 Thread Petr Novak
It might be connected with my problems with gracefulShutdown in Spark 1.5.0
2.11
https://mail.google.com/mail/#search/petr/14fb6bd5166f9395

Maybe Ctrl+C corrupts checkpoints and breaks gracefulShutdown?

Petr

On Fri, Sep 18, 2015 at 4:10 PM, Petr Novak  wrote:

> ...to ensure it is not something wrong on my cluster.
>
> On Fri, Sep 18, 2015 at 4:09 PM, Petr Novak  wrote:
>
>> I have tried it on Spark 1.3.0 2.10 and it works. The same code doesn't
>> on Spark 1.5.0 2.11. It would be nice if anybody could try on another
>> installation to ensure it is something wrong on my cluster.
>>
>> Many thanks,
>> Petr
>>
>> On Fri, Sep 18, 2015 at 4:07 PM, Petr Novak  wrote:
>>
>>> This one is generated, I suppose, after Ctrl+C
>>>
>>> 15/09/18 14:38:25 INFO Worker: Asked to kill executor
>>> app-20150918143823-0001/0
>>> 15/09/18 14:38:25 INFO Worker: Asked to kill executor
>>> app-20150918143823-0001/0
>>> 15/09/18 14:38:25 DEBUG
>>> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
>>> message (0.568753 ms) AkkaMessage(KillExecutor(#,false) from
>>> Actor[akka://sparkWorker/deadLetters]
>>> 15/09/18 14:38:25 DEBUG
>>> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
>>> message (0.568753 ms) AkkaMessage(KillExecutor(#,false) from
>>> Actor[akka://sparkWorker/deadLetters]
>>> 15/09/18 14:38:25 INFO ExecutorRunner: Runner thread for executor
>>> app-20150918143823-0001/0 interrupted
>>> 15/09/18 14:38:25 INFO ExecutorRunner: Runner thread for executor
>>> app-20150918143823-0001/0 interrupted
>>> 15/09/18 14:38:25 INFO ExecutorRunner: Killing process!
>>> 15/09/18 14:38:25 INFO ExecutorRunner: Killing process!
>>> 15/09/18 14:38:25 ERROR FileAppender: Error writing stream to file
>>> /dfs/spark/work/app-20150918143823-0001/0/stderr
>>> java.io.IOException: Stream closed
>>> at
>>> java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162)
>>> at java.io.BufferedInputStream.read1(BufferedInputStream.java:272)
>>> at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>> at java.io.FilterInputStream.read(FilterInputStream.java:107)
>>> at
>>> org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
>>> at
>>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
>>> at
>>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
>>> at
>>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
>>> at
>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>> at
>>> org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
>>> 15/09/18 14:38:25 ERROR FileAppender: Error writing stream to file
>>> /dfs/spark/work/app-20150918143823-0001/0/stderr
>>> java.io.IOException: Stream closed
>>> at
>>> java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162)
>>> at java.io.BufferedInputStream.read1(BufferedInputStream.java:272)
>>> at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
>>> at java.io.FilterInputStream.read(FilterInputStream.java:107)
>>> at
>>> org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
>>> at
>>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
>>> at
>>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
>>> at
>>> org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
>>> at
>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>> at
>>> org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
>>> 15/09/18 14:38:25 DEBUG FileAppender: Closed file
>>> /dfs/spark/work/app-20150918143823-0001/0/stderr
>>> 15/09/18 14:38:25 DEBUG FileAppender: Closed file
>>> /dfs/spark/work/app-20150918143823-0001/0/stderr
>>>
>>> Petr
>>>
>>
>>
>


spark 1.5, ML Pipeline Decision Tree Dataframe Problem

2015-09-18 Thread Yasemin Kaya
Hi,

I am using *spark 1.5, ML Pipeline Decision Tree
*
to get tree's probability. But I have to convert my data to Dataframe type.
While creating model there is no problem but when I am using model on my
data there is a problem about converting to data frame type. My data type
is *JavaPairRDD* , when I am creating dataframe

DataFrame production = sqlContext.createDataFrame(
unlabeledTest.values(), Vector.class);

*Error says me: *
Exception in thread "main" java.lang.ClassCastException:
org.apache.spark.mllib.linalg.VectorUDT cannot be cast to
org.apache.spark.sql.types.StructType

I know if I give LabeledPoint type, there will be no problem. But the data
have no label, I wanna predict the label because of this reason I use model
on it.

Is there way to handle my problem?
Thanks.


Best,
yasemin
-- 
hiç ender hiç


Python UDF and explode error

2015-09-18 Thread Pavel Burdanov
from pyspark.sql.types import *
import pyspark.sql.functions as F
import re

re_expr = re.compile('P([0-9]+)')
expr_split = F.udf(lambda x: list(map(int, re_expr.findall(x))),
ArrayType(IntegerType()))
c = sc.parallelize([(i, 'P1&P2&P3&P4&P5') for i in range(10 ** 6)],
1).toDF(['a', 'b'])
#sqlContext.sql('drop table ccc')
sqlContext.sql('create table ccc(a int, b string)')
c.insertInto('ccc', True)
c = sqlContext.sql('select * from ccc')
c = c.filter(c.a % 100 == 85)
c = c.select(c.a, F.explode(expr_split(c.b)).alias('d'))
c = c.filter(c.a == 85)
c.take(5)

I use Spark 1.5 (1.4.1 has this error too) and Hive 1.1 from CDH 5.4.2 as
the warehouse. This code yields "org.apache.spark.SparkException: Can only
zip RDDs with same number of elements in each partition", but tweaking
numbers can result in unexpected EOF, wrong data returned or no error.

If I add "stored as parquet" into the create table statement, the code works
ok.

15/09/18 17:30:33 INFO scheduler.DAGScheduler: Job 10 failed: take at
:1, took 1.622074 s
Traceback (most recent call last):
  File "", line 1, in 
  File "/home/burdanov/spark/python/pyspark/sql/dataframe.py", line 303, in
take
return self.limit(num).collect()
  File "/home/burdanov/spark/python/pyspark/sql/dataframe.py", line 279, in
collect
port =
self._sc._jvm.PythonRDD.collectAndServe(self._jdf.javaToPython().rdd())
  File
"/home/burdanov/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
  File "/home/burdanov/spark/python/pyspark/sql/utils.py", line 36, in deco
return f(*a, **kw)
  File
"/home/burdanov/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage
12.0 (TID 73, localhost): org.apache.spark.SparkException: Can only zip RDDs
with same number of elements in each partition
at
org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$27$$anon$1.hasNext(RDD.scala:832)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:99)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
at org.apac

Breakpoints not hit with Scalatest + intelliJ

2015-09-18 Thread Michel Lemay
Hi,

I'm adding unit tests to some utility functions that are using SparkContext
but I'm unable to debug code and hit breakpoints when running under
IntelliJ.

My test creates a SparkContext from a SparkConf().setMaster("local[*]")...

However, when I place a breakpoint in the functions inside a spark
function, it's never hit.

for instance:

rdd.mapPartitions(p => {
  myFct(...)
})

Any help would be appreciated.


Re: Breakpoints not hit with Scalatest + intelliJ

2015-09-18 Thread Stephen Boesch
Hi Michel, please try local[1] and report back if the breakpoint were hit.

2015-09-18 7:37 GMT-07:00 Michel Lemay :

> Hi,
>
> I'm adding unit tests to some utility functions that are using
> SparkContext but I'm unable to debug code and hit breakpoints when running
> under IntelliJ.
>
> My test creates a SparkContext from a SparkConf().setMaster("local[*]")...
>
> However, when I place a breakpoint in the functions inside a spark
> function, it's never hit.
>
> for instance:
>
> rdd.mapPartitions(p => {
>   myFct(...)
> })
>
> Any help would be appreciated.
>
>
>


Re: SparkR - calling as.vector() with rdd dataframe causes error

2015-09-18 Thread Ellen Kraffmiller
Thanks for your response.  Is there a reason why this thread isn't
appearing on the mailing list?  So far, I only see my post, with no
answers, although I have received 2 answers via email.  It would be nice if
other people could see these answers as well.

On Thu, Sep 17, 2015 at 2:22 AM, Sun, Rui  wrote:

> The existing algorithms operating on R data.frame can't simply operate on
> SparkR DataFrame. They have to be re-implemented to be based on SparkR
> DataFrame API.
>
> -Original Message-
> From: ekraffmiller [mailto:ellen.kraffmil...@gmail.com]
> Sent: Thursday, September 17, 2015 3:30 AM
> To: user@spark.apache.org
> Subject: SparkR - calling as.vector() with rdd dataframe causes error
>
> Hi,
> I have a library of clustering algorithms that I'm trying to run in the
> SparkR interactive shell. (I am working on a proof of concept for a
> document classification tool.) Each algorithm takes a term document matrix
> in the form of a dataframe.  When I pass the method a local dataframe, the
> clustering algorithm works correctly, but when I pass it a spark rdd, it
> gives an error trying to coerce the data into a vector.  Here is the code,
> that I'm calling within SparkR:
>
> # get matrix from a file
> file <-
>
> "/Applications/spark-1.5.0-bin-hadoop2.6/examples/src/main/resources/matrix.csv"
>
> #read it into variable
>  raw_data <- read.csv(file,sep=',',header=FALSE)
>
> #convert to a local dataframe
> localDF = data.frame(raw_data)
>
> # create the rdd
> rdd  <- createDataFrame(sqlContext,localDF)
>
> #call the algorithm with the localDF - this works result <-
> galileo(localDF, model='hclust',dist='euclidean',link='ward',K=5)
>
> #call with the rdd - this produces error result <- galileo(rdd,
> model='hclust',dist='euclidean',link='ward',K=5)
>
> Error in as.vector(data) :
>   no method for coercing this S4 class to a vector
>
>
> I get the same error if I try to directly call as.vector(rdd) as well.
>
> Is there a reason why this works for localDF and not rdd?  Should I be
> doing something else to coerce the object into a vector?
>
> Thanks,
> Ellen
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-calling-as-vector-with-rdd-dataframe-causes-error-tp24717.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>


Re: Null Value in DecimalType column of DataFrame

2015-09-18 Thread Dirceu Semighini Filho
Hi Yin, I got that part.
I just think that instead of returning null, throwing an exception would be
better. In the exception message we can explain that the DecimalType used
can't fit the number that is been converted due to the precision and scale
values used to create it.
It would be easier for the user to find the reason why that error is
happening, instead of receiving an NullPointerException in another part of
his code. We can also make a better documentation of DecimalType classes to
explain this behavior, what do you think?




2015-09-17 18:52 GMT-03:00 Yin Huai :

> As I mentioned before, the range of values of DecimalType(10, 10) is [0,
> 1). If you have a value 10.5 and you want to cast it to DecimalType(10,
> 10), I do not think there is any better returned value except of null.
> Looks like DecimalType(10, 10) is not the right type for your use case. You
> need a decimal type that has precision - scale >= 2.
>
> On Tue, Sep 15, 2015 at 6:39 AM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
>
>>
>> Hi Yin, posted here because I think it's a bug.
>> So, it will return null and I can get a nullpointerexception, as I was
>> getting. Is this really the expected behavior? Never seen something
>> returning null in other Scala tools that I used.
>>
>> Regards,
>>
>>
>> 2015-09-14 18:54 GMT-03:00 Yin Huai :
>>
>>> btw, move it to user list.
>>>
>>> On Mon, Sep 14, 2015 at 2:54 PM, Yin Huai  wrote:
>>>
 A scale of 10 means that there are 10 digits at the right of the
 decimal point. If you also have precision 10, the range of your data will
 be [0, 1) and casting "10.5" to DecimalType(10, 10) will return null, which
 is expected.

 On Mon, Sep 14, 2015 at 1:42 PM, Dirceu Semighini Filho <
 dirceu.semigh...@gmail.com> wrote:

> Hi all,
> I'm moving from spark 1.4 to 1.5, and one of my tests is failing.
> It seems that there was some changes in org.apache.spark.sql.types.
> DecimalType
>
> This ugly code is a little sample to reproduce the error, don't use it
> into your project.
>
> test("spark test") {
>   val file = 
> context.sparkContext().textFile(s"${defaultFilePath}Test.csv").map(f => 
> Row.fromSeq({
> val values = f.split(",")
> 
> Seq(values.head.toString.toInt,values.tail.head.toString.toInt,BigDecimal(values.tail.tail.head),
> values.tail.tail.tail.head)}))
>
>   val structType = StructType(Seq(StructField("id", IntegerType, false),
> StructField("int2", IntegerType, false), StructField("double",
>
>  DecimalType(10,10), false),
>
>
> StructField("str2", StringType, false)))
>
>   val df = context.sqlContext.createDataFrame(file,structType)
>   df.first
> }
>
> The content of the file is:
>
> 1,5,10.5,va
> 2,1,0.1,vb
> 3,8,10.0,vc
>
> The problem resides in DecimalType, before 1.5 the scala wasn't
> required. Now when using  DecimalType(12,10) it works fine, but using
> DecimalType(10,10) the Decimal values
> 10.5 became null, and the 0.1 works.
>
> Is there anybody working with DecimalType for 1.5.1?
>
> Regards,
> Dirceu
>
>

>>>
>>
>>
>


Re: Spark on YARN / aws - executor lost on node restart

2015-09-18 Thread Adrian Tanase
Hi guys,

Digging up this question after spending some more time trying to replicate it.
It seems to be an issue with the YARN – spark integration, wondering if there 
is a bug already tracking this?

If I just kill the process on the machine, YARN detects the container is dead 
and the spark framework requests a new container to be deployed.
If the machine goes away completely, spark sees that the executor is lost but 
YarnAllocator never tries to request the container again. Wondering if there’s 
an implicit assumption that it would be notified by YARN, which might not 
happen if the node dies completely?

If there are no ideas on the list, I’ll prepare some logs and follow up with an 
issue.

Thanks,
-adrian

From: Adrian Tanase
Date: Wednesday, September 16, 2015 at 6:01 PM
To: "user@spark.apache.org"
Subject: Spark on YARN / aws - executor lost on node restart

Hi all,

We’re using spark streaming (1.4.0), deployed on AWS through yarn. It’s a 
stateful app that reads from kafka (with the new direct API) and we’re 
checkpointing to HDFS.

During some resilience testing, we restarted one of the machines and brought it 
back online. During the offline period, the Yarn cluster would not have 
resources to re-create the missing executor.
After starting all the services on the machine, it correctly joined the Yarn 
cluster, however the spark streaming app does not seem to notice that the 
resources are back and has not re-created the missing executor.

The app is correctly running with 6 out o 7 executors, however it’s running 
under capacity.
If we manually kill the driver and re-submit the app to yarn, all the sate is 
correctly recreated from checkpoint and all 7 executors are now online – 
however this seems like a brutal workaround.

So, here are some questions:

  *   Isn't the driver supposed to auto-heal after a machine is completely lost 
and then comes back after some time?
  *   Are any configuration settings that influence how spark driver should 
poll yarn to check back on resources being available again?
  *   Is there a tool one can run to “force” the driver to re-create missing 
workers/executors?

Lastly, another issue was that the driver also crashed and yarn successfully 
restarted it – I’m not sure yet if it’s because of some retry setting or 
another exception, will post the logs after I recreate the problem.

Thanks in advance for any ideas,
-adrian


Re: Limiting number of cores per job in multi-threaded driver.

2015-09-18 Thread Philip Weaver
Here's a specific example of what I want to do. My Spark application is
running with total-executor-cores=8. A request comes in, it spawns a thread
to handle that request, and starts a job. That job should use only 4 cores,
not all 8 of the cores available to the cluster.. When the first job is
scheduled, it should take only 4 cores, not all 8 of the cores that are
available to the driver.

Is there any way to accomplish this? This is on mesos.

In order to support the use cases described in
https://spark.apache.org/docs/latest/job-scheduling.html, where a spark
application runs for a long time and handles requests from multiple users,
I believe what I'm asking about is a very important feature. One of the
goals is to get lower latency for each request, but if the first request
takes all resources and we can't guarantee any free resources for the
second request, then that defeats the purpose. Does that make sense?

Thanks in advance for any advice you can provide!

- Philip

On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver 
wrote:

> I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR
> scheduler, so I can define a long-running application capable of executing
> multiple simultaneous spark jobs.
>
> The kind of jobs that I'm running do not benefit from more than 4 cores,
> but I want my application to be able to take several times that in order to
> run multiple jobs at the same time.
>
> I suppose my question is more basic: How can I limit the number of cores
> used to load an RDD or DataFrame? I can immediately repartition or coalesce
> my RDD or DataFrame to 4 partitions after I load it, but that doesn't stop
> Spark from using more cores to load it.
>
> Does it make sense what I am trying to accomplish, and is there any way to
> do it?
>
> - Philip
>
>


Re: Limiting number of cores per job in multi-threaded driver.

2015-09-18 Thread Philip Weaver
(whoops, redundant sentence in that first paragraph)

On Fri, Sep 18, 2015 at 8:36 AM, Philip Weaver 
wrote:

> Here's a specific example of what I want to do. My Spark application is
> running with total-executor-cores=8. A request comes in, it spawns a thread
> to handle that request, and starts a job. That job should use only 4 cores,
> not all 8 of the cores available to the cluster.. When the first job is
> scheduled, it should take only 4 cores, not all 8 of the cores that are
> available to the driver.
>
> Is there any way to accomplish this? This is on mesos.
>
> In order to support the use cases described in
> https://spark.apache.org/docs/latest/job-scheduling.html, where a spark
> application runs for a long time and handles requests from multiple users,
> I believe what I'm asking about is a very important feature. One of the
> goals is to get lower latency for each request, but if the first request
> takes all resources and we can't guarantee any free resources for the
> second request, then that defeats the purpose. Does that make sense?
>
> Thanks in advance for any advice you can provide!
>
> - Philip
>
> On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver 
> wrote:
>
>> I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR
>> scheduler, so I can define a long-running application capable of executing
>> multiple simultaneous spark jobs.
>>
>> The kind of jobs that I'm running do not benefit from more than 4 cores,
>> but I want my application to be able to take several times that in order to
>> run multiple jobs at the same time.
>>
>> I suppose my question is more basic: How can I limit the number of cores
>> used to load an RDD or DataFrame? I can immediately repartition or coalesce
>> my RDD or DataFrame to 4 partitions after I load it, but that doesn't stop
>> Spark from using more cores to load it.
>>
>> Does it make sense what I am trying to accomplish, and is there any way
>> to do it?
>>
>> - Philip
>>
>>
>


Spark Streaming checkpoint recovery throws Stack Overflow Error

2015-09-18 Thread swetha
Hi,

When I try to recover my Spark Streaming job from a checkpoint directory, I
get a StackOverFlow Error as shown below. Any idea as to why this is
happening?

15/09/18 09:02:20 ERROR streaming.StreamingContext: Error starting the
context, marking it as stopped
java.lang.StackOverflowError
at java.util.Date.getTimeImpl(Date.java:887)
at java.util.Date.getTime(Date.java:883)
at java.util.Calendar.setTime(Calendar.java:1106)
at java.text.SimpleDateFormat.format(SimpleDateFormat.java:955)
at java.text.SimpleDateFormat.format(SimpleDateFormat.java:948)
at java.text.DateFormat.format(DateFormat.java:298)
at java.text.Format.format(Format.java:157)
at
org.apache.spark.streaming.ui.UIUtils$.formatBatchTime(UIUtils.scala:113)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:137)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:136)
at scala.Option.map(Option.scala:145)
at 
org.apache.spark.streaming.dstream.DStream.makeScope(DStream.scala:136)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:394)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at
org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at
org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-throws-Stack-Overflow-Error-tp24737.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming checkpoint recovery throws Stack Overflow Error

2015-09-18 Thread Ted Yu
Which version of Java are you using ?

And release of Spark, please.

Thanks

On Fri, Sep 18, 2015 at 9:15 AM, swetha  wrote:

> Hi,
>
> When I try to recover my Spark Streaming job from a checkpoint directory, I
> get a StackOverFlow Error as shown below. Any idea as to why this is
> happening?
>
> 15/09/18 09:02:20 ERROR streaming.StreamingContext: Error starting the
> context, marking it as stopped
> java.lang.StackOverflowError
> at java.util.Date.getTimeImpl(Date.java:887)
> at java.util.Date.getTime(Date.java:883)
> at java.util.Calendar.setTime(Calendar.java:1106)
> at java.text.SimpleDateFormat.format(SimpleDateFormat.java:955)
> at java.text.SimpleDateFormat.format(SimpleDateFormat.java:948)
> at java.text.DateFormat.format(DateFormat.java:298)
> at java.text.Format.format(Format.java:157)
> at
> org.apache.spark.streaming.ui.UIUtils$.formatBatchTime(UIUtils.scala:113)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:137)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:136)
> at scala.Option.map(Option.scala:145)
> at
> org.apache.spark.streaming.dstream.DStream.makeScope(DStream.scala:136)
> at
>
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:394)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at
>
> org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
>
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at
>
> org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
>
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-throws-Stack-Overflow-Error-tp24737.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: SparkR - calling as.vector() with rdd dataframe causes error

2015-09-18 Thread Luciano Resende
I see the thread with all the responses on the bottom at mail-archive :

https://www.mail-archive.com/user%40spark.apache.org/msg36882.html

On Fri, Sep 18, 2015 at 7:58 AM, Ellen Kraffmiller <
ellen.kraffmil...@gmail.com> wrote:

> Thanks for your response.  Is there a reason why this thread isn't
> appearing on the mailing list?  So far, I only see my post, with no
> answers, although I have received 2 answers via email.  It would be nice if
> other people could see these answers as well.
>
> On Thu, Sep 17, 2015 at 2:22 AM, Sun, Rui  wrote:
>
>> The existing algorithms operating on R data.frame can't simply operate on
>> SparkR DataFrame. They have to be re-implemented to be based on SparkR
>> DataFrame API.
>>
>> -Original Message-
>> From: ekraffmiller [mailto:ellen.kraffmil...@gmail.com]
>> Sent: Thursday, September 17, 2015 3:30 AM
>> To: user@spark.apache.org
>> Subject: SparkR - calling as.vector() with rdd dataframe causes error
>>
>> Hi,
>> I have a library of clustering algorithms that I'm trying to run in the
>> SparkR interactive shell. (I am working on a proof of concept for a
>> document classification tool.) Each algorithm takes a term document matrix
>> in the form of a dataframe.  When I pass the method a local dataframe, the
>> clustering algorithm works correctly, but when I pass it a spark rdd, it
>> gives an error trying to coerce the data into a vector.  Here is the code,
>> that I'm calling within SparkR:
>>
>> # get matrix from a file
>> file <-
>>
>> "/Applications/spark-1.5.0-bin-hadoop2.6/examples/src/main/resources/matrix.csv"
>>
>> #read it into variable
>>  raw_data <- read.csv(file,sep=',',header=FALSE)
>>
>> #convert to a local dataframe
>> localDF = data.frame(raw_data)
>>
>> # create the rdd
>> rdd  <- createDataFrame(sqlContext,localDF)
>>
>> #call the algorithm with the localDF - this works result <-
>> galileo(localDF, model='hclust',dist='euclidean',link='ward',K=5)
>>
>> #call with the rdd - this produces error result <- galileo(rdd,
>> model='hclust',dist='euclidean',link='ward',K=5)
>>
>> Error in as.vector(data) :
>>   no method for coercing this S4 class to a vector
>>
>>
>> I get the same error if I try to directly call as.vector(rdd) as well.
>>
>> Is there a reason why this works for localDF and not rdd?  Should I be
>> doing something else to coerce the object into a vector?
>>
>> Thanks,
>> Ellen
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-calling-as-vector-with-rdd-dataframe-causes-error-tp24717.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
>> commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Luciano Resende
http://people.apache.org/~lresende
http://twitter.com/lresende1975
http://lresende.blogspot.com/


Re: Spark + Druid

2015-09-18 Thread Harish Butani
Hi,

I have just posted a Blog on this:
https://www.linkedin.com/pulse/combining-druid-spark-interactive-flexible-analytics-scale-butani

regards,
Harish Butani.

On Tue, Sep 1, 2015 at 11:46 PM, Paolo Platter 
wrote:

> Fantastic!!! I will look into that and I hope to contribute
>
> Paolo
>
> Inviata dal mio Windows Phone
> --
> Da: Harish Butani 
> Inviato: ‎02/‎09/‎2015 06:04
> A: user 
> Oggetto: Spark + Druid
>
> Hi,
>
> I am working on the Spark Druid Package:
> https://github.com/SparklineData/spark-druid-olap.
> For scenarios where a 'raw event' dataset is being indexed in Druid it
> enables you to write your Logical Plans(queries/dataflows) against the 'raw
> event' dataset and it rewrites parts of the plan to execute as a Druid
> Query. In Spark the configuration of a Druid DataSource is somewhat like
> configuring an OLAP index in a traditional DB. Early results show
> significant speedup of pushing slice and dice queries to Druid.
>
> It comprises of a Druid DataSource that wraps the 'raw event' dataset and
> has knowledge of the Druid Index; and a DruidPlanner which is a set of plan
> rewrite strategies to convert Aggregation queries into a Plan having a
> DruidRDD.
>
> Here
> 
>  is
> a detailed design document, which also describes a benchmark of
> representative queries on the TPCH dataset.
>
> Looking for folks who would be willing to try this out and/or contribute.
>
> regards,
> Harish Butani.
>


Re: spark 1.5, ML Pipeline Decision Tree Dataframe Problem

2015-09-18 Thread Feynman Liang
What is the type of unlabeledTest?

SQL should be using the VectorUDT we've defined for Vectors

so
you should be able to just "import sqlContext.implicits._" and then call
"rdd.toDf()" on your RDD to convert it into a dataframe.

On Fri, Sep 18, 2015 at 7:32 AM, Yasemin Kaya  wrote:

> Hi,
>
> I am using *spark 1.5, ML Pipeline Decision Tree
> *
> to get tree's probability. But I have to convert my data to Dataframe type.
> While creating model there is no problem but when I am using model on my
> data there is a problem about converting to data frame type. My data type
> is *JavaPairRDD* , when I am creating dataframe
>
> DataFrame production = sqlContext.createDataFrame(
> unlabeledTest.values(), Vector.class);
>
> *Error says me: *
> Exception in thread "main" java.lang.ClassCastException:
> org.apache.spark.mllib.linalg.VectorUDT cannot be cast to
> org.apache.spark.sql.types.StructType
>
> I know if I give LabeledPoint type, there will be no problem. But the data
> have no label, I wanna predict the label because of this reason I use model
> on it.
>
> Is there way to handle my problem?
> Thanks.
>
>
> Best,
> yasemin
> --
> hiç ender hiç
>


Re: parquet error

2015-09-18 Thread Cheng Lian
Not sure what's happening here, but I guess it's probably a dependency 
version issue. Could you please give vanilla Apache Spark a try to see 
whether its a CDH specific issue or not?


Cheng

On 9/17/15 11:44 PM, Chengi Liu wrote:

Hi,
  I did some digging..
I believe the error is caused by jets3t jar.
Essentially these lines

locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 
'java/net/URI', 'org/apache/hadoop/conf/Configuration', 
'org/apache/hadoop/fs/s3/S3Credentials', 
'org/jets3t/service/security/AWSCredentials' }


stack: { 
'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 
uninitialized 32, uninitialized 32, 
'org/jets3t/service/security/AWSCredentials' }



But I am not sure how to fix this. Is it jar version issue?

I am using cloudera cdh 5.2 distro and have created the symlink of 
jets3t jar from hadoop/lib to spark/lib (which I believe is of version 
0.9ish version)?




On Wed, Sep 16, 2015 at 4:59 PM, Chengi Liu > wrote:


Hi,
  I have a spark cluster setup and I am trying to write the data
to s3 but in parquet format.
Here is what I am doing

df = sqlContext.load('test', 'com.databricks.spark.avro')

df.saveAsParquetFile("s3n://test")

But I get some nasty error:

Py4JJavaError: An error occurred while calling o29.saveAsParquetFile.

: org.apache.spark.SparkException: Job aborted.

at

org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.insert(commands.scala:166)

at

org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.run(commands.scala:139)

at

org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)

at

org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)

at
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)

at

org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)

at

org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)

at

org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)

at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)

at

org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:950)

at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:950)

at
org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:336)

at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:144)

at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:135)

at
org.apache.spark.sql.DataFrame.saveAsParquetFile(DataFrame.scala:1508)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

at py4j.Gateway.invoke(Gateway.java:259)

at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)

at py4j.commands.CallCommand.execute(CallCommand.java:79)

at py4j.GatewayConnection.run(GatewayConnection.java:207)

at java.lang.Thread.run(Thread.java:744)

Caused by: org.apache.spark.SparkException: Job aborted due to
stage failure: Task 3 in stage 0.0 failed 4 times, most recent
failure: Lost task 3.3 in stage 0.0 (TID 12, srv-110-29.720.rdio):
org.apache.spark.SparkException: Task failed while writing rows.

at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org

$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:191)

at

org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)

at

org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)

at org.apache.spark.scheduler.Task.run(Task.scala:70)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:744)

Caused by: java.lang.VerifyError: Bad type on operand stack

Exception Details:

  Location:


org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.initialize(Ljava/net/URI;Lorg/apache/hadoop/conf/Configuration;)V
@38: invokespecial

  Reason:

 

Re: Checkpointing with Kinesis

2015-09-18 Thread Alan Dipert
Hello,

Thanks all for considering our problem.  We are doing transformations in
Spark Streaming.  We have also since learned that WAL to S3 on 1.4 is "not
reliable" [1]

We are just going to wait for EMR to support 1.5 and hopefully this won't
be a problem anymore [2].

Alan

1.
https://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%3CCA+AHuKkH9r0BwQMgQjDG+j=qdcqzpow1rw1u4d0nrcgmq5x...@mail.gmail.com%3E
2. https://issues.apache.org/jira/browse/SPARK-9215

On Fri, Sep 18, 2015 at 4:23 AM, Nick Pentreath 
wrote:

> Are you doing actual transformations / aggregation in Spark Streaming? Or
> just using it to bulk write to S3?
>
> If the latter, then you could just use your AWS Lambda function to read
> directly from the Kinesis stream. If the former, then perhaps either look
> into the WAL option that Aniket mentioned, or perhaps you could write the
> processed RDD back to Kinesis, and have the Lambda function read the
> Kinesis stream and write to Redshift?
>
> On Thu, Sep 17, 2015 at 5:48 PM, Alan Dipert  wrote:
>
>> Hello,
>> We are using Spark Streaming 1.4.1 in AWS EMR to process records from
>> Kinesis.  Our Spark program saves RDDs to S3, after which the records are
>> picked up by a Lambda function that loads them into Redshift.  That no data
>> is lost during processing is important to us.
>>
>> We have set our Kinesis checkpoint interval to 15 minutes, which is also
>> our window size.
>>
>> Unfortunately, checkpointing happens after receiving data from Kinesis,
>> not after we have successfully written to S3.  If batches back up in Spark,
>> and the cluster is terminated, whatever data was in-memory will be lost
>> because it was checkpointed but not actually saved to S3.
>>
>> We are considering forking and modifying the kinesis-asl library with
>> changes that would allow us to perform the checkpoint manually and at the
>> right time.  We'd rather not do this.
>>
>> Are we overlooking an easier way to deal with this problem?  Thank you in
>> advance for your insight!
>>
>> Alan
>>
>
>


SparkContext declared as object variable

2015-09-18 Thread Priya Ch
Hello All,

  Instead of declaring sparkContext in main, declared as object variable as
-

 object sparkDemo
{

 val conf = new SparkConf
 val sc = new SparkContext(conf)

  def main(args:Array[String])
  {

val baseRdd = sc.parallelize()
   .
   .
   .
  }

}

But this piece of code is giving :
java.io.IOException: org.apache.spark.SparkException: Failed to get
broadcast_5_piece0 of broadcast_5
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get
broadcast_5_piece0 of broadcast_5
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.broadcast.TorrentBroadcast.org

$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)

Why should't we declare sc as object variable ???

Regards,
Padma Ch


Re: spark 1.5, ML Pipeline Decision Tree Dataframe Problem

2015-09-18 Thread Yasemin Kaya
Thanks, I try to make but i can't.
JavaPairRDD unlabeledTest, the vector is Dence vector. I
add import org.apache.spark.sql.SQLContext.implicits$   but there is no
method toDf(), I am using Java not Scala.

2015-09-18 20:02 GMT+03:00 Feynman Liang :

> What is the type of unlabeledTest?
>
> SQL should be using the VectorUDT we've defined for Vectors
> 
>  so
> you should be able to just "import sqlContext.implicits._" and then call
> "rdd.toDf()" on your RDD to convert it into a dataframe.
>
> On Fri, Sep 18, 2015 at 7:32 AM, Yasemin Kaya  wrote:
>
>> Hi,
>>
>> I am using *spark 1.5, ML Pipeline Decision Tree
>> *
>> to get tree's probability. But I have to convert my data to Dataframe type.
>> While creating model there is no problem but when I am using model on my
>> data there is a problem about converting to data frame type. My data type
>> is *JavaPairRDD* , when I am creating dataframe
>>
>> DataFrame production = sqlContext.createDataFrame(
>> unlabeledTest.values(), Vector.class);
>>
>> *Error says me: *
>> Exception in thread "main" java.lang.ClassCastException:
>> org.apache.spark.mllib.linalg.VectorUDT cannot be cast to
>> org.apache.spark.sql.types.StructType
>>
>> I know if I give LabeledPoint type, there will be no problem. But the
>> data have no label, I wanna predict the label because of this reason I use
>> model on it.
>>
>> Is there way to handle my problem?
>> Thanks.
>>
>>
>> Best,
>> yasemin
>> --
>> hiç ender hiç
>>
>
>


-- 
hiç ender hiç


unsubscribe

2015-09-18 Thread Nambi
unsubscribe


Does anyone use ShuffleDependency directly?

2015-09-18 Thread Josh Rosen
Does anyone use ShuffleDependency

directly in their Spark code or libraries? If so, how do you use it?

Similarly, does anyone use ShuffleHandle

directly?


Re: unsubscribe

2015-09-18 Thread Richard Hillegas

To unsubscribe from the user list, please send a message to
user-unsubscr...@spark.apache.org as described here:
http://spark.apache.org/community.html#mailing-lists.

Thanks,
-Rick

Re: Python UDF and explode error

2015-09-18 Thread Pavel Burdanov
This is similar to SPARK-10685 and SPARK-9131, but in my case the error
reproduces even in local mode with one worker.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Python-UDF-and-explode-error-tp24736p24740.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkR pca?

2015-09-18 Thread Deborah Siegel
Hi,

Can PCA be implemented in a SparkR-MLLib integration?

perhaps 2 separate issues..

1) Having the methods in SparkRWrapper and RFormula which will send the
right input types through the pipeline
MLLib PCA operates either on a RowMatrix,  or the feature vector of an
RDD[LabeledPoint]. The labels aren't used.. though in the second case it
may be useful to be able to keep the label.

2) formula parsing from R
In R syntax, you can, for example in prcomp, have a formula which has no
label (response variable) --  eg.  prcomp(~ Col1 + Col2 + Col3, data =
myDataFrame)
Can RFormula currently parse this type of formula?


Thanks for listening / ideas.
Deb


Not able to group by Scala UDF

2015-09-18 Thread Jeff Jones
I’m trying to perform a Spark SQL (1.5) query containing a UDF in the select 
and group by clauses. From what I’ve been able to find this should be 
supported.  A few examples include 
https://github.com/spirom/LearningSpark/blob/master/src/main/scala/sql/UDF.scala,
 https://issues.apache.org/jira/browse/SPARK-9338, and 
https://issues.apache.org/jira/browse/SPARK-9435.  I just can’t seem to get it 
to work. I can use a nested query as a workaround but this is just one of many 
such queries that are generated by UI parameters some of which use UDFs and 
some that don’t. If I can simplify to not requiring the nested query it would 
make the code much easier to understand and maintain.  Is this possible with 
the 1.5 release?


select cdr3_length, frame_type, sum(fraction_templates), TagFilter(sample_tags, 
'Biological Sex', false)

from sequences_26aa4082_f714_4f53_9bf0_4cdf9d523f6a

group by cdr3_length, frame_type, TagFilter(sample_tags, 'Biological Sex', 
false)



[error] c.a.i.c.Analyzer - expression 'sample_tags' is neither present in the 
group by, nor is it an aggregate function. Add to group by or wrap in first() 
if you don't care which value you get.;

org.apache.spark.sql.AnalysisException: expression 'sample_tags' is neither 
present in the group by, nor is it an aggregate function. Add to group by or 
wrap in first() if you don't care which value you get.;

at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:37)
 ~[spark-catalyst_2.11-1.5.0.jar:1.5.0]

at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:44) 
~[spark-catalyst_2.11-1.5.0.jar:1.5.0]

at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:110)
 ~[spark-catalyst_2.11-1.5.0.jar:1.5.0]

at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1$3.apply(CheckAnalysis.scala:116)
 ~[spark-catalyst_2.11-1.5.0.jar:1.5.0]

at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1$3.apply(CheckAnalysis.scala:116)
 ~[spark-catalyst_2.11-1.5.0.jar:1.5.0]


Thanks,

Jeff


This message (and any attachments) is intended only for the designated 
recipient(s). It
may contain confidential or proprietary information, or have other limitations 
on use as
indicated by the sender. If you are not a designated recipient, you may not 
review, use,
copy or distribute this message. If you received this in error, please notify 
the sender by
reply e-mail and delete this message.


Re: Spark Streaming checkpoint recovery throws Stack Overflow Error

2015-09-18 Thread Saisai Shao
Hi Swetha,

The problem of stack overflow is that when recovering from checkpoint data,
Java will use a recursive way to deserialize the call stack, if you have a
large call stack, this recursive way can easily lead to stack overflow.
This is caused by Java deserialization mechanism, you need to increase the
stack size.


Thanks
Saisai


On Fri, Sep 18, 2015 at 9:19 AM, Ted Yu  wrote:

> Which version of Java are you using ?
>
> And release of Spark, please.
>
> Thanks
>
> On Fri, Sep 18, 2015 at 9:15 AM, swetha  wrote:
>
>> Hi,
>>
>> When I try to recover my Spark Streaming job from a checkpoint directory,
>> I
>> get a StackOverFlow Error as shown below. Any idea as to why this is
>> happening?
>>
>> 15/09/18 09:02:20 ERROR streaming.StreamingContext: Error starting the
>> context, marking it as stopped
>> java.lang.StackOverflowError
>> at java.util.Date.getTimeImpl(Date.java:887)
>> at java.util.Date.getTime(Date.java:883)
>> at java.util.Calendar.setTime(Calendar.java:1106)
>> at java.text.SimpleDateFormat.format(SimpleDateFormat.java:955)
>> at java.text.SimpleDateFormat.format(SimpleDateFormat.java:948)
>> at java.text.DateFormat.format(DateFormat.java:298)
>> at java.text.Format.format(Format.java:157)
>> at
>> org.apache.spark.streaming.ui.UIUtils$.formatBatchTime(UIUtils.scala:113)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:137)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:136)
>> at scala.Option.map(Option.scala:145)
>> at
>> org.apache.spark.streaming.dstream.DStream.makeScope(DStream.scala:136)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:394)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>> at
>>
>> org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>> at
>>
>> org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> at scala.Option.orElse(Option.scala:257)
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-throws-Stack-Overflow-Error-tp24737.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


What's the best practice to parse JSON using spark

2015-09-18 Thread Cui Lin
Hello,All,

Parsing JSON's nested structure is easy if using Java or Python API. Where
I can find the similar way to parse JSON file using spark?

Another question is by using SparkSQL, how can i easily save the results
into NOSQL DB? any examples? Thanks a lot!



-- 
Best regards!

Lin,Cui


Re: What's the best practice to parse JSON using spark

2015-09-18 Thread Ted Yu
For #2, please see:

examples/src/main/scala//org/apache/spark/examples/HBaseTest.scala
examples/src/main/scala//org/apache/spark/examples/pythonconverters/HBaseConverters.scala

In hbase, there is hbase-spark module which is being polished. Should be
available in hbase 1.3.0 release.

Cheers

On Fri, Sep 18, 2015 at 5:09 PM, Cui Lin  wrote:

> Hello,All,
>
> Parsing JSON's nested structure is easy if using Java or Python API. Where
> I can find the similar way to parse JSON file using spark?
>
> Another question is by using SparkSQL, how can i easily save the results
> into NOSQL DB? any examples? Thanks a lot!
>
>
>
> --
> Best regards!
>
> Lin,Cui
>


Re: What's the best practice to parse JSON using spark

2015-09-18 Thread Ted Yu
For #1, see this thread: http://search-hadoop.com/m/q3RTti0Thneenne2

For #2, also see:
examples//src/main/python/hbase_inputformat.py
examples//src/main/python/hbase_outputformat.py

Cheers

On Fri, Sep 18, 2015 at 5:12 PM, Ted Yu  wrote:

> For #2, please see:
>
> examples/src/main/scala//org/apache/spark/examples/HBaseTest.scala
>
> examples/src/main/scala//org/apache/spark/examples/pythonconverters/HBaseConverters.scala
>
> In hbase, there is hbase-spark module which is being polished. Should be
> available in hbase 1.3.0 release.
>
> Cheers
>
> On Fri, Sep 18, 2015 at 5:09 PM, Cui Lin  wrote:
>
>> Hello,All,
>>
>> Parsing JSON's nested structure is easy if using Java or Python API.
>> Where I can find the similar way to parse JSON file using spark?
>>
>> Another question is by using SparkSQL, how can i easily save the results
>> into NOSQL DB? any examples? Thanks a lot!
>>
>>
>>
>> --
>> Best regards!
>>
>> Lin,Cui
>>
>
>


SparkML pipelines and error recovery

2015-09-18 Thread Fatma Ozcan
Trying to understand how Spark ML pipelines work in case of failures. If I
have multiple transformers and one of them fails, will the lineage based
recovery of rdd's automatically kick in?

Thanks,
Fatma


Re: Checkpointing with Kinesis

2015-09-18 Thread Michal Čizmazia
FYI re WAL on S3

http://search-hadoop.com/m/q3RTtFMpd41A7TnH/WAL+S3&subj=WAL+on+S3



On 18 September 2015 at 13:32, Alan Dipert  wrote:

> Hello,
>
> Thanks all for considering our problem.  We are doing transformations in
> Spark Streaming.  We have also since learned that WAL to S3 on 1.4 is "not
> reliable" [1]
>
> We are just going to wait for EMR to support 1.5 and hopefully this won't
> be a problem anymore [2].
>
> Alan
>
> 1.
> https://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%3CCA+AHuKkH9r0BwQMgQjDG+j=qdcqzpow1rw1u4d0nrcgmq5x...@mail.gmail.com%3E
> 2. https://issues.apache.org/jira/browse/SPARK-9215
>
> On Fri, Sep 18, 2015 at 4:23 AM, Nick Pentreath 
> wrote:
>
>> Are you doing actual transformations / aggregation in Spark Streaming? Or
>> just using it to bulk write to S3?
>>
>> If the latter, then you could just use your AWS Lambda function to read
>> directly from the Kinesis stream. If the former, then perhaps either look
>> into the WAL option that Aniket mentioned, or perhaps you could write the
>> processed RDD back to Kinesis, and have the Lambda function read the
>> Kinesis stream and write to Redshift?
>>
>> On Thu, Sep 17, 2015 at 5:48 PM, Alan Dipert  wrote:
>>
>>> Hello,
>>> We are using Spark Streaming 1.4.1 in AWS EMR to process records from
>>> Kinesis.  Our Spark program saves RDDs to S3, after which the records are
>>> picked up by a Lambda function that loads them into Redshift.  That no data
>>> is lost during processing is important to us.
>>>
>>> We have set our Kinesis checkpoint interval to 15 minutes, which is also
>>> our window size.
>>>
>>> Unfortunately, checkpointing happens after receiving data from Kinesis,
>>> not after we have successfully written to S3.  If batches back up in Spark,
>>> and the cluster is terminated, whatever data was in-memory will be lost
>>> because it was checkpointed but not actually saved to S3.
>>>
>>> We are considering forking and modifying the kinesis-asl library with
>>> changes that would allow us to perform the checkpoint manually and at the
>>> right time.  We'd rather not do this.
>>>
>>> Are we overlooking an easier way to deal with this problem?  Thank you
>>> in advance for your insight!
>>>
>>> Alan
>>>
>>
>>
>


Re: Spark streaming to database exception handling

2015-09-18 Thread chyeers
I have the same problem. I had use spark streaming to save data to hbase with
using JDBC of phoenix .It seems to be ok sometimes,but missing data while
the exception happened during write data to external
storagorg.apache.phoenix.exception.BatchUpdateExecution: ERROR 1106 (XCL06):
Exception while executing batch



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-to-database-exception-handling-tp24728p24741.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: KafkaDirectStream can't be recovered from checkpoint

2015-09-18 Thread Michal Čizmazia
Hi Petr, after Ctrl+C can you see the following message in the logs?

Invoking stop(stopGracefully=false)

Details:
https://github.com/apache/spark/pull/6307


On 18 September 2015 at 10:28, Petr Novak  wrote:

> It might be connected with my problems with gracefulShutdown in Spark
> 1.5.0 2.11
> https://mail.google.com/mail/#search/petr/14fb6bd5166f9395
>
> Maybe Ctrl+C corrupts checkpoints and breaks gracefulShutdown?
>
> Petr
>
> On Fri, Sep 18, 2015 at 4:10 PM, Petr Novak  wrote:
>
>> ...to ensure it is not something wrong on my cluster.
>>
>> On Fri, Sep 18, 2015 at 4:09 PM, Petr Novak  wrote:
>>
>>> I have tried it on Spark 1.3.0 2.10 and it works. The same code doesn't
>>> on Spark 1.5.0 2.11. It would be nice if anybody could try on another
>>> installation to ensure it is something wrong on my cluster.
>>>
>>> Many thanks,
>>> Petr
>>>
>>> On Fri, Sep 18, 2015 at 4:07 PM, Petr Novak 
>>> wrote:
>>>
 This one is generated, I suppose, after Ctrl+C

 15/09/18 14:38:25 INFO Worker: Asked to kill executor
 app-20150918143823-0001/0
 15/09/18 14:38:25 INFO Worker: Asked to kill executor
 app-20150918143823-0001/0
 15/09/18 14:38:25 DEBUG
 AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
 message (0.568753 ms) AkkaMessage(KillExecutor(#,false) from
 Actor[akka://sparkWorker/deadLetters]
 15/09/18 14:38:25 DEBUG
 AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
 message (0.568753 ms) AkkaMessage(KillExecutor(#,false) from
 Actor[akka://sparkWorker/deadLetters]
 15/09/18 14:38:25 INFO ExecutorRunner: Runner thread for executor
 app-20150918143823-0001/0 interrupted
 15/09/18 14:38:25 INFO ExecutorRunner: Runner thread for executor
 app-20150918143823-0001/0 interrupted
 15/09/18 14:38:25 INFO ExecutorRunner: Killing process!
 15/09/18 14:38:25 INFO ExecutorRunner: Killing process!
 15/09/18 14:38:25 ERROR FileAppender: Error writing stream to file
 /dfs/spark/work/app-20150918143823-0001/0/stderr
 java.io.IOException: Stream closed
 at
 java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162)
 at java.io.BufferedInputStream.read1(BufferedInputStream.java:272)
 at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
 at java.io.FilterInputStream.read(FilterInputStream.java:107)
 at
 org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
 at
 org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
 at
 org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
 at
 org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
 at
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
 at
 org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
 15/09/18 14:38:25 ERROR FileAppender: Error writing stream to file
 /dfs/spark/work/app-20150918143823-0001/0/stderr
 java.io.IOException: Stream closed
 at
 java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162)
 at java.io.BufferedInputStream.read1(BufferedInputStream.java:272)
 at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
 at java.io.FilterInputStream.read(FilterInputStream.java:107)
 at
 org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
 at
 org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
 at
 org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
 at
 org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
 at
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
 at
 org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
 15/09/18 14:38:25 DEBUG FileAppender: Closed file
 /dfs/spark/work/app-20150918143823-0001/0/stderr
 15/09/18 14:38:25 DEBUG FileAppender: Closed file
 /dfs/spark/work/app-20150918143823-0001/0/stderr

 Petr

>>>
>>>
>>
>


Re: in joins, does one side stream?

2015-09-18 Thread Reynold Xin
Yes for RDD -- both are materialized. No for DataFrame/SQL - one side
streams.


On Thu, Sep 17, 2015 at 11:21 AM, Koert Kuipers  wrote:

> in scalding we join with the smaller side on the left, since the smaller
> side will get buffered while the bigger side streams through the join.
>
> looking at CoGroupedRDD i do not get the impression such a distiction is
> made. it seems both sided are put into a map that can spill to disk. is
> this correct?
>
> thanks
>


Using Spark for portfolio manager app

2015-09-18 Thread Thúy Hằng Lê
Hi all,

I am going to build a financial application for Portfolio Manager, where
each portfolio contains a list of stocks, the number of shares purchased,
and the purchase price.
Another source of information is stocks price from market data. The
application need to calculate real-time gain or lost of each stock in each
portfolio ( compared to the purchase price).

I am new with Spark, i know using Spark Streaming I can aggregate portfolio
possitions in real-time, for example:
user A contains:
  - 100 IBM stock with transactionValue=$15000
  - 500 AAPL stock with transactionValue=$11400

Now given the stock prices change in real-time too, e.g if IBM price at
151, i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 -
15000 = $100

My questions are:

 * What is the best method to combine 2 real-time streams(
transaction made by user and market pricing data) in Spark.
 * How can I use real-time Adhoc SQL again portfolio's positions,
is there any way i can do SQL on the output of Spark Streamming.
 For example,
  select sum(gainOrLost) from portfolio where user='A';
 * What are prefered external storages for Spark in this use case.
 * Is spark is right choice for my use case?


Re: Limiting number of cores per job in multi-threaded driver.

2015-09-18 Thread Adrian Tanase
Reading through the docs it seems that with a combination of FAIR scheduler and 
maybe pools you can get pretty far.

However the smallest unit of scheduled work is the task so probably you need to 
think about the parallelism of each transformation.

I'm guessing that by increasing the level of parallelism you get many smaller 
tasks that the scheduler can then run across the many jobs you might have - as 
opposed to fewer, longer tasks...

Lastly, 8 cores is not that much horsepower :)
You may consider running with beefier machines or a larger cluster, to get at 
least tens of cores.

Hope this helps,
-adrian

Sent from my iPhone

On 18 Sep 2015, at 18:37, Philip Weaver 
mailto:philip.wea...@gmail.com>> wrote:

Here's a specific example of what I want to do. My Spark application is running 
with total-executor-cores=8. A request comes in, it spawns a thread to handle 
that request, and starts a job. That job should use only 4 cores, not all 8 of 
the cores available to the cluster.. When the first job is scheduled, it should 
take only 4 cores, not all 8 of the cores that are available to the driver.

Is there any way to accomplish this? This is on mesos.

In order to support the use cases described in 
https://spark.apache.org/docs/latest/job-scheduling.html, where a spark 
application runs for a long time and handles requests from multiple users, I 
believe what I'm asking about is a very important feature. One of the goals is 
to get lower latency for each request, but if the first request takes all 
resources and we can't guarantee any free resources for the second request, 
then that defeats the purpose. Does that make sense?

Thanks in advance for any advice you can provide!

- Philip

On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver 
mailto:philip.wea...@gmail.com>> wrote:
I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR 
scheduler, so I can define a long-running application capable of executing 
multiple simultaneous spark jobs.

The kind of jobs that I'm running do not benefit from more than 4 cores, but I 
want my application to be able to take several times that in order to run 
multiple jobs at the same time.

I suppose my question is more basic: How can I limit the number of cores used 
to load an RDD or DataFrame? I can immediately repartition or coalesce my RDD 
or DataFrame to 4 partitions after I load it, but that doesn't stop Spark from 
using more cores to load it.

Does it make sense what I am trying to accomplish, and is there any way to do 
it?

- Philip




Re: Limiting number of cores per job in multi-threaded driver.

2015-09-18 Thread Adrian Tanase
Forgot to mention that you could also restrict the parallelism to 4, 
essentially using only 4 cores at any given time, however if your job is 
complex, a stage might be broken into more than 1 task...

Sent from my iPhone

On 19 Sep 2015, at 08:30, Adrian Tanase 
mailto:atan...@adobe.com>> wrote:

Reading through the docs it seems that with a combination of FAIR scheduler and 
maybe pools you can get pretty far.

However the smallest unit of scheduled work is the task so probably you need to 
think about the parallelism of each transformation.

I'm guessing that by increasing the level of parallelism you get many smaller 
tasks that the scheduler can then run across the many jobs you might have - as 
opposed to fewer, longer tasks...

Lastly, 8 cores is not that much horsepower :)
You may consider running with beefier machines or a larger cluster, to get at 
least tens of cores.

Hope this helps,
-adrian

Sent from my iPhone

On 18 Sep 2015, at 18:37, Philip Weaver 
mailto:philip.wea...@gmail.com>> wrote:

Here's a specific example of what I want to do. My Spark application is running 
with total-executor-cores=8. A request comes in, it spawns a thread to handle 
that request, and starts a job. That job should use only 4 cores, not all 8 of 
the cores available to the cluster.. When the first job is scheduled, it should 
take only 4 cores, not all 8 of the cores that are available to the driver.

Is there any way to accomplish this? This is on mesos.

In order to support the use cases described in 
https://spark.apache.org/docs/latest/job-scheduling.html, where a spark 
application runs for a long time and handles requests from multiple users, I 
believe what I'm asking about is a very important feature. One of the goals is 
to get lower latency for each request, but if the first request takes all 
resources and we can't guarantee any free resources for the second request, 
then that defeats the purpose. Does that make sense?

Thanks in advance for any advice you can provide!

- Philip

On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver 
mailto:philip.wea...@gmail.com>> wrote:
I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR 
scheduler, so I can define a long-running application capable of executing 
multiple simultaneous spark jobs.

The kind of jobs that I'm running do not benefit from more than 4 cores, but I 
want my application to be able to take several times that in order to run 
multiple jobs at the same time.

I suppose my question is more basic: How can I limit the number of cores used 
to load an RDD or DataFrame? I can immediately repartition or coalesce my RDD 
or DataFrame to 4 partitions after I load it, but that doesn't stop Spark from 
using more cores to load it.

Does it make sense what I am trying to accomplish, and is there any way to do 
it?

- Philip




Re: Using Spark for portfolio manager app

2015-09-18 Thread Adrian Tanase
Cool use case! You should definitely be able to model it with Spark.

For the first question it's pretty easy - you probably need to keep the user 
portfolios as state using updateStateByKey.
You need to consume 2 event sources - user trades and stock changes. You 
probably want to Cogroup the stock changes with users that have that stock in 
their portfolio, then union the 2 message streams.

As messages come in, you consume the union of these 2 streams and you update 
the state. Messages modeled as case classes and a pattern match should do the 
trick (assuming scala). After the update, you need to emit a tuple with 
(newPortfolio, gainOrLost) so you can also push the deltas somewhere.

For the Sql part, you need to create a Dataframe out of the user portfolio 
DStream, using foreachrdd. Look around for examples of Sql + spark streaming, I 
think databricks had a sample app / tutorial. 
You can then query the resulting DataFrame using SQL.
If instead of one update you want to provide a graph then you need to use a 
window over the gainOrLose.

That being said, there are a lot of interesting questions you'll need to answer 
about state keeping, event sourcing, persistance, durability - especially 
around outputting data out of spark, where you need to do more work to achieve 
exactly once semmantics. I only focused on the main dataflow.

Hope this helps, that's how I'd model it, anyway :)

-adrian

Sent from my iPhone

> On 19 Sep 2015, at 05:43, Thúy Hằng Lê  wrote:
> 
> Hi all,
> 
> I am going to build a financial application for Portfolio Manager, where each 
> portfolio contains a list of stocks, the number of shares purchased, and the 
> purchase price. 
> Another source of information is stocks price from market data. The 
> application need to calculate real-time gain or lost of each stock in each 
> portfolio ( compared to the purchase price).
> 
> I am new with Spark, i know using Spark Streaming I can aggregate portfolio 
> possitions in real-time, for example:
> user A contains: 
>   - 100 IBM stock with transactionValue=$15000
>   - 500 AAPL stock with transactionValue=$11400
> 
> Now given the stock prices change in real-time too, e.g if IBM price at 151, 
> i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 - 15000 = 
> $100
> 
> My questions are:
> 
>  * What is the best method to combine 2 real-time streams( 
> transaction made by user and market pricing data) in Spark.
>  * How can I use real-time Adhoc SQL again portfolio's positions, is 
> there any way i can do SQL on the output of Spark Streamming.
>  For example,
>   select sum(gainOrLost) from portfolio where user='A';
>  * What are prefered external storages for Spark in this use case.
>  * Is spark is right choice for my use case?
>