RE: Fwd: DF creation

2016-03-18 Thread Diwakar Dhanuskodi
Import sqlContext.implicits._  before  using  df ()


Sent from Samsung Mobile.

 Original message From: satyajit vegesna 
 Date:19/03/2016  06:00  (GMT+05:30) 
To: user@spark.apache.org, d...@spark.apache.org Cc:  
Subject: Fwd: DF creation 

Hi ,

I am trying to create separate val reference to object DATA (as shown below),  

case class data(name:String,age:String)

Creation of this object is done separately and the reference to the object is 
stored into val data.

i use val samplerdd = sc.parallelize(Seq(data)) , to create RDD.
org.apache.spark.rdd.RDD[data] = ParallelCollectionRDD[10] at parallelize at 
:24

is there a way to create dataframe out of this, without using  createDataFrame, 
and by using toDF() which i was unable to convert.(would like to avoid 
providing the structtype).

Regards,
Satyajit.





RE: sparkR issues ?

2016-03-18 Thread Sun, Rui
Sorry. I am wrong. The issue is not related to as.data.frame(). It seems to be 
related to DataFrame naming conflict between s4vectors and SparkR.
Refer to https://issues.apache.org/jira/browse/SPARK-12148


From: Sun, Rui [mailto:rui@intel.com]
Sent: Wednesday, March 16, 2016 9:33 AM
To: Alex Kozlov ; roni 
Cc: user@spark.apache.org
Subject: RE: sparkR issues ?

I have submitted https://issues.apache.org/jira/browse/SPARK-13905 and a PR for 
it.

From: Alex Kozlov [mailto:ale...@gmail.com]
Sent: Wednesday, March 16, 2016 12:52 AM
To: roni mailto:roni.epi...@gmail.com>>
Cc: Sun, Rui mailto:rui@intel.com>>; 
user@spark.apache.org
Subject: Re: sparkR issues ?

Hi Roni, you can probably rename the as.data.frame in 
$SPARK_HOME/R/pkg/R/DataFrame.R and re-install SparkR by running install-dev.sh

On Tue, Mar 15, 2016 at 8:46 AM, roni 
mailto:roni.epi...@gmail.com>> wrote:
Hi ,
 Is there a work around for this?
 Do i need to file a bug for this?
Thanks
-R

On Tue, Mar 15, 2016 at 12:28 AM, Sun, Rui 
mailto:rui@intel.com>> wrote:
It seems as.data.frame() defined in SparkR convers the versions in R base 
package.
We can try to see if we can change the implementation of as.data.frame() in 
SparkR to avoid such covering.

From: Alex Kozlov [mailto:ale...@gmail.com]
Sent: Tuesday, March 15, 2016 2:59 PM
To: roni mailto:roni.epi...@gmail.com>>
Cc: user@spark.apache.org
Subject: Re: sparkR issues ?

This seems to be a very unfortunate name collision.  SparkR defines it's own 
DataFrame class which shadows what seems to be your own definition.

Is DataFrame something you define?  Can you rename it?

On Mon, Mar 14, 2016 at 10:44 PM, roni 
mailto:roni.epi...@gmail.com>> wrote:
Hi,
 I am working with bioinformatics and trying to convert some scripts to sparkR 
to fit into other spark jobs.

I tries a simple example from a bioinf lib and as soon as I start sparkR 
environment it does not work.

code as follows -
countData <- matrix(1:100,ncol=4)
condition <- factor(c("A","A","B","B"))
dds <- DESeqDataSetFromMatrix(countData, DataFrame(condition), ~ condition)

Works if i dont initialize the sparkR environment.
 if I do library(SparkR) and sqlContext <- sparkRSQL.init(sc)  it gives 
following error

> dds <- DESeqDataSetFromMatrix(countData, as.data.frame(condition), ~ 
> condition)
Error in DataFrame(colData, row.names = rownames(colData)) :
  cannot coerce class "data.frame" to a DataFrame

I am really stumped. I am not using any spark function , so i would expect it 
to work as a simple R code.
why it does not work?

Appreciate  the help
-R




--
Alex Kozlov
(408) 507-4987
(650) 887-2135 efax
ale...@gmail.com




--
Alex Kozlov
(408) 507-4987
(650) 887-2135 efax
ale...@gmail.com


Re: Limit pyspark.daemon threads

2016-03-18 Thread Carlile, Ken



Thanks! I found that part just after I sent the email… whoops. I’m guessing that’s not an issue for my users, since it’s been set that way for a couple of years now. 


The thread count is definitely an issue, though, since if enough nodes go down, they can’t schedule their spark clusters. 


—Ken



On Mar 17, 2016, at 10:50 AM, Ted Yu  wrote:



I took a look at docs/configuration.md
Though I didn't find answer for your first question, I think the following pertains to your second question:




  spark.python.worker.memory
  512m
  
    Amount of memory to use per python worker process during aggregation, in the same
    format as JVM memory strings (e.g. 512m, 2g). If the memory
    used during aggregation goes above this amount, it will spill the data into disks.
  




On Thu, Mar 17, 2016 at 7:43 AM, Carlile, Ken 
 wrote:

Hello,

We have an HPC cluster that we run Spark jobs on using standalone mode and a number of scripts I’ve built up to dynamically schedule and start spark clusters within the Grid Engine framework. Nodes in the cluster have 16 cores and 128GB of RAM.

My users use pyspark heavily. We’ve been having a number of problems with nodes going offline with extraordinarily high load. I was able to look at one of those nodes today before it went truly sideways, and I discovered that the user was running 50 pyspark.daemon
 threads (remember, this is a 16 core box), and the load was somewhere around 25 or so, with all CPUs maxed out at 100%.

So while the spark worker is aware it’s only got 16 cores and behaves accordingly, pyspark seems to be happy to overrun everything like crazy. Is there a global parameter I can use to limit pyspark threads to a sane number, say 15 or 16? It would also be interesting
 to set a memory limit, which leads to another question.

