Re: Spark Streaming Application is Stuck Under Heavy Load Due to DeadLock

2016-01-04 Thread Shixiong Zhu
Hye Rachana, could you provide the full jstack outputs? Maybe it's same as
https://issues.apache.org/jira/browse/SPARK-11104

Best Regards,
Shixiong Zhu

2016-01-04 12:56 GMT-08:00 Rachana Srivastava <
rachana.srivast...@markmonitor.com>:

> Hello All,
>
>
>
> I am running my application on Spark cluster but under heavy load the
> system is hung due to deadlock.  I found similar issues resolved here
> https://datastax-oss.atlassian.net/browse/JAVA-555 in  Spark version
> 2.1.3.  But I am running on Spark 1.3 still getting the same issue.
>
>
>
> Here is the stack trace for reference:
>
>
>
> sun.misc.Unsafe.park(Native Method)
>
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
>
>
> org.apache.spark.streaming.ContextWaiter.waitForStopOrError(ContextWaiter.scala:63)
>
>
> org.apache.spark.streaming.StreamingContext.awaitTermination(StreamingContext.scala:521)
>
>
> org.apache.spark.streaming.api.java.JavaStreamingContext.awaitTermination(JavaStreamingContext.scala:592)
>
>
>
> sun.misc.Unsafe.park(Native Method)
>
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
>
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
>
> java.util.concurrent.Semaphore.acquire(Semaphore.java:317)
>
>
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:62)
>
>
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
>
>
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
>
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
>
>
> org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
>
>
>
> sun.misc.Unsafe.park(Native Method)
>
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
>
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
>
>
> org.spark-project.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:342)
>
>
> org.spark-project.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:526)
>
>
> org.spark-project.jetty.util.thread.QueuedThreadPool.access$600(QueuedThreadPool.java:44)
>
>
> org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
>
> java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> Thanks,
>
>
>
> Rachana
>
>
>


Re: val listRDD =ssc.socketTextStream(localhost,9999) on Yarn

2015-12-22 Thread Shixiong Zhu
Just replace `localhost` with a host name that can be accessed by Yarn
containers.

Best Regards,
Shixiong Zhu

2015-12-22 0:11 GMT-08:00 prasadreddy <alle.re...@gmail.com>:

> How do we achieve this on yarn-cluster mode
>
> Please advice.
>
> Thanks
> Prasad
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/val-listRDD-ssc-socketTextStream-localhost--on-Yarn-tp25760.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark-submit for dependent jars

2015-12-21 Thread Shixiong Zhu
Looks you need to add an "driver" option to your codes, such as

sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:oracle:thin:@:1521:xxx",
  "driver" -> "oracle.jdbc.driver.OracleDriver",
      "dbtable" -> "your_table_name")).load()

Best Regards,
Shixiong Zhu

2015-12-21 6:03 GMT-08:00 Jeff Zhang <zjf...@gmail.com>:

> Please make sure this is correct jdbc url,
> jdbc:oracle:thin:@:1521:xxx
>
>
>
> On Mon, Dec 21, 2015 at 9:54 PM, Madabhattula Rajesh Kumar <
> mrajaf...@gmail.com> wrote:
>
>> Hi Jeff and Satish,
>>
>> I have modified script and executed. Please find below command
>>
>> ./spark-submit --master local  --class test.Main --jars
>> /home/user/download/jar/ojdbc7.jar
>> /home//test/target/spark16-0.0.1-SNAPSHOT.jar
>>
>> Still I'm getting same exception.
>>
>>
>>
>> Exception in thread "main" java.sql.SQLException: No suitable driver
>> found for jdbc:oracle:thin:@:1521:xxx
>> at java.sql.DriverManager.getConnection(DriverManager.java:596)
>> at java.sql.DriverManager.getConnection(DriverManager.java:187)
>> at
>> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConnector$1.apply(JDBCRDD.scala:188)
>> at
>> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConnector$1.apply(JDBCRDD.scala:181)
>> at
>> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:121)
>> at
>> org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:91)
>> at
>> org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:60)
>> at
>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125)
>> at
>> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
>> at com.cisco.ss.etl.utils.ETLHelper$class.getData(ETLHelper.scala:22)
>> at com.cisco.ss.etl.Main$.getData(Main.scala:9)
>> at com.cisco.ss.etl.Main$delayedInit$body.apply(Main.scala:13)
>> at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
>> at
>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>> at scala.App$$anonfun$main$1.apply(App.scala:71)
>> at scala.App$$anonfun$main$1.apply(App.scala:71)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
>> at scala.App$class.main(App.scala:71)
>> at com.cisco.ss.etl.Main$.main(Main.scala:9)
>> at com.cisco.ss.etl.Main.main(Main.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
>> at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>> Regards,
>> Rajesh
>>
>> On Mon, Dec 21, 2015 at 7:18 PM, satish chandra j <
>> jsatishchan...@gmail.com> wrote:
>>
>>> Hi Rajesh,
>>> Could you please try giving your cmd as mentioned below:
>>>
>>> ./spark-submit --master local  --class  --jars 
>>> 
>>>
>>> Regards,
>>> Satish Chandra
>>>
>>> On Mon, Dec 21, 2015 at 6:45 PM, Madabhattula Rajesh Kumar <
>>> mrajaf...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> How to add dependent jars in spark-submit command. For example: Oracle.
>>>> Could you please help me to resolve this issue
>>>>
>>>> I have a standalone cluster. One Master and One slave.
>>>>
>>>> I have used below command it is not working
>>>>
>>>> ./spark-submit --master local  --class test.Main
>>>> /test/target/spark16-0.0.1-SNAPSHOT.jar --jars
>>>> /home/user

Re: seriazable error in apache spark job

2015-12-18 Thread Shixiong Zhu
Looks you have a reference to some Akka class. Could you post your codes?

Best Regards,
Shixiong Zhu

2015-12-17 23:43 GMT-08:00 Pankaj Narang <pankajnaran...@gmail.com>:

> I am encountering below error. Can somebody guide ?
>
> Something similar is one this link
> https://github.com/elastic/elasticsearch-hadoop/issues/298
>
>
> actor.MentionCrawlActor
> java.io.NotSerializableException: actor.MentionCrawlActor
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> ~[na:1.7.0_79]
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> ~[na:1.7.0_79]
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> ~[na:1.7.0_79]
> at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> ~[na:1.7.0_79]
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> ~[na:1.7.0_79]
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> ~[na:1.7.0_79]
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> ~[na:1.7.0_79]
> at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> ~[na:1.7.0_79]
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> ~[na:1.7.0_79]
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/seriazable-error-in-apache-spark-job-tp25732.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Question about Spark Streaming checkpoint interval

2015-12-18 Thread Shixiong Zhu
You are right. "checkpointInterval" is only for data checkpointing.
"metadata checkpoint" is done for each batch. Feel free to send a PR to add
the missing doc.

Best Regards,
Shixiong Zhu

2015-12-18 8:26 GMT-08:00 Lan Jiang <ljia...@gmail.com>:

> Need some clarification about the documentation. According to Spark doc
>
> *"the default interval is a multiple of the batch interval that is at
> least 10 seconds. It can be set by
> using dstream.checkpoint(checkpointInterval). Typically, a checkpoint
> interval of 5 - 10 sliding intervals of a DStream is a good setting to
> try.”*
>
> My question is that does the *checkpointinterval* apply only for *data
> checkpointing* or it applies to *metadata checkpointing*? The API says
> dstream.checkpoint() is for "Enable periodic checkpointing of RDDs of this
> DStream”, implying it is only for data checkpointing. My understanding is
> that metadata checkpointing is for driver failure. For example, in Kafka
> direct API, driver keeps track of the offset range of each partition. So if
> metadata checkpoint is NOT done for each batch, in driver failure, some
> messages in Kafka is going to be replayed.
>
> I do not find the answer in the document saying *whether metadata
> checkpointing is done for each batch* and whether checkpointinterval
> setting applies to both types of checkpointing. Maybe I miss it. If anyone
> can point me to the right documentation, I would highly appreciate it.
>
> Best Regards,
>
> Lan
>


Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2015-12-17 Thread Shixiong Zhu
Streaming checkpoint doesn't support Accumulator or Broadcast. See
https://issues.apache.org/jira/browse/SPARK-5206

Here is a workaround:
https://issues.apache.org/jira/browse/SPARK-5206?focusedCommentId=14506806=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14506806

Best Regards,
Shixiong Zhu

2015-12-17 4:39 GMT-08:00 Bartłomiej Alberski <albers...@gmail.com>:

> I prepared simple example helping in reproducing problem:
>
> https://github.com/alberskib/spark-streaming-broadcast-issue
>
> I think that in that way it will be easier for you to understand problem
> and find solution (if any exists)
>
> Thanks
> Bartek
>
> 2015-12-16 23:34 GMT+01:00 Bartłomiej Alberski <albers...@gmail.com>:
>
>> First of all , thanks @tdas for looking into my problem.
>>
>> Yes, I checked it seperately and it is working fine. For below piece of
>> code there is no single exception and values are sent correctly.
>>
>> val reporter = new MyClassReporter(...)
>> reporter.send(...)
>> val out = new FileOutputStream("out123.txt")
>> val outO = new ObjectOutputStream(out)
>> outO.writeObject(reporter)
>> outO.flush()
>> outO.close()
>>
>> val in = new FileInputStream("out123.txt")
>> val inO = new ObjectInputStream(in)
>> val reporterFromFile  =
>> inO.readObject().asInstanceOf[StreamingGraphiteReporter]
>> reporterFromFile.send(...)
>> in.close()
>>
>> Maybe I am wrong but I think that it will be strange if class
>> implementing Serializable and properly broadcasted to executors cannot be
>> serialized and deserialized?
>> I also prepared slightly different piece of code and I received slightly
>> different exception. Right now it looks like:
>> java.lang.ClassCastException: [B cannot be cast to com.example.sender.
>> MyClassReporter.
>>
>> Maybe I am wrong but, it looks like that when restarting from checkpoint
>> it does read proper block of memory to read bytes for MyClassReporter.
>>
>> 2015-12-16 2:38 GMT+01:00 Tathagata Das <t...@databricks.com>:
>>
>>> Could you test serializing and deserializing the MyClassReporter  class
>>> separately?
>>>
>>> On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski <
>>> albers...@gmail.com> wrote:
>>>
>>>> Below is the full stacktrace(real names of my classes were changed)
>>>> with short description of entries from my code:
>>>>
>>>> rdd.mapPartitions{ partition => //this is the line to which second
>>>> stacktrace entry is pointing
>>>>   val sender =  broadcastedValue.value // this is the maing place to
>>>> which first stacktrace entry is pointing
>>>> }
>>>>
>>>> java.lang.ClassCastException:
>>>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>>>> com.example.sender.MyClassReporter
>>>> at com.example.flow.Calculator
>>>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87)
>>>> at com.example.flow.Calculator
>>>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82)
>>>> at
>>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>>>> at
>>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>>>> at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>> at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>>> at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>> at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>>> at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> 2015-12-14 17:10 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:
>>>>
>>>>> Can you show

Re: pyspark + kafka + streaming = NoSuchMethodError

2015-12-17 Thread Shixiong Zhu
What's the Scala version of your Spark? Is it 2.10?

Best Regards,
Shixiong Zhu

2015-12-17 10:10 GMT-08:00 Christos Mantas <cman...@cslab.ece.ntua.gr>:

> Hello,
>
> I am trying to set up a simple example with Spark Streaming (Python) and
> Kafka on a single machine deployment.
> My Kafka broker/server is also on the same machine (localhost:1281) and I
> am using Spark Version: spark-1.5.2-bin-hadoop2.6
>
> Python code
>
> ...
> ssc = StreamingContext(sc, 1)
> ...
> lines = KafkaUtils.createDirectStream(ssc, ["test"],
> {"metadata.broker.list":"localhost:1281"})
>
>
> So I try
>
> spark-submit --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar
> my_kafka_streaming_wordcount.py
>
> OR
>
> spark-submit --packages  org.apache.spark:spark-streaming-kafka_2.11:1.5.2
> my_kafka_streaming_wordcount.py
> (my kafka version is 2.11-0.9.0.0)
>
> OR
>
> pyspark  --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar  [import
> stuff and type those lines]
>
>
> and I end up with:
>
> 15/12/17 19:44:58 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/12/17 19:45:00 WARN MetricsSystem: Using default name DAGScheduler for
> source because spark.app.id is not set.
> Traceback (most recent call last):
>   File "/I/edited/the/path/here/my_kafka_streaming_wordcount.py", line 80,
> in 
> lines = KafkaUtils.createDirectStream(ssc, ["test"],
> {"metadata.broker.list":"localhost:1281"})
>   File
> "/opt/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/kafka.py",
> line 130, in createDirectStream
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o29.createDirectStream.
> : java.lang.NoSuchMethodError:
> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
> at kafka.api.RequestKeys$.(RequestKeys.scala:39)
> at kafka.api.RequestKeys$.(RequestKeys.scala)
> at kafka.api.TopicMetadataRequest.(TopicMetadataRequest.scala:53)
> at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
> at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
> at
> org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:259)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
>
> Am I missing something?
>
> Thanks in advance
> Chris M.
>
>
>
>
>
>


Re: Use of rdd.zipWithUniqueId() in DStream

2015-12-14 Thread Shixiong Zhu
It doesn't guarantee that. E.g.,

scala> sc.parallelize(Seq(1.0, 2.0, 3.0, 4.0), 2).filter(_ >
2.0).zipWithUniqueId().collect().foreach(println)

(3.0,1)

(4.0,3)

It only guarantees "unique".

Best Regards,
Shixiong Zhu

2015-12-13 10:18 GMT-08:00 Sourav Mazumder <sourav.mazumde...@gmail.com>:

> Hi All,
>
> I'm trying to use zipWithUniqieId() function of RDD using transform
> function of dStream. It does generate unique id always starting from 0 and
> in sequence.
>
> However, not sure whether this is a reliable behavior which is always
> guaranteed to generate sequence number starting form 0.
>
> Can anyone confirm ?
>
> Regards,
> Sourav
>


Re: Local Mode: Executor thread leak?

2015-12-08 Thread Shixiong Zhu
Could you send a PR to fix it? Thanks!

Best Regards,
Shixiong Zhu

2015-12-08 13:31 GMT-08:00 Richard Marscher <rmarsc...@localytics.com>:

> Alright I was able to work through the problem.
>
> So the owning thread was one from the executor task launch worker, which
> at least in local mode runs the task and the related user code of the task.
> After judiciously naming every thread in the pools in the user code (with a
> custom ThreadFactory) I was able to trace down the leak to a couple thread
> pools that were not shut down properly by noticing the named threads
> accumulating in thread dumps of the JVM process.
>
> On Mon, Dec 7, 2015 at 6:41 PM, Richard Marscher <rmarsc...@localytics.com
> > wrote:
>
>> Thanks for the response.
>>
>> The version is Spark 1.5.2.
>>
>> Some examples of the thread names:
>>
>> pool-1061-thread-1
>> pool-1059-thread-1
>> pool-1638-thread-1
>>
>> There become hundreds then thousands of these stranded in WAITING.
>>
>> I added logging to try to track the lifecycle of the thread pool in
>> Executor as mentioned before. Here is an excerpt, but every seems fine
>> there. Every executor that starts is also shut down and it seems like it
>> shuts down fine.
>>
>> 15/12/07 23:30:21 WARN o.a.s.e.Executor: Threads finished in executor
>> driver. pool shut down 
>> java.util.concurrent.ThreadPoolExecutor@e5d036b[Terminated,
>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
>> 15/12/07 23:30:28 WARN o.a.s.e.Executor: Executor driver created, thread
>> pool: java.util.concurrent.ThreadPoolExecutor@3bc41ae3[Running, pool
>> size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
>> 15/12/07 23:31:06 WARN o.a.s.e.Executor: Threads finished in executor
>> driver. pool shut down 
>> java.util.concurrent.ThreadPoolExecutor@3bc41ae3[Terminated,
>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 36]
>> 15/12/07 23:31:11 WARN o.a.s.e.Executor: Executor driver created, thread
>> pool: java.util.concurrent.ThreadPoolExecutor@6e85ece4[Running, pool
>> size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
>> 15/12/07 23:34:35 WARN o.a.s.e.Executor: Threads finished in executor
>> driver. pool shut down 
>> java.util.concurrent.ThreadPoolExecutor@6e85ece4[Terminated,
>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 288]
>>
>> Also here is an example thread dump of such a thread:
>>
>> "pool-493-thread-1" prio=10 tid=0x7f0e60612800 nid=0x18c4 waiting on
>> condition [0x7f0c33c3e000]
>>java.lang.Thread.State: WAITING (parking)
>> at sun.misc.Unsafe.park(Native Method)
>> - parking to wait for  <0x7f10b3e8fb60> (a
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>> at
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>> at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
>> at
>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>>     at
>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> On Mon, Dec 7, 2015 at 6:23 PM, Shixiong Zhu <zsxw...@gmail.com> wrote:
>>
>>> Which version are you using? Could you post these thread names here?
>>>
>>> Best Regards,
>>> Shixiong Zhu
>>>
>>> 2015-12-07 14:30 GMT-08:00 Richard Marscher <rmarsc...@localytics.com>:
>>>
>>>> Hi,
>>>>
>>>> I've been running benchmarks against Spark in local mode in a long
>>>> running process. I'm seeing threads leaking each time it runs a job. It
>>>> doesn't matter if I recycle SparkContext constantly or have 1 context stay
>>>> alive for the entire application lifetime.
>>>>
>>>> I see a huge accumulation ongoing of "pool--thread-1" threads with
>>>> the creating thread "Executor task launch worker-xx" where x's are numbers.
>>>> The number of leaks per launch worker varies but usually 1 to a few.
>>>>
>>>> Searching the Spark code the pool is created in the Executor class. It
>>>>

Re: Local Mode: Executor thread leak?

2015-12-07 Thread Shixiong Zhu
Which version are you using? Could you post these thread names here?

Best Regards,
Shixiong Zhu

2015-12-07 14:30 GMT-08:00 Richard Marscher <rmarsc...@localytics.com>:

> Hi,
>
> I've been running benchmarks against Spark in local mode in a long running
> process. I'm seeing threads leaking each time it runs a job. It doesn't
> matter if I recycle SparkContext constantly or have 1 context stay alive
> for the entire application lifetime.
>
> I see a huge accumulation ongoing of "pool--thread-1" threads with the
> creating thread "Executor task launch worker-xx" where x's are numbers. The
> number of leaks per launch worker varies but usually 1 to a few.
>
> Searching the Spark code the pool is created in the Executor class. It is
> `.shutdown()` in the stop for the executor. I've wired up logging and also
> tried shutdownNow() and awaitForTermination on the pools. Every seems okay
> there for every Executor that is called with `stop()` but I'm still not
> sure yet if every Executor is called as such, which I am looking into now.
>
> What I'm curious to know is if anyone has seen a similar issue?
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com <http://localytics.com/> | Our Blog
> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
> Facebook <http://facebook.com/localytics> | LinkedIn
> <http://www.linkedin.com/company/1148792?trk=tyah>
>


Re: Help with Couchbase connector error

2015-11-26 Thread Shixiong Zhu
Het Eyal, I just checked the couchbase spark connector jar. The target
version of some of classes are Java 8 (52.0). You can create a ticket in
https://issues.couchbase.com/projects/SPARKC

Best Regards,
Shixiong Zhu

2015-11-26 9:03 GMT-08:00 Ted Yu <yuzhih...@gmail.com>:

> StoreMode is from Couchbase connector.
>
> Where did you obtain the connector ?
>
> See also
> http://stackoverflow.com/questions/1096148/how-to-check-the-jdk-version-used-to-compile-a-class-file
>
> On Thu, Nov 26, 2015 at 8:55 AM, Eyal Sharon <e...@scene53.com> wrote:
>
>> Hi ,
>> Great , that gave some directions. But can you elaborate more?  or share
>> some post
>> I am currently running JDK 7 , and  my Couchbase too
>>
>> Thanks !
>>
>> On Thu, Nov 26, 2015 at 6:02 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> This implies version mismatch between the JDK used to build your jar and
>>> the one at runtime.
>>>
>>> When building, target JDK 1.7
>>>
>>> There're plenty of posts on the web for dealing with such error.
>>>
>>> Cheers
>>>
>>> On Thu, Nov 26, 2015 at 7:31 AM, Eyal Sharon <e...@scene53.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to set a connection to Couchbase. I am at the very
>>>> beginning, and I got stuck on   this exception
>>>>
>>>> Exception in thread "main" java.lang.UnsupportedClassVersionError:
>>>> com/couchbase/spark/StoreMode : Unsupported major.minor version 52.0
>>>>
>>>>
>>>> Here is the simple code fragment
>>>>
>>>>   val sc = new SparkContext(cfg)
>>>>
>>>>   val doc1 = JsonDocument.create("doc1", JsonObject.create().put("some", 
>>>> "content"))
>>>>   val doc2 = JsonArrayDocument.create("doc2", JsonArray.from("more", 
>>>> "content", "in", "here"))
>>>>
>>>>
>>>>   val data = sc
>>>> .parallelize(Seq(doc1, doc2))
>>>> .saveToCouchbase()
>>>> }
>>>>
>>>>
>>>> Any help will be a bless
>>>>
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> *This email and any files transmitted with it are confidential and
>>>> intended solely for the use of the individual or entity to whom they are
>>>> addressed. Please note that any disclosure, copying or distribution of the
>>>> content of this information is strictly forbidden. If you have received
>>>> this email message in error, please destroy it immediately and notify its
>>>> sender.*
>>>>
>>>
>>>
>>
>> *This email and any files transmitted with it are confidential and
>> intended solely for the use of the individual or entity to whom they are
>> addressed. Please note that any disclosure, copying or distribution of the
>> content of this information is strictly forbidden. If you have received
>> this email message in error, please destroy it immediately and notify its
>> sender.*
>>
>
>


Re: Why there's no api for SparkContext#textFiles to support multiple inputs ?

2015-11-11 Thread Shixiong Zhu
In addition, if you have more than two text files, you can just put them
into a Seq and use "reduce(_ ++ _)".

Best Regards,
Shixiong Zhu

2015-11-11 10:21 GMT-08:00 Jakob Odersky <joder...@gmail.com>:

> Hey Jeff,
> Do you mean reading from multiple text files? In that case, as a
> workaround, you can use the RDD#union() (or ++) method to concatenate
> multiple rdds. For example:
>
> val lines1 = sc.textFile("file1")
> val lines2 = sc.textFile("file2")
>
> val rdd = lines1 union lines2
>
> regards,
> --Jakob
>
> On 11 November 2015 at 01:20, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> Although user can use the hdfs glob syntax to support multiple inputs.
>> But sometimes, it is not convenient to do that. Not sure why there's no api
>> of SparkContext#textFiles. It should be easy to implement that. I'd love to
>> create a ticket and contribute for that if there's no other consideration
>> that I don't know.
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>


Re: Anybody hit this issue in spark shell?

2015-11-10 Thread Shixiong Zhu
Scala compiler stores some metadata in the ScalaSig attribute. See the
following link as an example:

http://stackoverflow.com/questions/10130106/how-does-scala-know-the-difference-between-def-foo-and-def-foo/10130403#10130403

As maven-shade-plugin doesn't recognize ScalaSig, it cannot fix the
reference in it. Not sure if there is a Scala version of
`maven-shade-plugin` to deal with it.

Generally, annotations that will be shaded should not be used in the Scala
codes. I'm wondering if we can expose this issue in the PR build. Because
SBT build doesn't do the shading, now it's hard for us to find similar
issues in the PR build.

Best Regards,
Shixiong Zhu

2015-11-09 18:47 GMT-08:00 Ted Yu <yuzhih...@gmail.com>:

> Created https://github.com/apache/spark/pull/9585
>
> Cheers
>
> On Mon, Nov 9, 2015 at 6:39 PM, Josh Rosen <joshro...@databricks.com>
> wrote:
>
>> When we remove this, we should add a style-checker rule to ban the import
>> so that it doesn't get added back by accident.
>>
>> On Mon, Nov 9, 2015 at 6:13 PM, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> Yeah, we should probably remove that.
>>>
>>> On Mon, Nov 9, 2015 at 5:54 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> If there is no option to let shell skip processing @VisibleForTesting
>>>> , should the annotation be dropped ?
>>>>
>>>> Cheers
>>>>
>>>> On Mon, Nov 9, 2015 at 5:50 PM, Marcelo Vanzin <van...@cloudera.com>
>>>> wrote:
>>>>
>>>>> We've had this in the past when using "@VisibleForTesting" in classes
>>>>> that for some reason the shell tries to process. QueryExecution.scala
>>>>> seems to use that annotation and that was added recently, so that's
>>>>> probably the issue.
>>>>>
>>>>> BTW, if anyone knows how Scala can find a reference to the original
>>>>> Guava class even after shading, I'd really like to know. I've looked
>>>>> several times and never found where the original class name is stored.
>>>>>
>>>>> On Mon, Nov 9, 2015 at 10:37 AM, Zhan Zhang <zzh...@hortonworks.com>
>>>>> wrote:
>>>>> > Hi Folks,
>>>>> >
>>>>> > Does anybody meet the following issue? I use "mvn package -Phive
>>>>> > -DskipTests” to build the package.
>>>>> >
>>>>> > Thanks.
>>>>> >
>>>>> > Zhan Zhang
>>>>> >
>>>>> >
>>>>> >
>>>>> > bin/spark-shell
>>>>> > ...
>>>>> > Spark context available as sc.
>>>>> > error: error while loading QueryExecution, Missing dependency 'bad
>>>>> symbolic
>>>>> > reference. A signature in QueryExecution.class refers to term
>>>>> annotations
>>>>> > in package com.google.common which is not available.
>>>>> > It may be completely missing from the current classpath, or the
>>>>> version on
>>>>> > the classpath might be incompatible with the version used when
>>>>> compiling
>>>>> > QueryExecution.class.', required by
>>>>> >
>>>>> /Users/zzhang/repo/spark/assembly/target/scala-2.10/spark-assembly-1.6.0-SNAPSHOT-hadoop2.2.0.jar(org/apache/spark/sql/execution/QueryExecution.class)
>>>>> > :10: error: not found: value sqlContext
>>>>> >import sqlContext.implicits._
>>>>> >   ^
>>>>> > :10: error: not found: value sqlContext
>>>>> >import sqlContext.sql
>>>>> >   ^
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Marcelo
>>>>>
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Memory are not used according to setting

2015-11-04 Thread Shixiong Zhu
You should use `SparkConf.set` rather than `SparkConf.setExecutorEnv`. For
driver configurations, you need to set them before starting your
application. You can use the `--conf` argument before running
`spark-submit`.

Best Regards,
Shixiong Zhu

2015-11-04 15:55 GMT-08:00 William Li <a-...@expedia.com>:

> Hi All – I have a four worker node cluster, each with 8GB memory. When I
> submit a job, the driver node takes 1gb memory, each worker node only
> allocates one executor, also just take 1gb memory. The setting of the job
> has:
>
> sparkConf
>   .setExecutorEnv("spark.driver.memory", "6g")
>   .setExecutorEnv("spark.dynamicAllocation.enabled", "true")
>   .setExecutorEnv("spark.executor.cores","8")
>   .setExecutorEnv("spark.executor.memory", "6g*”*)
>
>
> Any one knows how to make the worker or executor use more memory?
>
>
> Thanks,
>
>
> William.
>
>
>


Re: Spark Streaming data checkpoint performance

2015-11-02 Thread Shixiong Zhu
"trackStateByKey" is about to be added in 1.6 to resolve the performance
issue of "updateStateByKey". You can take a look at
https://issues.apache.org/jira/browse/SPARK-2629 and
https://github.com/apache/spark/pull/9256


Re: [SQL] Memory leak with spark streaming and spark sql in spark 1.5.1

2015-10-15 Thread Shixiong Zhu
Thanks for reporting it Terry. I submitted a PR to fix it:
https://github.com/apache/spark/pull/9132

Best Regards,
Shixiong Zhu

2015-10-15 2:39 GMT+08:00 Reynold Xin <r...@databricks.com>:

> +dev list
>
> On Wed, Oct 14, 2015 at 1:07 AM, Terry Hoo <hujie.ea...@gmail.com> wrote:
>
>> All,
>>
>> Does anyone meet memory leak issue with spark streaming and spark sql in
>> spark 1.5.1? I can see the memory is increasing all the time when running
>> this simple sample:
>>
>> val sc = new SparkContext(conf)
>> val sqlContext = new HiveContext(sc)
>> import sqlContext.implicits._
>> val ssc = new StreamingContext(sc, Seconds(1))
>> val s1 = ssc.socketTextStream("localhost", ).map(x =>
>> (x,1)).reduceByKey((x : Int, y : Int) => x + y)
>> s1.print
>> s1.foreachRDD(rdd => {
>>   rdd.foreach(_ => Unit)
>>   sqlContext.createDataFrame(rdd).registerTempTable("A")
>>   sqlContext.sql("""select * from A""").show(1)
>> })
>>
>> After dump the the java heap, I can see there is about 22K entries
>> in SQLListener._stageIdToStageMetrics after 2 hour running (other maps in
>> this SQLListener has about 1K entries), is this a leak in SQLListener?
>>
>> Thanks!
>> Terry
>>
>
>


Re: spark-shell :javap fails with complaint about JAVA_HOME, but it is set correctly

2015-10-15 Thread Shixiong Zhu
Scala 2.10 REPL javap doesn't support Java7 or Java8. It was fixed in Scala
2.11. See https://issues.scala-lang.org/browse/SI-4936

Best Regards,
Shixiong Zhu

2015-10-15 4:19 GMT+08:00 Robert Dodier <robert.dod...@gmail.com>:

> Hi,
>
> I am working with Spark 1.5.1 (official release), with Oracle Java8,
> on Ubuntu 14.04. echo $JAVA_HOME says "/usr/lib/jvm/java-8-oracle".
>
> I'd like to use :javap in spark-shell, but I get an error message:
>
> scala> :javap java.lang.Object
> Failed: Could not load javap tool. Check that JAVA_HOME is correct.
>
> However ls $JAVA_HOME/lib/tools.jar shows that it is there.
>
> I tried starting spark-shell with -toolcp $JAVA_HOME/lib/tools.jar but
> I get the same error.
>
> For comparison, if execute scala and enter :javap java.lang.Object, it
> works as expected.
>
> Not sure where to go from here. Thanks for any advice.
>
> best,
>
> Robert Dodier
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: What is the abstraction for a Worker process in Spark code

2015-10-12 Thread Shixiong Zhu
Which mode are you using? For standalone, it's
org.apache.spark.deploy.worker.Worker. For Yarn and Mesos, Spark just
submits its request to them and they will schedule processes for Spark.

Best Regards,
Shixiong Zhu

2015-10-12 20:12 GMT+08:00 Muhammad Haseeb Javed <11besemja...@seecs.edu.pk>
:

> I understand that each executor that is processing a Spark job is emulated
> in Spark code by the Executor class in Executor.scala and
> CoarseGrainedExecutorBackend is the abstraction which facilitates
> communication between an Executor and the Driver. But what is the
> abstraction for a Worker process in Spark code which would a reference to
> all the Executors running in it.
>


Re: Spark UI consuming lots of memory

2015-10-12 Thread Shixiong Zhu
In addition, you cannot turn off JobListener and SQLListener now...

Best Regards,
Shixiong Zhu

2015-10-13 11:59 GMT+08:00 Shixiong Zhu <zsxw...@gmail.com>:

> Is your query very complicated? Could you provide the output of `explain`
> your query that consumes an excessive amount of memory? If this is a small
> query, there may be a bug that leaks memory in SQLListener.
>
> Best Regards,
> Shixiong Zhu
>
> 2015-10-13 11:44 GMT+08:00 Nicholas Pritchard <
> nicholas.pritch...@falkonry.com>:
>
>> As an update, I did try disabling the ui with "spark.ui.enabled=false",
>> but the JobListener and SQLListener still consume a lot of memory, leading
>> to OOM error. Has anyone encountered this before? Is the only solution just
>> to increase the driver heap size?
>>
>> Thanks,
>> Nick
>>
>> On Mon, Oct 12, 2015 at 8:42 PM, Nicholas Pritchard <
>> nicholas.pritch...@falkonry.com> wrote:
>>
>>> I set those configurations by passing to spark-submit script:
>>> "bin/spark-submit --conf spark.ui.retainedJobs=20 ...". I have verified
>>> that these configurations are being passed correctly because they are
>>> listed in the environments tab and also by counting the number of
>>> job/stages that are listed. The "spark.sql.ui.retainedExecutions=0"
>>> only applies to the number of "completed" executions; there will always be
>>> a "running" execution. For some reason, I have one execution that consumes
>>> an excessive amount of memory.
>>>
>>> Actually, I am not interested in the SQL UI, as I find the Job/Stages UI
>>> to have sufficient information. I am also using Spark Standalone cluster
>>> manager so have not had to use the history server.
>>>
>>>
>>> On Mon, Oct 12, 2015 at 8:17 PM, Shixiong Zhu <zsxw...@gmail.com> wrote:
>>>
>>>> Could you show how did you set the configurations? You need to set
>>>> these configurations before creating SparkContext and SQLContext.
>>>>
>>>> Moreover, the history sever doesn't support SQL UI. So
>>>> "spark.eventLog.enabled=true" doesn't work now.
>>>>
>>>> Best Regards,
>>>> Shixiong Zhu
>>>>
>>>> 2015-10-13 2:01 GMT+08:00 pnpritchard <nicholas.pritch...@falkonry.com>
>>>> :
>>>>
>>>>> Hi,
>>>>>
>>>>> In my application, the Spark UI is consuming a lot of memory,
>>>>> especially the
>>>>> SQL tab. I have set the following configurations to reduce the memory
>>>>> consumption:
>>>>> - spark.ui.retainedJobs=20
>>>>> - spark.ui.retainedStages=40
>>>>> - spark.sql.ui.retainedExecutions=0
>>>>>
>>>>> However, I still get OOM errors in the driver process with the default
>>>>> 1GB
>>>>> heap size. The following link is a screen shot of a heap dump report,
>>>>> showing the SQLListener instance having a retained size of 600MB.
>>>>>
>>>>> https://cloud.githubusercontent.com/assets/5124612/10404379/20fbdcfc-6e87-11e5-9415-27e25193a25c.png
>>>>>
>>>>> Rather than just increasing the allotted heap size, does anyone have
>>>>> any
>>>>> other ideas? Is it possible to disable the SQL tab specifically? I also
>>>>> thought about serving the UI from disk rather than memory with
>>>>> "spark.eventLog.enabled=true" and "spark.ui.enabled=false". Has anyone
>>>>> tried
>>>>> this before?
>>>>>
>>>>> Thanks,
>>>>> Nick
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-consuming-lots-of-memory-tp25033.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Spark UI consuming lots of memory

2015-10-12 Thread Shixiong Zhu
Could you show how did you set the configurations? You need to set these
configurations before creating SparkContext and SQLContext.

Moreover, the history sever doesn't support SQL UI. So
"spark.eventLog.enabled=true" doesn't work now.

Best Regards,
Shixiong Zhu

2015-10-13 2:01 GMT+08:00 pnpritchard <nicholas.pritch...@falkonry.com>:

> Hi,
>
> In my application, the Spark UI is consuming a lot of memory, especially
> the
> SQL tab. I have set the following configurations to reduce the memory
> consumption:
> - spark.ui.retainedJobs=20
> - spark.ui.retainedStages=40
> - spark.sql.ui.retainedExecutions=0
>
> However, I still get OOM errors in the driver process with the default 1GB
> heap size. The following link is a screen shot of a heap dump report,
> showing the SQLListener instance having a retained size of 600MB.
>
> https://cloud.githubusercontent.com/assets/5124612/10404379/20fbdcfc-6e87-11e5-9415-27e25193a25c.png
>
> Rather than just increasing the allotted heap size, does anyone have any
> other ideas? Is it possible to disable the SQL tab specifically? I also
> thought about serving the UI from disk rather than memory with
> "spark.eventLog.enabled=true" and "spark.ui.enabled=false". Has anyone
> tried
> this before?
>
> Thanks,
> Nick
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-consuming-lots-of-memory-tp25033.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Unexplained sleep time

2015-10-12 Thread Shixiong Zhu
You don't need to care about this sleep. It runs in a separate thread and
usually won't affect the performance of your application.

Best Regards,
Shixiong Zhu

2015-10-09 6:03 GMT+08:00 yael aharon <yael.aharo...@gmail.com>:

> Hello,
> I am working on improving the performance of our Spark on Yarn
> applications.
> Scanning through the logs I found the following lines:
>
>
> [2015-10-07T16:25:17.245-04:00] [DataProcessing] [INFO] [] 
> [org.apache.spark.Logging$class] [tid:main] [userID:yarn] Started progress 
> reporter thread - sleep time : 5000
> [2015-10-07T16:25:22.262-04:00] [DataProcessing] [INFO] [] 
> [org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl] [tid:Reporter] 
> [userID:yarn] Received new token for : hostname:8041
>
>
> As the log says, the main thread sleeps for 5 seconds. Is there a way to
> configure/eliminate this sleep?
> thanks, Yael
>


Re: Spark UI consuming lots of memory

2015-10-12 Thread Shixiong Zhu
Is your query very complicated? Could you provide the output of `explain`
your query that consumes an excessive amount of memory? If this is a small
query, there may be a bug that leaks memory in SQLListener.

Best Regards,
Shixiong Zhu

2015-10-13 11:44 GMT+08:00 Nicholas Pritchard <
nicholas.pritch...@falkonry.com>:

> As an update, I did try disabling the ui with "spark.ui.enabled=false",
> but the JobListener and SQLListener still consume a lot of memory, leading
> to OOM error. Has anyone encountered this before? Is the only solution just
> to increase the driver heap size?
>
> Thanks,
> Nick
>
> On Mon, Oct 12, 2015 at 8:42 PM, Nicholas Pritchard <
> nicholas.pritch...@falkonry.com> wrote:
>
>> I set those configurations by passing to spark-submit script:
>> "bin/spark-submit --conf spark.ui.retainedJobs=20 ...". I have verified
>> that these configurations are being passed correctly because they are
>> listed in the environments tab and also by counting the number of
>> job/stages that are listed. The "spark.sql.ui.retainedExecutions=0" only
>> applies to the number of "completed" executions; there will always be a
>> "running" execution. For some reason, I have one execution that consumes an
>> excessive amount of memory.
>>
>> Actually, I am not interested in the SQL UI, as I find the Job/Stages UI
>> to have sufficient information. I am also using Spark Standalone cluster
>> manager so have not had to use the history server.
>>
>>
>> On Mon, Oct 12, 2015 at 8:17 PM, Shixiong Zhu <zsxw...@gmail.com> wrote:
>>
>>> Could you show how did you set the configurations? You need to set these
>>> configurations before creating SparkContext and SQLContext.
>>>
>>> Moreover, the history sever doesn't support SQL UI. So
>>> "spark.eventLog.enabled=true" doesn't work now.
>>>
>>> Best Regards,
>>> Shixiong Zhu
>>>
>>> 2015-10-13 2:01 GMT+08:00 pnpritchard <nicholas.pritch...@falkonry.com>:
>>>
>>>> Hi,
>>>>
>>>> In my application, the Spark UI is consuming a lot of memory,
>>>> especially the
>>>> SQL tab. I have set the following configurations to reduce the memory
>>>> consumption:
>>>> - spark.ui.retainedJobs=20
>>>> - spark.ui.retainedStages=40
>>>> - spark.sql.ui.retainedExecutions=0
>>>>
>>>> However, I still get OOM errors in the driver process with the default
>>>> 1GB
>>>> heap size. The following link is a screen shot of a heap dump report,
>>>> showing the SQLListener instance having a retained size of 600MB.
>>>>
>>>> https://cloud.githubusercontent.com/assets/5124612/10404379/20fbdcfc-6e87-11e5-9415-27e25193a25c.png
>>>>
>>>> Rather than just increasing the allotted heap size, does anyone have any
>>>> other ideas? Is it possible to disable the SQL tab specifically? I also
>>>> thought about serving the UI from disk rather than memory with
>>>> "spark.eventLog.enabled=true" and "spark.ui.enabled=false". Has anyone
>>>> tried
>>>> this before?
>>>>
>>>> Thanks,
>>>> Nick
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-consuming-lots-of-memory-tp25033.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: Creating Custom Receiver for Spark Streaming

2015-10-12 Thread Shixiong Zhu
Each ReceiverInputDStream will create one Receiver. If you only use
one ReceiverInputDStream, there will be only one Receiver in the cluster.
But if you create multiple ReceiverInputDStreams, there will be multiple
Receivers.

Best Regards,
Shixiong Zhu

2015-10-12 23:47 GMT+08:00 Something Something <mailinglist...@gmail.com>:

> Is it safe to assume that Spark will always create a single instance of
> Custom Receiver? Or would it create multiple instances on each node in a
> cluster? Wondering if I need to worry about receiving the same message on
> different nodes etc.
>
> Please help. Thanks.
>


Re: Data skipped while writing Spark Streaming output to HDFS

2015-10-12 Thread Shixiong Zhu
Could you print the content of RDD to check if there are multiple values
for a key in a batch?

Best Regards,
Shixiong Zhu

2015-10-12 18:25 GMT+08:00 Sathiskumar <sathish.palaniap...@gmail.com>:

> I'm running a Spark Streaming application for every 10 seconds, its job is
> to
> consume data from kafka, transform it and store it into HDFS based on the
> key. i.e, a file per unique key. I'm using the Hadoop's saveAsHadoopFile()
> API to store the output, I see that a file gets generated for every unique
> key, but the issue is that only one row gets stored for each of the unique
> key though the DStream has more rows for the same key.
>
> For example, consider the following DStream which has one unique key,
>
> *  key  value*
>  =   ===
>  Key_1   183.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0
>  Key_1   184.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0
>  Key_1   181.33 70.0 2.12 1.0 1.0 1.0 11.0 4.0
>  Key_1   185.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0
>  Key_1   185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0
>
> I see only one row (instead of 5 rows) gets stored in the HDFS file,
>
> 185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0
>
> The following code is used to store the output into HDFS,
>
> dStream.foreachRDD(new Function<JavaPairRDDString, String>, Void> () {
> @Override
> public Void call(JavaPairRDD<String, String> pairRDD) throws Exception
> {
> long timestamp = System.currentTimeMillis();
> int randomInt = random.nextInt();
> pairRDD.saveAsHadoopFile("hdfs://localhost:9000/application-" +
> timestamp +"-"+ randomInt, String.class, String.class,
> RDDMultipleTextOutputFormat.class);
> }
> });
>
> where the implementation of RDDMultipleTextOutputFormat is as follows,
>
> public class RDDMultipleTextOutputFormat<K,V> extends
> MultipleTextOutputFormat<K,V> {
>
> public K generateActualKey(K key, V value) {
> return null;
> }
>
> public String generateFileNameForKeyValue(K key, V value, String name)
> {
> return key.toString();
> }
> }
>
> Please let me know if I'm missing anything? Thanks for your help.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Data-skipped-while-writing-Spark-Streaming-output-to-HDFS-tp25026.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: calling persist would cause java.util.NoSuchElementException: key not found:

2015-10-02 Thread Shixiong Zhu
Do you have the full stack trace? Could you check if it's same as
https://issues.apache.org/jira/browse/SPARK-10422

Best Regards,
Shixiong Zhu

2015-10-01 17:05 GMT+08:00 Eyad Sibai <eyad.alsi...@gmail.com>:

> Hi
>
> I am trying to call .persist() on a dataframe but once I execute the next
> line I am getting
> java.util.NoSuchElementException: key not found: ….
>
> I tried to do persist on disk also the same thing.
>
> I am using:
> pyspark with python3
> spark 1.5
>
>
> Thanks!
>
>
> EYAD SIBAI
> Risk Engineer
>
> *iZettle ®*
> ––
>
> Mobile: +46 72 911 60 54 <+46%2072%20911%2060%2054>
> Web: www.izettle.com <http://izettle.com/>
>


Re: What is the best way to submit multiple tasks?

2015-10-01 Thread Shixiong Zhu
Right, you can use SparkContext and SQLContext in multiple threads. They
are thread safe.

Best Regards,
Shixiong Zhu

2015-10-01 4:57 GMT+08:00 <saif.a.ell...@wellsfargo.com>:

> Hi all,
>
> I have a process where I do some calculations on each one of the columns
> of a dataframe.
> Intrinsecally, I run across each column with a for loop. On the other
> hand, each process itself is non-entirely-distributable.
>
> To speed up the process, I would like to submit a spark program for each
> column, any suggestions? I was thinking on primitive threads sharing a
> spark context.
>
> Thank you,
> Saif
>
>


Re: Spark Streaming Standalone 1.5 - Stage cancelled because SparkContext was shut down

2015-10-01 Thread Shixiong Zhu
Do you have the log? Looks like some exceptions in your codes make
SparkContext stopped.

Best Regards,
Shixiong Zhu

2015-09-30 17:30 GMT+08:00 tranan <tra...@gmail.com>:

> Hello All,
>
> I have several Spark Streaming applications running on Standalone mode in
> Spark 1.5.  Spark is currently set up for dynamic resource allocation.  The
> issue I am seeing is that I can have about 12 Spark Streaming Jobs running
> concurrently.  Occasionally I would see more than half where to fail due to
> Stage cancelled because SparkContext was shut down.  It would automatically
> restart as it runs on supervised mode.  Attached is the screenshot of one
> of
> the jobs that failed.  Anyone have any insight as to what is going on?
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24885/Screen_Shot_2015-09-29_at_8.png
> >
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Standalone-1-5-Stage-cancelled-because-SparkContext-was-shut-down-tp24885.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Worker node timeout exception

2015-10-01 Thread Shixiong Zhu
Do you have the log file? It may be because of wrong settings.

Best Regards,
Shixiong Zhu

2015-10-01 7:32 GMT+08:00 markluk <m...@juicero.com>:

> I setup a new Spark cluster. My worker node is dying with the following
> exception.
>
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [120 seconds]
> at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
>
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:241)
> ... 11 more
>
>
> Any ideas what's wrong? This is happening both for a spark program and
> spark
> shell.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Worker-node-timeout-exception-tp24893.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming Log4j Inside Eclipse

2015-09-29 Thread Shixiong Zhu
I mean JavaSparkContext.setLogLevel. You can use it like this:

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(2));
jssc.sparkContext().setLogLevel(...);



Best Regards,
Shixiong Zhu

2015-09-29 22:07 GMT+08:00 Ashish Soni <asoni.le...@gmail.com>:

> I am using Java Streaming context and it doesnt have method setLogLevel
> and also i have tried by passing VM argument in eclipse and it doesnt work
>
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(2));
>
> Ashish
>
> On Tue, Sep 29, 2015 at 7:23 AM, Adrian Tanase <atan...@adobe.com> wrote:
>
>> You should set exta java options for your app via Eclipse project and
>> specify something like
>>
>>  -Dlog4j.configuration=file:/tmp/log4j.properties
>>
>> Sent from my iPhone
>>
>> On 28 Sep 2015, at 18:52, Shixiong Zhu <zsxw...@gmail.com> wrote:
>>
>> You can use JavaSparkContext.setLogLevel to set the log level in your
>> codes.
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2015-09-28 22:55 GMT+08:00 Ashish Soni <asoni.le...@gmail.com>:
>>
>>> I am not running it using spark submit , i am running locally inside
>>> Eclipse IDE , how i set this using JAVA Code
>>>
>>> Ashish
>>>
>>> On Mon, Sep 28, 2015 at 10:42 AM, Adrian Tanase <atan...@adobe.com>
>>> wrote:
>>>
>>>> You also need to provide it as parameter to spark submit
>>>>
>>>> http://stackoverflow.com/questions/28840438/how-to-override-sparks-log4j-properties-per-driver
>>>>
>>>> From: Ashish Soni
>>>> Date: Monday, September 28, 2015 at 5:18 PM
>>>> To: user
>>>> Subject: Spark Streaming Log4j Inside Eclipse
>>>>
>>>> I need to turn off the verbose logging of Spark Streaming Code when i
>>>> am running inside eclipse i tried creating a log4j.properties file and
>>>> placed inside /src/main/resources but i do not see it getting any effect ,
>>>> Please help as not sure what else needs to be done to change the log at
>>>> DEBUG or WARN
>>>>
>>>
>>>
>>
>


Re: Spark Streaming Log4j Inside Eclipse

2015-09-28 Thread Shixiong Zhu
You can use JavaSparkContext.setLogLevel to set the log level in your codes.

Best Regards,
Shixiong Zhu

2015-09-28 22:55 GMT+08:00 Ashish Soni <asoni.le...@gmail.com>:

> I am not running it using spark submit , i am running locally inside
> Eclipse IDE , how i set this using JAVA Code
>
> Ashish
>
> On Mon, Sep 28, 2015 at 10:42 AM, Adrian Tanase <atan...@adobe.com> wrote:
>
>> You also need to provide it as parameter to spark submit
>>
>> http://stackoverflow.com/questions/28840438/how-to-override-sparks-log4j-properties-per-driver
>>
>> From: Ashish Soni
>> Date: Monday, September 28, 2015 at 5:18 PM
>> To: user
>> Subject: Spark Streaming Log4j Inside Eclipse
>>
>> I need to turn off the verbose logging of Spark Streaming Code when i am
>> running inside eclipse i tried creating a log4j.properties file and placed
>> inside /src/main/resources but i do not see it getting any effect , Please
>> help as not sure what else needs to be done to change the log at DEBUG or
>> WARN
>>
>
>


Re: Monitoring tools for spark streaming

2015-09-28 Thread Shixiong Zhu
Which version are you using? Could you take a look at the new Streaming UI
in 1.4.0?

Best Regards,
Shixiong Zhu

2015-09-29 7:52 GMT+08:00 Siva <sbhavan...@gmail.com>:

> Hi,
>
> Could someone recommend the monitoring tools for spark streaming?
>
> By extending StreamingListener we can dump the delay in processing of
> batches and some alert messages.
>
> But are there any Web UI tools where we can monitor failures, see delays
> in processing, error messages and setup alerts etc.
>
> Thanks
>
>


Re: Spark streaming job filling a lot of data in local spark nodes

2015-09-28 Thread Shixiong Zhu
These files are created by shuffle and just some temp files. They are not
necessary for checkpointing and only stored in your local temp directory.
They will be stored in "/tmp" by default. You can use `spark.local.dir` to
set the path if you find your "/tmp" doesn't have enough space.

Best Regards,
Shixiong Zhu

2015-09-29 1:04 GMT+08:00 swetha <swethakasire...@gmail.com>:

>
> Hi,
>
> I see a lot of data getting filled locally as shown below from my streaming
> job. I have my checkpoint set to hdfs. But, I still see the following data
> filling my local nodes. Any idea if I can make this stored in hdfs instead
> of storing the data locally?
>
> -rw-r--r--  1520 Sep 17 18:43 shuffle_23119_5_0.index
> -rw-r--r--  1 180564255 Sep 17 18:43 shuffle_23129_2_0.data
> -rw-r--r--  1 364850277 Sep 17 18:45 shuffle_23145_8_0.data
> -rw-r--r--  1  267583750 Sep 17 18:46 shuffle_23105_4_0.data
> -rw-r--r--  1  136178819 Sep 17 18:48 shuffle_23123_8_0.data
> -rw-r--r--  1  159931184 Sep 17 18:48 shuffle_23167_8_0.data
> -rw-r--r--  1520 Sep 17 18:49 shuffle_23315_7_0.index
> -rw-r--r--  1520 Sep 17 18:50 shuffle_23319_3_0.index
> -rw-r--r--  1   92240350 Sep 17 18:51 shuffle_23305_2_0.data
> -rw-r--r--  1   40380158 Sep 17 18:51 shuffle_23323_6_0.data
> -rw-r--r--  1  369653284 Sep 17 18:52 shuffle_23103_6_0.data
> -rw-r--r--  1  371932812 Sep 17 18:52 shuffle_23125_6_0.data
> -rw-r--r--  1   19857974 Sep 17 18:53 shuffle_23291_19_0.data
> -rw-r--r--  1  55342005 Sep 17 18:53 shuffle_23305_8_0.data
> -rw-r--r--  1   92920590 Sep 17 18:53 shuffle_23303_4_0.data
>
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-job-filling-a-lot-of-data-in-local-spark-nodes-tp24846.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark.streaming.concurrentJobs

2015-09-28 Thread Shixiong Zhu
"writing to HBase for each partition in the RDDs from a given DStream is an
independent output operation"

This is not correct. "writing to HBase for each partition in the RDDs from
a given DStream" is just a task. And they already run in parallel.

The output operation is the DStream action, such as count, saveXXX, take.

For example, if "spark.streaming.concurrentJobs" is 1, and you call
DStream.count() twice. There will be two "count" Spark jobs and they will
run one by one. But if you set "spark.streaming.concurrentJobs" to 2, these
two "count" Spark jobs will run in parallel.

Moreover, "spark.streaming.concurrentJobs" is an internal configuration and
it may be changed in future.


Best Regards,
Shixiong Zhu

2015-09-26 3:34 GMT+08:00 Atul Kulkarni <atulskulka...@gmail.com>:

> Can someone please help either by explaining or pointing to documentation
> the relationship between #executors needed and How to let the concurrent
> jobs that are created by the above parameter run in parallel?
>
> On Thu, Sep 24, 2015 at 11:56 PM, Atul Kulkarni <atulskulka...@gmail.com>
> wrote:
>
>> Hi Folks,
>>
>> I am trying to speed up my spark streaming job, I found a presentation by
>> Tathagata Das that mentions to increase value of
>> "spark.streaming.concurrentJobs" if I have more than one output.
>>
>> In my spark streaming job I am reading from Kafka using Receiver Bases
>> approach and transforming each line of data from Kafka and storing to
>> HBase. I do not intend to do any kind of collation at this stage. I believe
>> this can be parallelized by creating a separate job to write a different
>> set of lines from Kafka to HBase and hence, I set the above parameter to a
>> value > 1. Is my above assumption that writing to HBase for each partition
>> in the RDDs from a given DStream is an independent output operation and can
>> be parallelized?
>>
>> If the assumption is correct, and I run the job - this job creates
>> multiple (smaller) jobs but they are executed one after another, not in
>> parallel - I am curious if there is a requirement that #Executors be >= a
>> particular number (a calculation based on how many repartitions after unio
>> od DSreams etc. - I don't know I am grasping at Straws here.)
>>
>> I would appreciate some help in this regard. Thanks in advance.
>>
>> --
>> Regards,
>> Atul Kulkarni
>>
>
>
>
> --
> Regards,
> Atul Kulkarni
>


Re: Join two dataframe - Timeout after 5 minutes

2015-09-24 Thread Shixiong Zhu
You can change "spark.sql.broadcastTimeout" to increase the timeout. The
default value is 300 seconds.

Best Regards,
Shixiong Zhu

2015-09-24 15:16 GMT+08:00 Eyad Sibai <eyad.alsi...@gmail.com>:

> I am trying to join two tables using dataframes using python 3.4 and I am
> getting the following error
>
>
> I ran it on my localhost machine with 2 workers, spark 1.5
>
>
> I always get timeout if the job takes more than 5 minutes.
>
>
>
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
>
>  at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:69)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
>
>  at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>
>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
>
>  at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:142)
>
>  at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141)
>
>  at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
>
>  ... 33 more
>
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [300 seconds]
>
>  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>
>  at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>
>  at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>
>  at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>
>  at scala.concurrent.Await$.result(package.scala:107)
>
>  at
> org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
>
>  at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>
>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
>
>  at
> org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
>
>  at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>
>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
>
>  at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:119)
>
>  at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:69)
>
>  at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
>
>  ... 41 more
>
>
> 2015-09-23 15:44:09,536 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/static/sql,null}
>
> 2015-09-23 15:44:09,537 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL1/execution/json,null}
>
> 2015-09-23 15:44:09,538 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL1/execution,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL1/json,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL1,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/static/sql,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL/execution/json,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL/execution,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL/json,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/metrics/json,null}
>
> 2015-09-23 15:44:09,540 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
>
> 2015-09-23 15:44:09,540 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/api,null}
>
> 2015-09-23 15:44:09,540 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/,null}
>
> 2015-09-23 15:44:09,540 INFO ContextHan

