Re: Spark <--> S3 flakiness

2017-05-10 Thread Miguel Morales
Try using the DirectParquetOutputCommiter:
http://dev.sortable.com/spark-directparquetoutputcommitter/

On Wed, May 10, 2017 at 10:07 PM, lucas.g...@gmail.com
 wrote:
> Hi users, we have a bunch of pyspark jobs that are using S3 for loading /
> intermediate steps and final output of parquet files.
>
> We're running into the following issues on a semi regular basis:
> * These are intermittent errors, IE we have about 300 jobs that run
> nightly... And a fairly random but small-ish percentage of them fail with
> the following classes of errors.
>
> S3 write errors
>
>> "ERROR Utils: Aborting task
>> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS
>> Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS Error
>> Message: Not Found, S3 Extended Request ID: BlaBlahEtc="
>
>
>>
>> "Py4JJavaError: An error occurred while calling o43.parquet.
>> : com.amazonaws.services.s3.model.MultiObjectDeleteException: Status Code:
>> 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS Error
>> Message: One or more objects could not be deleted, S3 Extended Request ID:
>> null"
>
>
>
> S3 Read Errors:
>
>> [Stage 1:=>   (27 + 4)
>> / 31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in stage 1.0
>> (TID 11)
>> java.net.SocketException: Connection reset
>> at java.net.SocketInputStream.read(SocketInputStream.java:196)
>> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>> at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>> at
>> org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
>> at
>> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
>> at
>> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:200)
>> at
>> org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:103)
>> at
>> org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:168)
>> at
>> org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:228)
>> at
>> org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:174)
>> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>> at com.amazonaws.services.s3.model.S3Object.close(S3Object.java:203)
>> at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:187)
>
>
>
> We have literally tons of logs we can add but it would make the email
> unwieldy big.  If it would be helpful I'll drop them in a pastebin or
> something.
>
> Our config is along the lines of:
>
> spark-2.1.0-bin-hadoop2.7
> '--packages
> com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0
> pyspark-shell'
>
> Given the stack overflow / googling I've been doing I know we're not the
> only org with these issues but I haven't found a good set of solutions in
> those spaces yet.
>
> Thanks!
>
> Gary Lucas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark <--> S3 flakiness

2017-05-10 Thread lucas.g...@gmail.com
Hi users, we have a bunch of pyspark jobs that are using S3 for loading /
intermediate steps and final output of parquet files.

We're running into the following issues on a semi regular basis:
* These are intermittent errors, IE we have about 300 jobs that run
nightly... And a fairly random but small-ish percentage of them fail with
the following classes of errors.


*S3 write errors*

> "ERROR Utils: Aborting task
> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404, AWS
> Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS Error
> Message: Not Found, S3 Extended Request ID: BlaBlahEtc="
>


> "Py4JJavaError: An error occurred while calling o43.parquet.
> : com.amazonaws.services.s3.model.MultiObjectDeleteException: Status Code:
> 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS Error
> Message: One or more objects could not be deleted, S3 Extended Request ID:
> null"




*S3 Read Errors:*

> [Stage 1:=>   (27 + 4)
> / 31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in stage 1.0
> (TID 11)
> java.net.SocketException: Connection reset
> at java.net.SocketInputStream.read(SocketInputStream.java:196)
> at java.net.SocketInputStream.read(SocketInputStream.java:122)
> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
> at sun.security.ssl.InputRecord.read(InputRecord.java:509)
> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
> at
> org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
> at
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
> at
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:200)
> at
> org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:103)
> at
> org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:168)
> at
> org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:228)
> at
> org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:174)
> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> at java.io.FilterInputStream.close(FilterInputStream.java:181)
> at com.amazonaws.services.s3.model.S3Object.close(S3Object.java:203)
> at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:187)



We have literally tons of logs we can add but it would make the email
unwieldy big.  If it would be helpful I'll drop them in a pastebin or
something.

Our config is along the lines of:

   - spark-2.1.0-bin-hadoop2.7
   - '--packages
   com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0
   pyspark-shell'

Given the stack overflow / googling I've been doing I know we're not the
only org with these issues but I haven't found a good set of solutions in
those spaces yet.

Thanks!

Gary Lucas


unsubscribe

2017-05-10 Thread williamtellme123
unsubscribe

 

From: Aaron Perrin [mailto:aper...@gravyanalytics.com] 
Sent: Tuesday, January 31, 2017 9:42 AM
To: user @spark 
Subject: Multiple quantile calculations

 

