BinaryClassificationMetrics only supports AreaUnderPR and AreaUnderROC?

2017-05-11 Thread Lan Jiang
I realized that in the Spark ML, BinaryClassifcationMetrics only supports
AreaUnderPR and AreaUnderROC. Why is that? I

What if I need other metrics such as F-score, accuracy? I tried to use
MulticlassClassificationEvaluator to evaluate other metrics such as
Accuracy for a binary classification problem and it seems working. But I am
not sure if there is any issue using MulticlassClassificationEvaluator for
a binary classification. According to the Spark ML documentation "The
Evaluator can be a RegressionEvaluator for regression problems, *a
BinaryClassificationEvaluator for binary data, or a
MulticlassClassificationEvaluator for multiclass problems*. "

https://spark.apache.org/docs/2.1.0/ml-tuning.html

Can someone shed some lights on the issue?

Lan


Does monotonically_increasing_id generates the same id even when executor fails or being evicted out of memory

2017-02-28 Thread Lan Jiang
Hi, there

I am trying to generate unique ID for each record in a dataframe, so that I
can save the dataframe to a relational database table. My question is that
when the dataframe is regenerated due to executor failure or being evicted
out of cache, does the ID keeps the same as before?

According to the document:

*The generated ID is guaranteed to be monotonically increasing and unique,
but not consecutive. The current implementation puts the partition ID in
the upper 31 bits, and the lower 33 bits represent the record number within
each partition. *

I assume the partition id stays the same after the regeneration. But what
about the record number within each partition?

My code is like below:

import org.apache.spark.sql.functions._
val df1=(1 to 1000).toDF.repartition(4)
val df2 = df1.withColumn("id", monotonically_increasing_id).cache
df2.show
df2.show

I executed it several times and it seems to generate the same ID for each
specific record, but I am not sure that proves that it will generate the
same ID for every scenario.

BTW, I am aware of the shortcoming of monotonically_increasing_id in Spark
1.6, explained in https://issues.apache.org/jira/browse/SPARK-14241, which
is fixed in 2.0.

Lan


Spark Streaming proactive monitoring

2017-01-23 Thread Lan Jiang
Hi, there

From the Spark UI, we can monitor the following two metrics:

• Processing Time - The time to process each batch of data.
• Scheduling Delay - the time a batch waits in a queue for the 
processing of previous batches to finish.

However, what is the best way to monitor them proactively? For example, if 
processing time/scheduling delay exceed certain threshold, send alert to the 
admin/developer? 

Lan
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Yarn executor container memory

2016-08-15 Thread Lan Jiang
Hello,

My understanding is that YARN executor container memory is based on 
"spark.executor.memory" + “spark.yarn.executor.memoryOverhead”. The first one 
is for heap memory and second one is for offheap memory. The 
spark.executor.memory is used by -Xmx to set the max heap size. Now my question 
is why it does not count permgen size and memory used by stack. They are not 
part of the max heap size. IMHO, YARN executor container memory should be set 
to:  spark.executor.memory  + [-XX:MaxPermSize] + number_of_threads * [-Xss] + 
spark.yarn.executor.memoryOverhead. What did I miss?

Lan
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Processing json document

2016-07-07 Thread Lan Jiang
Hi, there,

Thank you all for your input. @Hyukjin, as a matter of fact, I have read
the blog link you posted before asking the question on the forum. As you
pointed out, the link uses wholeTextFiles(0, which is bad in my case,
because my json file can be as large as 20G+ and OOM might occur. I am not
sure how to extract the value by using textFile call as it will create an
RDD of string and treat each line without ordering. It destroys the json
context.

Large multiline json file with parent node are very common in the real
world. Take the common employees json example below, assuming we have
millions of employee and it is super large json document, how can spark
handle this? This should be a common pattern, shouldn't it? In real world,
json document does not always come as cleanly formatted as the spark
example requires.

{
"employees":[
{
  "firstName":"John",
  "lastName":"Doe"
},
{
  "firstName":"Anna",
   "lastName":"Smith"
},
{
   "firstName":"Peter",
"lastName":"Jones"}
]
}



On Thu, Jul 7, 2016 at 1:47 AM, Hyukjin Kwon <gurwls...@gmail.com> wrote:

> The link uses wholeTextFiles() API which treats each file as each record.
>
>
> 2016-07-07 15:42 GMT+09:00 Jörn Franke <jornfra...@gmail.com>:
>
>> This does not need necessarily the case if you look at the Hadoop
>> FileInputFormat architecture then you can even split large multi line Jsons
>> without issues. I would need to have a look at it, but one large file does
>> not mean one Executor independent of the underlying format.
>>
>> On 07 Jul 2016, at 08:12, Hyukjin Kwon <gurwls...@gmail.com> wrote:
>>
>> There is a good link for this here,
>> http://searchdatascience.com/spark-adventures-1-processing-multi-line-json-files
>>
>> If there are a lot of small files, then it would work pretty okay in a
>> distributed manner, but I am worried if it is single large file.
>>
>> In this case, this would only work in single executor which I think will
>> end up with OutOfMemoryException.
>>
>> Spark JSON data source does not support multi-line JSON as input due to
>> the limitation of TextInputFormat and LineRecordReader.
>>
>> You may have to just extract the values after reading it by textFile..
>> ​
>>
>>
>> 2016-07-07 14:48 GMT+09:00 Lan Jiang <ljia...@gmail.com>:
>>
>>> Hi, there
>>>
>>> Spark has provided json document processing feature for a long time. In
>>> most examples I see, each line is a json object in the sample file. That is
>>> the easiest case. But how can we process a json document, which does not
>>> conform to this standard format (one line per json object)? Here is the
>>> document I am working on.
>>>
>>> First of all, it is multiple lines for one single big json object. The
>>> real file can be as long as 20+ G. Within that one single json object, it
>>> contains many name/value pairs. The name is some kind of id values. The
>>> value is the actual json object that I would like to be part of dataframe.
>>> Is there any way to do that? Appreciate any input.
>>>
>>>
>>> {
>>> "id1": {
>>> "Title":"title1",
>>> "Author":"Tom",
>>> "Source":{
>>> "Date":"20160506",
>>> "Type":"URL"
>>> },
>>> "Data":" blah blah"},
>>>
>>> "id2": {
>>> "Title":"title2",
>>> "Author":"John",
>>> "Source":{
>>> "Date":"20150923",
>>> "Type":"URL"
>>> },
>>> "Data":" blah blah "},
>>>
>>> "id3: {
>>> "Title":"title3",
>>> "Author":"John",
>>> "Source":{
>>> "Date":"20150902",
>>> "Type":"URL"
>>> },
>>> "Data":" blah blah "}
>>> }
>>>
>>>
>>
>


Processing json document

2016-07-06 Thread Lan Jiang
Hi, there

Spark has provided json document processing feature for a long time. In
most examples I see, each line is a json object in the sample file. That is
the easiest case. But how can we process a json document, which does not
conform to this standard format (one line per json object)? Here is the
document I am working on.

