Re: calling persist would cause java.util.NoSuchElementException: key not found:

2015-10-02 Thread Shixiong Zhu
Do you have the full stack trace? Could you check if it's same as
https://issues.apache.org/jira/browse/SPARK-10422

Best Regards,
Shixiong Zhu

2015-10-01 17:05 GMT+08:00 Eyad Sibai <eyad.alsi...@gmail.com>:

> Hi
>
> I am trying to call .persist() on a dataframe but once I execute the next
> line I am getting
> java.util.NoSuchElementException: key not found: ….
>
> I tried to do persist on disk also the same thing.
>
> I am using:
> pyspark with python3
> spark 1.5
>
>
> Thanks!
>
>
> EYAD SIBAI
> Risk Engineer
>
> *iZettle ®*
> ––
>
> Mobile: +46 72 911 60 54 <+46%2072%20911%2060%2054>
> Web: www.izettle.com <http://izettle.com/>
>


Re: Monitoring tools for spark streaming

2015-09-28 Thread Shixiong Zhu
Which version are you using? Could you take a look at the new Streaming UI
in 1.4.0?

Best Regards,
Shixiong Zhu

2015-09-29 7:52 GMT+08:00 Siva <sbhavan...@gmail.com>:

> Hi,
>
> Could someone recommend the monitoring tools for spark streaming?
>
> By extending StreamingListener we can dump the delay in processing of
> batches and some alert messages.
>
> But are there any Web UI tools where we can monitor failures, see delays
> in processing, error messages and setup alerts etc.
>
> Thanks
>
>


Re: Spark streaming job filling a lot of data in local spark nodes

2015-09-28 Thread Shixiong Zhu
These files are created by shuffle and just some temp files. They are not
necessary for checkpointing and only stored in your local temp directory.
They will be stored in "/tmp" by default. You can use `spark.local.dir` to
set the path if you find your "/tmp" doesn't have enough space.

Best Regards,
Shixiong Zhu

2015-09-29 1:04 GMT+08:00 swetha <swethakasire...@gmail.com>:

>
> Hi,
>
> I see a lot of data getting filled locally as shown below from my streaming
> job. I have my checkpoint set to hdfs. But, I still see the following data
> filling my local nodes. Any idea if I can make this stored in hdfs instead
> of storing the data locally?
>
> -rw-r--r--  1520 Sep 17 18:43 shuffle_23119_5_0.index
> -rw-r--r--  1 180564255 Sep 17 18:43 shuffle_23129_2_0.data
> -rw-r--r--  1 364850277 Sep 17 18:45 shuffle_23145_8_0.data
> -rw-r--r--  1  267583750 Sep 17 18:46 shuffle_23105_4_0.data
> -rw-r--r--  1  136178819 Sep 17 18:48 shuffle_23123_8_0.data
> -rw-r--r--  1  159931184 Sep 17 18:48 shuffle_23167_8_0.data
> -rw-r--r--  1520 Sep 17 18:49 shuffle_23315_7_0.index
> -rw-r--r--  1520 Sep 17 18:50 shuffle_23319_3_0.index
> -rw-r--r--  1   92240350 Sep 17 18:51 shuffle_23305_2_0.data
> -rw-r--r--  1   40380158 Sep 17 18:51 shuffle_23323_6_0.data
> -rw-r--r--  1  369653284 Sep 17 18:52 shuffle_23103_6_0.data
> -rw-r--r--  1  371932812 Sep 17 18:52 shuffle_23125_6_0.data
> -rw-r--r--  1   19857974 Sep 17 18:53 shuffle_23291_19_0.data
> -rw-r--r--  1  55342005 Sep 17 18:53 shuffle_23305_8_0.data
> -rw-r--r--  1   92920590 Sep 17 18:53 shuffle_23303_4_0.data
>
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-job-filling-a-lot-of-data-in-local-spark-nodes-tp24846.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark.streaming.concurrentJobs

2015-09-28 Thread Shixiong Zhu
"writing to HBase for each partition in the RDDs from a given DStream is an
independent output operation"

This is not correct. "writing to HBase for each partition in the RDDs from
a given DStream" is just a task. And they already run in parallel.

The output operation is the DStream action, such as count, saveXXX, take.

For example, if "spark.streaming.concurrentJobs" is 1, and you call
DStream.count() twice. There will be two "count" Spark jobs and they will
run one by one. But if you set "spark.streaming.concurrentJobs" to 2, these
two "count" Spark jobs will run in parallel.