I want to calculate quantiles on two different columns.  I know that I can 
calculate them with two separate operations. However, for performance reasons, 
I'd like to calculate both with one operation. 

 

Is this possible to do this with the Dataset API? I'm assuming that it isn't. 
But, if it isn't, is it possible to calculate both in one pass, assuming that I 
made some code changes? I briefly looked at the approxQuantile code, but I 
haven't dug into the algorithm.

 



unsubscribe

2017-05-10 Thread williamtellme123
 

 

From: Aaron Jackson [mailto:ajack...@pobox.com] 
Sent: Tuesday, July 19, 2016 7:17 PM
To: user 
Subject: Heavy Stage Concentration - Ends With Failure

 

Hi,

 

I have a cluster with 15 nodes of which 5 are HDFS nodes.  I kick off a job 
that creates some 120 stages.  Eventually, the active and pending stages reduce 
down to a small bottleneck and it never fails... the tasks associated with the 
10 (or so) running tasks are always allocated to the same executor on the same 
host.

 

Sooner or later, it runs out of memory ... or some other resource.  It falls 
over and then they tasks are reallocated to another executor.

 

Why do we see such heavy concentration of tasks onto a single executor when 
other executors are free?  Were the tasks assigned to an executor when the job 
was decomposed into stages?



CSV output with JOBUUID

2017-05-10 Thread Swapnil Shinde
Hello
I am using spark-2.0.1 and saw that CSV fileformat stores output with
JOBUUID in it.
https://github.com/apache/spark/blob/v2.0.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala#L191

I want to avoid csv writing JOBUUID in it. Is there any property (may be
spark.sql.sources.writeJobUUID) that can skip this JOBUUID in output part
filenames?

Furthermore, I couldn't find how this JOBUUID is used in spark-2.1.0
version. We are planning to upgrade very soon so wanted to know how 2.1.0
version handles it in code.

Any help or pointers is very much appreciated!

Thanks
Swapnil


Re: [EXT] Re: [Spark Core]: Python and Scala generate different DAGs for identical code

2017-05-10 Thread Michael Mansour (CS)
Debugging PySpark is admittedly difficult, and I’ll agree too that the docs can 
be lacking at times.  PySpark docstrings are sometimes just missing or are 
incomplete.  While I don’t write in Scala, I find that the Scala-spark source 
code and docs can fill in the PySpark gaps, and can only point you to there 
until I contribute improved documentation.

If you’re using PyCharm, its possible to connect be able to debug and step 
through your applications.  It just requires that you set an 
`os.environ[‘SPARK_HOME’] = `, and operate in local mode.  I see 
different setups for debugging with PyCharm, but this one worked for me.  
Additionally, I kept hitting problems with the JavaGateWay in PySpark not being 
able to connect; finally, dumping the HomeBrew version of Spark and downloading 
the Apache-hosted version fixed my problem.  They do not appear to be the same 
once opened up on your local machine.

Not to oversell it, but you cannot actually see the contents of RDD’s, and 
cannot step into certain code that gets executed inside of a map function (eg 
through some lambdas) easily.

However, on the upside you can still step through most of your code, AND pause 
to take a `collect()` or `first()` to sanity check things along the way.

Hint, you can use PyCharm to execute those functions that are called on the 
executor.  Use the PyCharm debugger, get the RDD in your variable scope, take 
the first couple of observations (via collect or take) through the “evaluate 
expression” tool, and pass them through the function In expression evaluator.

Hope this helps --

Michael Mansour

--
Michael Mansour
Data Scientist
Symantec Cloud Security


From: Pavel Klemenkov 
Date: Wednesday, May 10, 2017 at 10:43 AM
To: "user@spark.apache.org" 
Subject: [EXT] Re: [Spark Core]: Python and Scala generate different DAGs for 
identical code

This video https://www.youtube.com/watch?v=LQHMMCf2ZWY I think.

On Wed, May 10, 2017 at 8:04 PM, 
lucas.g...@gmail.com 
> wrote:
Any chance of a link to that video :)

Thanks!

G