First of all, it is multiple lines for one single big json object. The real
file can be as long as 20+ G. Within that one single json object, it
contains many name/value pairs. The name is some kind of id values. The
value is the actual json object that I would like to be part of dataframe.
Is there any way to do that? Appreciate any input.


{
"id1": {
"Title":"title1",
"Author":"Tom",
"Source":{
"Date":"20160506",
"Type":"URL"
},
"Data":" blah blah"},

"id2": {
"Title":"title2",
"Author":"John",
"Source":{
"Date":"20150923",
"Type":"URL"
},
"Data":" blah blah "},

"id3: {
"Title":"title3",
"Author":"John",
"Source":{
"Date":"20150902",
"Type":"URL"
},
"Data":" blah blah "}
}


Re: MLLib + Streaming

2016-03-06 Thread Lan Jiang
tion 
> can have towards wild predictions in the beginning. Offline training is 
> simple and cheap where as online training can be hard and needs to be 
> constantly monitored to see how it is performing.
> 
> Hope this helps in understanding offline learning vs. online learning and 
> which algorithms you can choose for online learning in MLlib.
> 
> Guru Medasani
> gdm...@gmail.com <mailto:gdm...@gmail.com>
> 
> 
> 
> > On Mar 5, 2016, at 7:37 PM, Lan Jiang <ljia...@gmail.com 
> > <mailto:ljia...@gmail.com>> wrote:
> >
> > Hi, there
> >
> > I hope someone can clarify this for me.  It seems that some of the MLlib 
> > algorithms such as KMean, Linear Regression and Logistics Regression have a 
> > Streaming version, which can do online machine learning. But does that mean 
> > other MLLib algorithm cannot be used in Spark streaming applications, such 
> > as random forest, SVM, collaborate filtering, etc??
> >
> > DStreams are essentially a sequence of RDDs. We can use DStream.transform() 
> > and DStream.foreachRDD() operations, which allows you access RDDs in a 
> > DStream and apply MLLib functions on them. So it looks like all MLLib 
> > algorithms should be able to run in the streaming application. Am I wrong?
> >
> > Lan
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> > <mailto:user-unsubscr...@spark.apache.org>
> > For additional commands, e-mail: user-h...@spark.apache.org 
> > <mailto:user-h...@spark.apache.org>
> >
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org 
> <mailto:user-h...@spark.apache.org>
> 
> 



Re: Spark ML and Streaming

2016-03-06 Thread Lan Jiang
Sorry, accidentally sent again. My apology.  

> On Mar 6, 2016, at 1:22 PM, Lan Jiang <ljia...@gmail.com> wrote:
> 
> Hi, there
> 
> I hope someone can clarify this for me.  It seems that some of the MLlib 
> algorithms such as KMean, Linear Regression and Logistics Regression have a 
> Streaming version, which can do online machine learning. But does that mean 
> other MLLib algorithm cannot be used in Spark streaming applications, such as 
> random forest, SVM, collaborate filtering, etc??
> 
> DStreams are essentially a sequence of RDDs. We can use DStream.transform() 
> and DStream.foreachRDD() operations, which allows you access RDDs in a 
> DStream and apply MLLib functions on them. So it looks like all MLLib 
> algorithms should be able to run in the streaming application. Am I wrong? 
> 
> Thanks in advance.
> 
> Lan


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark ML and Streaming

2016-03-06 Thread Lan Jiang
Hi, there

I hope someone can clarify this for me.  It seems that some of the MLlib 
algorithms such as KMean, Linear Regression and Logistics Regression have a 
Streaming version, which can do online machine learning. But does that mean 
other MLLib algorithm cannot be used in Spark streaming applications, such as 
random forest, SVM, collaborate filtering, etc??

DStreams are essentially a sequence of RDDs. We can use DStream.transform() and 
DStream.foreachRDD() operations, which allows you access RDDs in a DStream and 
apply MLLib functions on them. So it looks like all MLLib algorithms should be 
able to run in the streaming application. Am I wrong? 

Thanks in advance.

Lan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



MLLib + Streaming

2016-03-05 Thread Lan Jiang
Hi, there

I hope someone can clarify this for me.  It seems that some of the MLlib 
algorithms such as KMean, Linear Regression and Logistics Regression have a 
Streaming version, which can do online machine learning. But does that mean 
other MLLib algorithm cannot be used in Spark streaming applications, such as 
random forest, SVM, collaborate filtering, etc??

DStreams are essentially a sequence of RDDs. We can use DStream.transform() and 
DStream.foreachRDD() operations, which allows you access RDDs in a DStream and 
apply MLLib functions on them. So it looks like all MLLib algorithms should be 
able to run in the streaming application. Am I wrong? 

Lan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



broadcast join in SparkSQL requires analyze table noscan

2016-02-10 Thread Lan Jiang
Hi, there

I am looking at the SparkSQL setting spark.sql.autoBroadcastJoinThreshold.
According to the programming guide

*Note that currently statistics are only supported for Hive Metastore
tables where the command ANALYZE TABLE  COMPUTE STATISTICS
noscan has been run.*

My question is that is "NOSCAN" option a must? If I execute "ANALYZE TABLE
 compute statistics" command in Hive shell, is the statistics
going to be used by SparkSQL to decide broadcast join?

Thanks.


Re: broadcast join in SparkSQL requires analyze table noscan

2016-02-10 Thread Lan Jiang
Michael,

Thanks for the reply.

On Wed, Feb 10, 2016 at 11:44 AM, Michael Armbrust 
wrote:

> My question is that is "NOSCAN" option a must? If I execute "ANALYZE TABLE
>>  compute statistics" command in Hive shell, is the statistics
>> going to be used by SparkSQL to decide broadcast join?
>
>
> Yes, spark SQL will only accept the simple no scan version.  However, as
> long as the sizeInBytes statistic is present, we will use it.
>
>


Question about Spark Streaming checkpoint interval

2015-12-18 Thread Lan Jiang
Need some clarification about the documentation. According to Spark doc

"the default interval is a multiple of the batch interval that is at least 10 
seconds. It can be set by using dstream.checkpoint(checkpointInterval). 
Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a 
good setting to try.”

My question is that does the checkpointinterval apply only for data 
checkpointing or it applies to metadata checkpointing? The API says 
dstream.checkpoint() is for "Enable periodic checkpointing of RDDs of this 
DStream”, implying it is only for data checkpointing. My understanding is that 
metadata checkpointing is for driver failure. For example, in Kafka direct API, 
driver keeps track of the offset range of each partition. So if metadata 
checkpoint is NOT done for each batch, in driver failure, some messages in 
Kafka is going to be replayed. 

I do not find the answer in the document saying whether metadata checkpointing 
is done for each batch and whether checkpointinterval setting applies to both 
types of checkpointing. Maybe I miss it. If anyone can point me to the right 
documentation, I would highly appreciate it.

Best Regards,

Lan

Re: Scala VS Java VS Python

2015-12-16 Thread Lan Jiang
For Spark data science project, Python might be a good choice. However, for 
Spark streaming, Python API is still lagging. For example, for Kafka no 
receiver connector, according to the Spark 1.5.2 doc:  "Spark 1.4 added a 
Python API, but it is not yet at full feature parity”. 

