Re: Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-07-13 Thread Dibyendu Bhattacharya
You can get some good pointers in this JIRA

https://issues.apache.org/jira/browse/SPARK-15796

Dibyendu


On Thu, Jul 14, 2016 at 12:53 AM, Sunita  wrote:

> I am facing the same issue. Upgrading to Spark1.6 is causing hugh
> performance
> loss. Could you solve this issue? I am also attempting memory settings as
> mentioned
> http://spark.apache.org/docs/latest/configuration.html#memory-management
>
> But its not making a lot of difference. Appreciate your inputs on this
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-tp27056p27330.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Issue in spark job. Remote rpc client dissociated

2016-07-13 Thread Balachandar R.A.
>
> Hello Ted,
>


Thanks for the response. Here is the additional information.


> I am using spark 1.6.1  (spark-1.6.1-bin-hadoop2.6)
>
>
>
> Here is the code snippet
>
>
>
>
>
> JavaRDD add = jsc.parallelize(listFolders, listFolders.size());
>
> JavaRDD test = add.map(new Function() {
>
> @Override
>
> public Integer call(File file) throws Exception {
>
> String folder = file.getName();
>
> System.out.println("[x] Processing dataset from the
> directory " + folder);
>
> int status = 0;
>
>argsList[3] = argsList[3] + "/"+ folder;   // full path
> of the input folder. Input folder is in shared file system that every
> worker node has access to it. Something like (“/home/user/software/data/”)
> and folder name will be like (“20161307”)
>
> argsList[7] = argsList[7] + "/" + folder + ".csv"; //
> full path of the output.
>
> try{
>
> Launcher.main(argsList);  // Launcher class is a
> black box. It process the input folder and create a csv file which in the
> output location (argsList[7]). This is also in a shared file system
>
> status = 0;
>
> }
>
> catch(Exception e)
>
> {
>
> System.out.println("[x] Execution of import tool
> for the directory " + folder + "failed");
>
> status = 0;
>
> }
>
> accum.add(1);
>
> return status;
>
> }
>
> });
>
>
>
>
>
> Here is the spark-env.sh
>
>
>
> export SPARK_WORKER_INSTANCES=1
>
> export JAVA_HOME=/home/work_IW1/opt/jdk1.8.0_77/
>
> export HADOOP_CONF_DIR=/home/work_IW1/opt/hadoop-2.7.2/etc/hadoop
>
>
>
> Here is the spark-defaults.conf
>
>
>
>
>
>   spark.master spark:// master:7077
>
>   spark.eventLog.enabled   true
>
>   spark.eventLog.dir   hdfs://master:9000/sparkEvent
>
>   spark.serializer
> org.apache.spark.serializer.KryoSerializer
>
>   spark.driver.memory  4g
>
>
>


Hope it helps.


Re: Memory issue java.lang.OutOfMemoryError: Java heap space

2016-07-13 Thread Chanh Le
Can you show me at Spark UI -> executors tab and storage tab.
It will show us how many executor was executed and how much memory we use to 
cache.

 


> On Jul 14, 2016, at 9:49 AM, Jean Georges Perrin  wrote:
> 
> I use it as a standalone cluster.
> 
> I run it through start-master, then start-slave. I only have one slave now, 
> but I will probably have a few soon.
> 
> The "application" is run on a separate box.
> 
> When everything was running on my mac, i was in local mode, but i never setup 
> anything in local mode. Going "production" was a little more complex that I 
> thought.
> 
>> On Jul 13, 2016, at 10:35 PM, Chanh Le > > wrote:
>> 
>> Hi Jean,
>> How do you run your Spark Application? Local Mode, Cluster Mode? 
>> If you run in local mode did you use —driver-memory and —executor-memory 
>> because in local mode your setting about executor and driver didn’t work 
>> that you expected.
>> 
>> 
>> 
>> 
>>> On Jul 14, 2016, at 8:43 AM, Jean Georges Perrin >> > wrote:
>>> 
>>> Looks like replacing the setExecutorEnv() by set() did the trick... let's 
>>> see how fast it'll process my 50x 10ˆ15 data points...
>>> 
 On Jul 13, 2016, at 9:24 PM, Jean Georges Perrin > wrote:
 
 I have added:
 
SparkConf conf = new 
 SparkConf().setAppName("app").setExecutorEnv("spark.executor.memory", "8g")
.setMaster("spark://10.0.100.120:7077 
 ");
 
 but it did not change a thing
 
> On Jul 13, 2016, at 9:14 PM, Jean Georges Perrin  > wrote:
> 
> Hi,
> 
> I have a Java memory issue with Spark. The same application working on my 
> 8GB Mac crashes on my 72GB Ubuntu server...
> 
> I have changed things in the conf file, but it looks like Spark does not 
> care, so I wonder if my issues are with the driver or executor.
> 
> I set:
> 
> spark.driver.memory 20g
> spark.executor.memory   20g
> And, whatever I do, the crash is always at the same spot in the app, 
> which makes me think that it is a driver problem.
> 
> The exception I get is:
> 
> 16/07/13 20:36:30 WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 
> 208, micha.nc.rr.com): java.lang.OutOfMemoryError: Java heap space
> at java.nio.HeapCharBuffer.(HeapCharBuffer.java:57)
> at java.nio.CharBuffer.allocate(CharBuffer.java:335)
> at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:810)
> at org.apache.hadoop.io.Text.decode(Text.java:412)
> at org.apache.hadoop.io.Text.decode(Text.java:389)
> at org.apache.hadoop.io.Text.toString(Text.java:280)
> at 
> org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$org$apache$spark$sql$execution$datasources$json$JSONRelation$$createBaseRdd$1.apply(JSONRelation.scala:105)
> at 
> org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$org$apache$spark$sql$execution$datasources$json$JSONRelation$$createBaseRdd$1.apply(JSONRelation.scala:105)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
> at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
> at 
> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
> at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
> at 
> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1135)
> at 
> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1135)
> at 
> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
> at 
> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> 

Re: Memory issue java.lang.OutOfMemoryError: Java heap space

2016-07-13 Thread Jean Georges Perrin
I use it as a standalone cluster.

I run it through start-master, then start-slave. I only have one slave now, but 
I will probably have a few soon.

The "application" is run on a separate box.

When everything was running on my mac, i was in local mode, but i never setup 
anything in local mode. Going "production" was a little more complex that I 
thought.

> On Jul 13, 2016, at 10:35 PM, Chanh Le  wrote:
> 
> Hi Jean,
> How do you run your Spark Application? Local Mode, Cluster Mode? 
> If you run in local mode did you use —driver-memory and —executor-memory 
> because in local mode your setting about executor and driver didn’t work that 
> you expected.
> 
> 
> 
> 
>> On Jul 14, 2016, at 8:43 AM, Jean Georges Perrin > > wrote:
>> 
>> Looks like replacing the setExecutorEnv() by set() did the trick... let's 
>> see how fast it'll process my 50x 10ˆ15 data points...
>> 
>>> On Jul 13, 2016, at 9:24 PM, Jean Georges Perrin >> > wrote:
>>> 
>>> I have added:
>>> 
>>> SparkConf conf = new 
>>> SparkConf().setAppName("app").setExecutorEnv("spark.executor.memory", "8g")
>>> .setMaster("spark://10.0.100.120:7077 
>>> ");
>>> 
>>> but it did not change a thing
>>> 
 On Jul 13, 2016, at 9:14 PM, Jean Georges Perrin > wrote:
 
 Hi,
 
 I have a Java memory issue with Spark. The same application working on my 
 8GB Mac crashes on my 72GB Ubuntu server...
 
 I have changed things in the conf file, but it looks like Spark does not 
 care, so I wonder if my issues are with the driver or executor.
 
 I set:
 
 spark.driver.memory 20g
 spark.executor.memory   20g
 And, whatever I do, the crash is always at the same spot in the app, which 
 makes me think that it is a driver problem.
 
 The exception I get is:
 
 16/07/13 20:36:30 WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 
 208, micha.nc.rr.com): java.lang.OutOfMemoryError: Java heap space
 at java.nio.HeapCharBuffer.(HeapCharBuffer.java:57)
 at java.nio.CharBuffer.allocate(CharBuffer.java:335)
 at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:810)
 at org.apache.hadoop.io.Text.decode(Text.java:412)
 at org.apache.hadoop.io.Text.decode(Text.java:389)
 at org.apache.hadoop.io.Text.toString(Text.java:280)
 at 
 org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$org$apache$spark$sql$execution$datasources$json$JSONRelation$$createBaseRdd$1.apply(JSONRelation.scala:105)
 at 
 org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$org$apache$spark$sql$execution$datasources$json$JSONRelation$$createBaseRdd$1.apply(JSONRelation.scala:105)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
 at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
 at 
 scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
 at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
 at 
 org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1135)
 at 
 org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1135)
 at 
 org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
 at 
 org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
 at 
 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
 at 
 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
 at 
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
 at org.apache.spark.scheduler.Task.run(Task.scala:89)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 
 I have set a small memory "dumper" in my app. At the beginning, it says:
 
 **  Free . 

Re: Memory issue java.lang.OutOfMemoryError: Java heap space

2016-07-13 Thread Chanh Le
Hi Jean,
How do you run your Spark Application? Local Mode, Cluster Mode? 
If you run in local mode did you use —driver-memory and —executor-memory 
because in local mode your setting about executor and driver didn’t work that 
you expected.




> On Jul 14, 2016, at 8:43 AM, Jean Georges Perrin  wrote:
> 
> Looks like replacing the setExecutorEnv() by set() did the trick... let's see 
> how fast it'll process my 50x 10ˆ15 data points...
> 
>> On Jul 13, 2016, at 9:24 PM, Jean Georges Perrin > > wrote:
>> 
>> I have added:
>> 
>>  SparkConf conf = new 
>> SparkConf().setAppName("app").setExecutorEnv("spark.executor.memory", "8g")
>>  .setMaster("spark://10.0.100.120:7077 
>> ");
>> 
>> but it did not change a thing
>> 
>>> On Jul 13, 2016, at 9:14 PM, Jean Georges Perrin >> > wrote:
>>> 
>>> Hi,
>>> 
>>> I have a Java memory issue with Spark. The same application working on my 
>>> 8GB Mac crashes on my 72GB Ubuntu server...
>>> 
>>> I have changed things in the conf file, but it looks like Spark does not 
>>> care, so I wonder if my issues are with the driver or executor.
>>> 
>>> I set:
>>> 
>>> spark.driver.memory 20g
>>> spark.executor.memory   20g
>>> And, whatever I do, the crash is always at the same spot in the app, which 
>>> makes me think that it is a driver problem.
>>> 
>>> The exception I get is:
>>> 
>>> 16/07/13 20:36:30 WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 208, 
>>> micha.nc.rr.com): java.lang.OutOfMemoryError: Java heap space
>>> at java.nio.HeapCharBuffer.(HeapCharBuffer.java:57)
>>> at java.nio.CharBuffer.allocate(CharBuffer.java:335)
>>> at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:810)
>>> at org.apache.hadoop.io.Text.decode(Text.java:412)
>>> at org.apache.hadoop.io.Text.decode(Text.java:389)
>>> at org.apache.hadoop.io.Text.toString(Text.java:280)
>>> at 
>>> org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$org$apache$spark$sql$execution$datasources$json$JSONRelation$$createBaseRdd$1.apply(JSONRelation.scala:105)
>>> at 
>>> org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$org$apache$spark$sql$execution$datasources$json$JSONRelation$$createBaseRdd$1.apply(JSONRelation.scala:105)
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> at 
>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
>>> at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
>>> at 
>>> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
>>> at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1135)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1135)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>>> at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>> 
>>> I have set a small memory "dumper" in my app. At the beginning, it says:
>>> 
>>> **  Free . 1,413,566
>>> **  Allocated  1,705,984
>>> **  Max .. 16,495,104
>>> **> Total free ... 16,202,686
>>> Just before the crash, it says:
>>> 
>>> **  Free . 1,461,633
>>> **  Allocated  1,786,880
>>> **  Max .. 16,495,104
>>> **> Total free ... 16,169,857
>>> 
>>> 
>>> 
>>> 
>> 
> 