Re: Hbase Spark streaming issue.

2015-09-24 Thread Shixiong Zhu
Looks like you have an incompatible hbase-default.xml in some place. You
can use the following code to find the location of "hbase-default.xml"

println(Thread.currentThread().getContextClassLoader().getResource("hbase-default.xml"))

Best Regards,
Shixiong Zhu

2015-09-21 15:46 GMT+08:00 Siva <sbhavan...@gmail.com>:

> Hi,
>
> I m seeing some strange error while inserting data from spark streaming to
> hbase.
>
> I can able to write the data from spark (without streaming) to hbase
> successfully, but when i use the same code to write dstream I m seeing the
> below error.
>
> I tried setting the below parameters, still didnt help. Did any face the
> similar issue?
>
> conf.set("hbase.defaults.for.version.skip", "true")
> conf.set("hbase.defaults.for.version", "0.98.4.2.2.4.2-2-hadoop2")
>
> 15/09/20 22:39:10 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
> 16)
> java.lang.RuntimeException: hbase-default.xml file seems to be for and old
> version of HBase (null), this version is 0.98.4.2.2.4.2-2-hadoop2
> at
> org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:73)
> at
> org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:105)
> at
> org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:116)
> at
> org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:125)
> at
> $line51.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$HBaseConn$.hbaseConnection(:49)
> at
> $line52.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$TestHbaseSpark$$anonfun$run$1$$anonfun$apply$1.apply(:73)
> at
> $line52.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$TestHbaseSpark$$anonfun$run$1$$anonfun$apply$1.apply(:73)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:782)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:782)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15/09/20 22:39:10 WARN TaskSetManager: Lost task 0.0 in stage 14.0 (TID
> 16, localhost): java.lang.RuntimeException: hbase-default.xml file seems to
> be for and old version of HBase (null), this version is
> 0.98.4.2.2.4.2-2-hadoop2
>
>
> Thanks,
> Siva.
>


Fwd: Spark streaming DStream state on worker

2015-09-24 Thread Shixiong Zhu
+user, -dev

It's not clear about `compute` in your question. There are two `compute`
here.

1. DStream.compute: it always runs in the driver, and all RDDs are created
in the driver. E.g.,

DStream.foreachRDD(rdd => rdd.count())

"rdd.count()" is called in the driver.

2. RDD.compute: this will run in the executor and the location is not
guaranteed. E.g.,

DStream.foreachRDD(rdd => rdd.foreach { v =>
println(v)
})

"println(v)" is called in the executor.


Best Regards,
Shixiong Zhu

2015-09-17 3:47 GMT+08:00 Renyi Xiong <renyixio...@gmail.com>:

> Hi,
>
> I want to do temporal join operation on DStream across RDDs, my question
> is: Are RDDs from same DStream always computed on same worker (except
> failover) ?
>
> thanks,
> Renyi.
>


Re: JobScheduler: Error generating jobs for time for custom InputDStream

2015-09-24 Thread Shixiong Zhu
Looks like you returns a "Some(null)" in "compute". If you don't want to
create a RDD, it should return None. If you want to return an empty RDD, it
should return "Some(sc.emptyRDD)".

Best Regards,
Shixiong Zhu

2015-09-15 2:51 GMT+08:00 Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com>:

> Hi,
>
> I sent this message to the user list a few weeks ago with no luck, so I'm
> forwarding it to the dev list in case someone could give a hand with this.
> Thanks a lot in advance
>
>
> I've developed a ScalaCheck property for testing Spark Streaming
> transformations. To do that I had to develop a custom InputDStream, which
> is very similar to QueueInputDStream but has a method for adding new test
> cases for dstreams, which are objects of type Seq[Seq[A]], to the DStream.
> You can see the code at
> https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/main/scala/org/apache/spark/streaming/dstream/DynSeqQueueInputDStream.scala.
> I have developed a few properties that run in local mode
> https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/test/scala/es/ucm/fdi/sscheck/spark/streaming/ScalaCheckStreamingTest.scala.
> The problem is that when the batch interval is too small, and the machine
> cannot complete the batches fast enough, I get the following exceptions in
> the Spark log
>
> 15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time
> 1440580922500 ms
> java.lang.NullPointerException
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
> at
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at
> org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStr

Re: SparkContext initialization error- java.io.IOException: No space left on device

2015-09-06 Thread Shixiong Zhu
The folder is in "/tmp" by default. Could you use "df -h" to check the free
space of /tmp?

Best Regards,
Shixiong Zhu

2015-09-05 9:50 GMT+08:00 shenyan zhen <shenya...@gmail.com>:

> Has anyone seen this error? Not sure which dir the program was trying to
> write to.
>
> I am running Spark 1.4.1, submitting Spark job to Yarn, in yarn-client
> mode.
>
> 15/09/04 21:36:06 ERROR SparkContext: Error adding jar
> (java.io.IOException: No space left on device), was the --addJars option
> used?
>
> 15/09/04 21:36:08 ERROR SparkContext: Error initializing SparkContext.
>
> java.io.IOException: No space left on device
>
> at java.io.FileOutputStream.writeBytes(Native Method)
>
> at java.io.FileOutputStream.write(FileOutputStream.java:300)
>
> at
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:178)
>
> at java.util.zip.ZipOutputStream.closeEntry(ZipOutputStream.java:213)
>
> at java.util.zip.ZipOutputStream.finish(ZipOutputStream.java:318)
>
> at java.util.zip.DeflaterOutputStream.close(DeflaterOutputStream.java:163)
>
> at java.util.zip.ZipOutputStream.close(ZipOutputStream.java:338)
>
> at org.apache.spark.deploy.yarn.Client.createConfArchive(Client.scala:432)
>
> at
> org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:338)
>
> at
> org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:561)
>
> at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:115)
>
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:57)
>
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)
>
> at org.apache.spark.SparkContext.(SparkContext.scala:497)
>
> Thanks,
> Shenyan
>


Re: ClassCastException in driver program

2015-09-06 Thread Shixiong Zhu
Looks there are some circular references in SQL making the immutable List
serialization fail in 2.11.

In 2.11, Scala immutable List uses writeReplace()/readResolve() which don't
play nicely with circular references. Here is an example to reproduce this
issue in 2.11.6:

  class Foo extends Serializable {
var l: Seq[Any] = null
  }

  import java.io._

  val o = new ByteArrayOutputStream()
  val o1 = new ObjectOutputStream(o)
  val m = new Foo
  val n = List(1, m)
  m.l = n
  o1.writeObject(n)
  o1.close()
  val i = new ByteArrayInputStream(o.toByteArray)
  val i1 = new ObjectInputStream(i)
  i1.readObject()

Could you provide the "explain" output? It would be helpful to find the
circular references.



Best Regards,
Shixiong Zhu

2015-09-05 0:26 GMT+08:00 Jeff Jones <jjo...@adaptivebiotech.com>:

> We are using Scala 2.11 for a driver program that is running Spark SQL
> queries in a standalone cluster. I’ve rebuilt Spark for Scala 2.11 using
> the instructions at
> http://spark.apache.org/docs/latest/building-spark.html.  I’ve had to
> work through a few dependency conflict but all-in-all it seems to work for
> some simple Spark examples. I integrated the Spark SQL code into my
> application and I’m able to run using a local client, but when I switch
> over to the standalone cluster I get the following error.  Any help
> tracking this down would be appreciated.
>
> This exception occurs during a DataFrame.collect() call. I’ve tried to use
> –Dsun.io.serialization.extendedDebugInfo=true to get more information but
> it didn’t provide anything more.
>
> [error] o.a.s.s.TaskSetManager - Task 0 in stage 1.0 failed 4 times;
> aborting job
>
> [error] c.a.i.c.Analyzer - Job aborted due to stage failure: Task 0 in
> stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0
> (TID 4, 10.248.0.242): java.lang.ClassCastException: cannot assign instance
> of scala.collection.immutable.List$SerializationProxy to field
> org.apache.spark.sql.execution.Project.projectList of type
> scala.collection.Seq in instance of org.apache.spark.sql.execution.Project
>
> at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(Unknown
> Source)
>
> at java.io.ObjectStreamClass.setObjFieldValues(Unknown Source)
>
> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>
> at java.io.ObjectInputStream.readSerialData(Unknown Source)
>
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>
> at java.io.ObjectInputStream.readObject0(Unknown Source)
>
> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>
> at java.io.ObjectInputStream.readSerialData(Unknown Source)
>
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>
> at java.io.ObjectInputStream.readObject0(Unknown Source)
>
> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>
> at java.io.ObjectInputStream.readSerialData(Unknown Source)
>
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>
> at java.io.ObjectInputStream.readObject0(Unknown Source)
>
> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>
> at java.io.ObjectInputStream.readSerialData(Unknown Source)
>
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>
> at java.io.ObjectInputStream.readObject0(Unknown Source)
>
> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>
> at java.io.ObjectInputStream.readSerialData(Unknown Source)
>
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>
> at java.io.ObjectInputStream.readObject0(Unknown Source)
>
> at java.io.ObjectInputStream.readObject(Unknown Source)
>
> at
> scala.collection.immutable.List$SerializationProxy.readObject(List.scala:477)
>
> at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>
> at java.lang.reflect.Method.invoke(Unknown Source)
>
> at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
>
> at java.io.ObjectInputStream.readSerialData(Unknown Source)
>
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>
> at java.io.ObjectInputStream.readObject0(Unknown Source)
>
> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>
> at java.io.ObjectInputStream.readSerialData(Unknown Source)
>
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>
> at java.io.ObjectInputStream.readObject0(Unknown Source)
>
> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>
> at java.io.ObjectInputStream.readSerialData(Unknown Source)
>
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>
> at java.io.ObjectInputStream.readObject0(Unknown Source)
>

Re: DataFrame#show cost 2 Spark Jobs ?

2015-08-25 Thread Shixiong Zhu
That's two jobs. `SparkPlan.executeTake` will call `runJob` twice in this
case.

Best Regards,
Shixiong Zhu

2015-08-25 14:01 GMT+08:00 Cheng, Hao hao.ch...@intel.com:

 O, Sorry, I miss reading your reply!



 I know the minimum tasks will be 2 for scanning, but Jeff is talking about
 2 jobs, not 2 tasks.



 *From:* Shixiong Zhu [mailto:zsxw...@gmail.com]
 *Sent:* Tuesday, August 25, 2015 1:29 PM
 *To:* Cheng, Hao
 *Cc:* Jeff Zhang; user@spark.apache.org

 *Subject:* Re: DataFrame#show cost 2 Spark Jobs ?



 Hao,



 I can reproduce it using the master branch. I'm curious why you cannot
 reproduce it. Did you check if the input HadoopRDD did have two partitions?
 My test code is



 val df = sqlContext.read.json(examples/src/main/resources/people.json)

 df.show()




 Best Regards,

 Shixiong Zhu



 2015-08-25 13:01 GMT+08:00 Cheng, Hao hao.ch...@intel.com:

 Hi Jeff, which version are you using? I couldn’t reproduce the 2 spark
 jobs in the `df.show()` with latest code, we did refactor the code for json
 data source recently, not sure you’re running an earlier version of it.



 And a known issue is Spark SQL will try to re-list the files every time
 when loading the data for JSON, it’s probably causes longer time for ramp
 up with large number of files/partitions.



 *From:* Jeff Zhang [mailto:zjf...@gmail.com]
 *Sent:* Tuesday, August 25, 2015 8:11 AM
 *To:* Cheng, Hao
 *Cc:* user@spark.apache.org
 *Subject:* Re: DataFrame#show cost 2 Spark Jobs ?



 Hi Cheng,



 I know that sqlContext.read will trigger one spark job to infer the
 schema. What I mean is DataFrame#show cost 2 spark jobs. So overall it
 would cost 3 jobs.



 Here's the command I use:



  val df = sqlContext.read.json(
 file:///Users/hadoop/github/spark/examples/src/main/resources/people.json)
// trigger one spark job to infer schema

  df.show()// trigger 2 spark jobs which is weird









 On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao hao.ch...@intel.com wrote:

 The first job is to infer the json schema, and the second one is what you
 mean of the query.

 You can provide the schema while loading the json file, like below:



 sqlContext.read.schema(xxx).json(“…”)?



 Hao

 *From:* Jeff Zhang [mailto:zjf...@gmail.com]
 *Sent:* Monday, August 24, 2015 6:20 PM
 *To:* user@spark.apache.org
 *Subject:* DataFrame#show cost 2 Spark Jobs ?



 It's weird to me that the simple show function will cost 2 spark jobs.
 DataFrame#explain shows it is a very simple operation, not sure why need 2
 jobs.



 == Parsed Logical Plan ==

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Analyzed Logical Plan ==

 age: bigint, name: string

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Optimized Logical Plan ==

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Physical Plan ==

 Scan
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1]







 --

 Best Regards

 Jeff Zhang





 --

 Best Regards

 Jeff Zhang





Re: DataFrame#show cost 2 Spark Jobs ?

2015-08-24 Thread Shixiong Zhu
Hao,

I can reproduce it using the master branch. I'm curious why you cannot
reproduce it. Did you check if the input HadoopRDD did have two partitions?
My test code is

val df = sqlContext.read.json(examples/src/main/resources/people.json)
df.show()


Best Regards,
Shixiong Zhu