How is memory managed when pyspark is used? I have the spark worker memory set to 90GB, and there is 8GB of system overhead (GPFS caching), so if pyspark operates outside of the JVM memory pool, that leaves it at most 30GB to play with, assuming there is no
 overhead outside the JVM’s 90GB heap (ha ha.)

Thanks,
Ken Carlile
Sr. Unix Engineer
HHMI/Janelia Research Campus
571-209-4363















Handling Missing Values in MLLIB Decision Tree

2016-03-18 Thread Abir Chakraborty
Hello,

Can MLLIB Decision Tree (DT) handle missing values by having surrogate split 
(as it is currently being done in "rpart" library in R)?

Thanks,
Abir

Principal Data Scientist, Data Science Group, Innovation Labs
[24]7 Inc. - The Intuitive Consumer Experience Company(tm) | We make life 
simple for consumers to connect with companies to get things done
Mobile: +91-9880755850 | e-mail: abi...@247-inc.com
Prestige Tech Platina, Kadubeesanahalli, Marathahalli Outer Ring Road | 
Bangalore 560087 | India | www.247-inc.com



RE: best way to do deep learning on spark ?

2016-03-18 Thread Ulanov, Alexander
Hi Charles,

There is an implementation of multilayer perceptron in Spark (since 1.5):
https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier

Other features such as autoencoder, convolutional layers, etc. are currently 
under development. Please refer to 
https://issues.apache.org/jira/browse/SPARK-5575

Best regards, Alexander

From: charles li [mailto:charles.up...@gmail.com]
Sent: Wednesday, March 16, 2016 7:01 PM
To: user 
Subject: best way to do deep learning on spark ?


Hi, guys, I'm new to MLlib on spark, after reading the document, it seems that 
MLlib does not support deep learning, I want to know is there any way to 
implement deep learning on spark ?

Do I must use 3-party package like caffe or tensorflow ?

or

Does deep learning module list in the MLlib development plan?


great thanks

--
--
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: The error to read HDFS custom file in spark.

2016-03-18 Thread Mich Talebzadeh
Hi Tony,

Is

com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord

One of your own packages?

Sounds like it is one throwing the error

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 17 March 2016 at 15:21, Tony Liu  wrote:

> Hi,
>My HDFS file is store with custom data structures. I want to read it
> with SparkContext object.So I define a formatting object:
>
> *1. code of RawDataInputFormat.scala*
>
> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
> import org.apache.hadoop.io.LongWritable
> import org.apache.hadoop.mapred._
>
> /**
>   * Created by Tony on 3/16/16.
>   */
> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends 
> FileInputFormat {
>
>   override def getRecordReader(split: InputSplit, job: JobConf, reporter: 
> Reporter): RecordReader[LW, RD] = {
> new RawReader(split, job, reporter)
>   }
>
> }
>
> *2. code of RawReader.scala*
>
> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
> import org.apache.hadoop.io.{LongWritable, SequenceFile}
> import org.apache.hadoop.mapred._
>
> /**
>   * Created by Tony on 3/17/16.
>   */
> class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends 
> RecordReader[LW, RD] {
>
>   var reader: SequenceFile.Reader = null
>   var currentPos: Long = 0L
>   var length: Long = 0L
>
>   def this(split: InputSplit, job: JobConf, reporter: Reporter) {
> this()
> val p = (split.asInstanceOf[FileSplit]).getPath
> reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p))
>   }
>
>   override def next(key: LW, value: RD): Boolean = {
> val flag = reader.next(key, value)
> currentPos = reader.getPosition()
> flag
>   }
>
>   override def getProgress: Float = Math.min(1.0f, currentPos / 
> length.toFloat)
>
>   override def getPos: Long = currentPos
>
>   override def createKey(): LongWritable = {
> new LongWritable()
>   }
>
>   override def close(): Unit = {
> reader.close()
>   }
>
>   override def createValue(): RDRawDataRecord = {
> new RDRawDataRecord()
>   }
> }
>
> *3. code of RDRawDataRecord.scala*
>
> import com.kiisoo.aegis.common.rawdata.RawDataRecord;
> import java.io.DataInput;
> import java.io.DataOutput;
> import java.io.IOException;
> import org.apache.commons.lang.StringUtils;
> import org.apache.hadoop.io.Writable;
>
> public class RDRawDataRecord implements Writable {
> private String smac;
> private String dmac;
> private int hrssi;
> private int lrssi;
> private long fstamp;
> private long lstamp;
> private long maxstamp;
> private long minstamp;
> private long stamp;
>
> public void readFields(DataInput in) throws IOException {
> this.smac = in.readUTF();
> this.dmac = in.readUTF();
> this.hrssi = in.readInt();
> this.lrssi = in.readInt();
> this.fstamp = in.readLong();
> this.lstamp = in.readLong();
> this.maxstamp = in.readLong();
> this.minstamp = in.readLong();
> this.stamp = in.readLong();
> }
>
> public void write(DataOutput out) throws IOException {
> out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:"");
> out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:"");
> out.writeInt(this.hrssi);
> out.writeInt(this.lrssi);
> out.writeLong(this.fstamp);
> out.writeLong(this.lstamp);
> out.writeLong(this.maxstamp);
> out.writeLong(this.minstamp);
> out.writeLong(this.stamp);
> }
>
> */** *
>
> *ignore getter setter*
>
> ***/*
>
> }
>
> *At last, I use this code to run*:
>
> val filePath = 
> "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044"
> val conf = new SparkConf()
> conf.setMaster("local")
> conf.setAppName("demo")
> val sc = new SparkContext(conf)
> val file = sc.hadoopFile[LongWritable, RDRawDataRecord, 
> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
> file.foreach(v => {
>   println(v._2.getDmac) // Attribute of custom objects
> })
>
> *I get an error, it says:*
>
> Error:(41, 19) type arguments 
> [org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord,com.kiisoo.spark.RawDataInputFormat[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord]]
>  conform to the bounds of none of the overloaded alternatives of
>  value hadoopFile: [K, V, F <: 
> org.apache.hadoop.mapred.InputFormat[K,V]](path: String)(implicit km: 
> scala.reflect.ClassTag[K], implicit vm: scala.reflect.ClassTag[V], implicit 
> fm: scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)]  [K, V, F 
> <: org.apache.hadoop.mapred.InputFormat[K,V]](path: String, minPartitions: 
> Int)(implicit km: scala.reflect.ClassTag[K], implicit vm: 
> scala.reflect.ClassTag[V], implicit fm: 
>

Re: The error to read HDFS custom file in spark.

2016-03-18 Thread Tony Liu
I also tried before, but in RawReader.next(key, value) method, invoke
reader.next method get an error. it says: Type Mismatch.

On Fri, Mar 18, 2016 at 12:53 AM, Benyi Wang  wrote:

> I would say change
>
> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends 
> FileInputFormat
>
> to
>
> class RawDataInputFormat[LongWritable, RDRawDataRecord] extends 
> FileInputFormat
>
> ​
>
> On Thu, Mar 17, 2016 at 9:48 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi Tony,
>>
>> Is
>>
>> com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>>
>> One of your own packages?
>>
>> Sounds like it is one throwing the error
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 17 March 2016 at 15:21, Tony Liu  wrote:
>>
>>> Hi,
>>>My HDFS file is store with custom data structures. I want to read it
>>> with SparkContext object.So I define a formatting object:
>>>
>>> *1. code of RawDataInputFormat.scala*
>>>
>>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>>> import org.apache.hadoop.io.LongWritable
>>> import org.apache.hadoop.mapred._
>>>
>>> /**
>>>   * Created by Tony on 3/16/16.
>>>   */
>>> class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends 
>>> FileInputFormat {
>>>
>>>   override def getRecordReader(split: InputSplit, job: JobConf, reporter: 
>>> Reporter): RecordReader[LW, RD] = {
>>> new RawReader(split, job, reporter)
>>>   }
>>>
>>> }
>>>
>>> *2. code of RawReader.scala*
>>>
>>> import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
>>> import org.apache.hadoop.io.{LongWritable, SequenceFile}
>>> import org.apache.hadoop.mapred._
>>>
>>> /**
>>>   * Created by Tony on 3/17/16.
>>>   */
>>> class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends 
>>> RecordReader[LW, RD] {
>>>
>>>   var reader: SequenceFile.Reader = null
>>>   var currentPos: Long = 0L
>>>   var length: Long = 0L
>>>
>>>   def this(split: InputSplit, job: JobConf, reporter: Reporter) {
>>> this()
>>> val p = (split.asInstanceOf[FileSplit]).getPath
>>> reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p))
>>>   }
>>>
>>>   override def next(key: LW, value: RD): Boolean = {
>>> val flag = reader.next(key, value)
>>> currentPos = reader.getPosition()
>>> flag
>>>   }
>>>
>>>   override def getProgress: Float = Math.min(1.0f, currentPos / 
>>> length.toFloat)
>>>
>>>   override def getPos: Long = currentPos
>>>
>>>   override def createKey(): LongWritable = {
>>> new LongWritable()
>>>   }
>>>
>>>   override def close(): Unit = {
>>> reader.close()
>>>   }
>>>
>>>   override def createValue(): RDRawDataRecord = {
>>> new RDRawDataRecord()
>>>   }
>>> }
>>>
>>> *3. code of RDRawDataRecord.scala*
>>>
>>> import com.kiisoo.aegis.common.rawdata.RawDataRecord;
>>> import java.io.DataInput;
>>> import java.io.DataOutput;
>>> import java.io.IOException;
>>> import org.apache.commons.lang.StringUtils;
>>> import org.apache.hadoop.io.Writable;
>>>
>>> public class RDRawDataRecord implements Writable {
>>> private String smac;
>>> private String dmac;
>>> private int hrssi;
>>> private int lrssi;
>>> private long fstamp;
>>> private long lstamp;
>>> private long maxstamp;
>>> private long minstamp;
>>> private long stamp;
>>>
>>> public void readFields(DataInput in) throws IOException {
>>> this.smac = in.readUTF();
>>> this.dmac = in.readUTF();
>>> this.hrssi = in.readInt();
>>> this.lrssi = in.readInt();
>>> this.fstamp = in.readLong();
>>> this.lstamp = in.readLong();
>>> this.maxstamp = in.readLong();
>>> this.minstamp = in.readLong();
>>> this.stamp = in.readLong();
>>> }
>>>
>>> public void write(DataOutput out) throws IOException {
>>> out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:"");
>>> out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:"");
>>> out.writeInt(this.hrssi);
>>> out.writeInt(this.lrssi);
>>> out.writeLong(this.fstamp);
>>> out.writeLong(this.lstamp);
>>> out.writeLong(this.maxstamp);
>>> out.writeLong(this.minstamp);
>>> out.writeLong(this.stamp);
>>> }
>>>
>>> */** *
>>>
>>> *ignore getter setter*
>>>
>>> ***/*
>>>
>>> }
>>>
>>> *At last, I use this code to run*:
>>>
>>> val filePath = 
>>> "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044"
>>> val conf = new SparkConf()
>>> conf.setMaster("local")
>>> conf.setAppName("demo")
>>> val sc = new SparkContext(conf)
>>> val file = sc.hadoopFile[LongWritable, RDRawDataRecord, 
>>> RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
>>> file.foreach(v => {
>>>   println(v.

Saving intermediate results in mapPartitions

2016-03-18 Thread Krishna
Hi,

I've a situation where the number of elements output by each partition from
mapPartitions don't fit into the RAM even with the lowest number of rows in
the partition (there is a hard lower limit on this value). What's the best
way to address this problem? During the mapPartition phase, is there a way
to convert intermediate results to a DF and save to a database? Rows saved
to database don't need to be part of the output results from mapPartitions.


Re: [discuss] making SparkEnv private in Spark 2.0

2016-03-18 Thread Mridul Muralidharan
We have custom join's that leverage it.
It is used to get to direct shuffle'ed iterator - without needing
sort/aggregate/etc.

IIRC the only way to get to it from ShuffleHandle is via shuffle manager.


Regards,
Mridul

On Wed, Mar 16, 2016 at 3:36 PM, Reynold Xin  wrote:
>
> On Wed, Mar 16, 2016 at 3:29 PM, Mridul Muralidharan 
> wrote:
>>
>> b) Shuffle manager (to get shuffle reader)
>
>
> What's the use case for shuffle manager/reader? This seems like using super
> internal APIs in applications.
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Limit pyspark.daemon threads

2016-03-18 Thread Carlile, Ken
Hello, 

We have an HPC cluster that we run Spark jobs on using standalone mode and a 
number of scripts I’ve built up to dynamically schedule and start spark 
clusters within the Grid Engine framework. Nodes in the cluster have 16 cores 
and 128GB of RAM. 

My users use pyspark heavily. We’ve been having a number of problems with nodes 
going offline with extraordinarily high load. I was able to look at one of 
those nodes today before it went truly sideways, and I discovered that the user 
was running 50 pyspark.daemon threads (remember, this is a 16 core box), and 
the load was somewhere around 25 or so, with all CPUs maxed out at 100%. 

So while the spark worker is aware it’s only got 16 cores and behaves 
accordingly, pyspark seems to be happy to overrun everything like crazy. Is 
there a global parameter I can use to limit pyspark threads to a sane number, 
say 15 or 16? It would also be interesting to set a memory limit, which leads 
to another question. 

How is memory managed when pyspark is used? I have the spark worker memory set 
to 90GB, and there is 8GB of system overhead (GPFS caching), so if pyspark 
operates outside of the JVM memory pool, that leaves it at most 30GB to play 
with, assuming there is no overhead outside the JVM’s 90GB heap (ha ha.)

Thanks, 
Ken Carlile
Sr. Unix Engineer
HHMI/Janelia Research Campus
571-209-4363



Fwd: DF creation

2016-03-18 Thread satyajit vegesna
Hi ,

I am trying to create separate val reference to object DATA (as shown
below),

case class data(name:String,age:String)

Creation of this object is done separately and the reference to the object
is stored into val data.

i use val samplerdd = sc.parallelize(Seq(data)) , to create RDD.
org.apache.spark.rdd.RDD[data] = ParallelCollectionRDD[10] at parallelize
at :24

is there a way to create dataframe out of this, without using  createDataFrame,
and by using toDF() which i was unable to convert.(would like to avoid
providing the structtype).

Regards,
Satyajit.


Request for comments: Tensorframes, an integration library between TensorFlow and Spark DataFrames

2016-03-18 Thread Tim Hunter
Hello all,

I would like to bring your attention to a small project to integrate
TensorFlow with Apache Spark, called TensorFrames. With this library, you
can map, reduce or aggregate numerical data stored in Spark dataframes
using TensorFlow computation graphs. It is published as a Spark package and
available in this github repository:

https://github.com/tjhunter/tensorframes

More detailed examples can be found in the user guide:

https://github.com/tjhunter/tensorframes/wiki/TensorFrames-user-guide

This is a technical preview at this point. I am looking forward to some
feedback about the current python API if some adventurous users want to try
it out. Of course, contributions are most welcome, for example to fix bugs
or to add support for platforms other than linux-x86_64. It should support
all the most common inputs in dataframes (dense tensors of rank 0, 1, 2 of
ints, longs, floats and doubles).

Please note that this is not an endorsement by Databricks of TensorFlow, or
any other deep learning framework for that matter. If users want to use
deep learning in production, some other more robust solutions are
available: SparkNet, CaffeOnSpark, DeepLearning4J.

Best regards


Tim Hunter


Re: SparkContext.stop() takes too long to complete

2016-03-18 Thread Nezih Yigitbasi
Hadoop 2.4.0. Here is the relevant logs from executor 1136

16/03/18 21:26:58 INFO mapred.SparkHadoopMapRedUtil:
attempt_201603182126_0276_m_000484_0: Committed16/03/18 21:26:58 INFO
executor.Executor: Finished task 484.0 in stage 276.0 (TID 59663).
1080 bytes result sent to driver16/03/18 21:38:18 ERROR
executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15:
SIGTERM16/03/18 21:38:18 INFO storage.DiskBlockManager: Shutdown hook
called16/03/18 21:38:18 INFO util.ShutdownHookManager: Shutdown hook
called

On Fri, Mar 18, 2016 at 4:21 PM Ted Yu  wrote:

Which version of hadoop do you use ?
>
> bq. Requesting to kill executor(s) 1136
>
> Can you find more information on executor 1136 ?
>
> Thanks
>
> On Fri, Mar 18, 2016 at 4:16 PM, Nezih Yigitbasi <
> nyigitb...@netflix.com.invalid> wrote:
>
>> Hi Spark experts,
>> I am using Spark 1.5.2 on YARN with dynamic allocation enabled. I see in
>> the driver/application master logs that the app is marked as SUCCEEDED and
>> then SparkContext stop is called. However, this stop sequence takes > 10
>> minutes to complete, and YARN resource manager kills the application master
>> as it didn’t receive a heartbeat within the last 10 minutes. The resource
>> manager then kills the application master. Any ideas about what may be
>> going on?
>>
>> Here are the relevant logs:
>>
>> *6/03/18 21:26:58 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, 
>> exitCode: 0
>> 16/03/18 21:26:58 INFO spark.SparkContext: Invoking stop() from shutdown 
>> hook*16/03/18 21:26:58 INFO handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/static/sql,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/SQL/execution/json,null}16/03/18 21:26:58 
>> INFO handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/SQL/execution,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/SQL/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/SQL,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/static/sql,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/SQL/execution/json,null}16/03/18 21:26:58 
>> INFO handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/SQL/execution,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/SQL/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/SQL,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/metrics/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/stages/stage/kill,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/api,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/static,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}16/03/18 
>> 21:26:58 INFO handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/executors/threadDump,null}16/03/18 21:26:58 
>> INFO handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/executors/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/executors,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/environment/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/environment,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/storage/rdd/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/storage/rdd,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/storage/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/storage,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/stages/pool/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/stages/pool,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/stages/stage/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/stages/stage,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/stages/json,null}16/03/18 21:26:58 INFO 
>> handler.ContextHandler: stopped 
>> o.s.j.s.ServletContextHandler{/stages,null}16/03/18 21:26:58 INFO 
>> handle