Java does not have REPL shell, which is a major drawback from my perspective.

Lan



> On Dec 16, 2015, at 3:46 PM, Stephen Boesch  wrote:
> 
> There are solid reasons to have built spark on the jvm vs python. The 
> question for Daniel appear to be at this point scala vs java8. For that there 
> are many comparisons already available: but in the case of working with spark 
> there is the additional benefit for the scala side that the core libraries 
> are in that language.  
> 
> 2015-12-16 13:41 GMT-08:00 Darren Govoni  >:
> I use python too. I'm actually surprises it's not the primary language since 
> it is by far more used in data science than java snd Scala combined.
> 
> If I had a second choice of script language for general apps I'd want groovy 
> over scala.
> 
> 
> 
> Sent from my Verizon Wireless 4G LTE smartphone
> 
> 
>  Original message 
> From: Daniel Lopes > 
> Date: 12/16/2015 4:16 PM (GMT-05:00) 
> To: Daniel Valdivia  > 
> Cc: user > 
> Subject: Re: Scala VS Java VS Python 
> 
> For me Scala is better like Spark is written in Scala, and I like python cuz 
> I always used python for data science. :)
> 
> On Wed, Dec 16, 2015 at 5:54 PM, Daniel Valdivia  > wrote:
> Hello,
> 
> This is more of a "survey" question for the community, you can reply to me 
> directly so we don't flood the mailing list.
> 
> I'm having a hard time learning Spark using Python since the API seems to be 
> slightly incomplete, so I'm looking at my options to start doing all my apps 
> in either Scala or Java, being a Java Developer, java 1.8 looks like the 
> logical way, however I'd like to ask here what's the most common (Scala Or 
> Java) since I'm observing mixed results in the social documentation, however 
> Scala seems to be the predominant language for spark examples.
> 
> Thank for the advice
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 
> 
> 
> -- 
> Daniel Lopes, B.Eng
> Data Scientist - BankFacil
> CREA/SP 5069410560 
> 
> Mob +55 (18) 99764-2733 
> Ph +55 (11) 3522-8009
> http://about.me/dannyeuu 
> 
> Av. Nova Independência, 956, São Paulo, SP
> Bairro Brooklin Paulista
> CEP 04570-001
> https://www.bankfacil.com.br 
> 
> 



Re: Protobuff 3.0 for Spark

2015-11-09 Thread Lan Jiang
I have not run into any linkage problem, but maybe I was lucky. :-). The
reason I wanted to use protobuf 3 is mainly for Map type support.

On Thu, Nov 5, 2015 at 4:43 AM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> > On 5 Nov 2015, at 00:12, Lan Jiang <ljia...@gmail.com> wrote:
> >
> > I have used protobuf 3 successfully with Spark on CDH 5.4, even though
> Hadoop itself comes with protobuf 2.5. I think the steps apply to HDP too.
> You need to do the following
>
>
> Protobuf.jar has been so brittle in the past that the entire hadoop stack
> has effectively frozen @ 2.5, at least until another mass-coordinated
> update across as many projects as possible. If you do update it locally,
> you are highly likely to find linkage problems. I would strongly advise
> staying with 2.5 unless there is some pressing need and you are happy to
> take on all the pain yourself
>


Re: Protobuff 3.0 for Spark

2015-11-04 Thread Lan Jiang
I have used protobuf 3 successfully with Spark on CDH 5.4, even though Hadoop 
itself comes with protobuf 2.5. I think the steps apply to HDP too. You need to 
do the following

1. Set the below parameter

spark.executor.userClassPathFirst=true
spark.driver.userClassPathFirst=true

2. Include protobuf 3 jar file either through —jars during the spark-submit or 
package it into a uber jar file with your own classes.

Lan


> On Nov 4, 2015, at 4:07 PM, Cassa L  wrote:
> 
> Hi,
>  Does spark support protobuff 3.0? I used protobuff 2.5 with spark-1.4 built 
> for HDP 2.3. Given that protobuff has compatibility issues , want to know if 
> spark supports protbuff 3.0
> 
> LCassa


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Ahhhh... Spark creates >30000 partitions... What can I do?

2015-10-20 Thread Lan Jiang
As Francois pointed out, you are encountering a classic small file 
anti-pattern. One solution I used in the past is to wrap all these small binary 
files into a sequence file or avro file. For example, the avro schema can have 
two fields: filename: string and binaryname:byte[]. Thus your file is 
splittable and will not create so many partitions.

Lan


> On Oct 20, 2015, at 8:03 AM, François Pelletier 
>  wrote:
> 
> You should aggregate your files in larger chunks before doing anything else. 
> HDFS is not fit for small files. It will bloat it and cause you a lot of 
> performance issues. Target a few hundred MB chunks partition size and then 
> save those files back to hdfs and then delete the original ones. You can 
> read, use coalesce and the saveAsXXX on the result.
> 
> I had the same kind of problem once and solved it in bunching 100's of files 
> together in larger ones. I used text files with bzip2 compression.
> 
> 
> 
> Le 2015-10-20 08:42, Sean Owen a écrit :
>> coalesce without a shuffle? it shouldn't be an action. It just treats many 
>> partitions as one.
>> 
>> On Tue, Oct 20, 2015 at 1:00 PM, t3l > > wrote:
>> 
>> I have dataset consisting of 5 binary files (each between 500kb and
>> 2MB). They are stored in HDFS on a Hadoop cluster. The datanodes of the
>> cluster are also the workers for Spark. I open the files as a RDD using
>> sc.binaryFiles("hdfs:///path_to_directory").When I run the first action that
>> involves this RDD, Spark spawns a RDD with more than 3 partitions. And
>> this takes ages to process these partitions even if you simply run "count".
>> Performing a "repartition" directly after loading does not help, because
>> Spark seems to insist on materializing the RDD created by binaryFiles first.
>> 
>> How I can get around this?
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/A-Spark-creates-3-partitions-What-can-I-do-tp25140.html
>>  
>> 
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>> 
>> For additional commands, e-mail: user-h...@spark.apache.org 
>> 
>> 
>> 
> 



Re: Ahhhh... Spark creates >30000 partitions... What can I do?

2015-10-20 Thread Lan Jiang
I think the data file is binary per the original post. So in this case, 
sc.binaryFiles should be used. However, I still recommend against using so many 
small binary files as 

1. They are not good for batch I/O
2. They put too many memory pressure on namenode.

Lan