2015-08-25 13:01 GMT+08:00 Cheng, Hao hao.ch...@intel.com:

 Hi Jeff, which version are you using? I couldn’t reproduce the 2 spark
 jobs in the `df.show()` with latest code, we did refactor the code for json
 data source recently, not sure you’re running an earlier version of it.



 And a known issue is Spark SQL will try to re-list the files every time
 when loading the data for JSON, it’s probably causes longer time for ramp
 up with large number of files/partitions.



 *From:* Jeff Zhang [mailto:zjf...@gmail.com]
 *Sent:* Tuesday, August 25, 2015 8:11 AM
 *To:* Cheng, Hao
 *Cc:* user@spark.apache.org
 *Subject:* Re: DataFrame#show cost 2 Spark Jobs ?



 Hi Cheng,



 I know that sqlContext.read will trigger one spark job to infer the
 schema. What I mean is DataFrame#show cost 2 spark jobs. So overall it
 would cost 3 jobs.



 Here's the command I use:



  val df = sqlContext.read.json(
 file:///Users/hadoop/github/spark/examples/src/main/resources/people.json)
// trigger one spark job to infer schema

  df.show()// trigger 2 spark jobs which is weird









 On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao hao.ch...@intel.com wrote:

 The first job is to infer the json schema, and the second one is what you
 mean of the query.

 You can provide the schema while loading the json file, like below:



 sqlContext.read.schema(xxx).json(“…”)?



 Hao

 *From:* Jeff Zhang [mailto:zjf...@gmail.com]
 *Sent:* Monday, August 24, 2015 6:20 PM
 *To:* user@spark.apache.org
 *Subject:* DataFrame#show cost 2 Spark Jobs ?



 It's weird to me that the simple show function will cost 2 spark jobs.
 DataFrame#explain shows it is a very simple operation, not sure why need 2
 jobs.



 == Parsed Logical Plan ==

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Analyzed Logical Plan ==

 age: bigint, name: string

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Optimized Logical Plan ==

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Physical Plan ==

 Scan
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1]







 --

 Best Regards

 Jeff Zhang





 --

 Best Regards

 Jeff Zhang



Re: DataFrame#show cost 2 Spark Jobs ?

2015-08-24 Thread Shixiong Zhu
Because defaultMinPartitions is 2 (See
https://github.com/apache/spark/blob/642c43c81c835139e3f35dfd6a215d668a474203/core/src/main/scala/org/apache/spark/SparkContext.scala#L2057
), your input people.json will be split to 2 partitions.

At first, `take` will start a job for the first partition. However, the
limit is 21, but the first partition only has 2 records. So it will
continue to start a new job for the second partition.

You can check implementation details in SparkPlan.executeTake:
https://github.com/apache/spark/blob/642c43c81c835139e3f35dfd6a215d668a474203/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L185

Best Regards,
Shixiong Zhu

2015-08-25 8:11 GMT+08:00 Jeff Zhang zjf...@gmail.com:

 Hi Cheng,

 I know that sqlContext.read will trigger one spark job to infer the
 schema. What I mean is DataFrame#show cost 2 spark jobs. So overall it
 would cost 3 jobs.

 Here's the command I use:

  val df =
 sqlContext.read.json(file:///Users/hadoop/github/spark/examples/src/main/resources/people.json)
// trigger one spark job to infer schema
  df.show()// trigger 2 spark jobs which is weird




 On Mon, Aug 24, 2015 at 10:56 PM, Cheng, Hao hao.ch...@intel.com wrote:

 The first job is to infer the json schema, and the second one is what you
 mean of the query.

 You can provide the schema while loading the json file, like below:



 sqlContext.read.schema(xxx).json(“…”)?



 Hao

 *From:* Jeff Zhang [mailto:zjf...@gmail.com]
 *Sent:* Monday, August 24, 2015 6:20 PM
 *To:* user@spark.apache.org
 *Subject:* DataFrame#show cost 2 Spark Jobs ?



 It's weird to me that the simple show function will cost 2 spark jobs.
 DataFrame#explain shows it is a very simple operation, not sure why need 2
 jobs.



 == Parsed Logical Plan ==

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Analyzed Logical Plan ==

 age: bigint, name: string

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Optimized Logical Plan ==

 Relation[age#0L,name#1]
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json]



 == Physical Plan ==

 Scan
 JSONRelation[file:/Users/hadoop/github/spark/examples/src/main/resources/people.json][age#0L,name#1]







 --

 Best Regards

 Jeff Zhang




 --
 Best Regards

 Jeff Zhang



Re: spark streaming map use external variable occur a problem

2015-08-14 Thread Shixiong Zhu
Looks you compiled the codes with one Scala version but ran your app using
a different incompatible version.

BTW, you should not use PrintWriter like this to save your results. There
may be multiple tasks running at the same host, and your job will fail
because you are trying to write to the same file. Could you convert your
data to String using map and use saveAsTextFile or other save methods?

Best Regards,
Shixiong Zhu

2015-08-14 11:02 GMT+08:00 kale 805654...@qq.com:




 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark is much slower than direct access MySQL

2015-07-26 Thread Shixiong Zhu
Oh, I see. That's the total time of executing a query in Spark. Then the
difference is reasonable, considering Spark has much more work to do, e.g.,
launching tasks in executors.

Best Regards,
Shixiong Zhu

2015-07-26 16:16 GMT+08:00 Louis Hust louis.h...@gmail.com:

 Look at the given url:

 Code can be found at:


 https://github.com/louishust/sparkDemo/blob/master/src/main/java/DirectQueryTest.java

 2015-07-26 16:14 GMT+08:00 Shixiong Zhu zsxw...@gmail.com:

 Could you clarify how you measure the Spark time cost? Is it the total
 time of running the query? If so, it's possible because the overhead of
 Spark dominates for small queries.

 Best Regards,
 Shixiong Zhu

 2015-07-26 15:56 GMT+08:00 Jerrick Hoang jerrickho...@gmail.com:

 how big is the dataset? how complicated is the query?

 On Sun, Jul 26, 2015 at 12:47 AM Louis Hust louis.h...@gmail.com
 wrote:

 Hi, all,

 I am using spark DataFrame to fetch small table from MySQL,
 and i found it cost so much than directly access MySQL Using JDBC.

 Time cost for Spark is about 2033ms, and direct access at about 16ms.

 Code can be found at:


 https://github.com/louishust/sparkDemo/blob/master/src/main/java/DirectQueryTest.java

 So If my configuration for spark is wrong? How to optimise Spark to
 achieve the similar performance like direct access?

 Any idea will be appreciated!






Re: Spark is much slower than direct access MySQL

2015-07-26 Thread Shixiong Zhu
Could you clarify how you measure the Spark time cost? Is it the total time
of running the query? If so, it's possible because the overhead of
Spark dominates for small queries.

Best Regards,
Shixiong Zhu

2015-07-26 15:56 GMT+08:00 Jerrick Hoang jerrickho...@gmail.com:

 how big is the dataset? how complicated is the query?

 On Sun, Jul 26, 2015 at 12:47 AM Louis Hust louis.h...@gmail.com wrote:

 Hi, all,

 I am using spark DataFrame to fetch small table from MySQL,
 and i found it cost so much than directly access MySQL Using JDBC.

 Time cost for Spark is about 2033ms, and direct access at about 16ms.

 Code can be found at:


 https://github.com/louishust/sparkDemo/blob/master/src/main/java/DirectQueryTest.java

 So If my configuration for spark is wrong? How to optimise Spark to
 achieve the similar performance like direct access?

 Any idea will be appreciated!




Re: Some BlockManager Doubts

2015-07-09 Thread Shixiong Zhu
MEMORY_AND_DISK will use disk if there is no enough memory. If there is no
enough memory when putting a MEMORY_AND_DISK block, BlockManager will store
it to disk. And if a MEMORY_AND_DISK block is dropped from memory,
MemoryStore will call BlockManager.dropFromMemory to store it to disk, see
MemoryStore.ensureFreeSpace for details.

Best Regards,
Shixiong Zhu

2015-07-09 19:17 GMT+08:00 Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com:

 Hi ,

 Just would like to clarify few doubts I have how BlockManager behaves .
 This is mostly in regards to Spark Streaming Context .

 There are two possible cases Blocks may get dropped / not stored in memory

 Case 1. While writing the Block for MEMORY_ONLY settings , if Node's
 BlockManager does not have enough memory to unroll the block , Block wont
 be stored to memory and Receiver will throw error while writing the Block..
 If StorageLevel is using Disk ( as in case MEMORY_AND_DISK) , blocks will
 be stored to Disk ONLY IF BlockManager not able to unroll to Memory... This
 is fine in the case while receiving the blocks , but this logic has a issue
 when old Blocks are chosen to be dropped from memory as Case 2

 Case 2 : Now let say either for MEMORY_ONLY or MEMORY_AND_DISK settings ,
 blocks are successfully stored to Memory in Case 1 . Now what would happen
 if memory limit goes beyond a certain threshold, BlockManager start
 dropping LRU blocks from memory which was successfully stored while
 receiving.

 Primary issue here what I see , while dropping the blocks in Case 2 ,
 Spark does not check if storage level is using Disk (MEMORY_AND_DISK ) ,
 and even with DISK storage levels  blocks is drooped from memory without
 writing it to Disk.
 Or I believe the issue is at the first place that blocks are NOT written
 to Disk simultaneously in Case 1 , I understand this will impact throughput
 , but it design may throw BlockNotFound error if Blocks are chosen to be
 dropped even in case of StorageLevel is using Disk.

 Any thoughts ?

 Regards,
 Dibyendu



Re: change default storage level

2015-07-09 Thread Shixiong Zhu
Spark won't store RDDs to memory unless you use a memory StorageLevel. By
default, your input and intermediate results won't be put into memory. You
can call persist if you want to avoid duplicate computation or reading.
E.g.,

val r1 = context.wholeTextFiles(...)
val r2 = r1.flatMap(s - ...)
val r3 = r2.filter(...)...
r3.saveAsTextFile(...)
val r4 = r2.map(...)...
r4.saveAsTextFile(...)

In the avoid example, r2 will be used twice. To speed up the computation,
you can call r2.persist(StorageLevel.MEMORY) to store r2 into memory. Then
r4 will use the data of r2 in memory directly. E.g.,

val r1 = context.wholeTextFiles(...)
val r2 = r1.flatMap(s - ...)
r2.persist(StorageLevel.MEMORY)
val r3 = r2.filter(...)...
r3.saveAsTextFile(...)
val r4 = r2.map(...)...
r4.saveAsTextFile(...)

See
http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence


Best Regards,
Shixiong Zhu

2015-07-09 22:09 GMT+08:00 Michal Čizmazia mici...@gmail.com:

 Is there a way how to change the default storage level?

 If not, how can I properly change the storage level wherever necessary, if
 my input and intermediate results do not fit into memory?

 In this example:

 context.wholeTextFiles(...)
 .flatMap(s - ...)
 .flatMap(s - ...)

 Does persist() need to be called after every transformation?

  context.wholeTextFiles(...)
 .persist(StorageLevel.MEMORY_AND_DISK)
 .flatMap(s - ...)
 .persist(StorageLevel.MEMORY_AND_DISK)
 .flatMap(s - ...)
 .persist(StorageLevel.MEMORY_AND_DISK)

  Thanks!




Re: (de)serialize DStream

2015-07-08 Thread Shixiong Zhu
DStream must be Serializable, it's metadata checkpointing. But you can use
KryoSerializer for data checkpointing. The data checkpointing uses
RDD.checkpoint which can be set by spark.serializer.

Best Regards,
Shixiong Zhu

2015-07-08 3:43 GMT+08:00 Chen Song chen.song...@gmail.com:

 In Spark Streaming, when using updateStateByKey, it requires the generated
 DStream to be checkpointed.

 It seems that it always use JavaSerializer, no matter what I set for
 spark.serializer. Can I use KryoSerializer for checkpointing? If not, I
 assume the key and value types have to be Serializable?

 Chen



Re: Application jar file not found exception when submitting application

2015-07-06 Thread Shixiong Zhu
Before running your script, could you confirm that 
/data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar
exists? You might forget to build this jar.

Best Regards,
Shixiong Zhu

2015-07-06 18:14 GMT+08:00 bit1...@163.com bit1...@163.com:

 Hi,
 I have following shell script that will submit the application to the
 cluster. But whenever I start the application, I encounter
 FileNotFoundException, after retrying for serveral times, I can
 successfully submit it!


 SPARK=/data/software/spark-1.3.1-bin-2.4.0
 APP_HOME=/data/software/spark-1.3.1-bin-2.4.0/applications
 $SPARK/bin/spark-submit --deploy-mode cluster --name
 PssAmStreamingApplication --master spark:/com-app1:7077 --driver-memory 1G
 --executor-memory 4G --total-executor-cores 10 --class
 com.app.PssAmStreamingApplicationDriver
 $APP_HOME/pss.am.core-1.0-SNAPSHOT-shaded.jar




 [root@com-app2 applications]# ./submitApplicationStreaming.sh
 Running Spark using the REST application submission protocol.
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 15/07/06 18:05:35 INFO StandaloneRestClient: Submitting a request to
 launch an application in spark://com-app1:7077.
 Warning: Master endpoint spark://com-app1:7077 was not a REST server.
 Falling back to legacy submission gateway instead.
 15/07/06 18:05:36 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 Sending launch command to spark://com-app1:7077
 Driver successfully submitted as driver-20150706180538-0008
 ... waiting before polling master for driver state
 ... polling master for driver state
 State of driver-20150706180538-0008 is ERROR
 Exception from cluster was: java.io.FileNotFoundException: File
 file:/data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar
 does not exist
 java.io.FileNotFoundException: File
 file:/data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar
 does not exist
 at
 org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:511)

 at
 org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724)

 at
 org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501)

 at
 org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397)

 at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:464)
 at 
 org.apache.spark.deploy.worker.DriverRunner.org$apache$spark$deploy$worker$DriverRunner$$downloadUserJar(DriverRunner.scala:146)

 at
 org.apache.spark.deploy.worker.DriverRunner$$anon$1.run(DriverRunner.scala:72)

 --
 bit1...@163.com



Re: How to shut down spark web UI?

2015-07-06 Thread Shixiong Zhu
You can set spark.ui.enabled to false to disable the Web UI.

Best Regards,
Shixiong Zhu

2015-07-06 17:05 GMT+08:00 luohui20...@sina.com:

 Hello there,

I heard that there is some way to shutdown Spark WEB UI, is there a
 configuration to support this?

   Thank you.

 

 Thanksamp;Best regards!
 San.Luo



Re: learning rpc about spark core source code

2015-06-10 Thread Shixiong Zhu
The new RPC interface is an internal module and added in 1.4. It should not
exist in 1.3. Where did you find it?

For the communication between driver, worker and master, it still uses
Akka. There are a pending PR to update them:
https://github.com/apache/spark/pull/5392 Do you mean the communication
between driver and executors? Because this is an ongoing work, there is no
blog now. But you can find more details in this umbrella JIRA:
https://issues.apache.org/jira/browse/SPARK-5293



Best Regards,
Shixiong Zhu

2015-06-10 20:33 GMT+08:00 huangzheng 1106944...@qq.com:

 Hi all

Recently I have learned about  1.3 spark core source code ,  can’t
 understand rpc,  How to communicate between client driver, worker  and
 master?

   There are some scala files such as RpcCallContextRpcEndPointRef
 RpcEndpoint  RpcEnv. On spark core rpc module

   Have any blogs ?



 Thank you very much!



Re: StreamingListener, anyone?

2015-06-04 Thread Shixiong Zhu
You should not call `jssc.stop(true);` in a StreamingListener. It will
cause a dead-lock: `jssc.stop` won't return until `listenerBus` exits. But
since `jssc.stop` blocks `StreamingListener`, `listenerBus` cannot exit.

Best Regards,
Shixiong Zhu

2015-06-04 0:39 GMT+08:00 dgoldenberg dgoldenberg...@gmail.com:

 Hi,

 I've got a Spark Streaming driver job implemented and in it, I register a
 streaming listener, like so:

 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(params.getBatchDurationMillis()));
 jssc.addStreamingListener(new JobListener(jssc));

 where JobListener is defined like so
 private static class JobListener implements StreamingListener {

 private JavaStreamingContext jssc;

 JobListener(JavaStreamingContext jssc) {
 this.jssc = jssc;
 }

 @Override
 public void
 onBatchCompleted(StreamingListenerBatchCompleted
 batchCompleted) {
 System.out.println( Batch completed.);
 jssc.stop(true);
 System.out.println( The job has been stopped.);
 }
 

 I do not seem to be seeing onBatchCompleted being triggered.  Am I doing
 something wrong?

 In this particular case, I was trying to implement a bulk ingest type of
 logic where the first batch is all we're interested in (reading out of a
 Kafka topic with offset reset set to smallest).




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/StreamingListener-anyone-tp23140.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Re: spark 1.3.1 jars in repo1.maven.org

2015-06-02 Thread Shixiong Zhu
Ryan - I sent a PR to fix your issue:
https://github.com/apache/spark/pull/6599

Edward - I have no idea why the following error happened. ContextCleaner
doesn't use any Hadoop API. Could you try Spark 1.3.0? It's supposed to
support both hadoop 1 and hadoop 2.

* Exception in thread Spark Context Cleaner
java.lang.NoClassDefFoundError: 0
at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149)


Best Regards,
Shixiong Zhu

2015-06-03 0:08 GMT+08:00 Ryan Williams ryan.blake.willi...@gmail.com:

 I think this is causing issues upgrading ADAM
 https://github.com/bigdatagenomics/adam to Spark 1.3.1 (cf. adam#690
 https://github.com/bigdatagenomics/adam/pull/690#issuecomment-107769383);
 attempting to build against Hadoop 1.0.4 yields errors like:

 2015-06-02 15:57:44 ERROR Executor:96 - Exception in task 0.0 in stage 0.0
 (TID 0)
 *java.lang.IncompatibleClassChangeError: Found class
 org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected*
 at
 org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95)
 at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 2015-06-02 15:57:44 WARN  TaskSetManager:71 - Lost task 0.0 in stage 0.0
 (TID 0, localhost): java.lang.IncompatibleClassChangeError: Found class
 org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected
 at
 org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95)
 at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 TaskAttemptContext is a class in Hadoop 1.0.4, but an interface in Hadoop
 2; Spark 1.3.1 expects the interface but is getting the class.

 It sounds like, while I *can* depend on Spark 1.3.1 and Hadoop 1.0.4, I
 then need to hope that I don't exercise certain Spark code paths that run
 afoul of differences between Hadoop 1 and 2; does that seem correct?

 Thanks!

 On Wed, May 20, 2015 at 1:52 PM Sean Owen so...@cloudera.com wrote:

 I don't think any of those problems are related to Hadoop. Have you
 looked at userClassPathFirst settings?

 On Wed, May 20, 2015 at 6:46 PM, Edward Sargisson ejsa...@gmail.com
 wrote:

 Hi Sean and Ted,
 Thanks for your replies.

 I don't have our current problems nicely written up as good questions
 yet. I'm still sorting out classpath issues, etc.
 In case it is of help, I'm seeing:
 * Exception in thread Spark Context Cleaner
 java.lang.NoClassDefFoundError: 0
 at
 org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149)
 * We've been having clashing dependencies between a colleague and I
 because of the aforementioned classpath issue
 * The clashing dependencies are also causing issues with what jetty
 libraries are available in the classloader from Spark and don't clash with
 existing libraries we have.

 More anon,

 Cheers,
 Edward



  Original Message 
  Subject: Re: spark 1.3.1 jars in repo1.maven.org Date: 2015-05-20 00:38
 From: Sean Owen so...@cloudera.com To: Edward Sargisson 
 esa...@pobox.com Cc: user user@spark.apache.org


 Yes, the published artifacts can only refer to one version of anything
 (OK, modulo publishing a large number of variants under classifiers).

 You aren't intended to rely on Spark's transitive dependencies for
 anything. Compiling against the Spark API has no relation to what
 version of Hadoop it binds against because it's not part of any API.
 You mark the Spark dependency even as provided in your build and get
 all the Spark/Hadoop bindings at runtime from our cluster.

 What problem are you experiencing?


 On Wed, May 20, 2015 at 2:17 AM, Edward Sargisson esa...@pobox.com
 wrote:

 Hi

Re: Spark 1.4.0-rc3: Actor not found

2015-06-02 Thread Shixiong Zhu
How about other jobs? Is it an executor log, or a driver log? Could you
post other logs near this error, please? Thank you.

Best Regards,
Shixiong Zhu

2015-06-02 17:11 GMT+08:00 Anders Arpteg arp...@spotify.com:

 Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that
 worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster
 mode), initial stage starts, but the job fails before any task succeeds
 with the following error. Any hints?

 [ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0]
 [akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler]
 swallowing exception during message send
 (akka.remote.RemoteTransportExceptionNoStackTrace)
 Exception in thread main akka.actor.ActorNotFound: Actor not found for:
 ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/),
 Path(/user/OutputCommitCoordinator)]
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
 at
 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 at
 scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 at
 akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
 at
 akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
 at
 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
 at
 scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 at
 scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
 at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
 at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
 at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
 at
 akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
 at
 akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
 at akka.dispatch.Mailbox.run(Mailbox.scala:220)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)




Re: Spark Streaming graceful shutdown in Spark 1.4

2015-05-21 Thread Shixiong Zhu
My 2 cents:

As per javadoc:
https://docs.oracle.com/javase/7/docs/api/java/lang/Runtime.html#addShutdownHook(java.lang.Thread)

Shutdown hooks should also finish their work quickly. When a program
invokes exit the expectation is that the virtual machine will promptly shut
down and exit. When the virtual machine is terminated due to user logoff or
system shutdown the underlying operating system may only allow a fixed
amount of time in which to shut down and exit. It is therefore inadvisable
to attempt any user interaction or to perform a long-running computation in
a shutdown hook.

The shutdown hook should not do any long-running work and may exit before
stop returns. It means we cannot implement the stopGracefully = true
semantics correctly, which the user will expect stops gracefully by
waiting for the processing of all received data to be completed. So I
agree that we can add `ssc.stop` as a the shutdown hook. But stopGracefully
should be false.



Best Regards,
Shixiong Zhu

2015-05-20 21:59 GMT-07:00 Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com:

 Thanks Tathagata for making this change..

 Dibyendu

 On Thu, May 21, 2015 at 8:24 AM, Tathagata Das t...@databricks.com
 wrote:

 If you are talking about handling driver crash failures, then all bets
 are off anyways! Adding a shutdown hook in the hope of handling driver
 process failure, handles only a some cases (Ctrl-C), but does not handle
 cases like SIGKILL (does not run JVM shutdown hooks) or driver machine
 crash. So its not a good idea to rely on that.

 Nonetheless I have opened a PR to handle the shutdown of the
 StreamigntContext in the same way as SparkContext.
 https://github.com/apache/spark/pull/6307


 On Tue, May 19, 2015 at 12:51 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Thenka Sean . you are right. If driver program is running then I can
 handle shutdown in main exit path  . But if Driver machine is crashed (if
 you just stop the application, for example killing the driver process ),
 then Shutdownhook is the only option isn't it ? What I try to say is , just
 doing ssc.stop in  sys.ShutdownHookThread  or
  Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need
 to use the Utils.addShutdownHook with a priority .. So just checking if
 Spark Streaming can make graceful shutdown as default shutdown mechanism.

 Dibyendu

 On Tue, May 19, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote:

 I don't think you should rely on a shutdown hook. Ideally you try to
 stop it in the main exit path of your program, even in case of an
 exception.

 On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya
 dibyendu.bhattach...@gmail.com wrote:
  You mean to say within Runtime.getRuntime().addShutdownHook I call
  ssc.stop(stopSparkContext  = true, stopGracefully  = true) ?
 
  This won't work anymore in 1.4.
 
  The SparkContext got stopped before Receiver processed all received
 blocks
  and I see below exception in logs. But if I add the
 Utils.addShutdownHook
  with the priority as I mentioned , then only graceful shutdown works
 . In
  that case shutdown-hook run in priority order.
 







Re: Problem with current spark

2015-05-15 Thread Shixiong Zhu
Could your provide the full driver log? Looks like a bug. Thank you!

Best Regards,
Shixiong Zhu