Moreover, "spark.streaming.concurrentJobs" is an internal configuration and
it may be changed in future.


Best Regards,
Shixiong Zhu

2015-09-26 3:34 GMT+08:00 Atul Kulkarni <atulskulka...@gmail.com>:

> Can someone please help either by explaining or pointing to documentation
> the relationship between #executors needed and How to let the concurrent
> jobs that are created by the above parameter run in parallel?
>
> On Thu, Sep 24, 2015 at 11:56 PM, Atul Kulkarni <atulskulka...@gmail.com>
> wrote:
>
>> Hi Folks,
>>
>> I am trying to speed up my spark streaming job, I found a presentation by
>> Tathagata Das that mentions to increase value of
>> "spark.streaming.concurrentJobs" if I have more than one output.
>>
>> In my spark streaming job I am reading from Kafka using Receiver Bases
>> approach and transforming each line of data from Kafka and storing to
>> HBase. I do not intend to do any kind of collation at this stage. I believe
>> this can be parallelized by creating a separate job to write a different
>> set of lines from Kafka to HBase and hence, I set the above parameter to a
>> value > 1. Is my above assumption that writing to HBase for each partition
>> in the RDDs from a given DStream is an independent output operation and can
>> be parallelized?
>>
>> If the assumption is correct, and I run the job - this job creates
>> multiple (smaller) jobs but they are executed one after another, not in
>> parallel - I am curious if there is a requirement that #Executors be >= a
>> particular number (a calculation based on how many repartitions after unio
>> od DSreams etc. - I don't know I am grasping at Straws here.)
>>
>> I would appreciate some help in this regard. Thanks in advance.
>>
>> --
>> Regards,
>> Atul Kulkarni
>>
>
>
>
> --
> Regards,
> Atul Kulkarni
>


Re: Join two dataframe - Timeout after 5 minutes

2015-09-24 Thread Shixiong Zhu
You can change "spark.sql.broadcastTimeout" to increase the timeout. The
default value is 300 seconds.

Best Regards,
Shixiong Zhu

2015-09-24 15:16 GMT+08:00 Eyad Sibai <eyad.alsi...@gmail.com>:

> I am trying to join two tables using dataframes using python 3.4 and I am
> getting the following error
>
>
> I ran it on my localhost machine with 2 workers, spark 1.5
>
>
> I always get timeout if the job takes more than 5 minutes.
>
>
>
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
>
>  at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:69)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
>
>  at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>
>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
>
>  at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:142)
>
>  at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141)
>
>  at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
>
>  ... 33 more
>
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [300 seconds]
>
>  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>
>  at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>
>  at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>
>  at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>
>  at scala.concurrent.Await$.result(package.scala:107)
>
>  at
> org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
>
>  at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>
>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
>
>  at
> org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>
>  at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
>
>  at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>
>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
>
>  at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:119)
>
>  at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.apply(TungstenAggregate.scala:69)
>
>  at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
>
>  ... 41 more
>
>
> 2015-09-23 15:44:09,536 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/static/sql,null}
>
> 2015-09-23 15:44:09,537 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL1/execution/json,null}
>
> 2015-09-23 15:44:09,538 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL1/execution,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL1/json,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL1,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/static/sql,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL/execution/json,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL/execution,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL/json,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/SQL,null}
>
> 2015-09-23 15:44:09,539 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/metrics/json,null}
>
> 2015-09-23 15:44:09,540 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
>
> 2015-09-23 15:44:09,540 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/api,null}
>
> 2015-09-23 15:44:09,540 INFO ContextHandler: stopped
> o.s.j.s.ServletContextHandler{/,null}
>
> 2015-09-23 15:44:09,540 INFO ContextHan

Re: Hbase Spark streaming issue.

2015-09-24 Thread Shixiong Zhu
Looks like you have an incompatible hbase-default.xml in some place. You
can use the following code to find the location of "hbase-default.xml"

println(Thread.currentThread().getContextClassLoader().getResource("hbase-default.xml"))

Best Regards,
Shixiong Zhu

2015-09-21 15:46 GMT+08:00 Siva <sbhavan...@gmail.com>:

> Hi,
>
> I m seeing some strange error while inserting data from spark streaming to
> hbase.
>
> I can able to write the data from spark (without streaming) to hbase
> successfully, but when i use the same code to write dstream I m seeing the
> below error.
>
> I tried setting the below parameters, still didnt help. Did any face the
> similar issue?
>
> conf.set("hbase.defaults.for.version.skip", "true")
> conf.set("hbase.defaults.for.version", "0.98.4.2.2.4.2-2-hadoop2")
>
> 15/09/20 22:39:10 ERROR Executor: Exception in task 0.0 in stage 14.0 (TID
> 16)
> java.lang.RuntimeException: hbase-default.xml file seems to be for and old
> version of HBase (null), this version is 0.98.4.2.2.4.2-2-hadoop2
> at
> org.apache.hadoop.hbase.HBaseConfiguration.checkDefaultsVersion(HBaseConfiguration.java:73)
> at
> org.apache.hadoop.hbase.HBaseConfiguration.addHbaseResources(HBaseConfiguration.java:105)
> at
> org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:116)
> at
> org.apache.hadoop.hbase.HBaseConfiguration.create(HBaseConfiguration.java:125)
> at
> $line51.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$HBaseConn$.hbaseConnection(:49)
> at
> $line52.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$TestHbaseSpark$$anonfun$run$1$$anonfun$apply$1.apply(:73)
> at
> $line52.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$TestHbaseSpark$$anonfun$run$1$$anonfun$apply$1.apply(:73)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:782)
> at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:782)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15/09/20 22:39:10 WARN TaskSetManager: Lost task 0.0 in stage 14.0 (TID
> 16, localhost): java.lang.RuntimeException: hbase-default.xml file seems to
> be for and old version of HBase (null), this version is
> 0.98.4.2.2.4.2-2-hadoop2
>
>
> Thanks,
> Siva.
>


Fwd: Spark streaming DStream state on worker

2015-09-24 Thread Shixiong Zhu
+user, -dev

It's not clear about `compute` in your question. There are two `compute`
here.

1. DStream.compute: it always runs in the driver, and all RDDs are created
in the driver. E.g.,

DStream.foreachRDD(rdd => rdd.count())

"rdd.count()" is called in the driver.

2. RDD.compute: this will run in the executor and the location is not
guaranteed. E.g.,

DStream.foreachRDD(rdd => rdd.foreach { v =>
println(v)
})

"println(v)" is called in the executor.


Best Regards,
Shixiong Zhu

2015-09-17 3:47 GMT+08:00 Renyi Xiong <renyixio...@gmail.com>:

> Hi,
>
> I want to do temporal join operation on DStream across RDDs, my question
> is: Are RDDs from same DStream always computed on same worker (except
> failover) ?
>
> thanks,
> Renyi.
>


Re: JobScheduler: Error generating jobs for time for custom InputDStream

2015-09-24 Thread Shixiong Zhu
Looks like you returns a "Some(null)" in "compute". If you don't want to
create a RDD, it should return None. If you want to return an empty RDD, it
should return "Some(sc.emptyRDD)".

Best Regards,
Shixiong Zhu

2015-09-15 2:51 GMT+08:00 Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com>:

> Hi,
>
> I sent this message to the user list a few weeks ago with no luck, so I'm
> forwarding it to the dev list in case someone could give a hand with this.
> Thanks a lot in advance
>
>
> I've developed a ScalaCheck property for testing Spark Streaming
> transformations. To do that I had to develop a custom InputDStream, which
> is very similar to QueueInputDStream but has a method for adding new test
> cases for dstreams, which are objects of type Seq[Seq[A]], to the DStream.
> You can see the code at
> https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/main/scala/org/apache/spark/streaming/dstream/DynSeqQueueInputDStream.scala.
> I have developed a few properties that run in local mode
> https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/test/scala/es/ucm/fdi/sscheck/spark/streaming/ScalaCheckStreamingTest.scala.
> The problem is that when the batch interval is too small, and the machine
> cannot complete the batches fast enough, I get the following exceptions in
> the Spark log
>
> 15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time
> 1440580922500 ms
> java.lang.NullPointerException
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
> at
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at
> org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStr

Re: Local Mode: Executor thread leak?

2015-12-08 Thread Shixiong Zhu
Could you send a PR to fix it? Thanks!

Best Regards,
Shixiong Zhu

2015-12-08 13:31 GMT-08:00 Richard Marscher <rmarsc...@localytics.com>:

> Alright I was able to work through the problem.
>
> So the owning thread was one from the executor task launch worker, which
> at least in local mode runs the task and the related user code of the task.
> After judiciously naming every thread in the pools in the user code (with a
> custom ThreadFactory) I was able to trace down the leak to a couple thread
> pools that were not shut down properly by noticing the named threads
> accumulating in thread dumps of the JVM process.
>
> On Mon, Dec 7, 2015 at 6:41 PM, Richard Marscher <rmarsc...@localytics.com
> > wrote:
>
>> Thanks for the response.
>>
>> The version is Spark 1.5.2.
>>
>> Some examples of the thread names:
>>
>> pool-1061-thread-1
>> pool-1059-thread-1
>> pool-1638-thread-1
>>
>> There become hundreds then thousands of these stranded in WAITING.
>>
>> I added logging to try to track the lifecycle of the thread pool in
>> Executor as mentioned before. Here is an excerpt, but every seems fine
>> there. Every executor that starts is also shut down and it seems like it
>> shuts down fine.
>>
>> 15/12/07 23:30:21 WARN o.a.s.e.Executor: Threads finished in executor
>> driver. pool shut down 
>> java.util.concurrent.ThreadPoolExecutor@e5d036b[Terminated,
>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
>> 15/12/07 23:30:28 WARN o.a.s.e.Executor: Executor driver created, thread
>> pool: java.util.concurrent.ThreadPoolExecutor@3bc41ae3[Running, pool
>> size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
>> 15/12/07 23:31:06 WARN o.a.s.e.Executor: Threads finished in executor
>> driver. pool shut down 
>> java.util.concurrent.ThreadPoolExecutor@3bc41ae3[Terminated,
>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 36]
>> 15/12/07 23:31:11 WARN o.a.s.e.Executor: Executor driver created, thread
>> pool: java.util.concurrent.ThreadPoolExecutor@6e85ece4[Running, pool
>> size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
>> 15/12/07 23:34:35 WARN o.a.s.e.Executor: Threads finished in executor
>> driver. pool shut down 
>> java.util.concurrent.ThreadPoolExecutor@6e85ece4[Terminated,
>> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 288]
>>
>> Also here is an example thread dump of such a thread:
>>
>> "pool-493-thread-1" prio=10 tid=0x7f0e60612800 nid=0x18c4 waiting on
>> condition [0x7f0c33c3e000]
>>java.lang.Thread.State: WAITING (parking)
>> at sun.misc.Unsafe.park(Native Method)
>> - parking to wait for  <0x7f10b3e8fb60> (a
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>> at
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>> at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
>> at
>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>>     at
>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> On Mon, Dec 7, 2015 at 6:23 PM, Shixiong Zhu <zsxw...@gmail.com> wrote:
>>
>>> Which version are you using? Could you post these thread names here?
>>>
>>> Best Regards,
>>> Shixiong Zhu
>>>
>>> 2015-12-07 14:30 GMT-08:00 Richard Marscher <rmarsc...@localytics.com>:
>>>
>>>> Hi,
>>>>
>>>> I've been running benchmarks against Spark in local mode in a long
>>>> running process. I'm seeing threads leaking each time it runs a job. It
>>>> doesn't matter if I recycle SparkContext constantly or have 1 context stay
>>>> alive for the entire application lifetime.
>>>>
>>>> I see a huge accumulation ongoing of "pool--thread-1" threads with
>>>> the creating thread "Executor task launch worker-xx" where x's are numbers.
>>>> The number of leaks per launch worker varies but usually 1 to a few.
>>>>
>>>> Searching the Spark code the pool is created in the Executor class. It
>>>>

Re: Local Mode: Executor thread leak?

2015-12-07 Thread Shixiong Zhu
Which version are you using? Could you post these thread names here?

Best Regards,
Shixiong Zhu

2015-12-07 14:30 GMT-08:00 Richard Marscher <rmarsc...@localytics.com>:

> Hi,
>
> I've been running benchmarks against Spark in local mode in a long running
> process. I'm seeing threads leaking each time it runs a job. It doesn't
> matter if I recycle SparkContext constantly or have 1 context stay alive
> for the entire application lifetime.
>
> I see a huge accumulation ongoing of "pool--thread-1" threads with the
> creating thread "Executor task launch worker-xx" where x's are numbers. The
> number of leaks per launch worker varies but usually 1 to a few.
>
> Searching the Spark code the pool is created in the Executor class. It is
> `.shutdown()` in the stop for the executor. I've wired up logging and also
> tried shutdownNow() and awaitForTermination on the pools. Every seems okay
> there for every Executor that is called with `stop()` but I'm still not
> sure yet if every Executor is called as such, which I am looking into now.
>
> What I'm curious to know is if anyone has seen a similar issue?
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com <http://localytics.com/> | Our Blog
> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
> Facebook <http://facebook.com/localytics> | LinkedIn
> <http://www.linkedin.com/company/1148792?trk=tyah>
>


Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2015-12-17 Thread Shixiong Zhu
Streaming checkpoint doesn't support Accumulator or Broadcast. See
https://issues.apache.org/jira/browse/SPARK-5206

Here is a workaround:
https://issues.apache.org/jira/browse/SPARK-5206?focusedCommentId=14506806=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14506806

Best Regards,
Shixiong Zhu

2015-12-17 4:39 GMT-08:00 Bartłomiej Alberski <albers...@gmail.com>:

> I prepared simple example helping in reproducing problem:
>
> https://github.com/alberskib/spark-streaming-broadcast-issue
>
> I think that in that way it will be easier for you to understand problem
> and find solution (if any exists)
>
> Thanks
> Bartek
>
> 2015-12-16 23:34 GMT+01:00 Bartłomiej Alberski <albers...@gmail.com>:
>
>> First of all , thanks @tdas for looking into my problem.
>>
>> Yes, I checked it seperately and it is working fine. For below piece of
>> code there is no single exception and values are sent correctly.
>>
>> val reporter = new MyClassReporter(...)
>> reporter.send(...)
>> val out = new FileOutputStream("out123.txt")
>> val outO = new ObjectOutputStream(out)
>> outO.writeObject(reporter)
>> outO.flush()
>> outO.close()
>>
>> val in = new FileInputStream("out123.txt")
>> val inO = new ObjectInputStream(in)
>> val reporterFromFile  =
>> inO.readObject().asInstanceOf[StreamingGraphiteReporter]
>> reporterFromFile.send(...)
>> in.close()
>>
>> Maybe I am wrong but I think that it will be strange if class
>> implementing Serializable and properly broadcasted to executors cannot be
>> serialized and deserialized?
>> I also prepared slightly different piece of code and I received slightly
>> different exception. Right now it looks like:
>> java.lang.ClassCastException: [B cannot be cast to com.example.sender.
>> MyClassReporter.
>>
>> Maybe I am wrong but, it looks like that when restarting from checkpoint
>> it does read proper block of memory to read bytes for MyClassReporter.
>>
>> 2015-12-16 2:38 GMT+01:00 Tathagata Das <t...@databricks.com>:
>>
>>> Could you test serializing and deserializing the MyClassReporter  class
>>> separately?
>>>
>>> On Mon, Dec 14, 2015 at 8:57 AM, Bartłomiej Alberski <
>>> albers...@gmail.com> wrote:
>>>
>>>> Below is the full stacktrace(real names of my classes were changed)
>>>> with short description of entries from my code:
>>>>
>>>> rdd.mapPartitions{ partition => //this is the line to which second
>>>> stacktrace entry is pointing
>>>>   val sender =  broadcastedValue.value // this is the maing place to
>>>> which first stacktrace entry is pointing
>>>> }
>>>>
>>>> java.lang.ClassCastException:
>>>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>>>> com.example.sender.MyClassReporter
>>>> at com.example.flow.Calculator
>>>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87)
>>>> at com.example.flow.Calculator
>>>> $$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82)
>>>> at
>>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>>>> at
>>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
>>>> at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>> at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>>>> at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>>> at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>>>> at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> 2015-12-14 17:10 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:
>>>>
>>>>> Can you show

Re: Use of rdd.zipWithUniqueId() in DStream

2015-12-14 Thread Shixiong Zhu
It doesn't guarantee that. E.g.,

scala> sc.parallelize(Seq(1.0, 2.0, 3.0, 4.0), 2).filter(_ >
2.0).zipWithUniqueId().collect().foreach(println)

(3.0,1)

(4.0,3)

It only guarantees "unique".

Best Regards,
Shixiong Zhu

2015-12-13 10:18 GMT-08:00 Sourav Mazumder <sourav.mazumde...@gmail.com>:

> Hi All,
>
> I'm trying to use zipWithUniqieId() function of RDD using transform
> function of dStream. It does generate unique id always starting from 0 and
> in sequence.
>
> However, not sure whether this is a reliable behavior which is always
> guaranteed to generate sequence number starting form 0.
>
> Can anyone confirm ?
>
> Regards,
> Sourav
>


Re: Spark Streaming Application is Stuck Under Heavy Load Due to DeadLock

2016-01-04 Thread Shixiong Zhu
Hye Rachana, could you provide the full jstack outputs? Maybe it's same as
https://issues.apache.org/jira/browse/SPARK-11104

Best Regards,
Shixiong Zhu

2016-01-04 12:56 GMT-08:00 Rachana Srivastava <
rachana.srivast...@markmonitor.com>:

> Hello All,
>
>
>
> I am running my application on Spark cluster but under heavy load the
> system is hung due to deadlock.  I found similar issues resolved here
> https://datastax-oss.atlassian.net/browse/JAVA-555 in  Spark version
> 2.1.3.  But I am running on Spark 1.3 still getting the same issue.
>
>
>
> Here is the stack trace for reference:
>
>
>
> sun.misc.Unsafe.park(Native Method)
>
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
>
>
> org.apache.spark.streaming.ContextWaiter.waitForStopOrError(ContextWaiter.scala:63)
>
>
> org.apache.spark.streaming.StreamingContext.awaitTermination(StreamingContext.scala:521)
>
>
> org.apache.spark.streaming.api.java.JavaStreamingContext.awaitTermination(JavaStreamingContext.scala:592)
>
>
>
> sun.misc.Unsafe.park(Native Method)
>
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
>
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
>
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
>
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
>
> java.util.concurrent.Semaphore.acquire(Semaphore.java:317)
>
>
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:62)
>
>
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
>
>
> org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply(AsynchronousListenerBus.scala:61)
>
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
>
>
> org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:60)
>
>
>
> sun.misc.Unsafe.park(Native Method)
>
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
>
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
>
>
> org.spark-project.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:342)
>
>
> org.spark-project.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:526)
>
>
> org.spark-project.jetty.util.thread.QueuedThreadPool.access$600(QueuedThreadPool.java:44)
>
>
> org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
>
> java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> Thanks,
>
>
>
> Rachana
>
>
>