> On Oct 20, 2015, at 11:20 AM, Deenar Toraskar <deenar.toras...@gmail.com> 
> wrote:
> 
> also check out wholeTextFiles
> 
> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html#wholeTextFiles(java.lang.String,%20int)
>  
> <https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html#wholeTextFiles(java.lang.String,%20int)>
> 
> On 20 October 2015 at 15:04, Lan Jiang <ljia...@gmail.com 
> <mailto:ljia...@gmail.com>> wrote:
> As Francois pointed out, you are encountering a classic small file 
> anti-pattern. One solution I used in the past is to wrap all these small 
> binary files into a sequence file or avro file. For example, the avro schema 
> can have two fields: filename: string and binaryname:byte[]. Thus your file 
> is splittable and will not create so many partitions.
> 
> Lan
> 
> 
>> On Oct 20, 2015, at 8:03 AM, François Pelletier 
>> <newslett...@francoispelletier.org 
>> <mailto:newslett...@francoispelletier.org>> wrote:
>> 
>> You should aggregate your files in larger chunks before doing anything else. 
>> HDFS is not fit for small files. It will bloat it and cause you a lot of 
>> performance issues. Target a few hundred MB chunks partition size and then 
>> save those files back to hdfs and then delete the original ones. You can 
>> read, use coalesce and the saveAsXXX on the result.
>> 
>> I had the same kind of problem once and solved it in bunching 100's of files 
>> together in larger ones. I used text files with bzip2 compression.
>> 
>> 
>> 
>> Le 2015-10-20 08:42, Sean Owen a écrit :
>>> coalesce without a shuffle? it shouldn't be an action. It just treats many 
>>> partitions as one.
>>> 
>>> On Tue, Oct 20, 2015 at 1:00 PM, t3l <t...@threelights.de 
>>> <mailto:t...@threelights.de>> wrote:
>>> 
>>> I have dataset consisting of 5 binary files (each between 500kb and
>>> 2MB). They are stored in HDFS on a Hadoop cluster. The datanodes of the
>>> cluster are also the workers for Spark. I open the files as a RDD using
>>> sc.binaryFiles("hdfs:///path_to_directory <>").When I run the first action 
>>> that
>>> involves this RDD, Spark spawns a RDD with more than 3 partitions. And
>>> this takes ages to process these partitions even if you simply run "count".
>>> Performing a "repartition" directly after loading does not help, because
>>> Spark seems to insist on materializing the RDD created by binaryFiles first.
>>> 
>>> How I can get around this?
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-spark-user-list.1001560.n3.nabble.com/A-Spark-creates-3-partitions-What-can-I-do-tp25140.html
>>>  
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/A-Spark-creates-3-partitions-What-can-I-do-tp25140.html>
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com 
>>> <http://nabble.com/>.
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
>>> <mailto:user-unsubscr...@spark.apache.org>
>>> For additional commands, e-mail: user-h...@spark.apache.org 
>>> <mailto:user-h...@spark.apache.org>
>>> 
>>> 
>> 
> 
> 



Re: "java.io.IOException: Filesystem closed" on executors

2015-10-14 Thread Lan Jiang
Thank you, Akhil. Actually the problem was solved last week and I did not
have time to report back. The error was caused by YARN killing the
container because executors use more off-heap memory that they were
assigned. There was nothing in the exectuor log, but the AM log clearly
states this is the problem.

After I increased the spark.yarn.executor.memoryOverhead, it was working
fine. I was using Spark 1.3, which has the defaut value as executorMemory *
0.07, with minimum of 384. In spark 1.4 and later, the default value was
changed to executorMemory * 0.10, with minimum of 384.

Lan

On Mon, Oct 12, 2015 at 8:34 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Can you look a bit deeper in the executor logs? It could be filling up the
> memory and getting killed.
>
> Thanks
> Best Regards
>
> On Mon, Oct 5, 2015 at 8:55 PM, Lan Jiang <ljia...@gmail.com> wrote:
>
>> I am still facing this issue. Executor dies due to
>>
>> org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem
>> closed
>> at
>> org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278)
>> at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
>> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
>> ...
>> Caused by: java.io.IOException: Filesystem closed
>> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
>> at
>> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
>> at java.io.DataInputStream.read(DataInputStream.java:149)
>>
>> Spark automatically launched new executors and the whole job completed
>> fine. Anyone has a clue what's going on?
>>
>> The spark job reads avro files from a directory, do some basic map/filter
>> and then repartition to 1, write the result to HDFS. I use spark 1.3 with
>> spark-avro (1.0.0). The error only happens when running on the whole
>> dataset. When running on 1/3 of the files, the same job completes without
>> error.
>>
>>
>> On Thu, Oct 1, 2015 at 2:41 PM, Lan Jiang <ljia...@gmail.com> wrote:
>>
>>> Hi, there
>>>
>>> Here is the problem I ran into when executing a Spark Job (Spark 1.3).
>>> The spark job is loading a bunch of avro files using Spark SQL spark-avro
>>> 1.0.0 library. Then it does some filter/map transformation, repartition to
>>> 1 partition and then write to HDFS. It creates 2 stages. The total HDFS
>>> block number is around 12000, thus it creates 12000 partitions, thus 12000
>>> tasks for the first stage. I have total 9 executors launched with 5 thread
>>> for each. The job has run fine until the very end.  When it reaches
>>> 19980/2 tasks succeeded, it suddenly failed the last 20 tasks and I
>>> lost 2 executors. The spark did launched 2 new executors and finishes the
>>> job eventually by reprocessing the 20 tasks.
>>>
>>> I only ran into this issue when I run the spark application on the full
>>> dataset. When I run the 1/3 of the dataset, everything finishes fine
>>> without error.
>>>
>>> Question 1: What is the root cause of this issue? It is simiar to
>>> http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed
>>> and https://issues.apache.org/jira/browse/SPARK-3052, but it says the
>>> issue has been fixed since 1.2
>>> Quesiton 2: I am a little surprised that after the 2 new executors were
>>> launched,  replacing the two failed executors, they simply reprocessed the
>>> failed 20 tasks/partitions.  What about the results for other parititons
>>> processed by the 2 failed executors before? I assumed the results of these
>>> parititons are stored to the local disk and thus do not need to be computed
>>> by the new exectuors?  When are the data stored locally? Is it
>>> configuration? This question is for my own understanding about the spark
>>> framework.
>>>
>>> The exception causing the exectuor failure is below
>>>
>>> org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem
>>> closed
>>> at
>>> org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278)
>>> at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
>>> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
>>> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
>>> at org.apache.spa

Re: How to increase Spark partitions for the DataFrame?

2015-10-08 Thread Lan Jiang
Hmm, that’s odd. 

You can always use repartition(n) to increase the partition number, but then 
there will be shuffle. How large is your ORC file? Have you used NameNode UI to 
check how many HDFS blocks each ORC file has?

Lan