2015-05-13 14:02 GMT-07:00 Giovanni Paolo Gibilisco gibb...@gmail.com:

 Hi,
 I'm trying to run an application that uses a Hive context to perform some
 queries over JSON files.
 The code of the application is here:
 https://github.com/GiovanniPaoloGibilisco/spark-log-processor/tree/fca93d95a227172baca58d51a4d799594a0429a1

 I can run it on Spark 1.3.1 after rebuilding it with hive support
 using: mvn -Phive -Phive-thriftserver -DskipTests clean package
 but when I try to run the same application on the one built fromt he
 current master branch (at this commit of today
 https://github.com/apache/spark/tree/bec938f777a2e18757c7d04504d86a5342e2b49e)
 again built with hive support I get an error at Stage 2 that is not
 submitted, and after a while the application is killed.
 The logs look like this:

 15/05/13 16:54:37 INFO SparkContext: Starting job: run at unknown:0
 15/05/13 16:54:37 INFO DAGScheduler: Got job 2 (run at unknown:0) with 2
 output partitions (allowLocal=false)
 15/05/13 16:54:37 INFO DAGScheduler: Final stage: ResultStage 4(run at
 unknown:0)
 15/05/13 16:54:37 INFO DAGScheduler: Parents of final stage: List()
 15/05/13 16:54:37 INFO Exchange: Using SparkSqlSerializer2.
 15/05/13 16:54:37 INFO SparkContext: Starting job: run at unknown:0
 15/05/13 16:54:37 INFO SparkContext: Starting job: run at unknown:0
 15/05/13 16:54:37 INFO SparkContext: Starting job: run at unknown:0
 ^C15/05/13 16:54:42 INFO SparkContext: Invoking stop() from shutdown hook
 15/05/13 16:54:42 INFO SparkUI: Stopped Spark web UI at
 http://192.168.230.130:4040
 15/05/13 16:54:42 INFO DAGScheduler: Stopping DAGScheduler
 15/05/13 16:54:42 INFO SparkDeploySchedulerBackend: Shutting down all
 executors
 15/05/13 16:54:42 INFO SparkDeploySchedulerBackend: Asking each executor
 to shut down
 15/05/13 16:54:52 INFO
 OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
 OutputCommitCoordinator stopped!
 15/05/13 16:54:52 ERROR TaskSchedulerImpl: Lost executor 0 on
 192.168.230.130: remote Rpc client disassociated
 15/05/13 16:54:53 INFO AppClient$ClientActor: Executor updated:
 app-20150513165402-/0 is now EXITED (Command exited with code 0)
 15/05/13 16:54:53 INFO SparkDeploySchedulerBackend: Executor
 app-20150513165402-/0 removed: Command exited with code 0
 15/05/13 16:54:53 ERROR SparkDeploySchedulerBackend: Asked to remove
 non-existent executor 0
 15/05/13 16:56:42 WARN AkkaRpcEndpointRef: Error sending message [message
 = StopExecutors] in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [120
 seconds]
 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at
 org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
 at
 org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
 at
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:257)
 at
 org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:266)
 at
 org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.stop(SparkDeploySchedulerBackend.scala:95)
 at
 org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:416)
 at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1404)
 at org.apache.spark.SparkContext.stop(SparkContext.scala:1562)
 at
 org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:551)
 at org.apache.spark.util.SparkShutdownHook.run(Utils.scala:2252)
 at
 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Utils.scala:)
 at
 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:)
 at
 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1764)
 at
 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(Utils.scala:)
 at
 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:)
 at
 org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:)
 at scala.util.Try$.apply(Try.scala:161)
 at org.apache.spark.util.SparkShutdownHookManager.runAll(Utils.scala:)
 at
 org.apache.spark.util.SparkShutdownHookManager$$anon$6.run(Utils.scala:2204)
 at
 org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)

 Should I submit an Issue for this?
 What is the best way to do it?
 Best





Re: history server

2015-05-07 Thread Shixiong Zhu
The history server may need several hours to start if you have a lot of
event logs. Is it stuck, or still replaying logs?

Best Regards,
Shixiong Zhu

2015-05-07 11:03 GMT-07:00 Marcelo Vanzin van...@cloudera.com:

 Can you get a jstack for the process? Maybe it's stuck somewhere.

 On Thu, May 7, 2015 at 11:00 AM, Koert Kuipers ko...@tresata.com wrote:

 i am trying to launch the spark 1.3.1 history server on a secure cluster.

 i can see in the logs that it successfully logs into kerberos, and it is
 replaying all the logs, but i never see the log message that indicate the
 web server is started (i should see something like Successfully started
 service on port 18080. or Started HistoryServer at
 http://somehost:18080;). yet the daemon stays alive...

 any idea why the history server would never start the web service?

 thanks!




 --
 Marcelo



Re: history server

2015-05-07 Thread Shixiong Zhu
SPARK-5522 is really cool. Didn't notice it.

Best Regards,
Shixiong Zhu

2015-05-07 11:36 GMT-07:00 Marcelo Vanzin van...@cloudera.com:

 That shouldn't be true in 1.3 (see SPARK-5522).

 On Thu, May 7, 2015 at 11:33 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 The history server may need several hours to start if you have a lot of
 event logs. Is it stuck, or still replaying logs?

 Best Regards,
 Shixiong Zhu

 2015-05-07 11:03 GMT-07:00 Marcelo Vanzin van...@cloudera.com:

 Can you get a jstack for the process? Maybe it's stuck somewhere.

 On Thu, May 7, 2015 at 11:00 AM, Koert Kuipers ko...@tresata.com
 wrote:

 i am trying to launch the spark 1.3.1 history server on a secure
 cluster.

 i can see in the logs that it successfully logs into kerberos, and it
 is replaying all the logs, but i never see the log message that indicate
 the web server is started (i should see something like Successfully
 started service on port 18080. or Started HistoryServer at
 http://somehost:18080;). yet the daemon stays alive...

 any idea why the history server would never start the web service?

 thanks!




 --
 Marcelo





 --
 Marcelo



Re:

2015-05-06 Thread Shixiong Zhu
You are using Scala 2.11 with 2.10 libraries. You can change

org.apache.spark % spark-streaming_2.10 % 1.3.1

to

org.apache.spark %% spark-streaming % 1.3.1

And sbt will use the corresponding libraries according to your Scala
version.


Best Regards,
Shixiong Zhu

2015-05-06 16:21 GMT-07:00 anshu shukla anshushuk...@gmail.com:

 Exception with sample testing in Intellij IDE:

 Exception in thread main java.lang.NoClassDefFoundError:
 scala/collection/GenTraversableOnce$class
 at akka.util.Collections$EmptyImmutableSeq$.init(Collections.scala:15)
 at akka.util.Collections$EmptyImmutableSeq$.clinit(Collections.scala)
 at akka.japi.Util$.immutableSeq(JavaAPI.scala:229)
 at akka.remote.RemoteSettings.init(RemoteSettings.scala:30)
 at
 akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:114)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at
 akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
 at scala.util.Try$.apply(Try.scala:191)
 at
 akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
 at
 akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
 at
 akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
 at scala.util.Success.flatMap(Try.scala:230)
 at
 akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
 at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:584)
 at akka.actor.ActorSystemImpl.init(ActorSystem.scala:577)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
 at
 org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)
 at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55)
 at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
 at
 org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1837)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:166)
 at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1828)
 at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57)
 at org.apache.spark.SparkEnv$.create(SparkEnv.scala:223)
 at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:163)
 at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:269)
 at org.apache.spark.SparkContext.init(SparkContext.scala:272)
 *at Testspark$.main(Testspark.scala:17)*
 at Testspark.main(Testspark.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
 Caused by: java.lang.ClassNotFoundException:
 scala.collection.GenTraversableOnce$class
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 38 more



 *Code is Testspark.scala-*

 /**
  * Created by anshushukla on 07/05/15.
  */
 import org.apache.spark.{SparkConf, SparkContext}


 object Testspark {


   def main (args: Array[String]) {
 val conf=new SparkConf()
 .setMaster(local[2])
 .setAppName(TestSpark)

 val sc=new SparkContext(conf)//line number 17 showing exception

 val data=sc.parallelize(1 to 10).collect().filter(_1000)
 data.foreach(println)

   }

 }


 *build.sbt is -*

 name := scala-test-workspace

 version := 1.0

 scalaVersion := 2.11.6

 libraryDependencies += org.apache.spark % spark-streaming_2.10 % 1.3.1


 --
 Thanks  Regards,
 Anshu Shukla
 Indian Institute of Science



Re: How to install spark in spark on yarn mode

2015-04-30 Thread Shixiong Zhu
You don't need to install Spark. Just download or build a Spark package
that matches your Yarn version. And ensure that HADOOP_CONF_DIR or
YARN_CONF_DIR points to the directory which contains the (client side)
configuration files for the Hadoop cluster.

See instructions here:
http://spark.apache.org/docs/latest/running-on-yarn.html


Best Regards,
Shixiong Zhu

2015-04-30 1:00 GMT-07:00 xiaohe lan zombiexco...@gmail.com:

 Hi Madhvi,

 If I only install spark on one node, and use spark-submit to run an
 application, which are the Worker nodes? Any where are the executors ?

 Thanks,
 Xiaohe

 On Thu, Apr 30, 2015 at 12:52 PM, madhvi madhvi.gu...@orkash.com wrote:

 Hi,
 Follow the instructions to install on the following link:
 http://mbonaci.github.io/mbo-spark/
 You dont need to install spark on every node.Just install it on one node
 or you can install it on remote system also and made a spark cluster.
 Thanks
 Madhvi

 On Thursday 30 April 2015 09:31 AM, xiaohe lan wrote:

 Hi experts,

 I see spark on yarn has yarn-client and yarn-cluster mode. I also have a
 5 nodes hadoop cluster (hadoop 2.4). How to install spark if I want to try
 the spark on yarn mode.

 Do I need to install spark on the each node of hadoop cluster ?

 Thanks,
 Xiaohe



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Enabling Event Log

2015-04-30 Thread Shixiong Zhu
spark.history.fs.logDirectory is for the history server. For Spark
applications, they should use spark.eventLog.dir. Since you commented out
spark.eventLog.dir, it will be /tmp/spark-events. And this folder does
not exits.

Best Regards,
Shixiong Zhu

2015-04-29 23:22 GMT-07:00 James King jakwebin...@gmail.com:

 I'm unclear why I'm getting this exception.

 It seems to have realized that I want to enable  Event Logging but
 ignoring where I want it to log to i.e. file:/opt/cb/tmp/spark-events which
 does exist.

 spark-default.conf

 # Example:
 spark.master spark://master1:7077,master2:7077
 spark.eventLog.enabled   true
 spark.history.fs.logDirectoryfile:/opt/cb/tmp/spark-events
 # spark.eventLog.dir   hdfs://namenode:8021/directory
 # spark.serializer
 org.apache.spark.serializer.KryoSerializer
 # spark.driver.memory  5g
 # spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value
 -Dnumbers=one two three

 Exception following job submission:

 spark.eventLog.enabled=true
 spark.history.fs.logDirectory=file:/opt/cb/tmp/spark-events

 spark.jars=file:/opt/cb/scripts/spark-streamer/cb-spark-streamer-1.0-SNAPSHOT.jar
 spark.master=spark://master1:7077,master2:7077
 Exception in thread main java.lang.IllegalArgumentException: Log
 directory /tmp/spark-events does not exist.
 at
 org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:99)
 at org.apache.spark.SparkContext.init(SparkContext.scala:399)
 at
 org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:642)
 at
 org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:75)
 at
 org.apache.spark.streaming.api.java.JavaStreamingContext.init(JavaStreamingContext.scala:132)


 Many Thanks
 jk



Re: Timeout Error

2015-04-26 Thread Shixiong Zhu
The configuration key should be spark.akka.askTimeout for this timeout.
The time unit is seconds.

Best Regards,
Shixiong(Ryan) Zhu

2015-04-26 15:15 GMT-07:00 Deepak Gopalakrishnan dgk...@gmail.com:

 Hello,


 Just to add a bit more context :

 I have done that in the code, but I cannot see it change from 30 seconds
 in the log.

 .set(spark.executor.memory, 10g)

 .set(spark.driver.memory, 20g)

 .set(spark.akka.timeout,6000)

 PS : I understand that 6000 is quite large, but I'm just trying to see if
 it actually changes


 Here is the command that I'm running

  sudo MASTER=spark://master.spark.com:7077
 /opt/spark/spark-1.3.0-bin-hadoop2.4/bin/spark-submit --class
 class-name   --executor-memory 20G --driver-memory 10G  --deploy-mode
 client --conf spark.akka.timeout=6000 --conf spark.akka.askTimeout=6000
 jar file path


 and here is how I load the file JavaPairRDDString, String
 learningRdd=sc.wholeTextFiles(filePath,10);
 Thanks

 On Mon, Apr 27, 2015 at 3:36 AM, Bryan Cutler cutl...@gmail.com wrote:

 I'm not sure what the expected performance should be for this amount of
 data, but you could try to increase the timeout with the property
 spark.akka.timeout to see if that helps.

 Bryan

 On Sun, Apr 26, 2015 at 6:57 AM, Deepak Gopalakrishnan dgk...@gmail.com
 wrote:

 Hello All,

 I'm trying to process a 3.5GB file on standalone mode using spark. I
 could run my spark job succesfully on a 100MB file and it works as
 expected. But, when I try to run it on the 3.5GB file, I run into the below
 error :


 15/04/26 12:45:50 INFO BlockManagerMaster: Updated info of block 
 taskresult_83
 15/04/26 12:46:46 WARN AkkaUtils: Error sending message [message = 
 Heartbeat(2,[Lscala.Tuple2;@790223d3,BlockManagerId(2, master.spark.com, 
 39143))] in 1 attempts
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at 
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at 
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
 at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
 15/04/26 12:47:15 INFO MemoryStore: ensureFreeSpace(26227673) called with 
 curMem=265897, maxMem=5556991426
 15/04/26 12:47:15 INFO MemoryStore: Block taskresult_92 stored as bytes in 
 memory (estimated size 25.0 MB, free 5.2 GB)
 15/04/26 12:47:16 INFO MemoryStore: ensureFreeSpace(26272879) called with 
 curMem=26493570, maxMem=5556991426
 15/04/26 12:47:16 INFO MemoryStore: Block taskresult_94 stored as bytes in 
 memory (estimated size 25.1 MB, free 5.1 GB)
 15/04/26 12:47:18 INFO MemoryStore: ensureFreeSpace(26285327) called with 
 curMem=52766449, maxMem=5556991426


 and the job fails.


 I'm on AWS and have opened all ports. Also, since the 100MB file works,
 it should not be a connection issue.  I've a r3 xlarge and 2 m3 large.

 Can anyone suggest a way to fix this?

 --
 Regards,
 *Deepak Gopalakrishnan*
 *Mobile*:+918891509774
 *Skype* : deepakgk87
 http://myexps.blogspot.com





 --
 Regards,
 *Deepak Gopalakrishnan*
 *Mobile*:+918891509774
 *Skype* : deepakgk87
 http://myexps.blogspot.com




Re: Can't get SparkListener to work

2015-04-21 Thread Shixiong Zhu
You need to call sc.stop() to wait for the notifications to be processed.

Best Regards,
Shixiong(Ryan) Zhu

2015-04-21 4:18 GMT+08:00 Praveen Balaji secondorderpolynom...@gmail.com:

 Thanks Shixiong. I tried it out and it works.

 If you're looking at this post, here a few points you may be interested in:

 Turns out this is has to do with the difference between methods and
 function in scala - something I didn't pay attention to before. If you're
 looking at this thread, this may be an interesting post:

 http://jim-mcbeath.blogspot.com/2009/05/scala-functions-vs-methods.html

 Below is some test code. I added the Thread.sleep because it looks like
 Spark notifications happen asynchronously and the main/driver thread wont
 wait for the notifications to be complete. I'll look at that further later,
 but for now that's my inference, so don't take my word for it yet. Here's
 the code:

 object TestME {
   def main(args: Array[String]): Unit = {
 val conf = new SparkConf().setAppName(testme);
 val sc = new SparkContext(conf);
 try {
   foo(sc);
 } finally {
   Thread.sleep(2000);
 }
   }

   def foo(sc: SparkContext) = {
 sc.addSparkListener(new SparkListener() {
   override def onTaskStart(e: SparkListenerTaskStart) = println(
 onTaskStart);
   override def onTaskEnd(e: SparkListenerTaskEnd) = println(
 onTaskEnd);
 });

  sc.parallelize(List(1, 2, 3)).map(i = throw new
 SparkException(test)).collect();
   }
 }

 I'm running it from Eclipse on local[*].



 On Sun, Apr 19, 2015 at 7:57 PM, Praveen Balaji 
 secondorderpolynom...@gmail.com wrote:

 Thanks Shixiong. I'll try this.

 On Sun, Apr 19, 2015, 7:36 PM Shixiong Zhu zsxw...@gmail.com wrote:

 The problem is the code you use to test:


 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();

 is like the following example:

 def foo: Int = Nothing = {
   throw new SparkException(test)
 }
 sc.parallelize(List(1, 2, 3)).map(foo).collect();

 So actually the Spark jobs do not be submitted since it fails in `foo`
 that is used to create the map function.

 Change it to

 sc.parallelize(List(1, 2, 3)).map(i = throw new
 SparkException(test)).collect();

 And you will see the correct messages from your listener.



 Best Regards,
 Shixiong(Ryan) Zhu

 2015-04-19 1:06 GMT+08:00 Praveen Balaji 
 secondorderpolynom...@gmail.com:

 Thanks for the response, Archit. I get callbacks when I do not throw an
 exception from map.
 My use case, however, is to get callbacks for exceptions in
 transformations on executors. Do you think I'm going down the right route?

 Cheers
 -p

 On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur 
 archit279tha...@gmail.com wrote:

 Hi Praveen,
 Can you try once removing throw exception in map. Do you still not get
 it.?
 On Apr 18, 2015 8:14 AM, Praveen Balaji 
 secondorderpolynom...@gmail.com wrote:

 Thanks for the response, Imran. I probably chose the wrong methods
 for this email. I implemented all methods of SparkListener and the only
 callback I get is onExecutorMetricsUpdate.

 Here's the complete code:

 ==

 import org.apache.spark.scheduler._

 sc.addSparkListener(new SparkListener() {
   override def onStageCompleted(e: SparkListenerStageCompleted) =
 println( onStageCompleted);
   override def onStageSubmitted(e: SparkListenerStageSubmitted) =
 println( onStageSubmitted);
   override def onTaskStart(e: SparkListenerTaskStart) =
 println( onTaskStart);
   override def onTaskGettingResult(e:
 SparkListenerTaskGettingResult) = println( onTaskGettingResult);
   override def onTaskEnd(e: SparkListenerTaskEnd) = println(
 onTaskEnd);
   override def onJobStart(e: SparkListenerJobStart) =
 println( onJobStart);
   override def onJobEnd(e: SparkListenerJobEnd) = println(
 onJobEnd);
   override def onEnvironmentUpdate(e:
 SparkListenerEnvironmentUpdate) = println( onEnvironmentUpdate);
   override def onBlockManagerAdded(e:
 SparkListenerBlockManagerAdded) = println( onBlockManagerAdded);
   override def onBlockManagerRemoved(e:
 SparkListenerBlockManagerRemoved) = println( 
 onBlockManagerRemoved);
   override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
 println( onUnpersistRDD);
   override def onApplicationStart(e:
 SparkListenerApplicationStart) = println( onApplicationStart);
   override def onApplicationEnd(e: SparkListenerApplicationEnd) =
 println( onApplicationEnd);
   override def onExecutorMetricsUpdate(e:
 SparkListenerExecutorMetricsUpdate) = println(
 onExecutorMetricsUpdate);
 });

 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();

 =

 On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid iras...@cloudera.com
 wrote:

 when you start the spark-shell, its already too late to get the
 ApplicationStart event.  Try listening for StageCompleted or JobEnd 
 instead.

 On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji 
 secondorderpolynom...@gmail.com

Re: Can't get SparkListener to work

2015-04-19 Thread Shixiong Zhu
The problem is the code you use to test:

sc.parallelize(List(1, 2, 3)).map(throw new
SparkException(test)).collect();

is like the following example:

def foo: Int = Nothing = {
  throw new SparkException(test)
}
sc.parallelize(List(1, 2, 3)).map(foo).collect();

So actually the Spark jobs do not be submitted since it fails in `foo` that
is used to create the map function.

Change it to

sc.parallelize(List(1, 2, 3)).map(i = throw new
SparkException(test)).collect();

And you will see the correct messages from your listener.



Best Regards,
Shixiong(Ryan) Zhu

2015-04-19 1:06 GMT+08:00 Praveen Balaji secondorderpolynom...@gmail.com:

 Thanks for the response, Archit. I get callbacks when I do not throw an
 exception from map.
 My use case, however, is to get callbacks for exceptions in
 transformations on executors. Do you think I'm going down the right route?

 Cheers
 -p

 On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur archit279tha...@gmail.com
 wrote:

 Hi Praveen,
 Can you try once removing throw exception in map. Do you still not get
 it.?
 On Apr 18, 2015 8:14 AM, Praveen Balaji 
 secondorderpolynom...@gmail.com wrote:

 Thanks for the response, Imran. I probably chose the wrong methods for
 this email. I implemented all methods of SparkListener and the only
 callback I get is onExecutorMetricsUpdate.

 Here's the complete code:

 ==

 import org.apache.spark.scheduler._

 sc.addSparkListener(new SparkListener() {
   override def onStageCompleted(e: SparkListenerStageCompleted) =
 println( onStageCompleted);
   override def onStageSubmitted(e: SparkListenerStageSubmitted) =
 println( onStageSubmitted);
   override def onTaskStart(e: SparkListenerTaskStart) =
 println( onTaskStart);
   override def onTaskGettingResult(e:
 SparkListenerTaskGettingResult) = println( onTaskGettingResult);
   override def onTaskEnd(e: SparkListenerTaskEnd) = println(
 onTaskEnd);
   override def onJobStart(e: SparkListenerJobStart) = println(
 onJobStart);
   override def onJobEnd(e: SparkListenerJobEnd) = println(
 onJobEnd);
   override def onEnvironmentUpdate(e:
 SparkListenerEnvironmentUpdate) = println( onEnvironmentUpdate);
   override def onBlockManagerAdded(e:
 SparkListenerBlockManagerAdded) = println( onBlockManagerAdded);
   override def onBlockManagerRemoved(e:
 SparkListenerBlockManagerRemoved) = println( onBlockManagerRemoved);
   override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
 println( onUnpersistRDD);
   override def onApplicationStart(e: SparkListenerApplicationStart)
 = println( onApplicationStart);
   override def onApplicationEnd(e: SparkListenerApplicationEnd) =
 println( onApplicationEnd);
   override def onExecutorMetricsUpdate(e:
 SparkListenerExecutorMetricsUpdate) = println(
 onExecutorMetricsUpdate);
 });

 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();

 =

 On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid iras...@cloudera.com
 wrote:

 when you start the spark-shell, its already too late to get the
 ApplicationStart event.  Try listening for StageCompleted or JobEnd 
 instead.

 On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji 
 secondorderpolynom...@gmail.com wrote:

 I'm trying to create a simple SparkListener to get notified of error
 on executors. I do not get any call backs on my SparkListener. Here some
 simple code I'm executing in spark-shell. But I still don't get any
 callbacks on my listener. Am I doing something wrong?

 Thanks for any clue you can send my way.

 Cheers
 Praveen

 ==
 import org.apache.spark.scheduler.SparkListener
 import org.apache.spark.scheduler.SparkListenerApplicationStart
 import org.apache.spark.scheduler.SparkListenerApplicationEnd
 import org.apache.spark.SparkException

 sc.addSparkListener(new SparkListener() {
   override def onApplicationStart(applicationStart:
 SparkListenerApplicationStart) {
 println( onApplicationStart:  +
 applicationStart.appName);
   }

   override def onApplicationEnd(applicationEnd:
 SparkListenerApplicationEnd) {
 println( onApplicationEnd:  + applicationEnd.time);
   }
 });

 sc.parallelize(List(1, 2, 3)).map(throw new
 SparkException(test)).collect();
 ===

 output:

 scala org.apache.spark.SparkException: hshsh
 at $iwC$$iwC$$iwC$$iwC.init(console:29)
 at $iwC$$iwC$$iwC.init(console:34)
 at $iwC$$iwC.init(console:36)
 at $iwC.init(console:38)







Re: Actor not found

2015-04-17 Thread Shixiong Zhu
I just checked the codes about creating OutputCommitCoordinator. Could you
reproduce this issue? If so, could you provide details about how to
reproduce it?

Best Regards,
Shixiong(Ryan) Zhu

2015-04-16 13:27 GMT+08:00 Canoe canoe...@gmail.com:

 13119 Exception in thread main akka.actor.ActorNotFound: Actor not found
 for: ActorSelection[Anchor(akka.tcp://sparkdri...@dmslave13.et2.tbsi
 te.net:5908/), Path(/user/OutputCommitCoordinator)]
 13120 at

 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
 13121 at

 akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
 13122 at
 scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
 13123 at

 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
 13124 at

 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
 13125 at

 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 13126 at

 akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
 13127 at
 scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 13128 at
 akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
 13129 at

 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
 13130 at
 akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
 13131 at

 akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
 13132 at
 scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
 13133 at
 scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
 13134 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
 13135 at
 akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
 13136 at
 akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
 13137 at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
 13138 at

 akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
 13139 at
 akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575)
 13140 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 13141 at
 akka.remote.EndpointManager.aroundReceive(Remoting.scala:395)
 13142 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 13143 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 13144 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
 13145 at akka.dispatch.Mailbox.run(Mailbox.scala:220)
 13146 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 13147 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 13148 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 13149 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 13150 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 I met the same problem when I run spark on yarn. Is this a bug or what ?




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Actor-not-found-tp22265p22508.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark streaming printing no output