Re: Memory issue java.lang.OutOfMemoryError: Java heap space

2016-07-13 Thread Jean Georges Perrin
Looks like replacing the setExecutorEnv() by set() did the trick... let's see 
how fast it'll process my 50x 10ˆ15 data points...

> On Jul 13, 2016, at 9:24 PM, Jean Georges Perrin  wrote:
> 
> I have added:
> 
>   SparkConf conf = new 
> SparkConf().setAppName("app").setExecutorEnv("spark.executor.memory", "8g")
>   .setMaster("spark://10.0.100.120:7077 
> ");
> 
> but it did not change a thing
> 
>> On Jul 13, 2016, at 9:14 PM, Jean Georges Perrin > > wrote:
>> 
>> Hi,
>> 
>> I have a Java memory issue with Spark. The same application working on my 
>> 8GB Mac crashes on my 72GB Ubuntu server...
>> 
>> I have changed things in the conf file, but it looks like Spark does not 
>> care, so I wonder if my issues are with the driver or executor.
>> 
>> I set:
>> 
>> spark.driver.memory 20g
>> spark.executor.memory   20g
>> And, whatever I do, the crash is always at the same spot in the app, which 
>> makes me think that it is a driver problem.
>> 
>> The exception I get is:
>> 
>> 16/07/13 20:36:30 WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 208, 
>> micha.nc.rr.com): java.lang.OutOfMemoryError: Java heap space
>> at java.nio.HeapCharBuffer.(HeapCharBuffer.java:57)
>> at java.nio.CharBuffer.allocate(CharBuffer.java:335)
>> at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:810)
>> at org.apache.hadoop.io.Text.decode(Text.java:412)
>> at org.apache.hadoop.io.Text.decode(Text.java:389)
>> at org.apache.hadoop.io.Text.toString(Text.java:280)
>> at 
>> org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$org$apache$spark$sql$execution$datasources$json$JSONRelation$$createBaseRdd$1.apply(JSONRelation.scala:105)
>> at 
>> org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$org$apache$spark$sql$execution$datasources$json$JSONRelation$$createBaseRdd$1.apply(JSONRelation.scala:105)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at 
>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
>> at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
>> at 
>> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
>> at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1135)
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1135)
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>> at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> 
>> I have set a small memory "dumper" in my app. At the beginning, it says:
>> 
>> **  Free . 1,413,566
>> **  Allocated  1,705,984
>> **  Max .. 16,495,104
>> **> Total free ... 16,202,686
>> Just before the crash, it says:
>> 
>> **  Free . 1,461,633
>> **  Allocated  1,786,880
>> **  Max .. 16,495,104
>> **> Total free ... 16,169,857
>> 
>> 
>> 
>> 
> 



Re: Memory issue java.lang.OutOfMemoryError: Java heap space

2016-07-13 Thread Jean Georges Perrin
I have added:

SparkConf conf = new 
SparkConf().setAppName("app").setExecutorEnv("spark.executor.memory", "8g")
.setMaster("spark://10.0.100.120:7077");

but it did not change a thing

> On Jul 13, 2016, at 9:14 PM, Jean Georges Perrin  wrote:
> 
> Hi,
> 
> I have a Java memory issue with Spark. The same application working on my 8GB 
> Mac crashes on my 72GB Ubuntu server...
> 
> I have changed things in the conf file, but it looks like Spark does not 
> care, so I wonder if my issues are with the driver or executor.
> 
> I set:
> 
> spark.driver.memory 20g
> spark.executor.memory   20g
> And, whatever I do, the crash is always at the same spot in the app, which 
> makes me think that it is a driver problem.
> 
> The exception I get is:
> 
> 16/07/13 20:36:30 WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 208, 
> micha.nc.rr.com): java.lang.OutOfMemoryError: Java heap space
> at java.nio.HeapCharBuffer.(HeapCharBuffer.java:57)
> at java.nio.CharBuffer.allocate(CharBuffer.java:335)
> at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:810)
> at org.apache.hadoop.io.Text.decode(Text.java:412)
> at org.apache.hadoop.io.Text.decode(Text.java:389)
> at org.apache.hadoop.io.Text.toString(Text.java:280)
> at 
> org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$org$apache$spark$sql$execution$datasources$json$JSONRelation$$createBaseRdd$1.apply(JSONRelation.scala:105)
> at 
> org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$org$apache$spark$sql$execution$datasources$json$JSONRelation$$createBaseRdd$1.apply(JSONRelation.scala:105)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
> at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
> at 
> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
> at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
> at 
> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1135)
> at 
> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1135)
> at 
> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
> at 
> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 
> I have set a small memory "dumper" in my app. At the beginning, it says:
> 
> **  Free . 1,413,566
> **  Allocated  1,705,984
> **  Max .. 16,495,104
> **> Total free ... 16,202,686
> Just before the crash, it says:
> 
> **  Free . 1,461,633
> **  Allocated  1,786,880
> **  Max .. 16,495,104
> **> Total free ... 16,169,857
> 
> 
> 
> 



Memory issue java.lang.OutOfMemoryError: Java heap space

2016-07-13 Thread Jean Georges Perrin
Hi,

I have a Java memory issue with Spark. The same application working on my 8GB 
Mac crashes on my 72GB Ubuntu server...

I have changed things in the conf file, but it looks like Spark does not care, 
so I wonder if my issues are with the driver or executor.

I set:

spark.driver.memory 20g
spark.executor.memory   20g
And, whatever I do, the crash is always at the same spot in the app, which 
makes me think that it is a driver problem.

The exception I get is:

16/07/13 20:36:30 WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 208, 
micha.nc.rr.com): java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapCharBuffer.(HeapCharBuffer.java:57)
at java.nio.CharBuffer.allocate(CharBuffer.java:335)
at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:810)
at org.apache.hadoop.io.Text.decode(Text.java:412)
at org.apache.hadoop.io.Text.decode(Text.java:389)
at org.apache.hadoop.io.Text.toString(Text.java:280)
at 
org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$org$apache$spark$sql$execution$datasources$json$JSONRelation$$createBaseRdd$1.apply(JSONRelation.scala:105)
at 
org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$org$apache$spark$sql$execution$datasources$json$JSONRelation$$createBaseRdd$1.apply(JSONRelation.scala:105)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
at 
org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1135)
at 
org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1135)
at 
org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
at 
org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I have set a small memory "dumper" in my app. At the beginning, it says:

**  Free . 1,413,566
**  Allocated  1,705,984
**  Max .. 16,495,104
**> Total free ... 16,202,686
Just before the crash, it says:

**  Free . 1,461,633
**  Allocated  1,786,880
**  Max .. 16,495,104
**> Total free ... 16,169,857






Doing record linkage using string comparators in Spark

2016-07-13 Thread Linh Tran
Hi guys,