Re: spark-submit for dependent jars

2015-12-21 Thread Shixiong Zhu
Looks you need to add an "driver" option to your codes, such as

sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:oracle:thin:@:1521:xxx",
  "driver" -> "oracle.jdbc.driver.OracleDriver",
      "dbtable" -> "your_table_name")).load()

Best Regards,
Shixiong Zhu

2015-12-21 6:03 GMT-08:00 Jeff Zhang <zjf...@gmail.com>:

> Please make sure this is correct jdbc url,
> jdbc:oracle:thin:@:1521:xxx
>
>
>
> On Mon, Dec 21, 2015 at 9:54 PM, Madabhattula Rajesh Kumar <
> mrajaf...@gmail.com> wrote:
>
>> Hi Jeff and Satish,
>>
>> I have modified script and executed. Please find below command
>>
>> ./spark-submit --master local  --class test.Main --jars
>> /home/user/download/jar/ojdbc7.jar
>> /home//test/target/spark16-0.0.1-SNAPSHOT.jar
>>
>> Still I'm getting same exception.
>>
>>
>>
>> Exception in thread "main" java.sql.SQLException: No suitable driver
>> found for jdbc:oracle:thin:@:1521:xxx
>> at java.sql.DriverManager.getConnection(DriverManager.java:596)
>> at java.sql.DriverManager.getConnection(DriverManager.java:187)
>> at
>> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConnector$1.apply(JDBCRDD.scala:188)
>> at
>> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConnector$1.apply(JDBCRDD.scala:181)
>> at
>> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:121)
>> at
>> org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:91)
>> at
>> org.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:60)
>> at
>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125)
>> at
>> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
>> at com.cisco.ss.etl.utils.ETLHelper$class.getData(ETLHelper.scala:22)
>> at com.cisco.ss.etl.Main$.getData(Main.scala:9)
>> at com.cisco.ss.etl.Main$delayedInit$body.apply(Main.scala:13)
>> at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
>> at
>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>> at scala.App$$anonfun$main$1.apply(App.scala:71)
>> at scala.App$$anonfun$main$1.apply(App.scala:71)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
>> at scala.App$class.main(App.scala:71)
>> at com.cisco.ss.etl.Main$.main(Main.scala:9)
>> at com.cisco.ss.etl.Main.main(Main.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
>> at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>> Regards,
>> Rajesh
>>
>> On Mon, Dec 21, 2015 at 7:18 PM, satish chandra j <
>> jsatishchan...@gmail.com> wrote:
>>
>>> Hi Rajesh,
>>> Could you please try giving your cmd as mentioned below:
>>>
>>> ./spark-submit --master local  --class  --jars 
>>> 
>>>
>>> Regards,
>>> Satish Chandra
>>>
>>> On Mon, Dec 21, 2015 at 6:45 PM, Madabhattula Rajesh Kumar <
>>> mrajaf...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> How to add dependent jars in spark-submit command. For example: Oracle.
>>>> Could you please help me to resolve this issue
>>>>
>>>> I have a standalone cluster. One Master and One slave.
>>>>
>>>> I have used below command it is not working
>>>>
>>>> ./spark-submit --master local  --class test.Main
>>>> /test/target/spark16-0.0.1-SNAPSHOT.jar --jars
>>>> /home/user

Re: val listRDD =ssc.socketTextStream(localhost,9999) on Yarn

2015-12-22 Thread Shixiong Zhu
Just replace `localhost` with a host name that can be accessed by Yarn
containers.

Best Regards,
Shixiong Zhu

2015-12-22 0:11 GMT-08:00 prasadreddy <alle.re...@gmail.com>:

> How do we achieve this on yarn-cluster mode
>
> Please advice.
>
> Thanks
> Prasad
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/val-listRDD-ssc-socketTextStream-localhost--on-Yarn-tp25760.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: seriazable error in apache spark job

2015-12-18 Thread Shixiong Zhu
Looks you have a reference to some Akka class. Could you post your codes?

Best Regards,
Shixiong Zhu

2015-12-17 23:43 GMT-08:00 Pankaj Narang <pankajnaran...@gmail.com>:

> I am encountering below error. Can somebody guide ?
>
> Something similar is one this link
> https://github.com/elastic/elasticsearch-hadoop/issues/298
>
>
> actor.MentionCrawlActor
> java.io.NotSerializableException: actor.MentionCrawlActor
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> ~[na:1.7.0_79]
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> ~[na:1.7.0_79]
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> ~[na:1.7.0_79]
> at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> ~[na:1.7.0_79]
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> ~[na:1.7.0_79]
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> ~[na:1.7.0_79]
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> ~[na:1.7.0_79]
> at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> ~[na:1.7.0_79]
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> ~[na:1.7.0_79]
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/seriazable-error-in-apache-spark-job-tp25732.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Question about Spark Streaming checkpoint interval

2015-12-18 Thread Shixiong Zhu
You are right. "checkpointInterval" is only for data checkpointing.
"metadata checkpoint" is done for each batch. Feel free to send a PR to add
the missing doc.

Best Regards,
Shixiong Zhu

2015-12-18 8:26 GMT-08:00 Lan Jiang <ljia...@gmail.com>:

> Need some clarification about the documentation. According to Spark doc
>
> *"the default interval is a multiple of the batch interval that is at
> least 10 seconds. It can be set by
> using dstream.checkpoint(checkpointInterval). Typically, a checkpoint
> interval of 5 - 10 sliding intervals of a DStream is a good setting to
> try.”*
>
> My question is that does the *checkpointinterval* apply only for *data
> checkpointing* or it applies to *metadata checkpointing*? The API says
> dstream.checkpoint() is for "Enable periodic checkpointing of RDDs of this
> DStream”, implying it is only for data checkpointing. My understanding is
> that metadata checkpointing is for driver failure. For example, in Kafka
> direct API, driver keeps track of the offset range of each partition. So if
> metadata checkpoint is NOT done for each batch, in driver failure, some
> messages in Kafka is going to be replayed.
>
> I do not find the answer in the document saying *whether metadata
> checkpointing is done for each batch* and whether checkpointinterval
> setting applies to both types of checkpointing. Maybe I miss it. If anyone
> can point me to the right documentation, I would highly appreciate it.
>
> Best Regards,
>
> Lan
>


Re: pyspark + kafka + streaming = NoSuchMethodError

2015-12-17 Thread Shixiong Zhu
What's the Scala version of your Spark? Is it 2.10?

Best Regards,
Shixiong Zhu

2015-12-17 10:10 GMT-08:00 Christos Mantas <cman...@cslab.ece.ntua.gr>:

> Hello,
>
> I am trying to set up a simple example with Spark Streaming (Python) and
> Kafka on a single machine deployment.
> My Kafka broker/server is also on the same machine (localhost:1281) and I
> am using Spark Version: spark-1.5.2-bin-hadoop2.6
>
> Python code
>
> ...
> ssc = StreamingContext(sc, 1)
> ...
> lines = KafkaUtils.createDirectStream(ssc, ["test"],
> {"metadata.broker.list":"localhost:1281"})
>
>
> So I try
>
> spark-submit --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar
> my_kafka_streaming_wordcount.py
>
> OR
>
> spark-submit --packages  org.apache.spark:spark-streaming-kafka_2.11:1.5.2
> my_kafka_streaming_wordcount.py
> (my kafka version is 2.11-0.9.0.0)
>
> OR
>
> pyspark  --jars spark-streaming-kafka-assembly_2.11-1.5.2.jar  [import
> stuff and type those lines]
>
>
> and I end up with:
>
> 15/12/17 19:44:58 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/12/17 19:45:00 WARN MetricsSystem: Using default name DAGScheduler for
> source because spark.app.id is not set.
> Traceback (most recent call last):
>   File "/I/edited/the/path/here/my_kafka_streaming_wordcount.py", line 80,
> in 
> lines = KafkaUtils.createDirectStream(ssc, ["test"],
> {"metadata.broker.list":"localhost:1281"})
>   File
> "/opt/spark-1.5.2-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/kafka.py",
> line 130, in createDirectStream
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o29.createDirectStream.
> : java.lang.NoSuchMethodError:
> scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
> at kafka.api.RequestKeys$.(RequestKeys.scala:39)
> at kafka.api.RequestKeys$.(RequestKeys.scala)
> at kafka.api.TopicMetadataRequest.(TopicMetadataRequest.scala:53)
> at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:122)
> at
> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:409)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
> at
> org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:614)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:259)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
>
> Am I missing something?
>
> Thanks in advance
> Chris M.
>
>
>
>
>
>