Re: SparkContext.stop() takes too long to complete

2016-03-18 Thread Ted Yu
Which version of hadoop do you use ?

bq. Requesting to kill executor(s) 1136

Can you find more information on executor 1136 ?

Thanks

On Fri, Mar 18, 2016 at 4:16 PM, Nezih Yigitbasi <
nyigitb...@netflix.com.invalid> wrote:

> Hi Spark experts,
> I am using Spark 1.5.2 on YARN with dynamic allocation enabled. I see in
> the driver/application master logs that the app is marked as SUCCEEDED and
> then SparkContext stop is called. However, this stop sequence takes > 10
> minutes to complete, and YARN resource manager kills the application master
> as it didn’t receive a heartbeat within the last 10 minutes. The resource
> manager then kills the application master. Any ideas about what may be
> going on?
>
> Here are the relevant logs:
>
> *6/03/18 21:26:58 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, 
> exitCode: 0
> 16/03/18 21:26:58 INFO spark.SparkContext: Invoking stop() from shutdown 
> hook*16/03/18 21:26:58 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/static/sql,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/SQL/execution/json,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/SQL/execution,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/SQL/json,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/SQL,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/static/sql,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/SQL/execution/json,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/SQL/execution,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/SQL/json,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/SQL,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/metrics/json,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/stages/stage/kill,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/api,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}16/03/18 
> 21:26:58 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/static,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}16/03/18 
> 21:26:58 INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/executors/threadDump,null}16/03/18 21:26:58 
> INFO handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/executors/json,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/executors,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/environment/json,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/environment,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/storage/rdd/json,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/storage/rdd,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/storage/json,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/storage,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/stages/pool/json,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/stages/pool,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/stages/stage/json,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/stages/stage,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/stages/json,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/stages,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/jobs/job/json,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/jobs/job,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/jobs/json,null}16/03/18 21:26:58 INFO 
> handler.ContextHandler: stopped 
> o.s.j.s.ServletContextHandler{/jobs,null}16/03/18 21:26:58 INFO ui.SparkUI: 
> Stopped Spark web UI at http://10.143.240.240:5270616/03/18 21:27:58 INFO 
> cluster.YarnClusterSchedulerBackend: Requesting to kill executor(s) 
> 113516/03/18 21:27:58 INFO yarn.YarnAllocator: Driver requested a total 
> 