> On Oct 8, 2015, at 2:08 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:
> 
> Hi Lan, thanks for the response yes I know and I have confirmed in UI that it 
> has only 12 partitions because of 12 HDFS blocks and hive orc file strip size 
> is 33554432.
> 
> On Thu, Oct 8, 2015 at 11:55 PM, Lan Jiang <ljia...@gmail.com 
> <mailto:ljia...@gmail.com>> wrote:
> The partition number should be the same as the HDFS block number instead of 
> file number. Did you confirmed from the spark UI that only 12 partitions were 
> created? What is your ORC orc.stripe.size?
> 
> Lan
> 
> 
> > On Oct 8, 2015, at 1:13 PM, unk1102 <umesh.ka...@gmail.com 
> > <mailto:umesh.ka...@gmail.com>> wrote:
> >
> > Hi I have the following code where I read ORC files from HDFS and it loads
> > directory which contains 12 ORC files. Now since HDFS directory contains 12
> > files it will create 12 partitions by default. These directory is huge and
> > when ORC files gets decompressed it becomes around 10 GB how do I increase
> > partitions for the below code so that my Spark job runs faster and does not
> > hang for long time because of reading 10 GB files through shuffle in 12
> > partitions. Please guide.
> >
> > DataFrame df =
> > hiveContext.read().format("orc").load("/hdfs/path/to/orc/files/");
> > df.select().groupby(..)
> >
> >
> >
> >
> > --
> > View this message in context: 
> > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-increase-Spark-partitions-for-the-DataFrame-tp24980.html
> >  
> > <http://apache-spark-user-list.1001560.n3.nabble.com/How-to-increase-Spark-partitions-for-the-DataFrame-tp24980.html>
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> > <mailto:user-unsubscr...@spark.apache.org>
> > For additional commands, e-mail: user-h...@spark.apache.org 
> > <mailto:user-h...@spark.apache.org>
> >
> 
> 



failed spark job reports on YARN as successful

2015-10-08 Thread Lan Jiang
Hi, there

I have a spark batch job running on CDH5.4 + Spark 1.3.0. Job is submitted in 
“yarn-client” mode. The job itself failed due to YARN kills several executor 
containers because the containers exceeded the memory limit posed by YARN. 
However, when I went to the YARN resource manager site, it displayed the job as 
successful. I found there was an issue reported in JIRA 
https://issues.apache.org/jira/browse/SPARK-3627 
, but it says it was fixed in 
Spark 1.2. On Spark history server, it shows the job as “Incomplete”. 

Is this still a bug or there is something I need to do in spark application to 
report the correct job status to YARN?

Lan



Re: How to increase Spark partitions for the DataFrame?

2015-10-08 Thread Lan Jiang
The partition number should be the same as the HDFS block number instead of 
file number. Did you confirmed from the spark UI that only 12 partitions were 
created? What is your ORC orc.stripe.size?

Lan


> On Oct 8, 2015, at 1:13 PM, unk1102  wrote:
> 
> Hi I have the following code where I read ORC files from HDFS and it loads
> directory which contains 12 ORC files. Now since HDFS directory contains 12
> files it will create 12 partitions by default. These directory is huge and
> when ORC files gets decompressed it becomes around 10 GB how do I increase
> partitions for the below code so that my Spark job runs faster and does not
> hang for long time because of reading 10 GB files through shuffle in 12
> partitions. Please guide. 
> 
> DataFrame df =
> hiveContext.read().format("orc").load("/hdfs/path/to/orc/files/");
> df.select().groupby(..)
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-increase-Spark-partitions-for-the-DataFrame-tp24980.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark cache memory storage

2015-10-06 Thread Lan Jiang
Hi, there

My understanding is that the cache storage is calculated as following

executor heap size * spark.storage.safetyFraction *
spark.storage.memoryFraction.

The default value for safetyFraction is 0.9 and memoryFraction is 0.6. When
I started a spark job on YARN, I set executor-memory to be 6g. thus I
expect the memory cache to be 6 * 0.9 * 0.6 = 3.24g. However, on the Spark
history server, it shows the reserved cached size for each executor is
3.1g. So it does not add up. What do I miss?

Lan


Re: "java.io.IOException: Filesystem closed" on executors

2015-10-05 Thread Lan Jiang
I am still facing this issue. Executor dies due to

org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem closed
at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278)
at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
...
Caused by: java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
at java.io.DataInputStream.read(DataInputStream.java:149)

Spark automatically launched new executors and the whole job completed
fine. Anyone has a clue what's going on?

The spark job reads avro files from a directory, do some basic map/filter
and then repartition to 1, write the result to HDFS. I use spark 1.3 with
spark-avro (1.0.0). The error only happens when running on the whole
dataset. When running on 1/3 of the files, the same job completes without
error.


On Thu, Oct 1, 2015 at 2:41 PM, Lan Jiang <ljia...@gmail.com> wrote:

> Hi, there
>
> Here is the problem I ran into when executing a Spark Job (Spark 1.3). The
> spark job is loading a bunch of avro files using Spark SQL spark-avro 1.0.0
> library. Then it does some filter/map transformation, repartition to 1
> partition and then write to HDFS. It creates 2 stages. The total HDFS block
> number is around 12000, thus it creates 12000 partitions, thus 12000 tasks
> for the first stage. I have total 9 executors launched with 5 thread for
> each. The job has run fine until the very end.  When it reaches 19980/2
> tasks succeeded, it suddenly failed the last 20 tasks and I lost 2
> executors. The spark did launched 2 new executors and finishes the job
> eventually by reprocessing the 20 tasks.
>
> I only ran into this issue when I run the spark application on the full
> dataset. When I run the 1/3 of the dataset, everything finishes fine
> without error.
>
> Question 1: What is the root cause of this issue? It is simiar to
> http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed
> and https://issues.apache.org/jira/browse/SPARK-3052, but it says the
> issue has been fixed since 1.2
> Quesiton 2: I am a little surprised that after the 2 new executors were
> launched,  replacing the two failed executors, they simply reprocessed the
> failed 20 tasks/partitions.  What about the results for other parititons
> processed by the 2 failed executors before? I assumed the results of these
> parititons are stored to the local disk and thus do not need to be computed
> by the new exectuors?  When are the data stored locally? Is it
> configuration? This question is for my own understanding about the spark
> framework.
>
> The exception causing the exectuor failure is below
>
> org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem
> closed
> at
> org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278)
> at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
> at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245)
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolEx

How to access lost executor log file

2015-10-01 Thread Lan Jiang
Hi, there

When running a Spark job on YARN, 2 executors somehow got lost during the 
execution. The message on the history server GUI is “CANNOT find address”.  Two 
extra executors were launched by YARN and eventually finished the job. Usually 
I go to the “Executors” tab on the UI to check the executor stdout/stderr for 
troubleshoot. Now if I go to the “Executors” tab,  I do not see the 2 executors 
that were lost. I can only see the rest executors and the 2 new executors. Thus 
I cannot check the stdout/stderr of the lost executors. How can I access the 
log files of these lost executors to find out why they were lost?

Thanks

Lan







Re: How to access lost executor log file

2015-10-01 Thread Lan Jiang
Ted,

Thanks for your reply.

First of all, after sending email to the mailing list,  I use yarn logs
applicationId  to retrieve the aggregated log
successfully.  I found the exceptions I am looking for.

Now as to your suggestion, when I go to the YARN RM UI, I can only see the
"Tracking URL" in the application overview section. When I click it, it
brings me to the spark history server UI, where I cannot find the lost
exectuors. The only logs link I can find one the YARN RM site is the
ApplicationMaster log, which is not what I need. Did I miss something?

Lan