Re: Help with Couchbase connector error

2015-11-26 Thread Shixiong Zhu
Het Eyal, I just checked the couchbase spark connector jar. The target
version of some of classes are Java 8 (52.0). You can create a ticket in
https://issues.couchbase.com/projects/SPARKC

Best Regards,
Shixiong Zhu

2015-11-26 9:03 GMT-08:00 Ted Yu <yuzhih...@gmail.com>:

> StoreMode is from Couchbase connector.
>
> Where did you obtain the connector ?
>
> See also
> http://stackoverflow.com/questions/1096148/how-to-check-the-jdk-version-used-to-compile-a-class-file
>
> On Thu, Nov 26, 2015 at 8:55 AM, Eyal Sharon <e...@scene53.com> wrote:
>
>> Hi ,
>> Great , that gave some directions. But can you elaborate more?  or share
>> some post
>> I am currently running JDK 7 , and  my Couchbase too
>>
>> Thanks !
>>
>> On Thu, Nov 26, 2015 at 6:02 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> This implies version mismatch between the JDK used to build your jar and
>>> the one at runtime.
>>>
>>> When building, target JDK 1.7
>>>
>>> There're plenty of posts on the web for dealing with such error.
>>>
>>> Cheers
>>>
>>> On Thu, Nov 26, 2015 at 7:31 AM, Eyal Sharon <e...@scene53.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to set a connection to Couchbase. I am at the very
>>>> beginning, and I got stuck on   this exception
>>>>
>>>> Exception in thread "main" java.lang.UnsupportedClassVersionError:
>>>> com/couchbase/spark/StoreMode : Unsupported major.minor version 52.0
>>>>
>>>>
>>>> Here is the simple code fragment
>>>>
>>>>   val sc = new SparkContext(cfg)
>>>>
>>>>   val doc1 = JsonDocument.create("doc1", JsonObject.create().put("some", 
>>>> "content"))
>>>>   val doc2 = JsonArrayDocument.create("doc2", JsonArray.from("more", 
>>>> "content", "in", "here"))
>>>>
>>>>
>>>>   val data = sc
>>>> .parallelize(Seq(doc1, doc2))
>>>> .saveToCouchbase()
>>>> }
>>>>
>>>>
>>>> Any help will be a bless
>>>>
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> *This email and any files transmitted with it are confidential and
>>>> intended solely for the use of the individual or entity to whom they are
>>>> addressed. Please note that any disclosure, copying or distribution of the
>>>> content of this information is strictly forbidden. If you have received
>>>> this email message in error, please destroy it immediately and notify its
>>>> sender.*
>>>>
>>>
>>>
>>
>> *This email and any files transmitted with it are confidential and
>> intended solely for the use of the individual or entity to whom they are
>> addressed. Please note that any disclosure, copying or distribution of the
>> content of this information is strictly forbidden. If you have received
>> this email message in error, please destroy it immediately and notify its
>> sender.*
>>
>
>


<    1   2