Saving the DataFrame based RandomForestClassificationModels

2016-03-18 Thread James Hammerton
Hi,

If you train a
org.apache.spark.ml.classification.RandomForestClassificationModel, you
can't save it - attempts to do so yield the following error:

16/03/18 14:12:44 INFO SparkContext: Successfully stopped SparkContext
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Pipeline write will fail on this Pipeline because it contains a stage
> which does not implement Writable. Non-Writable stage: rfc_704981ba3f48
> of type class org.apache.spark.ml.classification.RandomForestClassifier
> at org.apache.spark.ml.
> Pipeline$SharedReadWrite$$anonfun$validateStages$1.apply(Pipeline.scala:
> 218)
> at org.apache.spark.ml.
> Pipeline$SharedReadWrite$$anonfun$validateStages$1.apply(Pipeline.scala:
> 215)


This appears to be a known bug:
https://issues.apache.org/jira/browse/SPARK-13784 related to
https://issues.apache.org/jira/browse/SPARK-11888

My question is whether there's a work around given that these bugs are
unresolved at least until 2.0.0.

Regards,

James


SparkContext.stop() takes too long to complete

2016-03-18 Thread Nezih Yigitbasi
Hi Spark experts,
I am using Spark 1.5.2 on YARN with dynamic allocation enabled. I see in
the driver/application master logs that the app is marked as SUCCEEDED and
then SparkContext stop is called. However, this stop sequence takes > 10
minutes to complete, and YARN resource manager kills the application master
as it didn’t receive a heartbeat within the last 10 minutes. The resource
manager then kills the application master. Any ideas about what may be
going on?

Here are the relevant logs:

*6/03/18 21:26:58 INFO yarn.ApplicationMaster: Final app status:
SUCCEEDED, exitCode: 0
16/03/18 21:26:58 INFO spark.SparkContext: Invoking stop() from
shutdown hook*16/03/18 21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/static/sql,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/execution/json,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/execution,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/json,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/static/sql,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/execution/json,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/execution,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL/json,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/SQL,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/metrics/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/stage/kill,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/api,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/static,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors/threadDump,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/environment/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/environment,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage/rdd/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage/rdd,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/pool/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/pool,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/stage/json,null}16/03/18
21:26:58 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/stage,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/json,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs/job/json,null}16/03/18 21:26:58
INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs/job,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs/json,null}16/03/18 21:26:58 INFO
handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs,null}16/03/18 21:26:58 INFO
ui.SparkUI: Stopped Spark web UI at
http://10.143.240.240:5270616/03/18 21:27:58 INFO
cluster.YarnClusterSchedulerBackend: Requesting to kill executor(s)
113516/03/18 21:27:58 INFO yarn.YarnAllocator: Driver requested a
total number of 208 executor(s).16/03/18 21:27:58 INFO
yarn.ApplicationMaster$AMEndpoint: Driver requested to kill
executor(s) 1135.16/03/18 21:27:58 INFO
spark.ExecutorAllocationManager: Removing executor 1135 because it has
been idle for 60 seconds (new desired total will be 208)16/03/18
21:27:58 INFO cluster.YarnClusterSchedulerBackend: Requesting to kill
executor(s) 112316/03/18 21:27:58 INFO yarn.YarnAllocator: Driver
requested a total number of 207 executor(s).16/03/18 

Re: Enabling spark_shuffle service without restarting YARN Node Manager

2016-03-18 Thread Vinay Kashyap
Thanks for your reply guys.

@Alex :  Hope in the future releases we might get a way to this.

@Saisai : The concern regarding the Node Manager restart is that, if in a
shared YARN cluster running other applications as well apart from Spark,
for enabling spark shuffle service, other running applications should not
be disturbed. That was the only concern.


Thanks and regards
Vinay Kashyap



On Wed, Mar 16, 2016 at 4:05 PM, Saisai Shao  wrote:

> If you want to avoid existing job failure while restarting NM, you could
> enable work preserving for NM, in this case, the restart of NM will not
> affect the running containers (containers can still run). That could
> alleviate NM restart problem.
>
> Thanks
> Saisai
>
> On Wed, Mar 16, 2016 at 6:30 PM, Alex Dzhagriev  wrote:
>
>> Hi Vinay,
>>
>> I believe it's not possible as the spark-shuffle code should run in the
>> same JVM process as the Node Manager. I haven't heard anything about on the
>> fly bytecode loading in the Node Manger.
>>
>> Thanks, Alex.
>>
>> On Wed, Mar 16, 2016 at 10:12 AM, Vinay Kashyap 
>> wrote:
>>
>>> Hi all,
>>>
>>> I am using *Spark 1.5.1* in *yarn-client* mode along with *CDH 5.5*
>>>
>>> As per the documentation to enable Dynamic Allocation of Executors in
>>> Spark,
>>> it is required to add the shuffle service jar to YARN Node Manager's
>>> classpath and restart the YARN Node Manager.
>>>
>>> Is there any way to to dynamically supply the shuffle service jar
>>> information from the application itself and avoid disturbing the running
>>> YARN service.
>>>
>>> Tried couple of options by uploading the jar to hdfs and set
>>> *yarn.application.classpath* but did not work. On container launch for
>>> the executor it fails to recognize the shuffle service.
>>>
>>> Any help would be greatly appreciated.
>>>
>>> --
>>> *Thanks and regards*
>>> *Vinay Kashyap*
>>>
>>
>>
>


-- 
*Thanks and regards*
*Vinay Kashyap*


The error to read HDFS custom file in spark.

2016-03-18 Thread Tony Liu
Hi,
   My HDFS file is store with custom data structures. I want to read it
with SparkContext object.So I define a formatting object:

*1. code of RawDataInputFormat.scala*

import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.mapred._

/**
  * Created by Tony on 3/16/16.
  */
class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord]
extends FileInputFormat {

  override def getRecordReader(split: InputSplit, job: JobConf,
reporter: Reporter): RecordReader[LW, RD] = {
new RawReader(split, job, reporter)
  }

}

*2. code of RawReader.scala*

import com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord
import org.apache.hadoop.io.{LongWritable, SequenceFile}
import org.apache.hadoop.mapred._

/**
  * Created by Tony on 3/17/16.
  */
class RawReader[LW <: LongWritable, RD <: RDRawDataRecord] extends
RecordReader[LW, RD] {

  var reader: SequenceFile.Reader = null
  var currentPos: Long = 0L
  var length: Long = 0L

  def this(split: InputSplit, job: JobConf, reporter: Reporter) {
this()
val p = (split.asInstanceOf[FileSplit]).getPath
reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(p))
  }

  override def next(key: LW, value: RD): Boolean = {
val flag = reader.next(key, value)
currentPos = reader.getPosition()
flag
  }

  override def getProgress: Float = Math.min(1.0f, currentPos / length.toFloat)

  override def getPos: Long = currentPos

  override def createKey(): LongWritable = {
new LongWritable()
  }

  override def close(): Unit = {
reader.close()
  }

  override def createValue(): RDRawDataRecord = {
new RDRawDataRecord()
  }
}

*3. code of RDRawDataRecord.scala*

import com.kiisoo.aegis.common.rawdata.RawDataRecord;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.Writable;

public class RDRawDataRecord implements Writable {
private String smac;
private String dmac;
private int hrssi;
private int lrssi;
private long fstamp;
private long lstamp;
private long maxstamp;
private long minstamp;
private long stamp;

public void readFields(DataInput in) throws IOException {
this.smac = in.readUTF();
this.dmac = in.readUTF();
this.hrssi = in.readInt();
this.lrssi = in.readInt();
this.fstamp = in.readLong();
this.lstamp = in.readLong();
this.maxstamp = in.readLong();
this.minstamp = in.readLong();
this.stamp = in.readLong();
}

public void write(DataOutput out) throws IOException {
out.writeUTF(StringUtils.isNotBlank(this.smac)?this.smac:"");
out.writeUTF(StringUtils.isNotBlank(this.dmac)?this.dmac:"");
out.writeInt(this.hrssi);
out.writeInt(this.lrssi);
out.writeLong(this.fstamp);
out.writeLong(this.lstamp);
out.writeLong(this.maxstamp);
out.writeLong(this.minstamp);
out.writeLong(this.stamp);
}

*/** *

*ignore getter setter*

***/*

}