On Thu, Oct 1, 2015 at 1:30 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Can you go to YARN RM UI to find all the attempts for this Spark Job ?
>
> The two lost executors should be found there.
>
> On Thu, Oct 1, 2015 at 10:30 AM, Lan Jiang <ljia...@gmail.com> wrote:
>
>> Hi, there
>>
>> When running a Spark job on YARN, 2 executors somehow got lost during the
>> execution. The message on the history server GUI is “CANNOT find address”.
>> Two extra executors were launched by YARN and eventually finished the job.
>> Usually I go to the “Executors” tab on the UI to check the executor
>> stdout/stderr for troubleshoot. Now if I go to the “Executors” tab,  I do
>> not see the 2 executors that were lost. I can only see the rest executors
>> and the 2 new executors. Thus I cannot check the stdout/stderr of the lost
>> executors. How can I access the log files of these lost executors to find
>> out why they were lost?
>>
>> Thanks
>>
>> Lan
>>
>>
>>
>>
>>
>>
>


"java.io.IOException: Filesystem closed" on executors

2015-10-01 Thread Lan Jiang
Hi, there

Here is the problem I ran into when executing a Spark Job (Spark 1.3). The
spark job is loading a bunch of avro files using Spark SQL spark-avro 1.0.0
library. Then it does some filter/map transformation, repartition to 1
partition and then write to HDFS. It creates 2 stages. The total HDFS block
number is around 12000, thus it creates 12000 partitions, thus 12000 tasks
for the first stage. I have total 9 executors launched with 5 thread for
each. The job has run fine until the very end.  When it reaches 19980/2
tasks succeeded, it suddenly failed the last 20 tasks and I lost 2
executors. The spark did launched 2 new executors and finishes the job
eventually by reprocessing the 20 tasks.

I only ran into this issue when I run the spark application on the full
dataset. When I run the 1/3 of the dataset, everything finishes fine
without error.

Question 1: What is the root cause of this issue? It is simiar to
http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed
and https://issues.apache.org/jira/browse/SPARK-3052, but it says the issue
has been fixed since 1.2
Quesiton 2: I am a little surprised that after the 2 new executors were
launched,  replacing the two failed executors, they simply reprocessed the
failed 20 tasks/partitions.  What about the results for other parititons
processed by the 2 failed executors before? I assumed the results of these
parititons are stored to the local disk and thus do not need to be computed
by the new exectuors?  When are the data stored locally? Is it
configuration? This question is for my own understanding about the spark
framework.

The exception causing the exectuor failure is below

org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem closed
at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278)
at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.avro.mapred.FsInput.read(FsInput.java:54)
at
org.apache.avro.file.DataFileReader$SeekableInputStream.read(DataFileReader.java:210)
at
org.apache.avro.io.BinaryDecoder$InputStreamByteSource.tryReadRaw(BinaryDecoder.java:839)
at org.apache.avro.io.BinaryDecoder.isEnd(BinaryDecoder.java:444)
at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:264)


unintended consequence of using coalesce operation

2015-09-29 Thread Lan Jiang
Hi, there

I ran into an issue when using Spark (v 1.3) to load avro file through Spark 
SQL. The code sample is below