On 10 May 2017 at 09:49, Holden Karau 
> wrote:
So this Python side pipelining happens in a lot of places which can make 
debugging extra challenging. Some people work around this with persist which 
breaks the pipelining during debugging, but if your interested in more general 
Python debugging I've got a YouTube video on the topic which could be a good 
intro (of course I'm pretty biased about that).

On Wed, May 10, 2017 at 9:42 AM Pavel Klemenkov 
> wrote:
Thanks for the quick answer, Holden!

Are there any other tricks with PySpark which are hard to debug using UI or 
toDebugString?

On Wed, May 10, 2017 at 7:18 PM, Holden Karau 
> wrote:
In PySpark the filter and then map steps are combined into a single 
transformation from the JVM point of view. This allows us to avoid copying the 
data back to Scala in between the filter and the map steps. The debugging 
exeperience is certainly much harder in PySpark and I think is an interesting 
area for those interested in contributing :)

On Wed, May 10, 2017 at 7:33 AM pklemenkov 
> wrote:
This Scala code:
scala> val logs = sc.textFile("big_data_specialization/log.txt").
 | filter(x => !x.contains("INFO")).
 | map(x => (x.split("\t")(1), 1)).
 | reduceByKey((x, y) => x + y)

generated obvious lineage:

(2) ShuffledRDD[4] at reduceByKey at :27 []
 +-(2) MapPartitionsRDD[3] at map at :26 []
|  MapPartitionsRDD[2] at filter at :25 []
|  big_data_specialization/log.txt MapPartitionsRDD[1] at textFile at
:24 []
|  big_data_specialization/log.txt HadoopRDD[0] at textFile at
:24 []

But Python code:

logs = sc.textFile("../log.txt")\
 .filter(lambda x: 'INFO' not in x)\
 .map(lambda x: (x.split('\t')[1], 1))\
 .reduceByKey(lambda x, y: x + y)

generated something strange which is hard to follow:

(2) PythonRDD[13] at RDD at PythonRDD.scala:48 []
 |  MapPartitionsRDD[12] at mapPartitions at PythonRDD.scala:422 []
 |  ShuffledRDD[11] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[10] at reduceByKey at :1 []
|  PythonRDD[9] at reduceByKey at :1 []
|  ../log.txt MapPartitionsRDD[8] at textFile at
NativeMethodAccessorImpl.java:0 []
|  ../log.txt HadoopRDD[7] at textFile at
NativeMethodAccessorImpl.java:0 []

Why is that? Does pyspark do some optimizations under the hood? This debug
string is really useless for debugging.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Core-Python-and-Scala-generate-different-DAGs-for-identical-code-tp28674.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: [Spark Core]: Python and Scala generate different DAGs for identical code

2017-05-10 Thread Pavel Klemenkov
This video https://www.youtube.com/watch?v=LQHMMCf2ZWY I think.

On Wed, May 10, 2017 at 8:04 PM, lucas.g...@gmail.com 
wrote:

> Any chance of a link to that video :)
>
> Thanks!
>
> G
>
> On 10 May 2017 at 09:49, Holden Karau  wrote:
>
>> So this Python side pipelining happens in a lot of places which can make
>> debugging extra challenging. Some people work around this with persist
>> which breaks the pipelining during debugging, but if your interested in
>> more general Python debugging I've got a YouTube video on the topic which
>> could be a good intro (of course I'm pretty biased about that).
>>
>> On Wed, May 10, 2017 at 9:42 AM Pavel Klemenkov 
>> wrote:
>>
>>> Thanks for the quick answer, Holden!
>>>
>>> Are there any other tricks with PySpark which are hard to debug using UI
>>> or toDebugString?
>>>
>>> On Wed, May 10, 2017 at 7:18 PM, Holden Karau 
>>> wrote:
>>>
 In PySpark the filter and then map steps are combined into a single
 transformation from the JVM point of view. This allows us to avoid copying
 the data back to Scala in between the filter and the map steps. The
 debugging exeperience is certainly much harder in PySpark and I think is an
 interesting area for those interested in contributing :)

 On Wed, May 10, 2017 at 7:33 AM pklemenkov 
 wrote:

> This Scala code:
> scala> val logs = sc.textFile("big_data_specialization/log.txt").
>  | filter(x => !x.contains("INFO")).
>  | map(x => (x.split("\t")(1), 1)).
>  | reduceByKey((x, y) => x + y)
>
> generated obvious lineage:
>
> (2) ShuffledRDD[4] at reduceByKey at :27 []
>  +-(2) MapPartitionsRDD[3] at map at :26 []
> |  MapPartitionsRDD[2] at filter at :25 []
> |  big_data_specialization/log.txt MapPartitionsRDD[1] at
> textFile at
> :24 []
> |  big_data_specialization/log.txt HadoopRDD[0] at textFile at
> :24 []
>
> But Python code:
>
> logs = sc.textFile("../log.txt")\
>  .filter(lambda x: 'INFO' not in x)\
>  .map(lambda x: (x.split('\t')[1], 1))\
>  .reduceByKey(lambda x, y: x + y)
>
> generated something strange which is hard to follow:
>
> (2) PythonRDD[13] at RDD at PythonRDD.scala:48 []
>  |  MapPartitionsRDD[12] at mapPartitions at PythonRDD.scala:422 []
>  |  ShuffledRDD[11] at partitionBy at NativeMethodAccessorImpl.java:0
> []
>  +-(2) PairwiseRDD[10] at reduceByKey at :1
> []
> |  PythonRDD[9] at reduceByKey at :1
> []
> |  ../log.txt MapPartitionsRDD[8] at textFile at
> NativeMethodAccessorImpl.java:0 []
> |  ../log.txt HadoopRDD[7] at textFile at
> NativeMethodAccessorImpl.java:0 []
>
> Why is that? Does pyspark do some optimizations under the hood? This
> debug
> string is really useless for debugging.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-Core-Python-and-Scala-generate-
> different-DAGs-for-identical-code-tp28674.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
 Cell : 425-233-8271 <(425)%20233-8271>
 Twitter: https://twitter.com/holdenkarau

>>>
>>>
>>>
>>> --
>>> Yours faithfully, Pavel Klemenkov.
>>>
>> --
>> Cell : 425-233-8271 <(425)%20233-8271>
>> Twitter: https://twitter.com/holdenkarau
>>
>
>


-- 
Yours faithfully, Pavel Klemenkov.


Re: Synonym handling replacement issue with UDF in Apache Spark

2017-05-10 Thread albertoandreotti
Hi,

in case you're still struggling with this, I wrote a blog post explaining
Spark Joins and UDFs,

http://alberto-computerengineering.blogspot.com.ar/2017/05/custom-joins-in-spark-sql-spark-has.html

  

Alberto.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Synonym-handling-replacement-issue-with-UDF-in-Apache-Spark-tp28638p28675.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Core]: Python and Scala generate different DAGs for identical code

2017-05-10 Thread lucas.g...@gmail.com
Any chance of a link to that video :)

Thanks!

G

On 10 May 2017 at 09:49, Holden Karau  wrote:

> So this Python side pipelining happens in a lot of places which can make
> debugging extra challenging. Some people work around this with persist
> which breaks the pipelining during debugging, but if your interested in
> more general Python debugging I've got a YouTube video on the topic which
> could be a good intro (of course I'm pretty biased about that).
>
> On Wed, May 10, 2017 at 9:42 AM Pavel Klemenkov 
> wrote:
>
>> Thanks for the quick answer, Holden!
>>
>> Are there any other tricks with PySpark which are hard to debug using UI
>> or toDebugString?
>>
>> On Wed, May 10, 2017 at 7:18 PM, Holden Karau 
>> wrote:
>>
>>> In PySpark the filter and then map steps are combined into a single
>>> transformation from the JVM point of view. This allows us to avoid copying
>>> the data back to Scala in between the filter and the map steps. The
>>> debugging exeperience is certainly much harder in PySpark and I think is an
>>> interesting area for those interested in contributing :)
>>>
>>> On Wed, May 10, 2017 at 7:33 AM pklemenkov  wrote:
>>>
 This Scala code:
 scala> val logs = sc.textFile("big_data_specialization/log.txt").
  | filter(x => !x.contains("INFO")).
  | map(x => (x.split("\t")(1), 1)).
  | reduceByKey((x, y) => x + y)

 generated obvious lineage:

 (2) ShuffledRDD[4] at reduceByKey at :27 []
  +-(2) MapPartitionsRDD[3] at map at :26 []
 |  MapPartitionsRDD[2] at filter at :25 []
 |  big_data_specialization/log.txt MapPartitionsRDD[1] at textFile
 at
 :24 []
 |  big_data_specialization/log.txt HadoopRDD[0] at textFile at
 :24 []

 But Python code:

 logs = sc.textFile("../log.txt")\
  .filter(lambda x: 'INFO' not in x)\
  .map(lambda x: (x.split('\t')[1], 1))\
  .reduceByKey(lambda x, y: x + y)

 generated something strange which is hard to follow:

 (2) PythonRDD[13] at RDD at PythonRDD.scala:48 []
  |  MapPartitionsRDD[12] at mapPartitions at PythonRDD.scala:422 []
  |  ShuffledRDD[11] at partitionBy at NativeMethodAccessorImpl.java:0
 []
  +-(2) PairwiseRDD[10] at reduceByKey at :1
 []
 |  PythonRDD[9] at reduceByKey at :1
 []
 |  ../log.txt MapPartitionsRDD[8] at textFile at
 NativeMethodAccessorImpl.java:0 []
 |  ../log.txt HadoopRDD[7] at textFile at
 NativeMethodAccessorImpl.java:0 []

 Why is that? Does pyspark do some optimizations under the hood? This
 debug
 string is really useless for debugging.



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Spark-Core-Python-and-Scala-
 generate-different-DAGs-for-identical-code-tp28674.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org

 --