I'm hoping that someone can help me to make my setup more efficient. I'm trying 
to do record linkage across 2.5 billion records and have set myself up in Spark 
to handle the data. Right as of now, I'm relying on R (with the stringdist and 
RecordLinkage packages) to do the actual linkage. I'll transfer small batches 
in at a time from Spark to R, do the linkage, and send the resulting ID’s 
linking the records back. What I'd like to do is set up Spark so that the 
string distance measures (that I'm using in R) can be directly computed in 
Spark, thereby avoiding the data transfers. How can I go about doing this? An 
example of my data is provided below, along with example R code that I'm using 
below that (nb. “myMatchingFunction” calls functions from the stringdist and 
RecordLinkage packages). I'm open to switching to Python or Scala if I need to, 
or incorporating the C++ code for the string comparators into Spark. Thanks. 
—Linh

FIRSTNAME   LASTNAMEEMAIL   PHONE   ADDRESS DESIRED_ID
JohnSmith   johnsm...@domain.com 
  
   1234 Main St.   1
JohnSmith   1234567 1
J   Smith   johnsm...@domain.com 
 
1234567 1234 Main Street1
JaneSmith   2345678 2
JaneSmith   janesm...@domain.com 
 
2345678 5678 1st Street 2
JaneSmith   5678 First St.  2
JaneSmith   3


uk_breakcodes = read.df(sqlContext, "LT_IDMRQ_BREAK_CODES_UK.parquet", 
"parquet")
cache(uk_breakcodes)
registerTempTable(uk_breakcodes, "LT_IDMRQ_BREAK_CODES_UK")
tmp <- collect(sql(sqlContext, "select * from LT_IDMRQ_BREAK_CODES_UK where 
ADDRBRKCD = ‘XXX' or BADDRBRKCD=‘XXX' or OPBRKCD=‘XX'"))
matched = myMatchingFunction(tmp)

Spark HBase bulk load using hfile format

2016-07-13 Thread yeshwanth kumar
Hi i am doing bulk load into HBase as HFileFormat, by
using saveAsNewAPIHadoopFile

when i try to write i am getting an exception

 java.io.IOException: Added a key not lexically larger than previous.

following is the code snippet

case class HBaseRow(rowKey: ImmutableBytesWritable, kv: KeyValue)

val kAvroDF = sqlContext.read.format("com.databricks.spark.avro").load(args(0))
val kRDD = kAvroDF.select("seqid", "mi", "moc", "FID", "WID").rdd
val trRDD = kRDD.map(a => preparePUT(a(1).asInstanceOf[String],
a(3).asInstanceOf[String]))
val kvRDD = trRDD.flatMap(a => a).map(a => (a.rowKey, a.kv))
saveAsHFile(kvRDD, args(1))


prepare put returns a list of HBaseRow( ImmutableBytesWritable,KeyValue)
sorted on KeyValue, where i do a flat map on the rdd and
prepare a RDD(ImmutableBytesWritable,KeyValue) and pass it to saveASHFile

does flatmap operation on RDD changes the sorted order??


can anyone provide me how to resolve this issue.

Thanks,
-Yeshwanth


Re: Dense Vectors outputs in feature engineering

2016-07-13 Thread disha_dp
Hi Ian,
You can create a dense vector of you features as follows:

- String Index your features
- Invoke One Hot Encoding on them, which generates a sparse vector
   - Now, in case you wish to merge these features, then use VectorAssembler
(optional)
- After transforming the dataframe to return sparse vector/s (which you may
or may not assemble), you can  use Vectos.dense(vector.toArray()) on either
the individual One Hot features or the assembled sparse vector.

Hope this helps.

Cheers,
Disha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dense-Vectors-outputs-in-feature-engineering-tp27331p27332.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Dense Vectors outputs in feature engineering

2016-07-13 Thread rachmaninovquartet
Hi,

I'm trying to use the StringIndexer and OneHotEncoder, in order to vectorize
some of my features. Unfortunately, OneHotEncoder only returns sparse
vectors. I can't find a way, much less an efficient one, to convert the
columns generated by OneHotEncoder into dense vectors. I need this as I will
eventually be doing some deep learning on my data, not something internal to
spark.

If I were to update OneHotEncoder to have a setDense option, is there much
of a chance it would be accepted as a PR?

Since the first question seems unlikely, is there a way to change a
dataframe, with a sparse vector and index columns into columns, like the
pandas get_dummies method:
http://queirozf.com/entries/one-hot-encoding-a-feature-on-a-pandas-dataframe-an-example

or is there a better way to replicate the get_dummies functionality?

Thanks,

Ian





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dense-Vectors-outputs-in-feature-engineering-tp27331.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Online evaluation of MLLIB model

2016-07-13 Thread Jacek Laskowski
No real time in Spark unless near real time is your real time :-)

Use Spark ML Pipeline API inside Spark Streaming workflow.

Jacek
On 13 Jul 2016 5:57 p.m., "Danilo Rizzo"  wrote:

> Hi All, I'm trying to create a ML pipeline that is in charge of the model
> training.
> In my use case I have the need to evaluate the mode in real time from an
> external application; googling I saw that I can submit a spark job using
> the submit API.
>
> Not sure if this is the best way to achieve that, any thoughts? I'm
> wondering if it is able to manage a lot of requests of model evaluation
> keeping the response time faster enough to be used in a web application
>
> --
> Danilo Rizzo
>


Re: Structured Streaming and Microbatches

2016-07-13 Thread Jacek Laskowski
Hi,

It's still microbatching architecture with triggers as batchIntervals. It's
just faster by default and the API is more pleasant, i.e. Dataset-driven.

Jacek
On 13 Jul 2016 10:35 p.m., "Matthias Niehoff" <
matthias.nieh...@codecentric.de> wrote:

Hi everybody,

as far as I understand with new the structured Streaming API the output
will not get processed every x seconds anymore. Instead the data will be
processed as soon as is arrived. But there might be a delay due to
processing time for the data.

A small example:
Data comes in and the processing takes 1 second (quite long)
In this 1 second a lot of new data come in which will be processed after
the processing of the first data finished.

My questions are:
Is the data for each processing, i.e all the data collected in the 1 second
still processed as a microbatch (included reprocessing in case of failure
on another worker, etc.)? Or is the bulk of data processed one by one?

With regards to the processing time: is the behavior the same for high
processing times as in spark 1.x? Meaning we get a scheduling delay, data
is stored by a receiver,.. (is there even a concept of receiver in Spark 2?
Is a source in streaming basically a receiver?)

Hope those questions aren’t to confusing :-)

Thank you!


Re: Issue in spark job. Remote rpc client dissociated

2016-07-13 Thread Ted Yu
Which Spark release are you using ?

Can you disclose what the folder processing does (code snippet is better) ?

Thanks

On Wed, Jul 13, 2016 at 9:44 AM, Balachandar R.A. 
wrote:

> Hello
>
> In one of my use cases, i need to process list of folders in parallel. I
> used
> Sc.parallelize (list,list.size).map(" logic to process the folder").
> I have a six node cluster and there are six folders to process.  Ideally i
> expect that each of my node process one folder.  But,  i see that a node
> process multiple folders while one or two of the nodes do not get any job.
> In the end, the spark- submit crashes with the exception saying "remote RPC
> client dissociated". Can someone give me a hint on what's going wrong here?
> Please note that this issue does not arise if i comment my logic that
> processes the folder but simply print folder name. In this case,  every
> node gets one folder to process.  I inserted a sleep of 40 seconds inside
> the map. No issue. But when i uncomment my logic i see this issue. Also,
> before crashing it does process some of the folders successfully.
> Successfully means the business logic generates a file in a shared file
> system.
>
> Regards
> Bala
>


Structured Streaming and Microbatches

2016-07-13 Thread Matthias Niehoff
Hi everybody,

as far as I understand with new the structured Streaming API the output
will not get processed every x seconds anymore. Instead the data will be
processed as soon as is arrived. But there might be a delay due to
processing time for the data.

A small example:
Data comes in and the processing takes 1 second (quite long)
In this 1 second a lot of new data come in which will be processed after
the processing of the first data finished.

My questions are:
Is the data for each processing, i.e all the data collected in the 1 second
still processed as a microbatch (included reprocessing in case of failure
on another worker, etc.)? Or is the bulk of data processed one by one?

With regards to the processing time: is the behavior the same for high
processing times as in spark 1.x? Meaning we get a scheduling delay, data
is stored by a receiver,.. (is there even a concept of receiver in Spark 2?
Is a source in streaming basically a receiver?)

Hope those questions aren’t to confusing :-)

Thank you!


Re: Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-07-13 Thread Sunita
I am facing the same issue. Upgrading to Spark1.6 is causing hugh performance
loss. Could you solve this issue? I am also attempting memory settings as
mentioned
http://spark.apache.org/docs/latest/configuration.html#memory-management

But its not making a lot of difference. Appreciate your inputs on this



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-tp27056p27330.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Website

2016-07-13 Thread Reynold Xin
Thanks for reporting. This is due to
https://issues.apache.org/jira/servicedesk/agent/INFRA/issue/INFRA-12055



On Wed, Jul 13, 2016 at 11:52 AM, Pradeep Gollakota 
wrote:

> Worked for me if I go to https://spark.apache.org/site/ but not
> https://spark.apache.org
>
> On Wed, Jul 13, 2016 at 11:48 AM, Maurin Lenglart 
> wrote:
>
>> Same here
>>
>>
>>
>> *From: *Benjamin Kim 
>> *Date: *Wednesday, July 13, 2016 at 11:47 AM
>> *To: *manish ranjan 
>> *Cc: *user 
>> *Subject: *Re: Spark Website
>>
>>
>>
>> It takes me to the directories instead of the webpage.
>>
>>
>>
>> On Jul 13, 2016, at 11:45 AM, manish ranjan 
>> wrote:
>>
>>
>>
>> working for me. What do you mean 'as supposed to'?
>>
>>
>> ~Manish
>>
>>
>>
>> On Wed, Jul 13, 2016 at 11:45 AM, Benjamin Kim 
>> wrote:
>>
>> Has anyone noticed that the spark.apache.org is not working as supposed
>> to?
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>>
>>
>>
>
>


Re: Spark Website

2016-07-13 Thread Pradeep Gollakota
Worked for me if I go to https://spark.apache.org/site/ but not
https://spark.apache.org

On Wed, Jul 13, 2016 at 11:48 AM, Maurin Lenglart 
wrote:

> Same here
>
>
>
> *From: *Benjamin Kim 
> *Date: *Wednesday, July 13, 2016 at 11:47 AM
> *To: *manish ranjan 
> *Cc: *user 
> *Subject: *Re: Spark Website
>
>
>
> It takes me to the directories instead of the webpage.
>
>
>
> On Jul 13, 2016, at 11:45 AM, manish ranjan  wrote:
>
>
>
> working for me. What do you mean 'as supposed to'?
>
>
> ~Manish
>
>
>
> On Wed, Jul 13, 2016 at 11:45 AM, Benjamin Kim  wrote:
>
> Has anyone noticed that the spark.apache.org is not working as supposed
> to?
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>
>


Re: Spark Website

2016-07-13 Thread Maurin Lenglart
Same here

From: Benjamin Kim 
Date: Wednesday, July 13, 2016 at 11:47 AM
To: manish ranjan 
Cc: user 
Subject: Re: Spark Website

It takes me to the directories instead of the webpage.

On Jul 13, 2016, at 11:45 AM, manish ranjan 
> wrote:

working for me. What do you mean 'as supposed to'?

~Manish


On Wed, Jul 13, 2016 at 11:45 AM, Benjamin Kim 
> wrote:
Has anyone noticed that the spark.apache.org is not 
working as supposed to?


-
To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org




Re: Spark Website

2016-07-13 Thread Benjamin Kim
It takes me to the directories instead of the webpage.

> On Jul 13, 2016, at 11:45 AM, manish ranjan  wrote:
> 
> working for me. What do you mean 'as supposed to'?
> 
> ~Manish
> 
> 
> 
> On Wed, Jul 13, 2016 at 11:45 AM, Benjamin Kim  > wrote:
> Has anyone noticed that the spark.apache.org  is 
> not working as supposed to?
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 



Re: Spark Website

2016-07-13 Thread manish ranjan
working for me. What do you mean 'as supposed to'?

~Manish



On Wed, Jul 13, 2016 at 11:45 AM, Benjamin Kim  wrote:

> Has anyone noticed that the spark.apache.org is not working as supposed
> to?
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark Website

2016-07-13 Thread Benjamin Kim
Has anyone noticed that the spark.apache.org is not working as supposed to?


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Tools for Balancing Partitions by Size

2016-07-13 Thread Pedro Rodriguez
Hi Gourav,

In our case, we process raw logs into parquet tables that downstream
applications can use for other jobs. The desired outcome is that we only
need to worry about unbalanced input data at the preprocess step so that
downstream jobs can assume balanced input data.

In our specific case, this works because although the raw log rows are of
variable size, the rows in the Spark SQL table are of fixed size by parsing
primitives or chopping arrays. Due to this, in our use case it makes sense
to think in terms of balanced file size because it directly correlates to
having a balanced number of rows/partition and thus balanced partitions.

Given this setting, are there any specific issues you foresee? I agree that
file size isn't a general solution, but in the setting I don't see a reason
it should not work.

Our overall goal is to avoid two problems when we write data to S3:
- Large number of small files (Kbs) since this makes S3 listing take a long
time
- Small number of large files (GBs) since this makes reads not as efficient

Thus far, we have done this on a per-application basis with repartition and
a manually tuned number of partitions, but this is inconvenient. We are
interested in seeing if there is a way to automatically infer the number of
partitions we should use so that our files in S3 have a particular average
size (without incurring too high an overhead cost).

The solution that seems most promising right now is:

   - Define a custom write function which does two steps:
   - Write one partition to S3 and get files size and number of records
   - Use that to determine the number of partitions to repartition to, then
   write everything to S3

What seems unclear is how to compute the parent RDD (suppose its an RDD
with wide dependencies like a join), get one partition for step (1), then
not recompute anything to do step (2) without an explicit cache. This would
make it so the additional overhead on the job is on writing one partition
to S3 which seems like an acceptable level of overhead. Perhaps this could
be accomplished by saying: RDD A computes the size of on partition, RDD B
holds all partitions except for the one from A, the parents of A and B are
the original parent RDD, RDD C has parents A and B and has the overall
write balanced function.

Thanks,
Pedro

On Wed, Jul 13, 2016 at 9:10 AM, Gourav Sengupta 
wrote:

> Hi,
>
> Using file size is a very bad way of managing data provided you think that
> volume, variety and veracity does not holds true. Actually its a very bad
> way of thinking and designing data solutions, you are bound to hit bottle
> necks, optimization issues, and manual interventions.
>
> I have found thinking about data in logical partitions helps overcome most
> of the design problems that is mentioned above.
>
> You can either use reparition with shuffling or colasce with shuffle
> turned off to manage loads.
>
> If you are using HIVE just let me know.
>
>
> Regards,
> Gourav Sengupta
>
> On Wed, Jul 13, 2016 at 5:39 AM, Pedro Rodriguez 
> wrote:
>
>> The primary goal for balancing partitions would be for the write to S3.
>> We would like to prevent unbalanced partitions (can do with repartition),
>> but also avoid partitions that are too small or too large.
>>
>> So for that case, getting the cache size would work Maropu if its roughly
>> accurate, but for data ingest we aren’t caching, just writing straight
>> through to S3.
>>
>> The idea for writing to disk and checking for the size is interesting
>> Hatim. For certain jobs, it seems very doable to write a small percentage
>> of the data to S3, check the file size through the AWS API, and use that to
>> estimate the total size. Thanks for the idea.
>>
>> —
>> Pedro Rodriguez
>> PhD Student in Large-Scale Machine Learning | CU Boulder
>> Systems Oriented Data Scientist
>> UC Berkeley AMPLab Alumni
>>
>> pedrorodriguez.io | 909-353-4423
>> github.com/EntilZha | LinkedIn
>> 
>>
>> On July 12, 2016 at 7:26:17 PM, Hatim Diab (timd...@gmail.com) wrote:
>>
>> Hi,
>>
>> Since the final size depends on data types and compression. I've had to
>> first get a rough estimate of data, written to disk, then compute the
>> number of partitions.
>>
>> partitions = int(ceil(size_data * conversion_ratio / block_size))
>>
>> In my case block size 256mb, source txt & dest is snappy parquet,
>> compression_ratio .6
>>
>> df.repartition(partitions).write.parquet(output)
>>
>> Which yields files in the range of 230mb.
>>
>> Another way was to count and come up with an imperial formula.
>>
>> Cheers,
>> Hatim
>>
>>
>> On Jul 12, 2016, at 9:07 PM, Takeshi Yamamuro 
>> wrote:
>>
>> Hi,
>>
>> There is no simple way to access the size in a driver side.
>> Since the partitions of primitive typed data (e.g., int) are compressed
>> by `DataFrame#cache`,
>> the actual size is possibly a little bit different from 

Re: When worker is killed driver continues to run causing issues in supervise mode

2016-07-13 Thread Noorul Islam Kamal Malmiyoda
Adding dev list
On Jul 13, 2016 5:38 PM, "Noorul Islam K M"  wrote:

>
> Spark version: 1.6.1
> Cluster Manager: Standalone
>
> I am experimenting with cluster mode deployment along with supervise for
> high availability of streaming applications.
>
> 1. Submit a streaming job in cluster mode with supervise
> 2. Say that driver is scheduled on worker1. The app started
>successfully.
> 3. Kill worker1 java process. This does not kill driver process and
>hence the application (context) is still alive.
> 4. Because of supervise flag, driver gets scheduled to new worker
>worker2 and hence a new context is created, making it a duplicate.
>
> I think this seems to be a bug.
>
> Regards,
> Noorul
>


Issue in spark job. Remote rpc client dissociated

2016-07-13 Thread Balachandar R.A.
Hello

In one of my use cases, i need to process list of folders in parallel. I
used
Sc.parallelize (list,list.size).map(" logic to process the folder").
I have a six node cluster and there are six folders to process.  Ideally i
expect that each of my node process one folder.  But,  i see that a node
process multiple folders while one or two of the nodes do not get any job.
In the end, the spark- submit crashes with the exception saying "remote RPC
client dissociated". Can someone give me a hint on what's going wrong here?
Please note that this issue does not arise if i comment my logic that
processes the folder but simply print folder name. In this case,  every
node gets one folder to process.  I inserted a sleep of 40 seconds inside
the map. No issue. But when i uncomment my logic i see this issue. Also,
before crashing it does process some of the folders successfully.
Successfully means the business logic generates a file in a shared file
system.

Regards
Bala


Re: Spark Thrift Server performance

2016-07-13 Thread Mich Talebzadeh
Thanks guys

Any idea on this

What is the limit on the number of users accessing the thrift server
concurrently. Say using Yarn, will Yarn control apps accessing the thrift
server or users each armed with beeline connect to thrift server. Say my
STS has this conf below

--driver-memory 8G \
--num-executors 2 \
--executor-memory 8G \
--executor-cores 4 \

I may be not making sense but if the sigma of resources for the number of
"concurrent" connections exceed above STS limits, I gather the new
connections will hang? This assumes that we are just running a single STS
in a single node.

Cheers






Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 July 2016 at 17:09, ayan guha  wrote:

> Not really, that is not the primary intention. Our main goal is poor man's
> high availability (as STS does not provide HA mechanism like HS2) :).
> Additionally, we have made STS part of Ambari AUTO_START group, so Ambari
> brings up STS if it goes down for some intermittent reason.
>
>
>
> On Thu, Jul 14, 2016 at 1:38 AM, Michael Segel 
> wrote:
>
>> Hey, silly question?
>>
>> If you’re running a load balancer, are you trying to reuse the RDDs
>> between jobs?
>>
>> TIA
>> -Mike
>>
>> On Jul 13, 2016, at 9:08 AM, ayan guha  wrote:
>>
>> My 2 cents:
>>
>> Yes, we are running multiple STS (we are running on different nodes, but
>> you can run on same node, different ports). Using Ambari, it is really
>> convenient to manage.
>>
>> We have set up a nginx load balancer as well pointing to both services
>> and all our external BI tools connect to the load balancer.
>>
>> STS works as an YARN Client application, where STS is the driver.
>>
>>
>>
>> On Wed, Jul 13, 2016 at 5:33 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I need some feedback on the performance of the Spark Thrift Server (STS)
>>>
>>> As far I can ascertain one can start STS passing the usual spark
>>> parameters
>>>
>>> ${SPARK_HOME}/sbin/start-thriftserver.sh \
>>> --master spark://50.140.197.217:7077 \
>>> --hiveconf hive.server2.thrift.port=10055 \
>>> --packages  \
>>> --driver-memory 2G \
>>> --num-executors 2 \
>>> --executor-memory 2G \
>>> --conf "spark.scheduler.mode=FAIR" \
>>> --conf
>>> "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
>>> -XX:+PrintGCTimeStamps" \
>>> --jars  \
>>> --conf "spark.ui.port=12345"
>>>
>>>
>>>   And accessing it via beeline JDBC client
>>>
>>> beeline -u jdbc:hive2://rhes564:10055 -n hduser -p
>>>
>>> Now the questions I have
>>>
>>>
>>>1. What is the limit on the number of users accessing the thrift
>>>server.
>>>2. Clearly the thrift server can start with resource configuration.
>>>In a simple way does STS act as a gateway to Spark (meaning Spark apps 
>>> can
>>>use their own resources) or one is limited to resource that STS offers?
>>>3. Can one start multiple thrift servers
>>>
>>> As far as I can see STS is equivalent to Spark SQL accessing Hive DW.
>>> Indeed this is what it says:
>>>
>>> Connecting to jdbc:hive2://rhes564:10055
>>> Connected to: Spark SQL (version 1.6.1)
>>> Driver: Spark Project Core (version 1.6.1)
>>> Transaction isolation: TRANSACTION_REPEATABLE_READ
>>> Beeline version 1.6.1 by Apache Hive
>>> 0: jdbc:hive2://rhes564:10055>
>>>
>>> Thanks
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Spark Thrift Server performance

2016-07-13 Thread ayan guha
Not really, that is not the primary intention. Our main goal is poor man's
high availability (as STS does not provide HA mechanism like HS2) :).
Additionally, we have made STS part of Ambari AUTO_START group, so Ambari
brings up STS if it goes down for some intermittent reason.



On Thu, Jul 14, 2016 at 1:38 AM, Michael Segel 
wrote:

> Hey, silly question?
>
> If you’re running a load balancer, are you trying to reuse the RDDs
> between jobs?
>
> TIA
> -Mike
>
> On Jul 13, 2016, at 9:08 AM, ayan guha  wrote:
>
> My 2 cents:
>
> Yes, we are running multiple STS (we are running on different nodes, but
> you can run on same node, different ports). Using Ambari, it is really
> convenient to manage.
>
> We have set up a nginx load balancer as well pointing to both services and
> all our external BI tools connect to the load balancer.
>
> STS works as an YARN Client application, where STS is the driver.
>
>
>
> On Wed, Jul 13, 2016 at 5:33 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> I need some feedback on the performance of the Spark Thrift Server (STS)
>>
>> As far I can ascertain one can start STS passing the usual spark
>> parameters
>>
>> ${SPARK_HOME}/sbin/start-thriftserver.sh \
>> --master spark://50.140.197.217:7077 \
>> --hiveconf hive.server2.thrift.port=10055 \
>> --packages  \
>> --driver-memory 2G \
>> --num-executors 2 \
>> --executor-memory 2G \
>> --conf "spark.scheduler.mode=FAIR" \
>> --conf
>> "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
>> -XX:+PrintGCTimeStamps" \
>> --jars  \
>> --conf "spark.ui.port=12345"
>>
>>
>>   And accessing it via beeline JDBC client
>>
>> beeline -u jdbc:hive2://rhes564:10055 -n hduser -p
>>
>> Now the questions I have
>>
>>
>>1. What is the limit on the number of users accessing the thrift
>>server.
>>2. Clearly the thrift server can start with resource configuration.
>>In a simple way does STS act as a gateway to Spark (meaning Spark apps can
>>use their own resources) or one is limited to resource that STS offers?
>>3. Can one start multiple thrift servers
>>
>> As far as I can see STS is equivalent to Spark SQL accessing Hive DW.
>> Indeed this is what it says:
>>
>> Connecting to jdbc:hive2://rhes564:10055
>> Connected to: Spark SQL (version 1.6.1)
>> Driver: Spark Project Core (version 1.6.1)
>> Transaction isolation: TRANSACTION_REPEATABLE_READ
>> Beeline version 1.6.1 by Apache Hive
>> 0: jdbc:hive2://rhes564:10055>
>>
>> Thanks
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
>


-- 
Best Regards,
Ayan Guha


Online evaluation of MLLIB model

2016-07-13 Thread Danilo Rizzo
Hi All, I'm trying to create a ML pipeline that is in charge of the model
training.
In my use case I have the need to evaluate the mode in real time from an
external application; googling I saw that I can submit a spark job using
the submit API.

Not sure if this is the best way to achieve that, any thoughts? I'm
wondering if it is able to manage a lot of requests of model evaluation
keeping the response time faster enough to be used in a web application

-- 
Danilo Rizzo


Re: Dependencies with runing Spark Streaming on Mesos cluster using Python

2016-07-13 Thread Shuai Lin
I think there are two options for you:

First you can set `--conf spark.mesos.executor.docker.image=
adolphlwq/mesos-for-spark-exector-image:1.6.0.beta2` in your spark submit
args, so mesos would launch the executor with your custom image.

Or you can remove the `local:` prefix in the --jars flag, this way the
executors would download the jars from your spark driver.



On Wed, Jul 13, 2016 at 9:08 PM, Luke Adolph  wrote:

> Update:
> I revuild my mesos-exector-image ,I download
> *spark-streaming-kafka_2.10-1.6.0.jar* on *`/linker/jars`*
>
> I change my submit command:
>
> dcos spark run \ --submit-args='--jars
>> local:/linker/jars/spark-streaming-kafka_2.10-1.6.0.jar  spark2cassandra.py
>> 10.140.0.14:2181 wlu_spark2cassandra' --docker-image
>> adolphlwq/mesos-for-spark-exector-image:1.6.0.beta2
>
>
> Where I get new stderr output on mesos:
>
>
> ​
> I only problem is submit the dependency
> spark-streaming-kafka_2.10-1.6.0.jar to worker.
>
> Thanks.
>
>
> 2016-07-13 18:57 GMT+08:00 Luke Adolph :
>
>> Hi all:
>> My spark runs on mesos.I write a spark streaming app using python, code
>> on GitHub .
>>
>> The app has dependency "
>> *org.apache.spark:spark-streaming-kafka_2.10:1.6.1*".
>>
>> Spark on mesos has two important concepts: Spark Framework and Spark
>> exector.
>>
>> I set my exector run in docker image.The docker image Dockerfile
>> 
>>  is
>> below:
>>
>> # refer '
>>> http://spark.apache.org/docs/latest/running-on-mesos.html#spark-properties'
>>> on 'spark.mesos.executor.docker.image' section
>>
>> FROM ubuntu:14.04
>>> WORKDIR /linker
>>> RUN ln -f -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
>>> #download mesos
>>> RUN echo "deb http://repos.mesosphere.io/ubuntu/ trusty main" >
>>> /etc/apt/sources.list.d/mesosphere.list && \
>>> apt-key adv --keyserver keyserver.ubuntu.com --recv E56151BF && \
>>> apt-get update && \
>>> apt-get -y install mesos=0.28.1-2.0.20.ubuntu1404 openjdk-7-jre
>>> python-pip git vim curl
>>> RUN git clone https://github.com/adolphlwq/linkerProcessorSample.git &&
>>> \
>>> pip install -r linkerProcessorSample/docker/requirements.txt
>>> RUN curl -fL
>>> http://archive.apache.org/dist/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz
>>> | tar xzf - -C /usr/local && \
>>> apt-get clean
>>> ENV MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so \
>>> JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 \
>>> SPARK_HOME=/usr/local/spark-1.6.0-bin-hadoop2.6
>>> ENV PATH=$JAVA_HOME/bin:$PATH
>>> WORKDIR $SPARK_HOME
>>
>>
>> When I use below command to submit my app program:
>>
>> dcos spark run --submit-args='--packages
>>> org.apache.spark:spark-streaming-kafka_2.10:1.6.1 \
>>>spark2cassandra.py zk topic' \
>>> -docker-image=adolphlwq/mesos-for-spark-exector-image:1.6.0.beta
>>
>>
>> The exector docker container run successfully, but it has no package for
>> *org.apache.spark:spark-streaming-kafka_2.10:1.6.1*.
>>
>> The *stderr* om mesos is:
>>
>> I0713 09:34:52.715551 18124 logging.cpp:188] INFO level logging started!
>>> I0713 09:34:52.717797 18124 fetcher.cpp:424] Fetcher Info:
>>> {"cache_directory":"\/tmp\/mesos\/fetch\/slaves\/6097419e-c2d0-4e5f-9a91-e5815de640c4-S4","items":[{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/home\/ubuntu\/spark2cassandra.py"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.apache.spark_spark-streaming-kafka_2.10-1.6.0.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/com.101tec_zkclient-0.3.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.apache.kafka_kafka_2.10-0.8.2.1.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.slf4j_slf4j-api-1.7.10.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.spark-project.spark_unused-1.0.0.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/net.jpountz.lz4_lz4-1.3.0.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/log4j_log4j-1.2.17.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/com.yammer.metrics_metrics-core-2.2.0.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.apache.kafka_kafka-clients-0.8.2.1.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.xerial.snappy_snappy-java-1.1.2.jar"}}],"sandbox_directory":"\/tmp\/mesos\/slaves\/6097419e-c2d0-4e5f-9a91-e5815de640c4-S4\/frameworks\/7399b6f7-5dcd-4a9b-9846-e7948d5ffd11-0024\/executors\/driver-20160713093451-0015\/runs\/84419372-9482-4c58-8f87-4ba528b6885c"}
>>> I0713 09:34:52.719846 18124 fetcher.cpp:379] Fetching URI
>>> '/home/ubuntu/spark2cassandra.py'
>>> I0713 

Re: Spark Thrift Server performance

2016-07-13 Thread Michael Segel
Hey, silly question? 

If you’re running a load balancer, are you trying to reuse the RDDs between 
jobs? 

TIA
-Mike

> On Jul 13, 2016, at 9:08 AM, ayan guha  > wrote:
> 
> My 2 cents:
> 
> Yes, we are running multiple STS (we are running on different nodes, but you 
> can run on same node, different ports). Using Ambari, it is really convenient 
> to manage. 
> 
> We have set up a nginx load balancer as well pointing to both services and 
> all our external BI tools connect to the load balancer. 
> 
> STS works as an YARN Client application, where STS is the driver. 
> 
> 
> 
> On Wed, Jul 13, 2016 at 5:33 PM, Mich Talebzadeh  > wrote:
> Hi,
> 
> I need some feedback on the performance of the Spark Thrift Server (STS) 
> 
> As far I can ascertain one can start STS passing the usual spark parameters
> 
> ${SPARK_HOME}/sbin/start-thriftserver.sh \
> --master spark://50.140.197.217:7077 
>  \
> --hiveconf hive.server2.thrift.port=10055 \
> --packages  \
> --driver-memory 2G \
> --num-executors 2 \
> --executor-memory 2G \
> --conf "spark.scheduler.mode=FAIR" \
> --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails 
> -XX:+PrintGCTimeStamps" \
> --jars  \
> --conf "spark.ui.port=12345" 
> 
> 
>   And accessing it via beeline JDBC client
> 
> beeline -u jdbc:hive2://rhes564:10055 -n hduser -p
> 
> Now the questions I have
> 
> What is the limit on the number of users accessing the thrift server.
> Clearly the thrift server can start with resource configuration. In a simple 
> way does STS act as a gateway to Spark (meaning Spark apps can use their own 
> resources) or one is limited to resource that STS offers?
> Can one start multiple thrift servers
> As far as I can see STS is equivalent to Spark SQL accessing Hive DW. Indeed 
> this is what it says:
> 
> Connecting to jdbc:hive2://rhes564:10055
> Connected to: Spark SQL (version 1.6.1)
> Driver: Spark Project Core (version 1.6.1)
> Transaction isolation: TRANSACTION_REPEATABLE_READ
> Beeline version 1.6.1 by Apache Hive
> 0: jdbc:hive2://rhes564:10055>
> 
> Thanks
> 
>  
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha



Re: Tools for Balancing Partitions by Size

2016-07-13 Thread Gourav Sengupta
Hi,

Using file size is a very bad way of managing data provided you think that
volume, variety and veracity does not holds true. Actually its a very bad
way of thinking and designing data solutions, you are bound to hit bottle
necks, optimization issues, and manual interventions.

I have found thinking about data in logical partitions helps overcome most
of the design problems that is mentioned above.

You can either use reparition with shuffling or colasce with shuffle turned
off to manage loads.

If you are using HIVE just let me know.


Regards,
Gourav Sengupta

On Wed, Jul 13, 2016 at 5:39 AM, Pedro Rodriguez 
wrote:

> The primary goal for balancing partitions would be for the write to S3. We
> would like to prevent unbalanced partitions (can do with repartition), but
> also avoid partitions that are too small or too large.
>
> So for that case, getting the cache size would work Maropu if its roughly
> accurate, but for data ingest we aren’t caching, just writing straight
> through to S3.
>
> The idea for writing to disk and checking for the size is interesting
> Hatim. For certain jobs, it seems very doable to write a small percentage
> of the data to S3, check the file size through the AWS API, and use that to
> estimate the total size. Thanks for the idea.
>
> —
> Pedro Rodriguez
> PhD Student in Large-Scale Machine Learning | CU Boulder
> Systems Oriented Data Scientist
> UC Berkeley AMPLab Alumni
>
> pedrorodriguez.io | 909-353-4423
> github.com/EntilZha | LinkedIn
> 
>
> On July 12, 2016 at 7:26:17 PM, Hatim Diab (timd...@gmail.com) wrote:
>
> Hi,
>
> Since the final size depends on data types and compression. I've had to
> first get a rough estimate of data, written to disk, then compute the
> number of partitions.
>
> partitions = int(ceil(size_data * conversion_ratio / block_size))
>
> In my case block size 256mb, source txt & dest is snappy parquet,
> compression_ratio .6
>
> df.repartition(partitions).write.parquet(output)
>
> Which yields files in the range of 230mb.
>
> Another way was to count and come up with an imperial formula.
>
> Cheers,
> Hatim
>
>
> On Jul 12, 2016, at 9:07 PM, Takeshi Yamamuro 
> wrote:
>
> Hi,
>
> There is no simple way to access the size in a driver side.
> Since the partitions of primitive typed data (e.g., int) are compressed by
> `DataFrame#cache`,
> the actual size is possibly a little bit different from processing
> partitions size.
>
> // maropu
>
> On Wed, Jul 13, 2016 at 4:53 AM, Pedro Rodriguez 
> wrote:
>
>> Hi,
>>
>> Are there any tools for partitioning RDD/DataFrames by size at runtime?
>> The idea would be to specify that I would like for each partition to be
>> roughly X number of megabytes then write that through to S3. I haven't
>> found anything off the shelf, and looking through stack overflow posts
>> doesn't seem to yield anything concrete.
>>
>> Is there a way to programmatically get the size or a size estimate for an
>> RDD/DataFrame at runtime (eg size of one record would be sufficient)? I
>> gave SizeEstimator a try, but it seems like the results varied quite a bit
>> (tried on whole RDD and a sample). It would also be useful to get
>> programmatic access to the size of the RDD in memory if it is cached.
>>
>> Thanks,
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>
>


Re: Spark Thrift Server performance

2016-07-13 Thread ayan guha
My 2 cents:

Yes, we are running multiple STS (we are running on different nodes, but
you can run on same node, different ports). Using Ambari, it is really
convenient to manage.

We have set up a nginx load balancer as well pointing to both services and
all our external BI tools connect to the load balancer.

STS works as an YARN Client application, where STS is the driver.



On Wed, Jul 13, 2016 at 5:33 PM, Mich Talebzadeh 
wrote:

> Hi,
>
> I need some feedback on the performance of the Spark Thrift Server (STS)
>
> As far I can ascertain one can start STS passing the usual spark parameters
>
> ${SPARK_HOME}/sbin/start-thriftserver.sh \
> --master spark://50.140.197.217:7077 \
> --hiveconf hive.server2.thrift.port=10055 \
> --packages  \
> --driver-memory 2G \
> --num-executors 2 \
> --executor-memory 2G \
> --conf "spark.scheduler.mode=FAIR" \
> --conf
> "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
> -XX:+PrintGCTimeStamps" \
> --jars  \
> --conf "spark.ui.port=12345"
>
>
>   And accessing it via beeline JDBC client
>
> beeline -u jdbc:hive2://rhes564:10055 -n hduser -p
>
> Now the questions I have
>
>
>1. What is the limit on the number of users accessing the thrift
>server.
>2. Clearly the thrift server can start with resource configuration. In
>a simple way does STS act as a gateway to Spark (meaning Spark apps can use
>their own resources) or one is limited to resource that STS offers?
>3. Can one start multiple thrift servers
>
> As far as I can see STS is equivalent to Spark SQL accessing Hive DW.
> Indeed this is what it says:
>
> Connecting to jdbc:hive2://rhes564:10055
> Connected to: Spark SQL (version 1.6.1)
> Driver: Spark Project Core (version 1.6.1)
> Transaction isolation: TRANSACTION_REPEATABLE_READ
> Beeline version 1.6.1 by Apache Hive
> 0: jdbc:hive2://rhes564:10055>
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>



-- 
Best Regards,
Ayan Guha


Re: How to run Zeppelin and Spark Thrift Server Together

2016-07-13 Thread Michael Segel
I believe that there is one JVM for the Thrift Service and that there is only 
one context for the service. 

This would allow you to share RDDs across multiple jobs, however… not so great 
for security.

HTH… 