2015-04-15 Thread Shixiong Zhu
So the time niterval is much less than 1000 ms as you set in the code?
That's weird. Could you check the whole outputs to confirm that the content
won't be flushed by logs?

Best Regards,
Shixiong(Ryan) Zhu

2015-04-15 15:04 GMT+08:00 Shushant Arora shushantaror...@gmail.com:

 Yes only Time: 142905487 ms  strings gets printed on console.
 No output is getting printed.
 And timeinterval between two strings of form ( time:ms)is very less
 than Streaming Duration set in program.

 On Wed, Apr 15, 2015 at 5:11 AM, Shixiong Zhu zsxw...@gmail.com wrote:

 Could you see something like this in the console?

 ---
 Time: 142905487 ms
 ---


 Best Regards,
 Shixiong(Ryan) Zhu

 2015-04-15 2:11 GMT+08:00 Shushant Arora shushantaror...@gmail.com:

 Hi

 I am running a spark streaming application but on console nothing is
 getting printed.

 I am doing
 1.bin/spark-shell --master clusterMgrUrl
 2.import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.Duration
 import org.apache.spark.streaming.Seconds
 val ssc = new StreamingContext( sc, Seconds(1))
 val lines = ssc.socketTextStream(hostname,)
 lines.print()
 ssc.start()
 ssc.awaitTermination()

 Jobs are getting created when I see webUI but nothing gets printed on
 console.

 I have started a nc script on hostname  port  and can see messages
 typed on this port from another console.



 Please let me know If I am doing something wrong.








Re: spark streaming printing no output

2015-04-14 Thread Shixiong Zhu
Could you see something like this in the console?

---
Time: 142905487 ms
---


Best Regards,
Shixiong(Ryan) Zhu

2015-04-15 2:11 GMT+08:00 Shushant Arora shushantaror...@gmail.com:

 Hi

 I am running a spark streaming application but on console nothing is
 getting printed.

 I am doing
 1.bin/spark-shell --master clusterMgrUrl
 2.import org.apache.spark.streaming.StreamingContext
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.dstream.DStream
 import org.apache.spark.streaming.Duration
 import org.apache.spark.streaming.Seconds
 val ssc = new StreamingContext( sc, Seconds(1))
 val lines = ssc.socketTextStream(hostname,)
 lines.print()
 ssc.start()
 ssc.awaitTermination()

 Jobs are getting created when I see webUI but nothing gets printed on
 console.

 I have started a nc script on hostname  port  and can see messages
 typed on this port from another console.



 Please let me know If I am doing something wrong.






Re: Actor not found

2015-03-31 Thread Shixiong Zhu
Thanks for the log. It's really helpful. I created a JIRA to explain why it
will happen: https://issues.apache.org/jira/browse/SPARK-6640

However, will this error always happens in your environment?

Best Regards,
Shixiong Zhu

2015-03-31 22:36 GMT+08:00 sparkdi shopaddr1...@dubna.us:

 This is the whole output from the shell:

 ~/spark-1.3.0-bin-hadoop2.4$ sudo bin/spark-shell
 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 log4j:WARN No appenders could be found for logger
 (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
 log4j:WARN Please initialize the log4j system properly.
 log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
 more info.
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 15/03/30 19:00:40 INFO SecurityManager: Changing view acls to: root
 15/03/30 19:00:40 INFO SecurityManager: Changing modify acls to: root
 15/03/30 19:00:40 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view pe
 rmissions: Set(root); users with modify permissions: Set(root)
 15/03/30 19:00:40 INFO HttpServer: Starting HTTP Server
 15/03/30 19:00:40 INFO Server: jetty-8.y.z-SNAPSHOT
 15/03/30 19:00:40 INFO AbstractConnector: Started
 SocketConnector@0.0.0.0:47797
 15/03/30 19:00:40 INFO Utils: Successfully started service 'HTTP class
 server' on port 47797.
 Welcome to
     __
  / __/__  ___ _/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 1.3.0
   /_/

 Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
 Type in expressions to have them evaluated.
 Type :help for more information.
 15/03/30 19:00:42 INFO SparkContext: Running Spark version 1.3.0
 15/03/30 19:00:42 INFO SecurityManager: Changing view acls to: root
 15/03/30 19:00:42 INFO SecurityManager: Changing modify acls to: root
 15/03/30 19:00:42 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view pe
 rmissions: Set(root); users with modify permissions: Set(root)
 15/03/30 19:00:42 INFO Slf4jLogger: Slf4jLogger started
 15/03/30 19:00:42 INFO Remoting: Starting remoting
 15/03/30 19:00:43 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkDriver@vm:52574]
 15/03/30 19:00:43 INFO Utils: Successfully started service 'sparkDriver' on
 port 52574.
 15/03/30 19:00:43 INFO SparkEnv: Registering MapOutputTracker
 15/03/30 19:00:43 INFO SparkEnv: Registering BlockManagerMaster
 15/03/30 19:00:43 INFO DiskBlockManager: Created local directory at
 /tmp/spark-f71a8d86-6e49-4dfe-bb98-8e8581015acc/bl
 ockmgr-57532f5a-38db-4ba3-86d8-edef84f592e5
 15/03/30 19:00:43 INFO MemoryStore: MemoryStore started with capacity 265.4
 MB
 15/03/30 19:00:43 INFO HttpFileServer: HTTP File server directory is
 /tmp/spark-95e0a143-0de3-4c96-861c-968c9fae2746/h
 ttpd-cb029cd6-4943-479d-9b56-e7397489d9ea
 15/03/30 19:00:43 INFO HttpServer: Starting HTTP Server
 15/03/30 19:00:43 INFO Server: jetty-8.y.z-SNAPSHOT
 15/03/30 19:00:43 INFO AbstractConnector: Started
 SocketConnector@0.0.0.0:48500
 15/03/30 19:00:43 INFO Utils: Successfully started service 'HTTP file
 server' on port 48500.
 15/03/30 19:00:43 INFO SparkEnv: Registering OutputCommitCoordinator
 15/03/30 19:00:43 INFO Server: jetty-8.y.z-SNAPSHOT
 15/03/30 19:00:43 INFO AbstractConnector: Started
 SelectChannelConnector@0.0.0.0:4040
 15/03/30 19:00:43 INFO Utils: Successfully started service 'SparkUI' on
 port
 4040.
 15/03/30 19:00:43 INFO SparkUI: Started SparkUI at http://vm:4040
 15/03/30 19:00:43 INFO Executor: Starting executor ID driver on host
 localhost
 15/03/30 19:00:43 INFO Executor: Using REPL class URI:
 http://10.11.204.80:47797
 15/03/30 19:00:43 INFO AkkaUtils: Connecting to HeartbeatReceiver:
 akka.tcp://sparkDriver@vm:5
 2574/user/HeartbeatReceiver
 15/03/30 19:00:43 ERROR OneForOneStrategy: Actor not found for:
 ActorSelection[Anchor(akka://sparkDriver/deadLetters),
 Path(/)]
 akka.actor.ActorInitializationException: exception during creation
 at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
 at akka.actor.ActorCell.create(ActorCell.scala:596)
 at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
 at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
 at
 akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused

Re: Actor not found

2015-03-30 Thread Shixiong Zhu
Could you paste the whole stack trace here?

Best Regards,
Shixiong Zhu

2015-03-31 2:26 GMT+08:00 sparkdi shopaddr1...@dubna.us:

 I have the same problem, i.e. exception with the same call stack when I
 start
 either pyspark or spark-shell. I use spark-1.3.0-bin-hadoop2.4 on ubuntu
 14.10.
 bin/pyspark

 bunch of INFO messages, then ActorInitializationException exception.
 Shell starts, I can do this:
  rd = sc.parallelize([1,2])
  rd.first()
 This call does not return.
 Also if I start master, and then I tried to connect shell to the master it
 fails to connect complaining about master URL.

 The same tar works fine on windows.

 Maybe some linux versions are not supported?
 Thank you
 Dima



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Actor-not-found-tp22265p22300.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to specify the port for AM Actor ...

2015-03-29 Thread Shixiong Zhu
LGTM. Could you open a JIRA and send a PR? Thanks.

Best Regards,
Shixiong Zhu

2015-03-28 7:14 GMT+08:00 Manoj Samel manojsamelt...@gmail.com:

 I looked @ the 1.3.0 code and figured where this can be added

 In org.apache.spark.deploy.yarn ApplicationMaster.scala:282 is

 actorSystem = AkkaUtils.createActorSystem(sparkYarnAM,
 Utils.localHostName, 0,
   conf = sparkConf, securityManager = securityMgr)._1


 If I change it to below, then I can start it on the port I want.

 val port = sparkConf.getInt(spark.am.actor.port, 0) // New property
 ...
 actorSystem = AkkaUtils.createActorSystem(sparkYarnAM,
 Utils.localHostName, port,
   conf = sparkConf, securityManager = securityMgr)._1

 Thoughts? Any other place where any change is needed?



 On Wed, Mar 25, 2015 at 4:44 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 There is no configuration for it now.

 Best Regards,
 Shixiong Zhu

 2015-03-26 7:13 GMT+08:00 Manoj Samel manojsamelt...@gmail.com:

 There may be firewall rules limiting the ports between host running
 spark and the hadoop cluster. In that case, not all ports are allowed.

 Can it be a range of ports that can be specified ?

 On Wed, Mar 25, 2015 at 4:06 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 It's a random port to avoid port conflicts, since multiple AMs can run
 in the same machine. Why do you need a fixed port?

 Best Regards,
 Shixiong Zhu

 2015-03-26 6:49 GMT+08:00 Manoj Samel manojsamelt...@gmail.com:

 Spark 1.3, Hadoop 2.5, Kerbeors

 When running spark-shell in yarn client mode, it shows following
 message with a random port every time (44071 in example below). Is there a
 way to specify that port to a specific port ? It does not seem to be part
 of ports specified in
 http://spark.apache.org/docs/latest/configuration.html spark.xxx.port
 ...

 Thanks,

 15/03/25 22:27:10 INFO Client: Application report for
 application_1427316153428_0014 (state: ACCEPTED)
 15/03/25 22:27:10 INFO YarnClientSchedulerBackend: ApplicationMaster
 registered as Actor[akka.tcp://sparkYarnAM@xyz
 :44071/user/YarnAM#-1989273896]








Re: How to specify the port for AM Actor ...

2015-03-25 Thread Shixiong Zhu
There is no configuration for it now.

Best Regards,
Shixiong Zhu

2015-03-26 7:13 GMT+08:00 Manoj Samel manojsamelt...@gmail.com:

 There may be firewall rules limiting the ports between host running spark
 and the hadoop cluster. In that case, not all ports are allowed.

 Can it be a range of ports that can be specified ?

 On Wed, Mar 25, 2015 at 4:06 PM, Shixiong Zhu zsxw...@gmail.com wrote:

 It's a random port to avoid port conflicts, since multiple AMs can run in
 the same machine. Why do you need a fixed port?

 Best Regards,
 Shixiong Zhu

 2015-03-26 6:49 GMT+08:00 Manoj Samel manojsamelt...@gmail.com:

 Spark 1.3, Hadoop 2.5, Kerbeors

 When running spark-shell in yarn client mode, it shows following message
 with a random port every time (44071 in example below). Is there a way to
 specify that port to a specific port ? It does not seem to be part of ports
 specified in http://spark.apache.org/docs/latest/configuration.html
 spark.xxx.port ...

 Thanks,

 15/03/25 22:27:10 INFO Client: Application report for
 application_1427316153428_0014 (state: ACCEPTED)
 15/03/25 22:27:10 INFO YarnClientSchedulerBackend: ApplicationMaster
 registered as Actor[akka.tcp://sparkYarnAM@xyz
 :44071/user/YarnAM#-1989273896]






Re: How to specify the port for AM Actor ...

2015-03-25 Thread Shixiong Zhu
It's a random port to avoid port conflicts, since multiple AMs can run in
the same machine. Why do you need a fixed port?

Best Regards,
Shixiong Zhu

2015-03-26 6:49 GMT+08:00 Manoj Samel manojsamelt...@gmail.com:

 Spark 1.3, Hadoop 2.5, Kerbeors

 When running spark-shell in yarn client mode, it shows following message
 with a random port every time (44071 in example below). Is there a way to
 specify that port to a specific port ? It does not seem to be part of ports
 specified in http://spark.apache.org/docs/latest/configuration.html
 spark.xxx.port ...

 Thanks,

 15/03/25 22:27:10 INFO Client: Application report for
 application_1427316153428_0014 (state: ACCEPTED)
 15/03/25 22:27:10 INFO YarnClientSchedulerBackend: ApplicationMaster
 registered as Actor[akka.tcp://sparkYarnAM@xyz
 :44071/user/YarnAM#-1989273896]



Re: Handling fatal errors of executors and decommission datanodes

2015-03-16 Thread Shixiong Zhu
There are 2 cases for No space left on device:

1. Some tasks which use large temp space cannot run in any node.
2. The free space of datanodes is not balance. Some tasks which use large
temp space can not run in several nodes, but they can run in other nodes
successfully.

Because most of our cases are the second one, we set
spark.scheduler.executorTaskBlacklistTime to 3 to solve such No
space left on device errors. So if a task runs unsuccessfully in some
executor, it won't be scheduled to the same executor in 30 seconds.


Best Regards,
Shixiong Zhu

2015-03-16 17:40 GMT+08:00 Jianshi Huang jianshi.hu...@gmail.com:

 I created a JIRA: https://issues.apache.org/jira/browse/SPARK-6353


 On Mon, Mar 16, 2015 at 5:36 PM, Jianshi Huang jianshi.hu...@gmail.com
 wrote:

 Hi,

 We're facing No space left on device errors lately from time to time.
 The job will fail after retries. Obvious in such case, retry won't be
 helpful.

 Sure it's the problem in the datanodes but I'm wondering if Spark Driver
 can handle it and decommission the problematic datanode before retrying it.
 And maybe dynamically allocate another datanode if dynamic allocation is
 enabled.

 I think there needs to be a class of fatal errors that can't be recovered
 with retries. And it's best Spark can handle it nicely.

 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/




 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/



Re: Support for skewed joins in Spark

2015-03-12 Thread Shixiong Zhu
I sent a PR to add skewed join last year:
https://github.com/apache/spark/pull/3505
However, it does not split a key to multiple partitions. Instead, if a key
has too many values that can not be fit in to memory, it will store the
values into the disk temporarily and use disk files to do the join.

Best Regards,
Shixiong Zhu

2015-03-13 9:37 GMT+08:00 Soila Pertet Kavulya skavu...@gmail.com:

 Does Spark support skewed joins similar to Pig which distributes large
 keys over multiple partitions? I tried using the RangePartitioner but
 I am still experiencing failures because some keys are too large to
 fit in a single partition. I cannot use broadcast variables to
 work-around this because both RDDs are too large to fit in driver
 memory.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: java.util.NoSuchElementException: key not found:

2015-02-27 Thread Shixiong Zhu
RDD is not thread-safe. You should not use it in multiple threads.

Best Regards,
Shixiong Zhu

2015-02-27 23:14 GMT+08:00 rok rokros...@gmail.com:

 I'm seeing this java.util.NoSuchElementException: key not found: exception
 pop up sometimes when I run operations on an RDD from multiple threads in a
 python application. It ends up shutting down the SparkContext so I'm
 assuming this is a bug -- from what I understand, I should be able to run
 operations on the same RDD from multiple threads or is this not
 recommended?

 I can't reproduce it all the time and I've tried eliminating caching
 wherever possible to see if that would have an effect, but it doesn't seem
 to. Each thread first splits the base RDD and then runs the
 LogisticRegressionWithSGD on the subset.

 Is there a workaround to this exception?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-util-NoSuchElementException-key-not-found-tp21848.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Not able to update collections

2015-02-24 Thread Shixiong Zhu
Rdd.foreach runs in the executors. You should use `collect` to fetch data
to the driver. E.g.,

myRdd.collect().foreach {
node = {
mp(node) = 1
}
  }


Best Regards,
Shixiong Zhu

2015-02-25 4:00 GMT+08:00 Vijayasarathy Kannan kvi...@vt.edu:

 Thanks, but it still doesn't seem to work.

 Below is my entire code.

   var mp = scala.collection.mutable.Map[VertexId, Int]()

   var myRdd = graph.edges.groupBy[VertexId](f).flatMap {
  edgesBySrc = func(edgesBySrc, a, b)
   }

   myRdd.foreach {
 node = {
 mp(node) = 1
 }
   }

 Values in mp do not get updated for any element in myRdd.

 On Tue, Feb 24, 2015 at 2:39 PM, Sean Owen so...@cloudera.com wrote:

 Instead of

 ...foreach {
   edgesBySrc = {
   lst ++= func(edgesBySrc)
   }
 }

 try

 ...flatMap { edgesBySrc = func(edgesBySrc) }

 or even more succinctly

 ...flatMap(func)

 This returns an RDD that basically has the list you are trying to
 build, I believe.

 You can collect() to the driver but beware if it is a huge data set.

 If you really just mean to count the results, you can count() instead

 On Tue, Feb 24, 2015 at 7:35 PM, Vijayasarathy Kannan kvi...@vt.edu
 wrote:
  I am a beginner to Scala/Spark. Could you please elaborate on how to
 make
  RDD of results of func() and collect?
 
 
  On Tue, Feb 24, 2015 at 2:27 PM, Sean Owen so...@cloudera.com wrote:
 
  They aren't the same 'lst'. One is on your driver. It gets copied to
  executors when the tasks are executed. Those copies are updated. But
  the updates will never reflect in the local copy back in the driver.
 
  You may just wish to make an RDD of the results of func() and
  collect() them back to the driver.
 
  On Tue, Feb 24, 2015 at 7:20 PM, kvvt kvi...@vt.edu wrote:
   I am working on the below piece of code.
  
   var lst = scala.collection.mutable.MutableList[VertexId]()
   graph.edges.groupBy[VertexId](f).foreach {
 edgesBySrc = {
 lst ++= func(edgesBySrc)
 }
   }
  
   println(lst.length)
  
   Here, the final println() always says that the length of the list is
 0.
   The
   list is non-empty (correctly prints the length of the returned list
   inside
   func()).
  
   I am not sure if I am doing the append correctly. Can someone point
 out
   what
   I am doing wrong?
  
  
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Not-able-to-update-collections-tp21790.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 





Re: Problem with changing the akka.framesize parameter

2015-02-04 Thread Shixiong Zhu
The unit of spark.akka.frameSize is MB. The max value is 2047.

Best Regards,
Shixiong Zhu

2015-02-05 1:16 GMT+08:00 sahanbull sa...@skimlinks.com:

 I am trying to run a spark application with

 -Dspark.executor.memory=30g -Dspark.kryoserializer.buffer.max.mb=2000
 -Dspark.akka.frameSize=1

 and the job fails because one or more of the akka frames are larger than
 1mb (12000 ish).

 When I change the Dspark.akka.frameSize=1 to 12000,15000 and 2 and
 RUN:

 ./spark/bin/spark-submit  --driver-memory 30g --executor-memory 30g
 mySparkCode.py

 I get an error in the startup as :


 ERROR OneForOneStrategy: Cannot instantiate transport
 [akka.remote.transport.netty.NettyTransport]. Make sure it extends
 [akka.remote.transport.Transport] and ha
 s constructor with [akka.actor.ExtendedActorSystem] and
 [com.typesafe.config.Config] parameters
 java.lang.IllegalArgumentException: Cannot instantiate transport
 [akka.remote.transport.netty.NettyTransport]. Make sure it extends
 [akka.remote.transport.Transport] and has const
 ructor with [akka.actor.ExtendedActorSystem] and
 [com.typesafe.config.Config] parameters
 at

 akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:620)
 at

 akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:618)
 at

 scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
 at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
 at scala.util.Try$.apply(Try.scala:161)
 at scala.util.Failure.recover(Try.scala:185)
 at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)
 at akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)
 at

 scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at
 scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
 at

 akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)
 at

 akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.IllegalArgumentException: requirement failed: Setting
 'maximum-frame-size' must be at least 32000 bytes
 at scala.Predef$.require(Predef.scala:233)
 at
 akka.util.Helpers$Requiring$.requiring$extension1(Helpers.scala:104)
 at

 org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
 at
 org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
 at
 org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
 at

 org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at
 org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442)
 at
 org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
 at org.apache.spark.SparkEnv$.create(SparkEnv.scala:153)
 at org.apache.spark.SparkContext.init(SparkContext.scala:203)
 at

 org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at

 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at

 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
 at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
 at py4j.Gateway.invoke(Gateway.java:214)
 at

 py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79

Re: Problem with changing the akka.framesize parameter

2015-02-04 Thread Shixiong Zhu
Could you clarify why you need a 10G akka frame size?

Best Regards,
Shixiong Zhu