>>> Cell : 425-233-8271 <(425)%20233-8271>
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>>
>> --
>> Yours faithfully, Pavel Klemenkov.
>>
> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>


Re: [Spark Core]: Python and Scala generate different DAGs for identical code

2017-05-10 Thread Holden Karau
So this Python side pipelining happens in a lot of places which can make
debugging extra challenging. Some people work around this with persist
which breaks the pipelining during debugging, but if your interested in
more general Python debugging I've got a YouTube video on the topic which
could be a good intro (of course I'm pretty biased about that).

On Wed, May 10, 2017 at 9:42 AM Pavel Klemenkov 
wrote:

> Thanks for the quick answer, Holden!
>
> Are there any other tricks with PySpark which are hard to debug using UI
> or toDebugString?
>
> On Wed, May 10, 2017 at 7:18 PM, Holden Karau 
> wrote:
>
>> In PySpark the filter and then map steps are combined into a single
>> transformation from the JVM point of view. This allows us to avoid copying
>> the data back to Scala in between the filter and the map steps. The
>> debugging exeperience is certainly much harder in PySpark and I think is an
>> interesting area for those interested in contributing :)
>>
>> On Wed, May 10, 2017 at 7:33 AM pklemenkov  wrote:
>>
>>> This Scala code:
>>> scala> val logs = sc.textFile("big_data_specialization/log.txt").
>>>  | filter(x => !x.contains("INFO")).
>>>  | map(x => (x.split("\t")(1), 1)).
>>>  | reduceByKey((x, y) => x + y)
>>>
>>> generated obvious lineage:
>>>
>>> (2) ShuffledRDD[4] at reduceByKey at :27 []
>>>  +-(2) MapPartitionsRDD[3] at map at :26 []
>>> |  MapPartitionsRDD[2] at filter at :25 []
>>> |  big_data_specialization/log.txt MapPartitionsRDD[1] at textFile at
>>> :24 []
>>> |  big_data_specialization/log.txt HadoopRDD[0] at textFile at
>>> :24 []
>>>
>>> But Python code:
>>>
>>> logs = sc.textFile("../log.txt")\
>>>  .filter(lambda x: 'INFO' not in x)\
>>>  .map(lambda x: (x.split('\t')[1], 1))\
>>>  .reduceByKey(lambda x, y: x + y)
>>>
>>> generated something strange which is hard to follow:
>>>
>>> (2) PythonRDD[13] at RDD at PythonRDD.scala:48 []
>>>  |  MapPartitionsRDD[12] at mapPartitions at PythonRDD.scala:422 []
>>>  |  ShuffledRDD[11] at partitionBy at NativeMethodAccessorImpl.java:0 []
>>>  +-(2) PairwiseRDD[10] at reduceByKey at
>>> :1 []
>>> |  PythonRDD[9] at reduceByKey at :1 []
>>> |  ../log.txt MapPartitionsRDD[8] at textFile at
>>> NativeMethodAccessorImpl.java:0 []
>>> |  ../log.txt HadoopRDD[7] at textFile at
>>> NativeMethodAccessorImpl.java:0 []
>>>
>>> Why is that? Does pyspark do some optimizations under the hood? This
>>> debug
>>> string is really useless for debugging.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Core-Python-and-Scala-generate-different-DAGs-for-identical-code-tp28674.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>> --
>> Cell : 425-233-8271 <(425)%20233-8271>
>> Twitter: https://twitter.com/holdenkarau
>>
>
>
>
> --
> Yours faithfully, Pavel Klemenkov.
>
-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: [Spark Core]: Python and Scala generate different DAGs for identical code

2017-05-10 Thread Pavel Klemenkov
Thanks for the quick answer, Holden!

Are there any other tricks with PySpark which are hard to debug using UI or
toDebugString?

On Wed, May 10, 2017 at 7:18 PM, Holden Karau  wrote:

> In PySpark the filter and then map steps are combined into a single
> transformation from the JVM point of view. This allows us to avoid copying
> the data back to Scala in between the filter and the map steps. The
> debugging exeperience is certainly much harder in PySpark and I think is an
> interesting area for those interested in contributing :)
>
> On Wed, May 10, 2017 at 7:33 AM pklemenkov  wrote:
>
>> This Scala code:
>> scala> val logs = sc.textFile("big_data_specialization/log.txt").
>>  | filter(x => !x.contains("INFO")).
>>  | map(x => (x.split("\t")(1), 1)).
>>  | reduceByKey((x, y) => x + y)
>>
>> generated obvious lineage:
>>
>> (2) ShuffledRDD[4] at reduceByKey at :27 []
>>  +-(2) MapPartitionsRDD[3] at map at :26 []
>> |  MapPartitionsRDD[2] at filter at :25 []
>> |  big_data_specialization/log.txt MapPartitionsRDD[1] at textFile at
>> :24 []
>> |  big_data_specialization/log.txt HadoopRDD[0] at textFile at
>> :24 []
>>
>> But Python code:
>>
>> logs = sc.textFile("../log.txt")\
>>  .filter(lambda x: 'INFO' not in x)\
>>  .map(lambda x: (x.split('\t')[1], 1))\
>>  .reduceByKey(lambda x, y: x + y)
>>
>> generated something strange which is hard to follow:
>>
>> (2) PythonRDD[13] at RDD at PythonRDD.scala:48 []
>>  |  MapPartitionsRDD[12] at mapPartitions at PythonRDD.scala:422 []
>>  |  ShuffledRDD[11] at partitionBy at NativeMethodAccessorImpl.java:0 []
>>  +-(2) PairwiseRDD[10] at reduceByKey at :1
>> []
>> |  PythonRDD[9] at reduceByKey at :1 []
>> |  ../log.txt MapPartitionsRDD[8] at textFile at
>> NativeMethodAccessorImpl.java:0 []
>> |  ../log.txt HadoopRDD[7] at textFile at
>> NativeMethodAccessorImpl.java:0 []
>>
>> Why is that? Does pyspark do some optimizations under the hood? This debug
>> string is really useless for debugging.
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Spark-Core-Python-and-Scala-
>> generate-different-DAGs-for-identical-code-tp28674.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>



-- 
Yours faithfully, Pavel Klemenkov.


incremental broadcast join

2017-05-10 Thread Mendelson, Assaf
Hi,

It seems as if when doing broadcast join, the entire dataframe is resent even 
if part of it has already been broadcasted.

Consider the following case:

val df1 = ???
val df2 = ???
val df3 = ???

df3.join(broadcast(df1), on=cond, "left_outer")
followed by
df4.join(broadcast(df1.union(df2), on=cond, "left_outer")

I would expect the second broadcast to only broadcast the difference. However, 
if I do explain(true) I see the entire union is broadcast.

My use case is that I have a series of dataframes on which I need to do some 
enrichment, joining them with a small dataframe. The small dataframe gets 
additional information (as the result of each aggregation).

Is there an efficient way of doing this?

Thanks,
  Assaf.



Re: [Spark Core]: Python and Scala generate different DAGs for identical code

2017-05-10 Thread Holden Karau
In PySpark the filter and then map steps are combined into a single
transformation from the JVM point of view. This allows us to avoid copying
the data back to Scala in between the filter and the map steps. The
debugging exeperience is certainly much harder in PySpark and I think is an
interesting area for those interested in contributing :)

On Wed, May 10, 2017 at 7:33 AM pklemenkov  wrote:

> This Scala code:
> scala> val logs = sc.textFile("big_data_specialization/log.txt").
>  | filter(x => !x.contains("INFO")).
>  | map(x => (x.split("\t")(1), 1)).
>  | reduceByKey((x, y) => x + y)
>
> generated obvious lineage:
>
> (2) ShuffledRDD[4] at reduceByKey at :27 []
>  +-(2) MapPartitionsRDD[3] at map at :26 []
> |  MapPartitionsRDD[2] at filter at :25 []
> |  big_data_specialization/log.txt MapPartitionsRDD[1] at textFile at
> :24 []
> |  big_data_specialization/log.txt HadoopRDD[0] at textFile at
> :24 []
>
> But Python code:
>
> logs = sc.textFile("../log.txt")\
>  .filter(lambda x: 'INFO' not in x)\
>  .map(lambda x: (x.split('\t')[1], 1))\
>  .reduceByKey(lambda x, y: x + y)
>
> generated something strange which is hard to follow:
>
> (2) PythonRDD[13] at RDD at PythonRDD.scala:48 []
>  |  MapPartitionsRDD[12] at mapPartitions at PythonRDD.scala:422 []
>  |  ShuffledRDD[11] at partitionBy at NativeMethodAccessorImpl.java:0 []
>  +-(2) PairwiseRDD[10] at reduceByKey at :1
> []
> |  PythonRDD[9] at reduceByKey at :1 []
> |  ../log.txt MapPartitionsRDD[8] at textFile at
> NativeMethodAccessorImpl.java:0 []
> |  ../log.txt HadoopRDD[7] at textFile at
> NativeMethodAccessorImpl.java:0 []
>
> Why is that? Does pyspark do some optimizations under the hood? This debug
> string is really useless for debugging.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Core-Python-and-Scala-generate-different-DAGs-for-identical-code-tp28674.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


[Spark Core]: Python and Scala generate different DAGs for identical code

2017-05-10 Thread pklemenkov
This Scala code:
scala> val logs = sc.textFile("big_data_specialization/log.txt").
 | filter(x => !x.contains("INFO")).
 | map(x => (x.split("\t")(1), 1)).
 | reduceByKey((x, y) => x + y)

generated obvious lineage:

(2) ShuffledRDD[4] at reduceByKey at :27 []
 +-(2) MapPartitionsRDD[3] at map at :26 []
|  MapPartitionsRDD[2] at filter at :25 []
|  big_data_specialization/log.txt MapPartitionsRDD[1] at textFile at
:24 []
|  big_data_specialization/log.txt HadoopRDD[0] at textFile at
:24 []

But Python code:

logs = sc.textFile("../log.txt")\
 .filter(lambda x: 'INFO' not in x)\
 .map(lambda x: (x.split('\t')[1], 1))\
 .reduceByKey(lambda x, y: x + y)

generated something strange which is hard to follow:

(2) PythonRDD[13] at RDD at PythonRDD.scala:48 []   
   
 |  MapPartitionsRDD[12] at mapPartitions at PythonRDD.scala:422 []
 |  ShuffledRDD[11] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[10] at reduceByKey at :1 []
|  PythonRDD[9] at reduceByKey at :1 []
|  ../log.txt MapPartitionsRDD[8] at textFile at
NativeMethodAccessorImpl.java:0 []
|  ../log.txt HadoopRDD[7] at textFile at
NativeMethodAccessorImpl.java:0 []

Why is that? Does pyspark do some optimizations under the hood? This debug
string is really useless for debugging.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Core-Python-and-Scala-generate-different-DAGs-for-identical-code-tp28674.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Spark Core]: Python and Scala generate different DAGs for identical code

2017-05-10 Thread Pavel Klemenkov
This Scala code:
scala> val logs = sc.textFile("big_data_specialization/log.txt").
 | filter(x => !x.contains("INFO")).
 | map(x => (x.split("\t")(1), 1)).
 | reduceByKey((x, y) => x + y)

generated obvious lineage:

(2) ShuffledRDD[4] at reduceByKey at :27 []
 +-(2) MapPartitionsRDD[3] at map at :26 []
|  MapPartitionsRDD[2] at filter at :25 []
|  big_data_specialization/log.txt MapPartitionsRDD[1] at textFile at
:24 []
|  big_data_specialization/log.txt HadoopRDD[0] at textFile at
:24 []

But Python code:

logs = sc.textFile("../log.txt")\
 .filter(lambda x: 'INFO' not in x)\
 .map(lambda x: (x.split('\t')[1], 1))\
 .reduceByKey(lambda x, y: x + y)

generated something strange which is hard to follow:

(2) PythonRDD[13] at RDD at PythonRDD.scala:48 []

 |  MapPartitionsRDD[12] at mapPartitions at PythonRDD.scala:422 []
 |  ShuffledRDD[11] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[10] at reduceByKey at :1 []
|  PythonRDD[9] at reduceByKey at :1 []
|  ../log.txt MapPartitionsRDD[8] at textFile at
NativeMethodAccessorImpl.java:0 []
|  ../log.txt HadoopRDD[7] at textFile at
NativeMethodAccessorImpl.java:0 []

Why is that? Does pyspark do some optimizations under the hood? This debug
string is really useless for debugging.
-- 
Yours faithfully, Pavel Klemenkov.