> On Jul 10, 2016, at 10:05 PM, Takeshi Yamamuro  > wrote:
> 
> Hi,
> 
> ISTM multiple sparkcontexts are not recommended in spark.
> See: https://issues.apache.org/jira/browse/SPARK-2243 
> 
> 
> // maropu
> 
> 
> On Mon, Jul 11, 2016 at 12:01 PM, ayan guha  > wrote:
> Hi
> 
> Can you try using JDBC interpreter with STS? We are using Zeppelin+STS on 
> YARN for few months now without much issue. 
> 
> On Mon, Jul 11, 2016 at 12:48 PM, Chanh Le  > wrote:
> Hi everybody,
> We are using Spark to query big data and currently we’re using Zeppelin to 
> provide a UI for technical users.
> Now we also need to provide a UI for business users so we use Oracle BI tools 
> and set up a Spark Thrift Server (STS) for it.
> 
> When I run both Zeppelin and STS throw error:
> 
> INFO [2016-07-11 09:40:21,905] ({pool-2-thread-4} 
> SchedulerFactory.java[jobStarted]:131) - Job remoteInterpretJob_1468204821905 
> started by scheduler org.apache.zeppelin.spark.SparkInterpreter835015739
>  INFO [2016-07-11 09:40:21,911] ({pool-2-thread-4} Logging.scala[logInfo]:58) 
> - Changing view acls to: giaosudau
>  INFO [2016-07-11 09:40:21,912] ({pool-2-thread-4} Logging.scala[logInfo]:58) 
> - Changing modify acls to: giaosudau
>  INFO [2016-07-11 09:40:21,912] ({pool-2-thread-4} Logging.scala[logInfo]:58) 
> - SecurityManager: authentication disabled; ui acls disabled; users with view 
> permissions: Set(giaosudau); users with modify permissions: Set(giaosudau)
>  INFO [2016-07-11 09:40:21,918] ({pool-2-thread-4} Logging.scala[logInfo]:58) 
> - Starting HTTP Server
>  INFO [2016-07-11 09:40:21,919] ({pool-2-thread-4} Server.java[doStart]:272) 
> - jetty-8.y.z-SNAPSHOT
>  INFO [2016-07-11 09:40:21,920] ({pool-2-thread-4} 
> AbstractConnector.java[doStart]:338) - Started SocketConnector@0.0.0.0:54818 
> 
>  INFO [2016-07-11 09:40:21,922] ({pool-2-thread-4} Logging.scala[logInfo]:58) 
> - Successfully started service 'HTTP class server' on port 54818.
>  INFO [2016-07-11 09:40:22,408] ({pool-2-thread-4} 
> SparkInterpreter.java[createSparkContext]:233) - -- Create new 
> SparkContext local[*] ---
>  WARN [2016-07-11 09:40:22,411] ({pool-2-thread-4} 
> Logging.scala[logWarning]:70) - Another SparkContext is being constructed (or 
> threw an exception in its constructor).  This may indicate an error, since 
> only one SparkContext may be running in this JVM (see SPARK-2243). The other 
> SparkContext was created at:
> 
> Is that mean I need to setup allow multiple context? Because It’s only test 
> in local with local mode If I deploy on mesos cluster what would happened?
> 
> Need you guys suggests some solutions for that. Thanks.
> 
> Chanh
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha
> 
> 
> 
> -- 
> ---
> Takeshi Yamamuro



Re: Dependencies with runing Spark Streaming on Mesos cluster using Python

2016-07-13 Thread Luke Adolph
Update:
I revuild my mesos-exector-image ,I download
*spark-streaming-kafka_2.10-1.6.0.jar* on *`/linker/jars`*

I change my submit command:

dcos spark run \ --submit-args='--jars
> local:/linker/jars/spark-streaming-kafka_2.10-1.6.0.jar  spark2cassandra.py
> 10.140.0.14:2181 wlu_spark2cassandra' --docker-image
> adolphlwq/mesos-for-spark-exector-image:1.6.0.beta2


Where I get new stderr output on mesos:


​
I only problem is submit the dependency
spark-streaming-kafka_2.10-1.6.0.jar to worker.

Thanks.


2016-07-13 18:57 GMT+08:00 Luke Adolph :

> Hi all:
> My spark runs on mesos.I write a spark streaming app using python, code
> on GitHub .
>
> The app has dependency "
> *org.apache.spark:spark-streaming-kafka_2.10:1.6.1*".
>
> Spark on mesos has two important concepts: Spark Framework and Spark
> exector.
>
> I set my exector run in docker image.The docker image Dockerfile
> 
>  is
> below:
>
> # refer '
>> http://spark.apache.org/docs/latest/running-on-mesos.html#spark-properties'
>> on 'spark.mesos.executor.docker.image' section
>
> FROM ubuntu:14.04
>> WORKDIR /linker
>> RUN ln -f -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
>> #download mesos
>> RUN echo "deb http://repos.mesosphere.io/ubuntu/ trusty main" >
>> /etc/apt/sources.list.d/mesosphere.list && \
>> apt-key adv --keyserver keyserver.ubuntu.com --recv E56151BF && \
>> apt-get update && \
>> apt-get -y install mesos=0.28.1-2.0.20.ubuntu1404 openjdk-7-jre
>> python-pip git vim curl
>> RUN git clone https://github.com/adolphlwq/linkerProcessorSample.git && \
>> pip install -r linkerProcessorSample/docker/requirements.txt
>> RUN curl -fL
>> http://archive.apache.org/dist/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz
>> | tar xzf - -C /usr/local && \
>> apt-get clean
>> ENV MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so \
>> JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 \
>> SPARK_HOME=/usr/local/spark-1.6.0-bin-hadoop2.6
>> ENV PATH=$JAVA_HOME/bin:$PATH
>> WORKDIR $SPARK_HOME
>
>
> When I use below command to submit my app program:
>
> dcos spark run --submit-args='--packages
>> org.apache.spark:spark-streaming-kafka_2.10:1.6.1 \
>>spark2cassandra.py zk topic' \
>> -docker-image=adolphlwq/mesos-for-spark-exector-image:1.6.0.beta
>
>
> The exector docker container run successfully, but it has no package for
> *org.apache.spark:spark-streaming-kafka_2.10:1.6.1*.
>
> The *stderr* om mesos is:
>
> I0713 09:34:52.715551 18124 logging.cpp:188] INFO level logging started!
>> I0713 09:34:52.717797 18124 fetcher.cpp:424] Fetcher Info:
>> {"cache_directory":"\/tmp\/mesos\/fetch\/slaves\/6097419e-c2d0-4e5f-9a91-e5815de640c4-S4","items":[{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/home\/ubuntu\/spark2cassandra.py"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.apache.spark_spark-streaming-kafka_2.10-1.6.0.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/com.101tec_zkclient-0.3.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.apache.kafka_kafka_2.10-0.8.2.1.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.slf4j_slf4j-api-1.7.10.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.spark-project.spark_unused-1.0.0.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/net.jpountz.lz4_lz4-1.3.0.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/log4j_log4j-1.2.17.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/com.yammer.metrics_metrics-core-2.2.0.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.apache.kafka_kafka-clients-0.8.2.1.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.xerial.snappy_snappy-java-1.1.2.jar"}}],"sandbox_directory":"\/tmp\/mesos\/slaves\/6097419e-c2d0-4e5f-9a91-e5815de640c4-S4\/frameworks\/7399b6f7-5dcd-4a9b-9846-e7948d5ffd11-0024\/executors\/driver-20160713093451-0015\/runs\/84419372-9482-4c58-8f87-4ba528b6885c"}
>> I0713 09:34:52.719846 18124 fetcher.cpp:379] Fetching URI
>> '/home/ubuntu/spark2cassandra.py'
>> I0713 09:34:52.719866 18124 fetcher.cpp:250] Fetching directly into the
>> sandbox directory
>> I0713 09:34:52.719925 18124 fetcher.cpp:187] Fetching URI
>> '/home/ubuntu/spark2cassandra.py'
>> I0713 09:34:52.719945 18124 fetcher.cpp:167] Copying resource with
>> command:cp '/home/ubuntu/spark2cassandra.py'
>> '/tmp/mesos/slaves/6097419e-c2d0-4e5f-9a91-e5815de640c4-S4/frameworks/7399b6f7-5dcd-4a9b-9846-e7948d5ffd11-0024/executors/driver-20160713093451-0015/runs/84419372-9482-4c58-8f87-4ba528b6885c/spark2cassandra.py'
>> W0713 09:34:52.722587 18124 

When worker is killed driver continues to run causing issues in supervise mode

2016-07-13 Thread Noorul Islam K M

Spark version: 1.6.1
Cluster Manager: Standalone

I am experimenting with cluster mode deployment along with supervise for
high availability of streaming applications.

1. Submit a streaming job in cluster mode with supervise
2. Say that driver is scheduled on worker1. The app started
   successfully.
3. Kill worker1 java process. This does not kill driver process and
   hence the application (context) is still alive.
4. Because of supervise flag, driver gets scheduled to new worker
   worker2 and hence a new context is created, making it a duplicate.

I think this seems to be a bug.

Regards,
Noorul

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark 7736

2016-07-13 Thread ayan guha
Hi

I am facing same issue reporting on Spark 7736
 on Spark 1.6.0. Is it
any way to reopen the Jira?

Reproduction steps attached.



-- 
Best Regards,
Ayan Guha


Spark 7736.docx
Description: MS-Word 2007 document

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Dependencies with runing Spark Streaming on Mesos cluster using Python

2016-07-13 Thread Luke Adolph
Hi all:
My spark runs on mesos.I write a spark streaming app using python, code on
GitHub .

The app has dependency "*org.apache.spark:spark-streaming-kafka_2.10:1.6.1*
".

Spark on mesos has two important concepts: Spark Framework and Spark
exector.

I set my exector run in docker image.The docker image Dockerfile

is
below:

# refer '
> http://spark.apache.org/docs/latest/running-on-mesos.html#spark-properties'
> on 'spark.mesos.executor.docker.image' section

FROM ubuntu:14.04
> WORKDIR /linker
> RUN ln -f -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
> #download mesos
> RUN echo "deb http://repos.mesosphere.io/ubuntu/ trusty main" >
> /etc/apt/sources.list.d/mesosphere.list && \
> apt-key adv --keyserver keyserver.ubuntu.com --recv E56151BF && \
> apt-get update && \
> apt-get -y install mesos=0.28.1-2.0.20.ubuntu1404 openjdk-7-jre
> python-pip git vim curl
> RUN git clone https://github.com/adolphlwq/linkerProcessorSample.git && \
> pip install -r linkerProcessorSample/docker/requirements.txt
> RUN curl -fL
> http://archive.apache.org/dist/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz
> | tar xzf - -C /usr/local && \
> apt-get clean
> ENV MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so \
> JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 \
> SPARK_HOME=/usr/local/spark-1.6.0-bin-hadoop2.6
> ENV PATH=$JAVA_HOME/bin:$PATH
> WORKDIR $SPARK_HOME


When I use below command to submit my app program:

dcos spark run --submit-args='--packages
> org.apache.spark:spark-streaming-kafka_2.10:1.6.1 \
>spark2cassandra.py zk topic' \
> -docker-image=adolphlwq/mesos-for-spark-exector-image:1.6.0.beta


The exector docker container run successfully, but it has no package for
*org.apache.spark:spark-streaming-kafka_2.10:1.6.1*.

The *stderr* om mesos is:

I0713 09:34:52.715551 18124 logging.cpp:188] INFO level logging started!
> I0713 09:34:52.717797 18124 fetcher.cpp:424] Fetcher Info:
> {"cache_directory":"\/tmp\/mesos\/fetch\/slaves\/6097419e-c2d0-4e5f-9a91-e5815de640c4-S4","items":[{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/home\/ubuntu\/spark2cassandra.py"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.apache.spark_spark-streaming-kafka_2.10-1.6.0.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/com.101tec_zkclient-0.3.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.apache.kafka_kafka_2.10-0.8.2.1.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.slf4j_slf4j-api-1.7.10.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.spark-project.spark_unused-1.0.0.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/net.jpountz.lz4_lz4-1.3.0.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/log4j_log4j-1.2.17.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/com.yammer.metrics_metrics-core-2.2.0.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.apache.kafka_kafka-clients-0.8.2.1.jar"}},{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"\/root\/.ivy2\/jars\/org.xerial.snappy_snappy-java-1.1.2.jar"}}],"sandbox_directory":"\/tmp\/mesos\/slaves\/6097419e-c2d0-4e5f-9a91-e5815de640c4-S4\/frameworks\/7399b6f7-5dcd-4a9b-9846-e7948d5ffd11-0024\/executors\/driver-20160713093451-0015\/runs\/84419372-9482-4c58-8f87-4ba528b6885c"}
> I0713 09:34:52.719846 18124 fetcher.cpp:379] Fetching URI
> '/home/ubuntu/spark2cassandra.py'
> I0713 09:34:52.719866 18124 fetcher.cpp:250] Fetching directly into the
> sandbox directory
> I0713 09:34:52.719925 18124 fetcher.cpp:187] Fetching URI
> '/home/ubuntu/spark2cassandra.py'
> I0713 09:34:52.719945 18124 fetcher.cpp:167] Copying resource with
> command:cp '/home/ubuntu/spark2cassandra.py'
> '/tmp/mesos/slaves/6097419e-c2d0-4e5f-9a91-e5815de640c4-S4/frameworks/7399b6f7-5dcd-4a9b-9846-e7948d5ffd11-0024/executors/driver-20160713093451-0015/runs/84419372-9482-4c58-8f87-4ba528b6885c/spark2cassandra.py'
> W0713 09:34:52.722587 18124 fetcher.cpp:272] Copying instead of extracting
> resource from URI with 'extract' flag, because it does not seem to be an
> archive: /home/ubuntu/spark2cassandra.py
> I0713 09:34:52.724138 18124 fetcher.cpp:456] Fetched
> '/home/ubuntu/spark2cassandra.py' to
> '/tmp/mesos/slaves/6097419e-c2d0-4e5f-9a91-e5815de640c4-S4/frameworks/7399b6f7-5dcd-4a9b-9846-e7948d5ffd11-0024/executors/driver-20160713093451-0015/runs/84419372-9482-4c58-8f87-4ba528b6885c/spark2cassandra.py'
> I0713 09:34:52.724148 18124 fetcher.cpp:379] Fetching URI
> '/root/.ivy2/jars/org.apache.spark_spark-streaming-kafka_2.10-1.6.0.jar'
> I0713 09:34:52.724153 18124 fetcher.cpp:250] Fetching directly 

Flume integration

2016-07-13 Thread Ian Brooks
Hi,

I'm currently trying to implement a prototype Spark application that gets data 
from Flume and processes it. I'm using the pull based method mentioned in 
https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html 

The is initially working fine for getting data from Flume, however the Spark 
client doesn't appear to be letting Flume know that the data has been received, 
so Flume doesn't remove it from the batch. 

After 100 requests Flume stops allowing any new data and logs

08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5] 
(org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error 
while processing transaction. 


My code to pull the data from Flume is

SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
Duration batchInterval = new Duration(1);

final String checkpointDir = "/tmp/";

final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
ssc.checkpoint(checkpointDir);

JavaReceiverInputDStream flumeStream = 
FlumeUtils.createPollingStream(ssc, host, port);


// Transform each flume avro event to a process-able format
JavaDStream transformedEvents = flumeStream.map(new 
Function() {

@Override
public String call(SparkFlumeEvent flumeEvent) throws Exception 
{
String flumeEventStr = flumeEvent.event().toString();
avroData avroData = new avroData();

Gson gson = new GsonBuilder().create();
avroData = gson.fromJson(flumeEventStr, avroData.class);
 
HashMap body = avroData.getBody();
String data = body.get("bytes");
 
return data;
}
});


...

ssc.start();
ssc.awaitTermination();
ssc.close();
}


Is there something specific I should be doing to let the Flume server know the 
batch has been received and processed?


*Ian Brooks*



Re: Inode for STS

2016-07-13 Thread ayan guha
Thanks Christophe. Any comment from Spark dev community member would really
helpful on the Jira.

What I saw today is shutting down the thrift server process lead to a clean
up. Also, we started removing any empty folders from /tmp. Is there any
other or better method?

On Wed, Jul 13, 2016 at 5:25 PM, Christophe Préaud <
christophe.pre...@kelkoo.com> wrote:

> Hi Ayan,
>
> I have opened a JIRA about this issues, but there are no answer so far:
> SPARK-15401 
>
> Regards,
> Christophe.
>
>
> On 13/07/16 05:54, ayan guha wrote:
>
> Hi
>
> We are running Spark Thrift Server as a long running application. However,
>  it looks like it is filling up /tmp/hive folder with lots of small files
> and directories with no file in them, blowing out inode limit and
> preventing any connection with "No Space Left in Device" issue.
>
> What is the best way to clean up those small files periodically?
>
> --
> Best Regards,
> Ayan Guha
>
>
>
> --
> Kelkoo SAS
> Société par Actions Simplifiée
> Au capital de € 4.168.964,30
> Siège social : 158 Ter Rue du Temple 75003 Paris
> 425 093 069 RCS Paris
>
> Ce message et les pièces jointes sont confidentiels et établis à
> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
> destinataire de ce message, merci de le détruire et d'en avertir
> l'expéditeur.
>



-- 
Best Regards,
Ayan Guha


Re: Issue with Spark on 25 nodes cluster

2016-07-13 Thread ANDREA SPINA
Hi,
I solved by increasing the akka timeout time.
All the bests,

2016-06-28 15:04 GMT+02:00 ANDREA SPINA <74...@studenti.unimore.it>:

> Hello everyone,
>
> I am running some experiments with Spark 1.4.0 on a ~80GiB dataset located
> on hdfs-2.7.1. The environment is a 25 nodes cluster, 16 cores per node. I
> set the following params:
>
> spark.master = "spark://"${runtime.hostname}":7077"
>
> # 28 GiB of memory
> spark.executor.memory = "28672m"
> spark.worker.memory = "28672m"
> spark.driver.memory = "2048m"
>
> spark.driver.maxResultSize = "0"
>
> I run some scaling experiments varying the machine set number.
> I can successfully experiments with the whole number of nodes (25) and
> also with (20) nodes. Experiments with environments of 5 nodes and 10 nodes
> relentlessy fails. During the running spark executor begin to collect
> failing jobs from different stages and end with the following trace:
>
> 16/06/28 03:11:09 INFO DAGScheduler: Job 14 failed: reduce at
> sGradientDescent.scala:229, took 1778.508309 s
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 212 in stage 14.0 failed 4 times, most recent
> failure: Lost task 212.3 in stage 14.0 (TID 12278, 130.149.21.19):
> java.io.IOException: Connection from /130.149.21.16:35997 closed
> at
> org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
> at
> io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:738)
> at
> io.netty.channel.AbstractChannel$AbstractUnsafe$6.run(AbstractChannel.java:606)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> Here
> 
> the Master full Log.
> As well, each Worker receive signal SIGTERM: 15
>
> I can't figure out a solution as well.
> Thank you, Regards,
>
> Andrea
>
>
> --
> *Andrea Spina*
> N.Tessera: *74598*
> MAT: *89369*
> *Ingegneria Informatica* *[LM] *(D.M. 270)
>



-- 
*Andrea Spina*
N.Tessera: *74598*
MAT: *89369*

Re: How to run Zeppelin and Spark Thrift Server Together

2016-07-13 Thread Chanh Le
Hi Ayan,

I don’t know I did something wrong but still couldn’t set 
hive.metastore.warehouse.dir property.

I set 3 hive-site.xml files in spark location, zeppelin, hive as well but still 
didn’t work.

zeppeline/conf/hive-site.xml 
spark/conf/hive-site.xml
hive/conf/hive-site.xml



My hive-site.xml



  hive.metastore.metadb.dir
  alluxio://master1:19998/metadb
  
  Required by metastore server or if the uris argument below is not 
supplied
  


  hive.metastore.warehouse.dir
  alluxio://master1:19998/warehouse
  
  Required by metastore server or if the uris argument below is not 
supplied
  



Is there anything I can do?

Regards,
Chanh