2015-02-05 9:20 GMT+08:00 Shixiong Zhu zsxw...@gmail.com:

 The unit of spark.akka.frameSize is MB. The max value is 2047.

 Best Regards,
 Shixiong Zhu

 2015-02-05 1:16 GMT+08:00 sahanbull sa...@skimlinks.com:

 I am trying to run a spark application with

 -Dspark.executor.memory=30g -Dspark.kryoserializer.buffer.max.mb=2000
 -Dspark.akka.frameSize=1

 and the job fails because one or more of the akka frames are larger than
 1mb (12000 ish).

 When I change the Dspark.akka.frameSize=1 to 12000,15000 and 2 and
 RUN:

 ./spark/bin/spark-submit  --driver-memory 30g --executor-memory 30g
 mySparkCode.py

 I get an error in the startup as :


 ERROR OneForOneStrategy: Cannot instantiate transport
 [akka.remote.transport.netty.NettyTransport]. Make sure it extends
 [akka.remote.transport.Transport] and ha
 s constructor with [akka.actor.ExtendedActorSystem] and
 [com.typesafe.config.Config] parameters
 java.lang.IllegalArgumentException: Cannot instantiate transport
 [akka.remote.transport.netty.NettyTransport]. Make sure it extends
 [akka.remote.transport.Transport] and has const
 ructor with [akka.actor.ExtendedActorSystem] and
 [com.typesafe.config.Config] parameters
 at

 akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:620)
 at

 akka.remote.EndpointManager$$anonfun$8$$anonfun$3.applyOrElse(Remoting.scala:618)
 at

 scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
 at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
 at scala.util.Try$.apply(Try.scala:161)
 at scala.util.Failure.recover(Try.scala:185)
 at
 akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:618)
 at
 akka.remote.EndpointManager$$anonfun$8.apply(Remoting.scala:610)
 at

 scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at
 scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
 at

 akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:610)
 at

 akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:450)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.lang.IllegalArgumentException: requirement failed: Setting
 'maximum-frame-size' must be at least 32000 bytes
 at scala.Predef$.require(Predef.scala:233)
 at
 akka.util.Helpers$Requiring$.requiring$extension1(Helpers.scala:104)
 at

 org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
 at
 org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
 at
 org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
 at

 org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446)
 at
 scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at
 org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442)
 at
 org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
 at org.apache.spark.SparkEnv$.create(SparkEnv.scala:153)
 at org.apache.spark.SparkContext.init(SparkContext.scala:203)
 at

 org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at

 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at

 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
 at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
 at py4j.Gateway.invoke

Re: ClassNotFoundException when registering classes with Kryo

2015-02-01 Thread Shixiong Zhu
It's a bug that has been fixed in https://github.com/apache/spark/pull/4258
but not yet been merged.

Best Regards,
Shixiong Zhu

2015-02-02 10:08 GMT+08:00 Arun Lists lists.a...@gmail.com:

 Here is the relevant snippet of code in my main program:

 ===

 sparkConf.set(spark.serializer,
   org.apache.spark.serializer.KryoSerializer)
 sparkConf.set(spark.kryo.registrationRequired, true)
 val summaryDataClass = classOf[SummaryData]
 val summaryViewClass = classOf[SummaryView]
 sparkConf.registerKryoClasses(Array(

   summaryDataClass, summaryViewClass))

 ===

 I get the following error:

 Exception in thread main java.lang.reflect.InvocationTargetException
 ...

 Caused by: org.apache.spark.SparkException: Failed to load class to
 register with Kryo
 ...

 Caused by: java.lang.ClassNotFoundException:
 com.dtex.analysis.transform.SummaryData


 Note that the class in question SummaryData is in the same package as the
 main program and hence in the same jar.

 What do I need to do to make this work?

 Thanks,
 arun





Re: Issue with SparkContext in cluster

2015-01-28 Thread Shixiong Zhu
It's because you committed the job in Windows to a Hadoop cluster running
in Linux. Spark has not yet supported it. See
https://issues.apache.org/jira/browse/SPARK-1825

Best Regards,
Shixiong Zhu

2015-01-28 17:35 GMT+08:00 Marco marco@gmail.com:

 I've created a spark app, which runs fine if I copy the corresponding
 jar to the hadoop-server (where yarn is running) and submit it there.

 If it try it to submit it from my local machine, I get the error which
 I've attached below.
 Submit cmd: spark-submit.cmd --class
 ExamplesHadoop.SparkHbase.TruckEvents  --master yarn-cluster
 .\SparkHbase-1.0-SNAPSHOT-jar-with-dependencies.jar

 Even after raising the time for the yarn log, I have the issue that
 there is no log when I try to get it via yarn logs -applicationId
 myApplicationId

 Any hints how I could find the root cause of this issue?

 Thanks,
 Marco


 
 15/01/28 10:25:06 INFO spark.SecurityManager: Changing modify acls to: user
 15/01/28 10:25:06 INFO spark.SecurityManager: SecurityManager:
 authentication disabled; ui acls disabled; users with view
 permissions: Set(user); users with modify permissions:
 Set((user))
 15/01/28 10:25:06 INFO yarn.Client: Submitting application 9 to
 ResourceManager
 15/01/28 10:25:06 INFO impl.YarnClientImpl: Submitted application
 application_1422368366192_0009
 15/01/28 10:25:07 INFO yarn.Client: Application report for
 application_1422368366192_0009 (state: ACCEPTED)
 15/01/28 10:25:07 INFO yarn.Client:
  client token: N/A
  diagnostics: N/A
  ApplicationMaster host: N/A
  ApplicationMaster RPC port: -1
  queue: default
  start time: 1422437106550
  final status: UNDEFINED
  tracking URL:
 http://server:8088/proxy/application_1422368366192_0009/
  user: root
 15/01/28 10:25:08 INFO yarn.Client: Application report for
 application_1422368366192_0009 (state: ACCEPTED)
 15/01/28 10:25:09 INFO yarn.Client: Application report for
 application_1422368366192_0009 (state: ACCEPTED)
 15/01/28 10:25:10 INFO yarn.Client: Application report for
 application_1422368366192_0009 (state: ACCEPTED)
 15/01/28 10:25:11 INFO yarn.Client: Application report for
 application_1422368366192_0009 (state: FAILED)
 15/01/28 10:25:11 INFO yarn.Client:
  client token: N/A
  diagnostics: Application application_1422368366192_0009 failed 2
 times due to AM Container for appattempt_1422368366192_0009_02
 exited with  exitCode: 1
 For more detailed output, check application tracking
 page:http://server:8088/proxy/application_1422368366192_0009/Then,
 click on links to logs of each attempt.
 Diagnostics: Exception from container-launch.
 Container id: container_1422368366192_0009_02_01
 Exit code: 1
 Exception message:

 /hadoop/yarn/local/usercache/root/appcache/application_1422368366192_0009/container_1422368366192_0009_02_01/launch_container.sh:
 line 27: %PWD%;%PWD%/__spark__.jar;$H

 ADOOP_CONF_DIR;/usr/hdp/current/hadoop-client/*;/usr/hdp/current/hadoop-client/lib/*;/usr/hdp/current/hadoop-hdfs-client/*;/usr/hdp/current/hadoop-hdfs-client/lib/*;/usr/hdp/current/hadoop-y

 arn-client/*;/usr/hdp/current/hadoop-yarn-client/lib/*;$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/

 hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/shar

 e/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure;%PWD%/__app__.jar;%PWD%/*:
 bad substitution
 /bin/bash: line 0: fg: no job control
 Stack trace: ExitCodeException exitCode=1:

 /hadoop/yarn/local/usercache/root/appcache/application_1422368366192_0009/container_1422368366192_0009_02_01/launch_container.sh:
 line 27: %PWD

 %;%PWD%/__spark__.jar;$HADOOP_CONF_DIR;/usr/hdp/current/hadoop-client/*;/usr/hdp/current/hadoop-client/lib/*;/usr/hdp/current/hadoop-hdfs-client/*;/usr/hdp/current/hadoop-hdfs-client/lib/*;/

 usr/hdp/current/hadoop-yarn-client/*;/usr/hdp/current/hadoop-yarn-client/lib/*;$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr

 -framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/

 mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure;%PWD%/
 __app__.jar;%PWD%/*: bad substitution
 /bin/bash: line 0: fg: no job control
 at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
 at org.apache.hadoop.util.Shell.run(Shell.java:455)
 at
 org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715

Re: Trying to execute Spark in Yarn

2015-01-08 Thread Shixiong Zhu
`--jars` accepts a comma-separated list of jars. See the usage about
`--jars`

--jars JARS Comma-separated list of local jars to include on the driver and
executor classpaths.



Best Regards,
Shixiong Zhu

2015-01-08 19:23 GMT+08:00 Guillermo Ortiz konstt2...@gmail.com:

 I'm trying to execute Spark from a Hadoop Cluster, I have created this
 script to try it:

 #!/bin/bash

 export HADOOP_CONF_DIR=/etc/hadoop/conf
 SPARK_CLASSPATH=
 for lib in `ls /user/local/etc/lib/*.jar`
 do
 SPARK_CLASSPATH=$SPARK_CLASSPATH:$lib
 done
 /home/spark-1.1.1-bin-hadoop2.4/bin/spark-submit --name Streaming
 --master yarn-cluster --class com.sparkstreaming.Executor --jars
 $SPARK_CLASSPATH --executor-memory 10g
 /user/local/etc/lib/my-spark-streaming-scala.jar

 When I execute the script I get this error:

 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 Exception in thread main java.net.URISyntaxException: Expected
 scheme name at index 0:

 :/user/local/etc/lib/akka-actor_2.10-2.2.3-shaded-protobuf.jar:/user/local/etc/lib/akka-remote_2.10-..
 
 

 -maths-1.2.2a.jar:/user/local/etc/lib/xmlenc-0.52.jar:/user/local/etc/lib/zkclient-0.3.jar:/user/local/etc/lib/zookeeper-3.4.5.jar
 at java.net.URI$Parser.fail(URI.java:2829)
 at java.net.URI$Parser.failExpecting(URI.java:2835)
 at java.net.URI$Parser.parse(URI.java:3027)
 at java.net.URI.init(URI.java:595)
 at org.apache.spark.util.Utils$.resolveURI(Utils.scala:1396)
 at
 org.apache.spark.util.Utils$$anonfun$resolveURIs$1.apply(Utils.scala:1419)
 at
 org.apache.spark.util.Utils$$anonfun$resolveURIs$1.apply(Utils.scala:1419)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at org.apache.spark.util.Utils$.resolveURIs(Utils.scala:1419)
 at
 org.apache.spark.deploy.SparkSubmitArguments.parse$1(SparkSubmitArguments.scala:308)
 at
 org.apache.spark.deploy.SparkSubmitArguments.parseOpts(SparkSubmitArguments.scala:221)
 at
 org.apache.spark.deploy.SparkSubmitArguments.init(SparkSubmitArguments.scala:65)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:70)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)



 Why do I get this error? I have no idea. Any clue?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark 1.2 defaults to MR1 class when calling newAPIHadoopRDD

2015-01-07 Thread Shixiong Zhu
I have not used CDH5.3.0. But looks
spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar contains some
hadoop1 jars (come from a wrong hbase version).

I don't know the recommanded way to build spark-examples jar because the
official Spark docs does not mention how to build spark-examples jar. For
me, I will addd -Dhbase.profile=hadoop2 to the build instruction so that
the examples project will use a haoop2-compatible hbase.

Best Regards,
Shixiong Zhu

2015-01-08 0:30 GMT+08:00 Antony Mayi antonym...@yahoo.com.invalid:

 thanks, I found the issue, I was including 
 /usr/lib/spark/lib/spark-examples-1.2.0-cdh5.3.0-hadoop2.5.0-cdh5.3.0.jar into
 the classpath - this was breaking it. now using custom jar with just the
 python convertors and all works as a charm.
 thanks,
 antony.


   On Wednesday, 7 January 2015, 23:57, Sean Owen so...@cloudera.com
 wrote:



 Yes, the distribution is certainly fine and built for Hadoop 2. It sounds
 like you are inadvertently including Spark code compiled for Hadoop 1 when
 you run your app. The general idea is to use the cluster's copy at runtime.
 Those with more pyspark experience might be able to give more useful
 directions about how to fix that.

 On Wed, Jan 7, 2015 at 1:46 PM, Antony Mayi antonym...@yahoo.com wrote:

 this is official cloudera compiled stack cdh 5.3.0 - nothing has been done
 by me and I presume they are pretty good in building it so I still suspect
 it now gets the classpath resolved in different way?

 thx,
 Antony.


   On Wednesday, 7 January 2015, 18:55, Sean Owen so...@cloudera.com
 wrote:



 Problems like this are always due to having code compiled for Hadoop 1.x
 run against Hadoop 2.x, or vice versa. Here, you compiled for 1.x but at
 runtime Hadoop 2.x is used.

 A common cause is actually bundling Spark / Hadoop classes with your app,
 when the app should just use the Spark / Hadoop provided by the cluster. It
 could also be that you're pairing Spark compiled for Hadoop 1.x with a 2.x
 cluster.

 On Wed, Jan 7, 2015 at 9:38 AM, Antony Mayi antonym...@yahoo.com.invalid
 wrote:

 Hi,

 I am using newAPIHadoopRDD to load RDD from hbase (using pyspark running
 as yarn-client) - pretty much the standard case demonstrated in the
 hbase_inputformat.py from examples... the thing is the when trying the very
 same code on spark 1.2 I am getting the error bellow which based on similar
 cases on another forums suggest incompatibility between MR1 and MR2.

 why would this now start happening? is that due to some changes in
 resolving the classpath which now picks up MR2 jars first while before it
 was MR1?

 is there any workaround for this?

 thanks,
 Antony.

 the error:

 py4j.protocol.Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. :
 java.lang.IncompatibleClassChangeError: Found interface
 org.apache.hadoop.mapreduce.JobContext, but class was expected at
 org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:158)
 at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at
 scala.Option.getOrElse(Option.scala:120) at
 org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at
 org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) at
 scala.Option.getOrElse(Option.scala:120) at
 org.apache.spark.rdd.RDD.partitions(RDD.scala:203) at
 org.apache.spark.rdd.RDD.take(RDD.scala:1060) at
 org.apache.spark.rdd.RDD.first(RDD.scala:1093) at
 org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:202)
 at
 org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:500)
 at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606) at
 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at
 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at
 py4j.Gateway.invoke(Gateway.java:259) at
 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at
 py4j.commands.CallCommand.execute(CallCommand.java:79) at
 py4j.GatewayConnection.run(GatewayConnection.java:207) at
 java.lang.Thread.run(Thread.java:745)












Re: Joining by values

2015-01-03 Thread Shixiong Zhu
call `map(_.toList)` to convert `CompactBuffer` to `List`

Best Regards,
Shixiong Zhu

2015-01-04 12:08 GMT+08:00 Sanjay Subramanian 
sanjaysubraman...@yahoo.com.invalid:

 hi
 Take a look at the code here I wrote

 https://raw.githubusercontent.com/sanjaysubramanian/msfx_scala/master/src/main/scala/org/medicalsidefx/common/utils/PairRddJoin.scala

 /*rdd1.txt

 1~4,5,6,7
 2~4,5
 3~6,7

 rdd2.txt

 4~1001,1000,1002,1003
 5~1004,1001,1006,1007
 6~1007,1009,1005,1008
 7~1011,1012,1013,1010

 */
 val sconf = new 
 SparkConf().setMaster(local).setAppName(MedicalSideFx-PairRddJoin)
 val sc = new SparkContext(sconf)


 val rdd1 = /path/to/rdd1.txt
 val rdd2 = /path/to/rdd2.txt

 val rdd1InvIndex = sc.textFile(rdd1).map(x = (x.split('~')(0), 
 x.split('~')(1))).flatMapValues(str = str.split(',')).map(str = (str._2, 
 str._1))
 val rdd2Pair = sc.textFile(rdd2).map(str = (str.split('~')(0), 
 str.split('~')(1)))
 rdd1InvIndex.join(rdd2Pair).map(str = 
 str._2).groupByKey().collect().foreach(println)


 This outputs the following . I think this may be essentially what u r looking 
 for

 (I have to understand how to NOT print as CompactBuffer)

 (2,CompactBuffer(1001,1000,1002,1003, 1004,1001,1006,1007))
 (3,CompactBuffer(1011,1012,1013,1010, 1007,1009,1005,1008))
 (1,CompactBuffer(1001,1000,1002,1003, 1011,1012,1013,1010, 
 1004,1001,1006,1007, 1007,1009,1005,1008))


   --
  *From:* Sanjay Subramanian sanjaysubraman...@yahoo.com.INVALID
 *To:* dcmovva dilip.mo...@gmail.com; user@spark.apache.org 
 user@spark.apache.org
 *Sent:* Saturday, January 3, 2015 12:19 PM
 *Subject:* Re: Joining by values

 This is my design. Now let me try and code it in Spark.

 rdd1.txt
 =
 1~4,5,6,7
 2~4,5
 3~6,7

 rdd2.txt
 
 4~1001,1000,1002,1003
 5~1004,1001,1006,1007
 6~1007,1009,1005,1008
 7~1011,1012,1013,1010

 TRANSFORM 1
 ===
 map each value to key (like an inverted index)
 4~1
 5~1
 6~1
 7~1
 5~2
 4~2
 6~3
 7~3

 TRANSFORM 2
 ===
 Join keys in transform 1 and rdd2
 4~1,1001,1000,1002,1003
 4~2,1001,1000,1002,1003
 5~1,1004,1001,1006,1007
 5~2,1004,1001,1006,1007
 6~1,1007,1009,1005,1008
 6~3,1007,1009,1005,1008
 7~1,1011,1012,1013,1010
 7~3,1011,1012,1013,1010

 TRANSFORM 3
 ===
 Split key in transform 2 with ~ and keep key(1) i.e. 1,2,3
 1~1001,1000,1002,1003
 2~1001,1000,1002,1003
 1~1004,1001,1006,1007
 2~1004,1001,1006,1007
 1~1007,1009,1005,1008
 3~1007,1009,1005,1008
 1~1011,1012,1013,1010
 3~1011,1012,1013,1010

 TRANSFORM 4
 ===
 join by key

 1~1001,1000,1002,1003,1004,1001,1006,1007,1007,1009,1005,1008,1011,1012,1013,1010
 2~1001,1000,1002,1003,1004,1001,1006,1007
 3~1007,1009,1005,1008,1011,1012,1013,1010




  --
  *From:* dcmovva dilip.mo...@gmail.com
 *To:* user@spark.apache.org
 *Sent:* Saturday, January 3, 2015 10:10 AM
 *Subject:* Joining by values

 I have a two pair RDDs in spark like this

 rdd1 = (1 - [4,5,6,7])
   (2 - [4,5])
   (3 - [6,7])


 rdd2 = (4 - [1001,1000,1002,1003])
   (5 - [1004,1001,1006,1007])
   (6 - [1007,1009,1005,1008])
   (7 - [1011,1012,1013,1010])
 I would like to combine them to look like this.

 joinedRdd = (1 -
 [1000,1001,1002,1003,1004,1005,1006,1007,1008,1009,1010,1011,1012,1013])
 (2 - [1000,1001,1002,1003,1004,1006,1007])
 (3 - [1005,1007,1008,1009,1010,1011,1012,1013])


 Can someone suggest me how to do this.

 Thanks Dilip



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Joining-by-values-tp20954.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Re: recent join/iterator fix

2014-12-29 Thread Shixiong Zhu
The Iterable from cogroup is CompactBuffer, which is already materialized.
It's not a lazy Iterable. So now Spark cannot handle skewed data that some
key has too many values that cannot be fit into the memory.​


Re: Dynamic Allocation in Spark 1.2.0

2014-12-27 Thread Shixiong Zhu
I encountered the following issue when enabling dynamicAllocation. You may
want to take a look at it.

https://issues.apache.org/jira/browse/SPARK-4951

Best Regards,
Shixiong Zhu

2014-12-28 2:07 GMT+08:00 Tsuyoshi OZAWA ozawa.tsuyo...@gmail.com:

 Hi Anders,

 I faced the same issue as you mentioned. Yes, you need to install
 spark shuffle plugin for YARN. Please check following PRs which add
 doc to enable dynamicAllocation:

 https://github.com/apache/spark/pull/3731
 https://github.com/apache/spark/pull/3757

 I could run Spark on YARN with dynamicAllocation by following the
 instructions described in the docs.

 Thanks,
 - Tsuyoshi

 On Sat, Dec 27, 2014 at 11:06 PM, Anders Arpteg arp...@spotify.com
 wrote:
  Hey,
 
  Tried to get the new spark.dynamicAllocation.enabled feature working on
 Yarn
  (Hadoop 2.2), but am unsuccessful so far. I've tested with the following
  settings:
 
conf
  .set(spark.dynamicAllocation.enabled, true)
  .set(spark.shuffle.service.enabled, true)
  .set(spark.dynamicAllocation.minExecutors, 10)
  .set(spark.dynamicAllocation.maxExecutors, 700)
 
  The app works fine on Spark 1.2 if dynamicAllocation is not enabled, but
  with the settings above, it will start the app and the first job is
 listed
  in the web ui. However, no tasks are started and it seems to be stuck
  waiting for a container to be allocated forever.
 
  Any help would be appreciated. Need to do something specific to get the
  external yarn shuffle service running in the node manager?
 
  TIA,
  Anders



 --
 - Tsuyoshi

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Announcing Spark 1.2!

2014-12-19 Thread Shixiong Zhu
Congrats!

A little question about this release: Which commit is this release based
on? v1.2.0 and v1.2.0-rc2 are pointed to different commits in
https://github.com/apache/spark/releases

Best Regards,
Shixiong Zhu

2014-12-19 16:52 GMT+08:00 Patrick Wendell pwend...@gmail.com:

 I'm happy to announce the availability of Spark 1.2.0! Spark 1.2.0 is
 the third release on the API-compatible 1.X line. It is Spark's
 largest release ever, with contributions from 172 developers and more
 than 1,000 commits!

 This release brings operational and performance improvements in Spark
 core including a new network transport subsytem designed for very
 large shuffles. Spark SQL introduces an API for external data sources
 along with Hive 13 support, dynamic partitioning, and the
 fixed-precision decimal type. MLlib adds a new pipeline-oriented
 package (spark.ml) for composing multiple algorithms. Spark Streaming
 adds a Python API and a write ahead log for fault tolerance. Finally,
 GraphX has graduated from alpha and introduces a stable API along with
 performance improvements.

 Visit the release notes [1] to read about the new features, or
 download [2] the release today.

 For errata in the contributions or release notes, please e-mail me
 *directly* (not on-list).

 Thanks to everyone involved in creating, testing, and documenting this
 release!

 [1] http://spark.apache.org/releases/spark-release-1-2-0.html
 [2] http://spark.apache.org/downloads.html

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: weird bytecode incompatability issue between spark-core jar from mvn repo and official spark prebuilt binary

2014-12-18 Thread Shixiong Zhu
@Rui do you mean the spark-core jar in the maven central repo
are incompatible with the same version of the the official pre-built Spark
binary? That's really weird. I thought they should have used the same codes.

Best Regards,
Shixiong Zhu