val df = sqlContext.load(“path-to-avro-file","com.databricks.spark.avro”)
val myrdd = df.select(“Key", “Name", “binaryfield").rdd
val results = myrdd.map(...)
val finalResults = results.filter(...)
finalResults.coalesce(1).toDF().saveAsParquetFile(“path-to-parquet”)  

The avro file 645M. The HDFS block size is 128M. Thus the total is 5 HDFS 
blocks, which means there should be 5 partitions. Please note that I use 
coalesce because I expect the previous filter transformation should filter out 
almost all the data and I would like to write to 1 single parquet file. 

YARN cluster has 3 datanodes. I use the below configuration for spark submit

spark-submit —class  —num-executors 3 —executor-cores 2 
—executor-memory 8g —master yarn-client mytest.jar 

I do see 3 executors being created, one on each data/worker node. However, 
there is only one task running within the cluster.  After I remove the 
coalesce(1) call from the codes, I can see 5 tasks generates, spreading across 
3 executors.  I was surprised by the result. coalesce usually is thought to be 
a better choice than repartition operation when reducing the partition numbers. 
However, in the case, it causes performance issue because Spark only creates 
one task because the final partition number was coalesced to 1.  Thus there is 
only one thread reading HDFS files instead of 5. 

Is my understanding correct? In this case, I think repartition is a better 
choice than coalesce. 

Lan





Re: Change protobuf version or any other third party library version in Spark application

2015-09-15 Thread Lan Jiang
I am happy to report that after set spark.dirver.userClassPathFirst, I can use 
protobuf 3 with spark-shell. Looks like the classloading issue in the driver, 
not executor. 

Marcelo, thank you very much for the tip!

Lan


> On Sep 15, 2015, at 1:40 PM, Marcelo Vanzin <van...@cloudera.com> wrote:
> 
> Hi,
> 
> Just "spark.executor.userClassPathFirst" is not enough. You should
> also set "spark.driver.userClassPathFirst". Also not that I don't
> think this was really tested with the shell, but that should work with
> regular apps started using spark-submit.
> 
> If that doesn't work, I'd recommend shading, as others already have.
> 
> On Tue, Sep 15, 2015 at 9:19 AM, Lan Jiang <ljia...@gmail.com> wrote:
>> I used the --conf spark.files.userClassPathFirst=true  in the spark-shell
>> option, it still gave me the eror: java.lang.NoSuchFieldError: unknownFields
>> if I use protobuf 3.
>> 
>> The output says spark.files.userClassPathFirst is deprecated and suggest
>> using spark.executor.userClassPathFirst. I tried that and it did not work
>> either.
> 
> -- 
> Marcelo


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Change protobuf version or any other third party library version in Spark application

2015-09-15 Thread Lan Jiang
I used the --conf spark.files.userClassPathFirst=true  in the spark-shell 
option, it still gave me the eror: java.lang.NoSuchFieldError: unknownFields if 
I use protobuf 3. 

The output says spark.files.userClassPathFirst is deprecated and suggest using 
spark.executor.userClassPathFirst. I tried that and it did not work either. 

Lan



> On Sep 15, 2015, at 10:31 AM, java8964 <java8...@hotmail.com> wrote:
> 
> If you use Standalone mode, just start spark-shell like following:
> 
> spark-shell --jars your_uber_jar --conf spark.files.userClassPathFirst=true 
> 
> Yong
> 
> Date: Tue, 15 Sep 2015 09:33:40 -0500
> Subject: Re: Change protobuf version or any other third party library version 
> in Spark application
> From: ljia...@gmail.com
> To: java8...@hotmail.com
> CC: ste...@hortonworks.com; user@spark.apache.org
> 
> Steve,
> 
> Thanks for the input. You are absolutely right. When I use protobuf 2.6.1, I 
> also ran into method not defined errors. You suggest using Maven sharding 
> strategy, but I have already built the uber jar to package all my custom 
> classes and its dependencies including protobuf 3. The problem is how to 
> configure spark shell to use my uber jar first. 
> 
> java8964 -- appreciate the link and I will try the configuration. Looks 
> promising. However, the "user classpath first" attribute does not apply to 
> spark-shell, am I correct? 
> 
> Lan
> 
> On Tue, Sep 15, 2015 at 8:24 AM, java8964 <java8...@hotmail.com 
> <mailto:java8...@hotmail.com>> wrote:
> It is a bad idea to use the major version change of protobuf, as it most 
> likely won't work.
> 
> But you really want to give it a try, set the "user classpath first", so the 
> protobuf 3 coming with your jar will be used.
> 
> The setting depends on your deployment mode, check this for the parameter:
> 
> https://issues.apache.org/jira/browse/SPARK-2996 
> <https://issues.apache.org/jira/browse/SPARK-2996>
> 
> Yong
> 
> Subject: Re: Change protobuf version or any other third party library version 
> in Spark application
> From: ste...@hortonworks.com <mailto:ste...@hortonworks.com>
> To: ljia...@gmail.com <mailto:ljia...@gmail.com>
> CC: user@spark.apache.org <mailto:user@spark.apache.org>
> Date: Tue, 15 Sep 2015 09:19:28 +
> 
> 
> 
> 
> On 15 Sep 2015, at 05:47, Lan Jiang <ljia...@gmail.com 
> <mailto:ljia...@gmail.com>> wrote:
> 
> Hi, there,
> 
> I am using Spark 1.4.1. The protobuf 2.5 is included by Spark 1.4.1 by 
> default. However, I would like to use Protobuf 3 in my spark application so 
> that I can use some new features such as Map support.  Is there anyway to do 
> that? 
> 
> Right now if I build a uber.jar with dependencies including protobuf 3 
> classes and pass to spark-shell through --jars option, during the execution, 
> I got the error java.lang.NoSuchFieldError: unknownFields. 
> 
> 
> protobuf is an absolute nightmare version-wise, as protoc generates 
> incompatible java classes even across point versions. Hadoop 2.2+ is and will 
> always be protobuf 2.5 only; that applies transitively to downstream projects 
>  (the great protobuf upgrade of 2013 was actually pushed by the HBase team, 
> and required a co-ordinated change across multiple projects)
> 
> 
> Is there anyway to use a different version of Protobuf other than the default 
> one included in the Spark distribution? I guess I can generalize and extend 
> the question to any third party libraries. How to deal with version conflict 
> for any third party libraries included in the Spark distribution? 
> 
> maven shading is the strategy. Generally it is less needed, though the 
> troublesome binaries are,  across the entire apache big data stack:
> 
> google protobuf
> google guava
> kryo
> jackson
> 
> you can generally bump up the other versions, at least by point releases.



Re: Change protobuf version or any other third party library version in Spark application

2015-09-15 Thread Lan Jiang
Steve,

Thanks for the input. You are absolutely right. When I use protobuf 2.6.1,
I also ran into method not defined errors. You suggest using Maven sharding
strategy, but I have already built the uber jar to package all my custom
classes and its dependencies including protobuf 3. The problem is how to
configure spark shell to use my uber jar first.

java8964 -- appreciate the link and I will try the configuration. Looks
promising. However, the "user classpath first" attribute does not apply to
spark-shell, am I correct?

Lan

On Tue, Sep 15, 2015 at 8:24 AM, java8964 <java8...@hotmail.com> wrote:

> It is a bad idea to use the major version change of protobuf, as it most
> likely won't work.
>
> But you really want to give it a try, set the "user classpath first", so
> the protobuf 3 coming with your jar will be used.
>
> The setting depends on your deployment mode, check this for the parameter:
>
> https://issues.apache.org/jira/browse/SPARK-2996
>
> Yong
>
> --
> Subject: Re: Change protobuf version or any other third party library
> version in Spark application
> From: ste...@hortonworks.com
> To: ljia...@gmail.com
> CC: user@spark.apache.org
> Date: Tue, 15 Sep 2015 09:19:28 +
>
>
>
>
> On 15 Sep 2015, at 05:47, Lan Jiang <ljia...@gmail.com> wrote:
>
> Hi, there,
>
> I am using Spark 1.4.1. The protobuf 2.5 is included by Spark 1.4.1 by
> default. However, I would like to use Protobuf 3 in my spark application so
> that I can use some new features such as Map support.  Is there anyway to
> do that?
>
> Right now if I build a uber.jar with dependencies including protobuf 3
> classes and pass to spark-shell through --jars option, during the
> execution, I got the error *java.lang.NoSuchFieldError: unknownFields. *
>
>
>
> protobuf is an absolute nightmare version-wise, as protoc generates
> incompatible java classes even across point versions. Hadoop 2.2+ is and
> will always be protobuf 2.5 only; that applies transitively to downstream
> projects  (the great protobuf upgrade of 2013 was actually pushed by the
> HBase team, and required a co-ordinated change across multiple projects)
>
>
> Is there anyway to use a different version of Protobuf other than the
> default one included in the Spark distribution? I guess I can generalize
> and extend the question to any third party libraries. How to deal with
> version conflict for any third party libraries included in the Spark
> distribution?
>
>
> maven shading is the strategy. Generally it is less needed, though the
> troublesome binaries are,  across the entire apache big data stack:
>
> google protobuf
> google guava
> kryo
> jackson
>
> you can generally bump up the other versions, at least by point releases.
>


add external jar file to Spark shell vs. Scala Shell

2015-09-14 Thread Lan Jiang

Hi, there

I ran into a problem when I try to pass external jar file to spark-shell. 

I have a uber jar file that contains all the java codes I created for protobuf 
and all its dependency. 

If I simply execute my code using Scala Shell, it works fine without error. I 
use -cp to pass the external uber jar file here

./scala -cp 
~/workspace/protobuf/my-app/target/my-app-1.0-SNAPSHOT-jar-with-dependencies.jar
scala> import com.test.proto.Tr.MyProto
import com.test.proto.Tr.MyProto

scala> import java.nio.file.{Files, Paths}
import java.nio.file.{Files, Paths}

scala> val byteArray = 
Files.readAllBytes(Paths.get("/Users/ljiang/workspace/protobuf/my-app/myproto.pro"))
byteArray: Array[Byte] = Array(10, -62, -91, 2, -86, 6, -108, 65, 8, 10, 18, 
-113, 65, -54, 12, -85, 46, -22, 18, -30, 10, 10, 2, 73, 78, 18, -37, 10, -118, 
25, -52, 10, -22, 43, 15, 10, 1, 49, 18, ...
scala> val myProto = MyProto.parseFrom(byteArray)


Now the weird thing is that if I launched the spark-shell instead and execute 
the same code (Please note that I do not even using any SparkContext, RDD), it 
does not work. I use --jars option to pass the external jar file to spark-shell

spark-shell --jars 
~/workspace/protobuf/my-app/target/my-app-1.0-SNAPSHOT-jar-with-dependencies.jar

scala> import com.test.proto.Tr.MyProto
import com.test.proto.Tr.MyProto

scala> import java.nio.file.{Files, Paths}
import java.nio.file.{Files, Paths}

scala> val byteArray = 
Files.readAllBytes(Paths.get("/Users/ljiang/workspace/protobuf/my-app/myproto.pro"))
byteArray: Array[Byte] = Array(10, -62, -91, 2, -86, 6, -108, 65, 8, 10, 18, 
-113, 65, -54, 12, -85, 46, -22, 18, -30, 10, 10, 2, 73, 78, 18, -37, 10, -118, 
25, -52, 10, -22, 43, 15, 10, 1, 49, 18, ...
scala> val myProto = MyProto.parseFrom(byteArray)
java.lang.NoSuchFieldError: unknownFields
at com.test.proto.Tr$MyProto.(Tr.java)
at com.test.proto.Tr$MyProto.(Tr.java)
at com.test.proto.Tr$MyProto$1.parsePartialFrom(Tr.java)
at com.test.proto.Tr$MyProto$1.parsePartialFrom(Tr.java)
at 
com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
at com.test.proto.Tr$MyProto.parseFrom(Tr.java)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:23)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:28)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:30)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:32)
at $iwC$$iwC$$iwC$$iwC.(:34)
at $iwC$$iwC$$iwC.(:36)
at $iwC$$iwC.(:38)
at $iwC.(:40)
at (:42)
at .(:46)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at 