> On Jul 13, 2016, at 12:43 PM, ayan guha  wrote:
> 
> Hi
> 
> Create table always happens through Hive. In Hive, when you create a 
> database, the default metadata storage location is driven by 
> hive.metastore.metadb.dir and data storage is driven by 
> hive.metastore.warehouse.dir property (set in hive site xml). So, you do not 
> need to set this property in Zeppelin. 
> 
> What you can do:
>a. Modify  hive-site.xml to include those properties, if they are 
> not already set.  use the same hive site.xml to run STS. Then connect through 
> JDBC, create table and you should find metadata & data in your desired 
> location. 
> b. I think you can set these properties (same way you'd do in hive cli)
> c. You can create tables/databases with a LOCATION clause,  in case you need 
> to use non-standard path. 
> 
> Best
> Ayan
> 
> On Wed, Jul 13, 2016 at 3:20 PM, Chanh Le  > wrote:
> Hi Ayan,
> Thank you for replying. 
> But I wanna create a table in Zeppelin and store the metadata in Alluxio like 
> I tried to do set hive.metastore.warehouse.dir=alluxio://master1:19998/metadb 
> <>   <>So I can share data with STS.
> 
> The way you’ve mentioned through JDBC I already did and it works but I can’t 
> create table in Spark way easily.
> 
> Regards,
> Chanh
> 
> 
>> On Jul 13, 2016, at 12:06 PM, ayan guha > > wrote:
>> 
>> HI
>> 
>> I quickly tried with available hive interpreter 
>> 
>> 
>> 
>> Please try similarly. 
>> 
>> I will try with jdbc interpreter but I need to add it to zeppelin :)
>> 
>> Best
>> Ayan
>> 
>> On Wed, Jul 13, 2016 at 1:53 PM, Chanh Le > > wrote:
>> Hi Ayan,
>> How to set hive metastore in Zeppelin. I tried but not success.
>> The way I do I add into Spark Interpreter
>> 
>> 
>> 
>> And also try in a notebook by 
>> %sql
>> set hive.metastore.metadb.dir=alluxio://master1:19998/metadb <>
>> 
>> %sql 
>> set hive.metastore.warehouse.dir=alluxio://master1:19998/metadb <>
>> 
>> %spark
>> sqlContext.setConf("hive.metastore.warehouse.dir", 
>> "alluxio://master1:19998/metadb <>")
>> sqlContext.setConf("hive.metastore.metadb.dir", 
>> "alluxio://master1:19998/metadb <>")
>> sqlContext.read.parquet("alluxio://master1:19998/etl_info/WEBSITE 
>> <>").saveAsTable("tests_5”)
>> 
>> But It’s 
>> 
>>> On Jul 11, 2016, at 1:26 PM, ayan guha >> > wrote:
>>> 
>>> Hi
>>> 
>>> When you say "Zeppelin and STS", I am assuming you mean "Spark Interpreter" 
>>> and "JDBC interpreter" respectively. 
>>> 
>>> Through Zeppelin, you can either run your own spark application (by using 
>>> Zeppelin's own spark context) using spark interpreter OR you can access 
>>> STS, which  is a spark application ie separate Spark Context using JDBC 
>>> interpreter. There should not be any need for these 2 contexts to coexist. 
>>> 
>>> If you want to share data, save it to hive from either context, and you 
>>> should be able to see the data from other context. 
>>> 
>>> Best
>>> Ayan
>>> 
>>> 
>>> 
>>> On Mon, Jul 11, 2016 at 3:00 PM, Chanh Le >> > wrote:
>>> Hi Ayan,
>>> I tested It works fine but one more confuse is If my (technical) users want 
>>> to write some code in zeppelin to apply thing into Hive table? 
>>> Zeppelin and STS can’t share Spark Context that mean we need separated 
>>> process? Is there anyway to use the same Spark Context of STS?
>>> 
>>> Regards,
>>> Chanh
>>> 
>>> 
 On Jul 11, 2016, at 10:05 AM, Takeshi Yamamuro > wrote:
 
 Hi,
 
 ISTM multiple sparkcontexts are not recommended in spark.
 See: https://issues.apache.org/jira/browse/SPARK-2243 
 
 
 // maropu
 
 
 On Mon, Jul 11, 2016 at 12:01 PM, ayan guha > wrote:
 Hi
 
 Can you try using JDBC interpreter with STS? We are using Zeppelin+STS on 
 YARN for few months now without much issue. 

Spark, Kryo Serialization Issue with ProtoBuf field

2016-07-13 Thread Nkechi Achara
Hi,

I am seeing an error when running my spark job relating to Serialization of
a protobuf field when transforming an RDD.

com.esotericsoftware.kryo.KryoException:
java.lang.UnsupportedOperationException Serialization trace: otherAuthors_
(com.thomsonreuters.kraken.medusa.dbor.proto.Book$DBBooks)

The error seems to be created at this point:

val booksPerTier: Iterable[(TimeTier, RDD[DBBooks])] = allTiers.map {

  tier => (tier, books.filter(b => isInTier(endOfInterval, tier, b)
&& !isBookPublished(o)).mapPartitions( it =>

  it.map{ord =>

(ord.getAuthor, ord.getPublisherName,
getGenre(ord.getSourceCountry))}))

}


val averagesPerAuthor = booksPerTier.flatMap { case (tier, opt) =>

  opt.map(o => (tier, o._1, PublisherCompanyComparison,
o._3)).countByValue()

}


val averagesPerPublisher = booksPerTier.flatMap { case (tier, opt) =>

  opt.map(o => (tier, o._1, PublisherComparison(o._2),
o._3)).countByValue()

}

The field is a list specified in the protobuf as the below:

otherAuthors_ = java.util.Collections.emptyList()

As you can see the code is not actually utilising that field from the Book
Protobuf, although it still is being transmitted over the network.

Has anyone got any advice on this?

Thanks,

K


Re: Handling categorical variables in StreamingLogisticRegressionwithSGD

2016-07-13 Thread kundan kumar
Hi Sean ,

Thanks for the reply !!

Is there anything already available in spark that can fix the depth of
categorical variables. The OneHotEncoder changes the level of the vector
created depending on the number of distinct values coming in the stream.

Is there any parameter available with the StringIndexer so that I can fix
the level of categorical variable or will I need to write some
implementation of my own.

Thanks,
Kundan

On Tue, Jul 12, 2016 at 5:43 PM, Sean Owen  wrote:

> Yeah, for this to work, you need to know the number of distinct values
> a categorical feature will take on, ever. Sometimes that's known,
> sometimes it's not.
>
> One option is to use an algorithm that can use categorical features
> directly, like decision trees.
>
> You could consider hashing your features if so. So, you'd have maybe
> 10 indicator columns and you hash the feature into one of those 10
> columns to figure out which one it corresponds to. Of course, when you
> have an 11th value it collides with one of them and they get
> conflated, but, at least you can sort of proceed.
>
> This is more usually done with a large number of feature values, but
> maybe that's what you have. It's more problematic the smaller your
> hash space is.
>
> On Tue, Jul 12, 2016 at 10:21 AM, kundan kumar 
> wrote:
> > Hi ,
> >
> > I am trying to use StreamingLogisticRegressionwithSGD to build a CTR
> > prediction model.
> >
> > The document :
> >
> >
> http://spark.apache.org/docs/latest/mllib-linear-methods.html#streaming-linear-regression
> >
> > mentions that the numFeatures should be constant.
> >
> > The problem that I am facing is :
> > Since most of my variables are categorical, the numFeatures variable
> should
> > be the final set of variables after encoding and parsing the categorical
> > variables in labeled point format.
> >
> > Suppose, for a categorical variable x1 I have 10 distinct values in
> current
> > window.
> >
> > But in the next window some new values/items gets added to x1 and the
> number
> > of distinct values increases. How should I handle the numFeatures
> variable
> > in this case, because it will change now ?
> >
> > Basically, my question is how should I handle the new values of the
> > categorical variables in streaming model.
> >
> > Thanks,
> > Kundan
> >
> >
>


Problem saving Hive table with Overwrite mode

2016-07-13 Thread nimrodo
Hi,

I'm trying to write a partitioned parquet table and save it as a hive table
at a specific path. 
The code I'm using is in Java (columns and table names are a bit different
in my real code) and the code is executed using AirFlow which calls the
spark-submit:

aggregatedData.write().format("parquet").mode(SaveMode.Overwrite).partitionBy("schema_partition",
"colC").option("path","hdfs://sandbox.hortonworks.com:8020/BatchZone/table.parquet").saveAsTable("table_info");

However I'm getting the following exception:
[2016-07-13 10:18:53,490] {bash_operator.py:77} INFO - Exception in thread
"main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
execute, tree:
[2016-07-13 10:18:53,490] {bash_operator.py:77} INFO -
TungstenAggregate(key=[colA#43,colB#44,colC#46],
functions=[(min(colD#37L),mode=Final,isDistinct=false),(max(colE#42L),mode=Final,isDistinct=false),(max(colF#41L),mode=Final,isDistinct=false),(max(colG#38L),mode=Final,isDistinct=false),(max(colH#39L),mode=Final,isDistinct=false),(max(colI#40L),mode=Final,isDistinct=false)],
output=[colA#43,colB#44,colD#51L,colE#52L,colC#46,colF#53L,colG#54L,colH#55L,colI#56L])
[2016-07-13 10:18:53,490] {bash_operator.py:77} INFO - +- TungstenExchange
hashpartitioning(colA#43,colB#44,colC#46,200), None
[2016-07-13 10:18:53,491] {bash_operator.py:77} INFO - +-
TungstenAggregate(key=[colA#43,colB#44,colC#46],
functions=[(min(colD#37L),mode=Partial,isDistinct=false),(max(colE#42L),mode=Partial,isDistinct=false),(max(colF#41L),mode=Partial,isDistinct=false),(max(colG#38L),mode=Partial,isDistinct=false),(max(colH#39L),mode=Partial,isDistinct=false),(max(colI#40L),mode=Partial,isDistinct=false)],
output=[colA#43,colB#44,colC#46,min#73L,max#74L,max#75L,max#76L,max#77L,max#78L])
[2016-07-13 10:18:53,491] {bash_operator.py:77} INFO - +- Scan
ParquetRelation[colE#42L,colA#43,colG#38L,colD#37L,colH#39L,colF#41L,colI#40L,colC#46,colB#44]
InputPaths: hdfs://sandbox.hortonworks.com:8020/BatchZone/table.parquet
[2016-07-13 10:18:53,491] {bash_operator.py:77} INFO -
[2016-07-13 10:18:53,491] {bash_operator.py:77} INFO - at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
[2016-07-13 10:18:53,492] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.aggregate.TungstenAggregate.doExecute(TungstenAggregate.scala:80)
[2016-07-13 10:18:53,492] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
[2016-07-13 10:18:53,492] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
[2016-07-13 10:18:53,492] {bash_operator.py:77} INFO - at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
[2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
[2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
[2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
[2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
[2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
[2016-07-13 10:18:53,493] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
[2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
[2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
[2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:109)
[2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
[2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
[2016-07-13 10:18:53,494] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
[2016-07-13 10:18:53,495] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
[2016-07-13 10:18:53,495] {bash_operator.py:77} INFO - at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
[2016-07-13 10:18:53,495] {bash_operator.py:77} INFO - at

Spark Thrift Server performance

2016-07-13 Thread Mich Talebzadeh
Hi,

I need some feedback on the performance of the Spark Thrift Server (STS)

As far I can ascertain one can start STS passing the usual spark parameters

${SPARK_HOME}/sbin/start-thriftserver.sh \
--master spark://50.140.197.217:7077 \
--hiveconf hive.server2.thrift.port=10055 \
--packages  \
--driver-memory 2G \
--num-executors 2 \
--executor-memory 2G \
--conf "spark.scheduler.mode=FAIR" \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps" \
--jars  \
--conf "spark.ui.port=12345"


  And accessing it via beeline JDBC client

beeline -u jdbc:hive2://rhes564:10055 -n hduser -p

Now the questions I have


   1. What is the limit on the number of users accessing the thrift server.
   2. Clearly the thrift server can start with resource configuration. In a
   simple way does STS act as a gateway to Spark (meaning Spark apps can use
   their own resources) or one is limited to resource that STS offers?
   3. Can one start multiple thrift servers

As far as I can see STS is equivalent to Spark SQL accessing Hive DW.
Indeed this is what it says:

Connecting to jdbc:hive2://rhes564:10055
Connected to: Spark SQL (version 1.6.1)
Driver: Spark Project Core (version 1.6.1)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.6.1 by Apache Hive
0: jdbc:hive2://rhes564:10055>

Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Inode for STS

2016-07-13 Thread Christophe Préaud
Hi Ayan,

I have opened a JIRA about this issues, but there are no answer so far: 
SPARK-15401

Regards,
Christophe.

On 13/07/16 05:54, ayan guha wrote:
Hi

We are running Spark Thrift Server as a long running application. However,  it 
looks like it is filling up /tmp/hive folder with lots of small files and 
directories with no file in them, blowing out inode limit and preventing any 
connection with "No Space Left in Device" issue.

What is the best way to clean up those small files periodically?

--
Best Regards,
Ayan Guha



Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 158 Ter Rue du Temple 75003 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.


Any Idea about this error : IllegalArgumentException: File segment length cannot be negative ?

2016-07-13 Thread Dibyendu Bhattacharya
In Spark Streaming job, I see a Batch failed with following error. Haven't
seen anything like this earlier.

This has happened during Shuffle for one Batch (haven't reoccurred after
that).. Just curious to know what can cause this error. I am running Spark
1.5.1

Regards,
Dibyendu


Job aborted due to stage failure: Task 2801 in stage 9421.0 failed 4
times, most recent failure: Lost task 2801.3 in stage 9421.0:
java.lang.IllegalArgumentException: requirement failed: File segment
length cannot be negative (got -68321)
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.storage.FileSegment.(FileSegment.scala:28)
at 
org.apache.spark.storage.DiskBlockObjectWriter.fileSegment(DiskBlockObjectWriter.scala:216)
at 
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:684)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)