2014-12-18 17:22 GMT+08:00 Sean Owen so...@cloudera.com:

 Well, it's always a good idea to used matched binary versions. Here it
 is more acutely necessary. You can use a pre built binary -- if you
 use it to compile and also run. Why does it not make sense to publish
 artifacts?

 Not sure what you mean about core vs assembly, as the assembly
 contains all of the modules. You don't literally need the same jar
 file.

 On Thu, Dec 18, 2014 at 3:20 AM, Sun, Rui rui@intel.com wrote:
  Not using spark-submit. The App directly communicates with the Spark
 cluster
  in standalone mode.
 
 
 
  If mark the Spark dependency as 'provided’, then the spark-core .jar
  elsewhere must be pointe to in CLASSPATH. However, the pre-built Spark
  binary only has an assembly jar, not having individual module jars. So
 you
  don’t have a chance to point to a module.jar which is the same binary as
  that in the pre-built Spark binary.
 
 
 
  Maybe the Spark distribution should contain not only the assembly jar but
  also individual module jars. Any opinion?
 
 
 
  From: Shivaram Venkataraman [mailto:shiva...@eecs.berkeley.edu]
  Sent: Thursday, December 18, 2014 2:20 AM
  To: Sean Owen
  Cc: Sun, Rui; user@spark.apache.org
  Subject: Re: weird bytecode incompatability issue between spark-core jar
  from mvn repo and official spark prebuilt binary
 
 
 
  Just to clarify, are you running the application using spark-submit after
  packaging with sbt package ? One thing that might help is to mark the
 Spark
  dependency as 'provided' as then you shouldn't have the Spark classes in
  your jar.
 
 
 
  Thanks
 
  Shivaram
 
 
 
  On Wed, Dec 17, 2014 at 4:39 AM, Sean Owen so...@cloudera.com wrote:
 
  You should use the same binaries everywhere. The problem here is that
  anonymous functions get compiled to different names when you build
  different (potentially) so you actually have one function being called
  when another function is meant.
 
 
  On Wed, Dec 17, 2014 at 12:07 PM, Sun, Rui rui@intel.com wrote:
  Hi,
 
 
 
  I encountered a weird bytecode incompatability issue between spark-core
  jar
  from mvn repo and official spark prebuilt binary.
 
 
 
  Steps to reproduce:
 
  1. Download the official pre-built Spark binary 1.1.1 at
  http://d3kbcqa49mib13.cloudfront.net/spark-1.1.1-bin-hadoop1.tgz
 
  2. Launch the Spark cluster in pseudo cluster mode
 
  3. A small scala APP which calls RDD.saveAsObjectFile()
 
  scalaVersion := 2.10.4
 
 
 
  libraryDependencies ++= Seq(
 
org.apache.spark %% spark-core % 1.1.1
 
  )
 
 
 
  val sc = new SparkContext(args(0), test) //args[0] is the Spark master
  URI
 
val rdd = sc.parallelize(List(1, 2, 3))
 
rdd.saveAsObjectFile(/tmp/mysaoftmp)
 
sc.stop
 
 
 
  throws an exception as follows:
 
  [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to
  stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure:
  Lost
  task 1.3 in stage 0.0 (TID 6, ray-desktop.sh.intel.com):
  java.lang.ClassCastException: scala.Tuple2 cannot be cast to
  scala.collection.Iterator
 
  [error]
  org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
 
  [error]
  org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
 
  [error]
  org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 
  [error]
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 
  [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 
  [error]
  org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 
  [error]
  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 
  [error] org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 
  [error]
  org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 
  [error] org.apache.spark.scheduler.Task.run(Task.scala:54)
 
  [error]
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
 
  [error]
 
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
 
  [error]
 
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 
  [error] java.lang.Thread.run(Thread.java:701)
 
 
 
  After investigation, I found that this is caused by bytecode
  incompatibility
  issue between RDD.class in spark-core_2.10-1.1.1.jar and the pre-built
  spark
  assembly respectively.
 
 
 
  This issue also happens with spark 1.1.0.
 
 
 
  Is there anything wrong in my usage of Spark? Or anything wrong in the
  process of deploying Spark module jars to maven repo?
 
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail

Re: NullPointerException on cluster mode when using foreachPartition

2014-12-16 Thread Shixiong Zhu
Could you post the stack trace?


Best Regards,
Shixiong Zhu

2014-12-16 23:21 GMT+08:00 richiesgr richie...@gmail.com:

 Hi

 This time I need expert.
 On 1.1.1 and only in cluster (standalone or EC2)
 when I use this code :

 countersPublishers.foreachRDD(rdd = {
 rdd.foreachPartition(partitionRecords = {
   partitionRecords.foreach(record = {
 //dbActorUpdater ! updateDBMessage(record)
 println(record)
   })
 })
   })

 Get NPP (When I run this locally all is OK)

 If I use this
   countersPublishers.foreachRDD(rdd = rdd.collect().foreach(r =
 dbActorUpdater ! updateDBMessage(r)))

 There is no problem. I think something is misconfigured
 Thanks for help




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-on-cluster-mode-when-using-foreachPartition-tp20719.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Serialization issue when using HBase with Spark

2014-12-15 Thread Shixiong Zhu
Just point out a bug in your codes. You should not use `mapPartitions` like
that. For details, I recommend Section setup() and cleanup() in Sean
Owen's post:
http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/

Best Regards,
Shixiong Zhu

2014-12-14 16:35 GMT+08:00 Yanbo yanboha...@gmail.com:

 In #1, class HTable can not be serializable.
 You also need to check you self defined function getUserActions and make
 sure it is a member function of one class who implement serializable
 interface.

 发自我的 iPad

  在 2014年12月12日,下午4:35,yangliuyu yangli...@163.com 写道:
 
  The scenario is using HTable instance to scan multiple rowkey range in
 Spark
  tasks look likes below:
  Option 1:
  val users = input
   .map { case (deviceId, uid) =
  uid}.distinct().sortBy(x=x).mapPartitions(iterator={
   val conf = HBaseConfiguration.create()
   val table = new HTable(conf, actions)
   val result = iterator.map{ userId=
 (userId, getUserActions(table, userId, timeStart, timeStop))
   }
   table.close()
   result
 })
 
  But got the exception:
  org.apache.spark.SparkException: Task not serializable
 at
 
 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
 at
  org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
 at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597)
 at
 
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)...
  ...
  Caused by: java.io.NotSerializableException:
  org.apache.hadoop.conf.Configuration
 at
  java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
  java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 
  The reason not using sc.newAPIHadoopRDD is it only support one scan each
  time.
  val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result])
 
  And if using MultiTableInputFormat, driver is not possible put all
 rowkeys
  into HBaseConfiguration
  Option 2:
  sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result])
 
  It may divide all rowkey ranges into several parts then use option 2,
 but I
  prefer option 1. So is there any solution for option 1?
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: flatMap and spilling of output to disk

2014-12-10 Thread Shixiong Zhu
for(v1 - values; v2 - values) yield ((v1, v2), 1) will generate all data
at once and return all of them to flatMap.

To solve your problem, you should use for (v1 - values.iterator; v2 -
values.iterator) yield ((v1, v2), 1) which will generate the data when it’s
necessary.
​

Best Regards,
Shixiong Zhu

2014-12-10 20:13 GMT+08:00 Johannes Simon johannes.si...@mail.de:

 Hi!

 I have been using spark a lot recently and it's been running really well
 and fast, but now when I increase the data size, it's starting to run into
 problems:
 I have an RDD in the form of (String, Iterable[String]) - the
 Iterable[String] was produced by a groupByKey() - and I perform a flatMap
 on it that outputs some form of cartesian product of the values per key:


 rdd.flatMap({case (key, values) = for(v1 - values; v2 - values) yield
 ((v1, v2), 1)})


 So the runtime cost per RDD entry is O(n^2) where n is the number of
 values. This n can sometimes be 10,000 or even 100,000. That produces a lot
 of data, I am aware of that, but otherwise I wouldn't need a cluster, would
 I? :) For n=1000 this operation works quite well. But as soon as I allow n
 to be = 10,000 or higher, I start to get GC overhead limit exceeded
 exceptions.

 Configuration:
 - 7g executor memory
 - spark.shuffle.memoryFraction=0.5
 - spark.storage.memoryFraction=0.1
 I am not sure how the remaining memory for the actual JVM instance
 performing the flatMap is computed, but I would assume it to be something
 like (1-0.5-0.1)*7g = 2.8g

 Now I am wondering: Why should these 2.8g (or say even a few hundred MB)
 not suffice for spark to process this flatMap without too much GC overhead?
 If I assume a string to be 10 characters on average, therefore consuming
 about 60 bytes with overhead taken into account, then 10,000 of these
 values sum up to no more than ~600kb, and apart from that spark never has
 to keep anything else in memory.

 My question: When does spark start to spill RDD entries to disk, assuming
 that no RDD is to be persisted? Does it keep all output of the flatMap
 operation in memory until the entire flatMap is done? Or does it already
 spill every single yielded ((v1, v2), 1) entry out to disk if necessary?

 Thanks a lot!
 Johannes
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: flatMap and spilling of output to disk

2014-12-10 Thread Shixiong Zhu
Good catch. `Join` should use `Iterator`, too. I open an JIRA here:
https://issues.apache.org/jira/browse/SPARK-4824

Best Regards,
Shixiong Zhu

2014-12-10 21:35 GMT+08:00 Johannes Simon johannes.si...@mail.de:

 Hi!

 Using an iterator solved the problem! I've been chewing on this for days,
 so thanks a lot to both of you!! :)

 Since in an earlier version of my code, I used a self-join to perform the
 same thing, and ran into the same problems, I just looked at the
 implementation of PairRDDFunction.join (Spark v1.1.1):

 def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K,
 (V, W))] = {
   this.cogroup(other, partitioner).flatMapValues( pair =
 for (v - pair._1; w - pair._2) yield (v, w)
   )
 }

 Is there a reason to not use an iterator here if possible? Pardon my lack
 of Scala knowledge.. This should in any case cause the same problems I had
 when the size of vs/ws gets too large. (Though that question is more of a
 dev ml question)

 Thanks!
 Johannes

 Am 10.12.2014 um 13:44 schrieb Shixiong Zhu zsxw...@gmail.com:

 for(v1 - values; v2 - values) yield ((v1, v2), 1) will generate all
 data at once and return all of them to flatMap.

 To solve your problem, you should use for (v1 - values.iterator; v2 -
 values.iterator) yield ((v1, v2), 1) which will generate the data when
 it’s necessary.
 ​

 Best Regards,
 Shixiong Zhu

 2014-12-10 20:13 GMT+08:00 Johannes Simon johannes.si...@mail.de:

 Hi!

 I have been using spark a lot recently and it's been running really well
 and fast, but now when I increase the data size, it's starting to run into
 problems:
 I have an RDD in the form of (String, Iterable[String]) - the
 Iterable[String] was produced by a groupByKey() - and I perform a flatMap
 on it that outputs some form of cartesian product of the values per key:


 rdd.flatMap({case (key, values) = for(v1 - values; v2 - values) yield
 ((v1, v2), 1)})


 So the runtime cost per RDD entry is O(n^2) where n is the number of
 values. This n can sometimes be 10,000 or even 100,000. That produces a lot
 of data, I am aware of that, but otherwise I wouldn't need a cluster, would
 I? :) For n=1000 this operation works quite well. But as soon as I allow n
 to be = 10,000 or higher, I start to get GC overhead limit exceeded
 exceptions.

 Configuration:
 - 7g executor memory
 - spark.shuffle.memoryFraction=0.5
 - spark.storage.memoryFraction=0.1
 I am not sure how the remaining memory for the actual JVM instance
 performing the flatMap is computed, but I would assume it to be something
 like (1-0.5-0.1)*7g = 2.8g

 Now I am wondering: Why should these 2.8g (or say even a few hundred MB)
 not suffice for spark to process this flatMap without too much GC overhead?
 If I assume a string to be 10 characters on average, therefore consuming
 about 60 bytes with overhead taken into account, then 10,000 of these
 values sum up to no more than ~600kb, and apart from that spark never has
 to keep anything else in memory.

 My question: When does spark start to spill RDD entries to disk, assuming
 that no RDD is to be persisted? Does it keep all output of the flatMap
 operation in memory until the entire flatMap is done? Or does it already
 spill every single yielded ((v1, v2), 1) entry out to disk if necessary?

 Thanks a lot!
 Johannes
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Issue on [SPARK-3877][YARN]: Return code of the spark-submit in yarn-cluster mode

2014-12-05 Thread Shixiong Zhu
What's the status of this application in the yarn web UI?

Best Regards,
Shixiong Zhu

2014-12-05 17:22 GMT+08:00 LinQili lin_q...@outlook.com:

 I tried anather test code:
  def main(args: Array[String]) {
 if (args.length != 1) {
   Util.printLog(ERROR, Args error - arg1: BASE_DIR)
   exit(101)
 }
 val currentFile = args(0).toString
 val DB = test_spark
 val tableName = src

 val sparkConf = new SparkConf().setAppName(sHiveFromSpark)
 val sc = new SparkContext(sparkConf)
 val hiveContext = new HiveContext(sc)

 // Before exit
 Util.printLog(INFO, Exit)
 exit(100)
 }

 There were two `exit` in this code. If the args was wrong, the
 spark-submit will get the return code 101, but, if the args is correct,
 spark-submit cannot get the second return code 100.  What's the difference
 between these two `exit`? I was so confused.

 --
 From: lin_q...@outlook.com
 To: u...@spark.incubator.apache.org
 Subject: RE: Issue on [SPARK-3877][YARN]: Return code of the spark-submit
 in yarn-cluster mode
 Date: Fri, 5 Dec 2014 17:11:39 +0800


 I tried in spark client mode, spark-submit can get the correct return code
 from spark job. But in yarn-cluster mode, It failed.

 --
 From: lin_q...@outlook.com
 To: u...@spark.incubator.apache.org
 Subject: Issue on [SPARK-3877][YARN]: Return code of the spark-submit in
 yarn-cluster mode
 Date: Fri, 5 Dec 2014 16:55:37 +0800

 Hi, all:

 According to https://github.com/apache/spark/pull/2732, When a spark job
 fails or exits nonzero in yarn-cluster mode, the spark-submit will get the
 corresponding return code of the spark job. But I tried in spark-1.1.1 yarn
 cluster, spark-submit return zero anyway.

 Here is my spark code:

 try {
   val dropTable = sdrop table $DB.$tableName
   hiveContext.hql(dropTable)
   val createTbl =  do some thing...
   hiveContext.hql(createTbl)
 } catch {
   case ex: Exception = {
 Util.printLog(ERROR, screate db error.)
 exit(-1)
   }
 }

 Maybe I did something wrong. Is there any hint? Thanks.



Re: Issue on [SPARK-3877][YARN]: Return code of the spark-submit in yarn-cluster mode

2014-12-05 Thread Shixiong Zhu
There were two exit in this code. If the args was wrong, the spark-submit
will get the return code 101, but, if the args is correct, spark-submit
cannot get the second return code 100. What’s the difference between these
two exit? I was so confused.

I’m also confused. When I tried your codes, spark-submit returned 1 for
both two cases. That’s expected. In the yarn-cluster mode, the driver runs
in the ApplicationMaster. The exit code of driver is also the exit code of
ApplicationMaster. However, for now, Spark cannot get the exit code of
ApplicationMaster from Yarn, because Yarn does not send it back to the
client.
spark-submit will return 1 when Yarn reports the ApplicationMaster failed.
​

Best Regards,
Shixiong Zhu

2014-12-06 1:59 GMT+08:00 LinQili lin_q...@outlook.com:

 You mean the localhost:4040 or the application master web ui?

 Sent from my iPhone

 On Dec 5, 2014, at 17:26, Shixiong Zhu zsxw...@gmail.com wrote:

 What's the status of this application in the yarn web UI?

 Best Regards,
 Shixiong Zhu

 2014-12-05 17:22 GMT+08:00 LinQili lin_q...@outlook.com:

 I tried anather test code:
  def main(args: Array[String]) {
 if (args.length != 1) {
   Util.printLog(ERROR, Args error - arg1: BASE_DIR)
   exit(101)
 }
 val currentFile = args(0).toString
 val DB = test_spark
 val tableName = src

 val sparkConf = new SparkConf().setAppName(sHiveFromSpark)
 val sc = new SparkContext(sparkConf)
 val hiveContext = new HiveContext(sc)

 // Before exit
 Util.printLog(INFO, Exit)
 exit(100)
 }

 There were two `exit` in this code. If the args was wrong, the
 spark-submit will get the return code 101, but, if the args is correct,
 spark-submit cannot get the second return code 100.  What's the difference
 between these two `exit`? I was so confused.

 --
 From: lin_q...@outlook.com
 To: u...@spark.incubator.apache.org
 Subject: RE: Issue on [SPARK-3877][YARN]: Return code of the spark-submit
 in yarn-cluster mode
 Date: Fri, 5 Dec 2014 17:11:39 +0800


 I tried in spark client mode, spark-submit can get the correct return
 code from spark job. But in yarn-cluster mode, It failed.

 --
 From: lin_q...@outlook.com
 To: u...@spark.incubator.apache.org
 Subject: Issue on [SPARK-3877][YARN]: Return code of the spark-submit in
 yarn-cluster mode
 Date: Fri, 5 Dec 2014 16:55:37 +0800

 Hi, all:

 According to https://github.com/apache/spark/pull/2732, When a spark job
 fails or exits nonzero in yarn-cluster mode, the spark-submit will get the
 corresponding return code of the spark job. But I tried in spark-1.1.1 yarn
 cluster, spark-submit return zero anyway.

 Here is my spark code:

 try {
   val dropTable = sdrop table $DB.$tableName
   hiveContext.hql(dropTable)
   val createTbl =  do some thing...
   hiveContext.hql(createTbl)
 } catch {
   case ex: Exception = {
 Util.printLog(ERROR, screate db error.)
 exit(-1)
   }
 }

 Maybe I did something wrong. Is there any hint? Thanks.





Re: Setting network variables in spark-shell

2014-12-01 Thread Shixiong Zhu
Don't set `spark.akka.frameSize` to 1. The max value of
`spark.akka.frameSize` is 2047. The unit is MB.

Best Regards,
Shixiong Zhu

2014-12-01 0:51 GMT+08:00 Yanbo yanboha...@gmail.com:


 Try to use spark-shell --conf spark.akka.frameSize=1

 在 2014年12月1日,上午12:25,Brian Dolan buddha_...@yahoo.com.INVALID 写道:

 Howdy Folks,

 What is the correct syntax in 1.0.0 to set networking variables in spark
 shell?  Specifically, I'd like to set the spark.akka.frameSize

 I'm attempting this:

 spark-shell -Dspark.akka.frameSize=1 --executor-memory 4g


 Only to get this within the session:

 System.getProperty(spark.executor.memory)
 res0: String = 4g
 System.getProperty(spark.akka.frameSize)
 res1: String = null


 I don't believe I am violating protocol, but I have also posted this to
 SO:
 http://stackoverflow.com/questions/27215288/how-do-i-set-spark-akka-framesize-in-spark-shell

 ~~
 May All Your Sequences Converge






Re: spark.akka.frameSize setting problem

2014-11-30 Thread Shixiong Zhu
4096MB is greater than Int.MaxValue and it will be overflow in Spark.
Please set it less then 4096.

Best Regards,
Shixiong Zhu

2014-12-01 13:14 GMT+08:00 Ke Wang jkx...@gmail.com:

 I meet the same problem, did you solve it ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-akka-frameSize-setting-problem-tp3416p20063.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: spark.akka.frameSize setting problem

2014-11-30 Thread Shixiong Zhu
Sorry. Should be not greater than 2048. 2047 is the greatest value.

Best Regards,
Shixiong Zhu

2014-12-01 13:20 GMT+08:00 Shixiong Zhu zsxw...@gmail.com:

 4096MB is greater than Int.MaxValue and it will be overflow in Spark.
 Please set it less then 4096.

 Best Regards,
 Shixiong Zhu

 2014-12-01 13:14 GMT+08:00 Ke Wang jkx...@gmail.com:

 I meet the same problem, did you solve it ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-akka-frameSize-setting-problem-tp3416p20063.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: spark.akka.frameSize setting problem

2014-11-30 Thread Shixiong Zhu
Created a JIRA to track it: https://issues.apache.org/jira/browse/SPARK-4664

Best Regards,
Shixiong Zhu

2014-12-01 13:22 GMT+08:00 Shixiong Zhu zsxw...@gmail.com:

 Sorry. Should be not greater than 2048. 2047 is the greatest value.

 Best Regards,
 Shixiong Zhu

 2014-12-01 13:20 GMT+08:00 Shixiong Zhu zsxw...@gmail.com:

 4096MB is greater than Int.MaxValue and it will be overflow in Spark.
 Please set it less then 4096.

 Best Regards,
 Shixiong Zhu

 2014-12-01 13:14 GMT+08:00 Ke Wang jkx...@gmail.com:

 I meet the same problem, did you solve it ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-akka-frameSize-setting-problem-tp3416p20063.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Negative Accumulators

2014-11-24 Thread Shixiong Zhu
int overflow? If so, you can use BigInt like this:

scala import org.apache.spark.AccumulatorParamimport
org.apache.spark.AccumulatorParam

scala :paste// Entering paste mode (ctrl-D to finish)
implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] {
  def addInPlace(t1: BigInt, t2: BigInt) = t1 + t2
  def zero(initialValue: BigInt) = BigInt(0)
}
// Exiting paste mode, now interpreting.

defined module BigIntAccumulatorParam

scala val accu = sc.accumulator(BigInt(0))
accu: org.apache.spark.Accumulator[scala.math.BigInt] = 0

scala accu += 100

scala accu.value
res1: scala.math.BigInt = 100

​

Best Regards,
Shixiong Zhu

2014-11-25 10:31 GMT+08:00 Peter Thai thai.pe...@gmail.com:

 Hello!

 Does anyone know why I may be receiving negative final accumulator values?

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Lots of small input files

2014-11-23 Thread Shixiong Zhu
We encountered similar problem. If all partitions are located in the same
node and all of the tasks run less than 3 seconds (set by
spark.locality.wait, the default value is 3000), the tasks will run in
the single node. Our solution is
using org.apache.hadoop.mapred.lib.CombineTextInputFormat to create some
big enough tasks. Of cause, you can reduce `spark.locality.wait`, but it
may be not efficient because it still creates many tiny tasks.

Best Regards,
Shixiong Zhu

2014-11-22 17:17 GMT+08:00 Akhil Das ak...@sigmoidanalytics.com:

 What is your cluster setup? are you running a worker on the master node
 also?

 1. Spark usually assigns the task to the worker who has the data locally
 available, If one worker has enough tasks, then i believe it will start
 assigning to others as well. You could control it with the level of
 parallelism and all.

 2. If you coalesce it into one partition, then i believe only one of the
 worker will execute the single task.

 Thanks
 Best Regards

 On Fri, Nov 21, 2014 at 9:49 PM, Pat Ferrel p...@occamsmachete.com wrote:

 I have a job that searches for input recursively and creates a string of
 pathnames to treat as one input.

 The files are part-x files and they are fairly small. The job seems
 to take a long time to complete considering the size of the total data
 (150m) and only runs on the master machine. The job only does rdd.map type
 operations.

 1) Why doesn’t it use the other workers in the cluster?
 2) Is there a downside to using a lot of small part files? Should I
 coalesce them into one input file?
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: How did the RDD.union work

2014-11-11 Thread Shixiong Zhu
Could you provide the code of hbaseQuery? It maybe doesn't support to
execute in parallel.

Best Regards,
Shixiong Zhu

2014-11-12 14:32 GMT+08:00 qiaou qiaou8...@gmail.com:

  Hi:
 I got a problem with using the union method of RDD
 things like this
 I get a function like
 def hbaseQuery(area:string):RDD[Result]= ???
 when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it
 returns 0
 however when use like this  sc.parallize(hbaseQuery('aa’).collect.toList
 ::: hbaseQuery(’bb’).collect.toList).count() it return the right value
 obviously i have got an action after my transformation action ,but why it
 did not work
 fyi

 --
 qiaou
 已使用 Sparrow http://www.sparrowmailapp.com/?sig




  1   2   >