Change protobuf version or any other third party library version in Spark application

2015-09-14 Thread Lan Jiang
Hi, there,

I am using Spark 1.4.1. The protobuf 2.5 is included by Spark 1.4.1 by
default. However, I would like to use Protobuf 3 in my spark application so
that I can use some new features such as Map support.  Is there anyway to
do that?

Right now if I build a uber.jar with dependencies including protobuf 3
classes and pass to spark-shell through --jars option, during the
execution, I got the error *java.lang.NoSuchFieldError: unknownFields. *

Is there anyway to use a different version of Protobuf other than the
default one included in the Spark distribution? I guess I can generalize
and extend the question to any third party libraries. How to deal with
version conflict for any third party libraries included in the Spark
distribution?

Thanks!

Lan


Re: Scheduling across applications - Need suggestion

2015-04-22 Thread Lan Jiang
YARN capacity scheduler support hierarchical queues, which you can assign
cluster resource as percentage. Your spark application/shell can be
submitted to different queues. Mesos supports fine-grained mode, which
allows the machines/cores used each executors ramp up and down.

Lan

On Wed, Apr 22, 2015 at 2:32 PM, yana yana.kadiy...@gmail.com wrote:

 Yes. Fair schedulwr only helps concurrency within an application.  With
 multiple shells you'd either need something like Yarn/Mesos or careful math
 on resources as you said


 Sent on the new Sprint Network from my Samsung Galaxy S®4.


  Original message 
 From: Arun Patel
 Date:04/22/2015 6:28 AM (GMT-05:00)
 To: user
 Subject: Scheduling across applications - Need suggestion

 I believe we can use the properties like --executor-memory
  --total-executor-cores to configure the resources allocated for each
 application.  But, in a multi user environment, shells and applications are
 being submitted by multiple users at the same time.  All users are
 requesting resources with different properties.  At times, some users are
 not getting resources of the cluster.


 How to control resource usage in this case?  Please share any best
 practices followed.


 As per my understanding, Fair scheduler can used for scheduling tasks
 within an application but not across multiple applications.  Is this
 correct?


 Regards,

 Arun



Re: Configuring logging properties for executor

2015-04-20 Thread Lan Jiang
Rename your log4j_special.properties file as log4j.properties and place it 
under the root of your jar file, you should be fine.

If you are using Maven to build your jar, please the log4j.properties in the 
src/main/resources folder.

However, please note that if you have other dependency jar file in the 
classpath that contains another log4j.properties file this way, it might not 
work since the first log4j.properties file that is loaded will be used.

You can also do spark-submit —file log4j_special.properties … ,which should 
transfer your log4j property file to the worker nodes automatically without you 
copying them manually.

Lan


 On Apr 20, 2015, at 9:26 AM, Michael Ryabtsev michael...@gmail.com wrote:
 
 Hi all,
 
 I need to configure spark executor log4j.properties on a standalone cluster. 
 It looks like placing the relevant properties file in the spark
 configuration folder and  setting the spark.executor.extraJavaOptions from
 my application code:
 sparkConf.set(spark.executor.extraJavaOptions,
 -Dlog4j.configuration=log4j_special.properties);
 does the work, and the executor logs are written in the required place and
 level. As far as I understand, it works, because the spark configuration
 folder is on the class path, and passing parameter without path works here.
 However, I would like to avoid deploying these properties to each worker
 spark configuration folder.
 I wonder, if I put the properties in my application jar, is there any way of
 telling executor to load them?
 
 Thanks,
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Configuring logging properties for executor

2015-04-20 Thread Lan Jiang
Each application gets its own executor processes,  so there should be no 
problem running them in parallel. 

Lan


 On Apr 20, 2015, at 10:25 AM, Michael Ryabtsev michael...@gmail.com wrote:
 
 Hi Lan, 
 
 Thanks for fast response. It could be a solution if it works. I have more 
 than one log4 properties file, for different run modes like debug/production, 
 for executor and for application core. I think I would like to keep them 
 separate. Then, I suppose I should give all other properties files a special 
 names and keep the executor configuration with the default name? Can I 
 conclude that going this way I will not be able to run several applications 
 on the same cluster in parallel?
 
 Regarding submit, I am not using it now, I submit from the code, but I think 
 I should consider this option.
 
 Thanks.
 
 On Mon, Apr 20, 2015 at 5:59 PM, Lan Jiang ljia...@gmail.com 
 mailto:ljia...@gmail.com wrote:
 Rename your log4j_special.properties file as log4j.properties and place it 
 under the root of your jar file, you should be fine.
 
 If you are using Maven to build your jar, please the log4j.properties in the 
 src/main/resources folder.
 
 However, please note that if you have other dependency jar file in the 
 classpath that contains another log4j.properties file this way, it might not 
 work since the first log4j.properties file that is loaded will be used.
 
 You can also do spark-submit —file log4j_special.properties … ,which should 
 transfer your log4j property file to the worker nodes automatically without 
 you copying them manually.
 
 Lan
 
 
  On Apr 20, 2015, at 9:26 AM, Michael Ryabtsev michael...@gmail.com 
  mailto:michael...@gmail.com wrote:
 
  Hi all,
 
  I need to configure spark executor log4j.properties on a standalone cluster.
  It looks like placing the relevant properties file in the spark
  configuration folder and  setting the spark.executor.extraJavaOptions from
  my application code:
  sparkConf.set(spark.executor.extraJavaOptions,
  -Dlog4j.configuration=log4j_special.properties);
  does the work, and the executor logs are written in the required place and
  level. As far as I understand, it works, because the spark configuration
  folder is on the class path, and passing parameter without path works here.
  However, I would like to avoid deploying these properties to each worker
  spark configuration folder.
  I wonder, if I put the properties in my application jar, is there any way of
  telling executor to load them?
 
  Thanks,
 
 
 
  --
  View this message in context: 
  http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html
   
  http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-logging-properties-for-executor-tp22572.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
  mailto:user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org 
  mailto:user-h...@spark.apache.org