[WARN] org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

2017-05-10 Thread Mendelson, Assaf
Hi all,
When running spark I get the following warning: [WARN] 
org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Now I know that in general it is possible to ignore this warning, however, it 
means that utilities that catch "WARN" in the log keep flagging this.
I saw many answers to handling this (e.g. 
http://stackoverflow.com/questions/30369380/hadoop-unable-to-load-native-hadoop-library-for-your-platform-error-on-docker,
 
http://stackoverflow.com/questions/19943766/hadoop-unable-to-load-native-hadoop-library-for-your-platform-warning,
 
http://stackoverflow.com/questions/40015416/spark-unable-to-load-native-hadoop-library-for-your-platform),
 however, I am unable to solve this on my local machine.
Specifically, I can't find any such solution for windows (i.e. when running 
developer local builds) or on a centos 7 machine with no HDFS (basically it is 
a single node machine which uses spark standalone for testing).

Any help would be appreciated.

Thanks,
  Assaf.



Re: running spark program on intellij connecting to remote master for cluster

2017-05-10 Thread David Kaczynski
Do you have Spark installed locally on your laptop with IntelliJ?  Are you
using the SparkLauncher class or your local spark-submit script?  A while
back, I was trying to submit a spark job from my local workstation to a
remote cluster using the SparkLauncher class, but I didn't actually have
SPARK_HOME set or the spark-submit script on my local machine yet, so the
submit was failing.  I think the error I was getting was that SPARK_HOME
environment variable was not set, though.

On Wed, May 10, 2017 at 5:51 AM s t  wrote:

> Hello,
>
> I am trying to run spark code from my laptop with intellij. I have cluster
> of 2 nodes and a master. When i start the program from intellij it gets
> error of some missing classes.
>
> I am aware that some jars need to be distributed to the workers but do not
> know if it is possible programatically. spark submit or jupyter notebook
> handles the issue but intellij does not.
>
> can any one give some advices to me ?
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[Spark Streaming] Unknown delay in event timeline

2017-05-10 Thread Zhiwen Sun
Env : Spark 1.6.2 + kafka 0.8.2 DirectStream.

Spark streaming job with 1s interval.
Process time of micro batch suddenly became to 4s while is is usually 0.4s .
When we check where the time spent, we find a unknown delay in job.
There is no executor computing or shuffle reading. It is about 4s blank in
event timeline,
Event timeline snapshot is in attachment.
It seems that executors have different launch time.

Any one can give some suggestion?

Thanks!




Zhiwen Sun


Why spark.sql.autoBroadcastJoinThreshold not available

2017-05-10 Thread Jone Zhang
Now i use spark1.6.0 in java
I wish the following sql to be executed in BroadcastJoin way
*select * from sample join feature*

This is my step
1.set spark.sql.autoBroadcastJoinThreshold=100M
2.HiveContext.sql("cache lazy table feature as "select * from src where
...) which result size is only 100K
3.HiveContext.sql("select * from sample join feature")
Why the join is SortMergeJoin?

Grateful for any idea!
Thanks.


running spark program on intellij connecting to remote master for cluster

2017-05-10 Thread s t
Hello,

I am trying to run spark code from my laptop with intellij. I have cluster of 2 
nodes and a master. When i start the program from intellij it gets error of 
some missing classes. 

I am aware that some jars need to be distributed to the workers but do not know 
if it is possible programatically. spark submit or jupyter notebook handles the 
issue but intellij does not.

can any one give some advices to me ?
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



CrossValidator and stackoverflowError

2017-05-10 Thread issues solution
Hi ,
when i try to perform CrossValidator i get  the stackoverflowError

i have aleardy perform all  necessary transforimation Stringindexer vector

and save data frame in HDFS like parquet

afeter that i load all in new data frame and
split to train and test

when i try fit(train_set) i get stackoverflow why and why and why ??

the dag should be independent from the all transofmation ??

all iteratuion should be apllied to  loaded data frame
why i cant cache data frame 


URGENT :

2017-05-10 Thread issues solution
Hi ,
i know you  busy about questions but i don't undestand  :

   1-  why we dont have features importance inside pyspakr features ?
   2-  why we can't use cache data frame  with cross validation  ?
   3-   why the documnetation it s not clear when we talk about pyspark ?

you can understand when you work inside company you can t have enough time
to explor all .

thx


features IMportance

2017-05-10 Thread issues solution
Hi ,

some one can tell me if we have features importance inside pyspark 1.6.0

thx