*At last, I use this code to run*:

val filePath = "hdfs://tony.Liu:9000/wifi-raw-data/wifi-raw-data.1455206402044"
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("demo")
val sc = new SparkContext(conf)
val file = sc.hadoopFile[LongWritable, RDRawDataRecord,
RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
file.foreach(v => {
  println(v._2.getDmac) // Attribute of custom objects
})

*I get an error, it says:*

Error:(41, 19) type arguments
[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord,com.kiisoo.spark.RawDataInputFormat[org.apache.hadoop.io.LongWritable,com.kiisoo.aegis.bd.common.hdfs.RDRawDataRecord]]
conform to the bounds of none of the overloaded alternatives of
 value hadoopFile: [K, V, F <:
org.apache.hadoop.mapred.InputFormat[K,V]](path: String)(implicit km:
scala.reflect.ClassTag[K], implicit vm: scala.reflect.ClassTag[V],
implicit fm: scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K,
V)]  [K, V, F <: org.apache.hadoop.mapred.InputFormat[K,V]](path:
String, minPartitions: Int)(implicit km: scala.reflect.ClassTag[K],
implicit vm: scala.reflect.ClassTag[V], implicit fm:
scala.reflect.ClassTag[F])org.apache.spark.rdd.RDD[(K, V)]
val file = sc.hadoopFile[LongWritable, RDRawDataRecord,
RawDataInputFormat[LongWritable, RDRawDataRecord]](filePath)
  ^



*I also try read the text file with SparkContext AIP
'sc.hadoopFile[LongWritable, Text, TextInputFormat]("hdfs://xxx..")',
It works.*
*This error is what does this mean? How to fix this error?*

Thank you for help me.

--
Tony
:)


Re: DistributedLDAModel missing APIs in org.apache.spark.ml

2016-03-18 Thread Ted Yu
Can you utilize this function of DistributedLDAModel ?

  override protected def getModel: OldLDAModel = oldDistributedModel

cheers

On Fri, Mar 18, 2016 at 7:34 AM, cindymc  wrote:

> I like using the new DataFrame APIs on Spark ML, compared to using RDDs in
> the older SparkMLlib.  But it seems some of the older APIs are missing.  In
> particular, '*.mllib.clustering.DistributedLDAModel' had two APIs that I
> need now:
>
> topDocumentsPerTopic
> topTopicsPerDocument
>
> How can I get at the same results using the APIs on
> '*.ml.clustering.DistributedLDAModel'?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/DistributedLDAModel-missing-APIs-in-org-apache-spark-ml-tp26535.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: The build-in indexes in ORC file does not work.

2016-03-18 Thread Jörn Franke
Not sure it should work. How many rows are affected? The data is sorted?
Have you tried with Tez? Tez has some summary statistics that tells you if you 
use push down. Maybe you need to use HiveContext.
Perhaps a bloom filter could make sense for you as well.

> On 16 Mar 2016, at 12:45, Joseph  wrote:
> 
> Hi,
> 
> I have only one table named "gprs",  it has 560,000,000 rows,  and 57 
> columns.  The block size is 256M,  total ORC file number is 800, each of them 
> is about 51M.
> 
> my query statement is :
> select count(*) from gprs  where  terminal_type = 25080;
> select * from gprs  where  terminal_type = 25080;
> 
> In the gprs table, the "terminal_type"  column's  value is in [0, 25066]
> 
> Joseph
>  
> From: Jörn Franke
> Date: 2016-03-16 19:26
> To: Joseph
> CC: user; user
> Subject: Re: The build-in indexes in ORC file does not work.
> How much data are you querying? What is the query? How selective it is 
> supposed to be? What is the block size?
> 
>> On 16 Mar 2016, at 11:23, Joseph  wrote:
>> 
>> Hi all,
>> 
>> I have known that ORC provides three level of indexes within each file, file 
>> level, stripe level, and row level. 
>> The file and stripe level statistics are in the file footer so that they are 
>> easy to access to determine if the rest of the file needs to be read at all. 
>> Row level indexes include both column statistics for each row group and 
>> position for seeking to the start of the row group. 
>> 
>> The following is my understanding:
>> 1. The file and stripe level indexes are forcibly generated, we can not 
>> control them.
>> 2. The row level indexes can be configured by "orc.create.index"(whether to 
>> create row indexes) and "orc.row.index.stride"(number of rows between index 
>> entries).
>> 3. Each Index has statistics of min, max for each column, so sort data by 
>> the filter column will bring better performance.
>> 4. To use any one of the three level of indexes,we should enable predicate 
>> push-down by setting spark.sql.orc.filterPushdown=true (in sparkSQL) or 
>> hive.optimize.ppd=true (in hive).
>> 
>> But I found the  build-in indexes in ORC files did not work both in spark 
>> 1.5.2 and hive 1.2.1:
>> First, when the query statement with where clause did't match any record 
>> (the filter column had a value beyond the range of data),  the performance 
>> when enabled  predicate push-down was almost the same with when disabled 
>> predicate push-down.  I think, when the filter column has a value beyond the 
>> range of data, all of the orc files will not be scanned if use file level 
>> indexes,  so the performance should improve obviously.
>> 
>> The second, when enabled "orc.create.index" and sorted data by filter column 
>> and where clause can only match a few records, the performance when enabled  
>> predicate push-down was almost the same with when disabled predicate 
>> push-down. 
>> 
>> The third, when enabled  predicate push-down and "orc.create.index", the 
>> performance when  filter column had a value beyond the range of data was 
>> almost the same with when filter column had a value covering almost the 
>> whole data. 
>> 
>> So,  has anyone used ORC's build-in indexes before (especially in spark 
>> SQL)?  What's my issue?
>> 
>> Thanks!
>> 
>> Joseph


Can't zip RDDs with unequal numbers of partitions

2016-03-18 Thread Jiří Syrový
Hi,

any idea what could be causing this issue? It started appearing after
changing parameter



*spark.sql.autoBroadcastJoinThreshold to 10*

Caused by: java.lang.IllegalArgumentException: Can't zip RDDs with unequal
numbers of partitions
at
org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.PartitionCoalescer.(CoalescedRDD.scala:172)
at
org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:85)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.ShuffleDependency.(Dependency.scala:91)
at
org.apache.spark.sql.execution.Exchange.prepareShuffleDependency(Exchange.scala:220)
at
org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:254)
at
org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:248)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
... 28 more


Incomplete data when reading from S3

2016-03-18 Thread Blaž Šnuderl
Hi.

We have json data stored in S3 (json record per line). When reading the
data from s3 using the following code we started noticing json decode
errors.

sc.textFile(paths).map(json.loads)


After a bit more investigation we noticed an incomplete line, basically the
line was

> {"key": "value", "key2":  <- notice the line abruptly ends with no json
> close tag etc


It is not an issue with our data and it doesn't happen very often, but it
makes us very scared since it means spark could be dropping data.

We are using spark 1.5.1. Any ideas why this happens and possible fixes?

Regards,
Blaž Šnuderl


?????? Limit pyspark.daemon threads

2016-03-18 Thread Sea
It's useless...  The python worker will go above 1.5g in my production 
environment




--  --
??: "Ted Yu";;
: 2016??3??17??(??) 10:50
??: "Carlile, Ken"; 
: "user"; 
: Re: Limit pyspark.daemon threads



I took a look at docs/configuration.md
Though I didn't find answer for your first question, I think the following 
pertains to your second question:



  spark.python.worker.memory
  512m
  
Amount of memory to use per python worker process during aggregation, in 
the same
format as JVM memory strings (e.g. 512m, 2g). If 
the memory
used during aggregation goes above this amount, it will spill the data into 
disks.
  




On Thu, Mar 17, 2016 at 7:43 AM, Carlile, Ken  wrote:
Hello,
 
 We have an HPC cluster that we run Spark jobs on using standalone mode and a 
number of scripts I??ve built up to dynamically schedule and start spark 
clusters within the Grid Engine framework. Nodes in the cluster have 16 cores 
and 128GB of RAM.
 
 My users use pyspark heavily. We??ve been having a number of problems with 
nodes going offline with extraordinarily high load. I was able to look at one 
of those nodes today before it went truly sideways, and I discovered that the 
user was running 50 pyspark.daemon threads (remember, this is a 16 core box), 
and the load was somewhere around 25 or so, with all CPUs maxed out at 100%.
 
 So while the spark worker is aware it??s only got 16 cores and behaves 
accordingly, pyspark seems to be happy to overrun everything like crazy. Is 
there a global parameter I can use to limit pyspark threads to a sane number, 
say 15 or 16? It would also be interesting to set a memory limit, which leads 
to another question.
 
 How is memory managed when pyspark is used? I have the spark worker memory set 
to 90GB, and there is 8GB of system overhead (GPFS caching), so if pyspark 
operates outside of the JVM memory pool, that leaves it at most 30GB to play 
with, assuming there is no overhead outside the JVM??s 90GB heap (ha ha.)
 
 Thanks,
 Ken Carlile
 Sr. Unix Engineer
 HHMI/Janelia Research Campus
 571-209-4363

Re: Saving intermediate results in mapPartitions

2016-03-18 Thread Enrico Rotundo
Try to set MEMORY_AND_DISK as RDD’s storage persistence level.
http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence 

> On 19 Mar 2016, at 00:55, Krishna  wrote:
> 
> Hi,
> 
> I've a situation where the number of elements output by each partition from 
> mapPartitions don't fit into the RAM even with the lowest number of rows in 
> the partition (there is a hard lower limit on this value). What's the best 
> way to address this problem? During the mapPartition phase, is there a way to 
> convert intermediate results to a DF and save to a database? Rows saved to 
> database don't need to be part of the output results from mapPartitions.
> 
> 



spark shuffle service on yarn

2016-03-18 Thread Koert Kuipers
spark on yarn is nice because i can bring my own spark. i am worried that
the shuffle service forces me to use some "sanctioned" spark version that
is officially "installed" on the cluster.

so... can i safely install the spark 1.3 shuffle service on yarn and use it
with other 1.x versions of spark?

thanks


Potential conflict with org.iq80.snappy in Spark 1.6.0 environment?

2016-03-18 Thread vasu20
Hi,

I have some code that parses a snappy thrift file for objects.  This code
works fine when run standalone (outside of the Spark environment).  However,
when running from within Spark, I get an IllegalAccessError exception from
the org.iq80.snappy package.  Has anyone else seen this error and/or do you
have any suggestions?  Any pointers appreciated.  Thanks!

Vasu

-- 
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IllegalAccessError:
tried to access class org.iq80.snappy.BufferRecycler from class
org.iq80.snappy.AbstractSnappyInputStream
at
org.iq80.snappy.AbstractSnappyInputStream.(AbstractSnappyInputStream.java:91)
at
org.iq80.snappy.SnappyFramedInputStream.(SnappyFramedInputStream.java:38)
at DistMatchMetric$1.call(DistMatchMetric.java:131)
at DistMatchMetric$1.call(DistMatchMetric.java:123)
at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
at
org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$14.apply(RDD.scala:1011)
at
org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$14.apply(RDD.scala:1009)
at 
org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1951)
at 
org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Potential-conflict-with-org-iq80-snappy-in-Spark-1-6-0-environment-tp26539.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org