[Release Question]: Estimate on 3.5.2 release?

2024-04-26 Thread Paul Gerver
Hello,

I'm curious if there is an estimate when 3.5.2 for Spark Core will be released.
There are several bug and security vulnerability fixes in the dependencies we 
are excited to receive!

If anyone has any insights, that would be greatly appreciated. Thanks!
- ​Paul





[cid:8a2e80d5-1a98-4eca-b993-46937e35b2e9]<http://www.21cs.com/>



Paul Gerver

Streams Software Engineer

[cid:0412bb89-89ee-4329-a946-00e56d95d85c]<http://www.21cs.com/>[cid:8543c1a1-eba3-4153-9c05-88c2e1d9bc13]<https://www.linkedin.com/company/21st-century-software/>


This e-mail (including any attachments) may contain privileged, confidential, 
proprietary, private, copyrighted, or other legally protected information. The 
information is intended to be for the use of the individual or entity 
designated above. If you are not the intended recipient (even if the e-mail 
address above is yours), please notify us by return e-mail immediately, and 
delete the message and any attachments. Any disclosure, reproduction, 
distribution or other use of this message or any attachments by an individual 
or entity other than the intended recipient is prohibited.


CFP for the 2nd Performance Engineering track at Community over Code NA 2023

2023-07-03 Thread Brebner, Paul
Hi Apache Spark people - There are only 10 days left to submit a talk proposal 
(title and abstract only) for Community over Code NA 2023 - the 2nd Performance 
Engineering track is on this year so any Apache project-related performance and 
scalability talks are welcome, here's the CFP for more ideas and links 
including the CPF submission page:  
https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.linkedin.com%2Fpulse%2Fcall-papers-2nd-performance-engineering-track-over-code-brebner%2F=05%7C01%7CPaul.Brebner%40netapp.com%7C0d1187d03bfc4f4feaa108db7b7b805f%7C4b0911a0929b4715944bc03745165b3a%7C0%7C0%7C638239542594411186%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=KLphTZD56cLkYNTRjsnPB0lkQ40kpEW1CB1wyVtutps%3D=0<https://www.linkedin.com/pulse/call-papers-2nd-performance-engineering-track-over-code-brebner/>
 - Paul Brebner and Roger Abelenda


Rename columns without manually setting them all

2023-06-21 Thread John Paul Jayme
Hi,

This is currently my column definition :
Employee ID NameClient  Project Team01/01/2022  02/01/2022  
03/01/2022  04/01/2022  05/01/2022
12345   Dummy x Dummy a abc team a  OFF WO  WH  WH  
WH

As you can see, the outer columns are just daily attendance dates. My goal is 
to count the employees who were OFF / WO / WH on said dates. I need to 
transpose them so it would look like this :

[cid:ff6d0260-0168-40a4-82db-6c2acd517c39]

I am still new to pandas. Can you guide me on how to produce this? I am reading 
about melt() and set_index() but I am not sure if they are the correct 
functions to use.



How to read excel file in PySpark

2023-06-20 Thread John Paul Jayme
Good day,

I have a task to read excel files in databricks but I cannot seem to proceed. I 
am referencing the API documents -  
read_excel<https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.read_excel.html>
 , but there is an error sparksession object has no attribute 'read_excel'. Can 
you advise?

JOHN PAUL JAYME
Data Engineer
[https://app.tdcx.com/email-signature/assets/img/tdcx-logo.png]
m. +639055716384  w. www.tdcx.com<http://www.tdcx.com/>

Winner of over 350 Industry Awards
[Linkedin]<https://www.linkedin.com/company/tdcxgroup/> [Facebook] 
<https://www.facebook.com/tdcxgroup/>  [Twitter] 
<https://twitter.com/tdcxgroup/>  [Youtube] 
<https://www.youtube.com/c/TDCXgroup>  [Instagram] 
<https://www.instagram.com/tdcxgroup/>

This is a confidential email that may be privileged or legally protected. You 
are not authorized to copy or disclose the contents of this email. If you are 
not the intended addressee, please inform the sender and delete this email.




Re: NoClassDefError and SparkSession should only be created and accessed on the driver.

2022-09-20 Thread Paul Rogalinski
Hi Rajat,


I have been facing similar problem recently and could solve it by moving the 
UDF implementation into a dedicated class instead having it implemented in the 
driver class/object.


Regards,
Paul.

On Tuesday 20 September 2022 10:11:31 (+02:00), rajat kumar wrote:


Hi Alton, it's in same scala class only. Is there any change in spark3 to 
serialize separately?


Regards
Rajat


On Tue, Sep 20, 2022, 13:35 Xiao, Alton  wrote:


Can you show us your code?

your udf wasn’t  serialized by spark, In my opinion,  were they out of the 
spark running code?

 

发件人: rajat kumar 
日期: 星期二, 2022年9月20日 15:58
收件人: user @spark 
主题: NoClassDefError and SparkSession should only be created and accessed on the 
driver.

Hello ,

I am using Spark3 where there are some UDFs along . I am using Dataframe APIs 
to write parquet using spark. I am getting NoClassDefError along with below 
error. 

If I comment out all UDFs , it is working fine. 

Could someone suggest what could be wrong. It was working fine in Spark2.4

22/09/20 06:33:17 WARN TaskSetManager: Lost task 9.0 in stage 1.0 (TID 10) 
(vm-36408481 executor 2): java.lang.ExceptionInInitializerError

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)

at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)

at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)

at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)

at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)

at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)

at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)

at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431

Re: pyspark - memory leak leading to OOM after submitting 100 jobs?

2019-10-31 Thread Paul Wais
Well, dumb question:

Given the workflow outlined above, should Local Mode keep running?  Or
is the leak a known issue?  I just wanted to check because I can't
recall seeing this issue with a non-local master, though it's possible
there were task failures that hid the issue.

If this issue looks new, what's the easiest way to record memory dumps
or do profiling?  Can I put something in my spark-defaults.conf ?

The code is open source and the run is reproducible, although the
specific test currently requires a rather large (but public) dataset.

On Sun, Oct 20, 2019 at 6:24 PM Jungtaek Lim
 wrote:
>
> Honestly I'd recommend you to spend you time to look into the issue, via 
> taking memory dump per some interval and compare differences (at least share 
> these dump files to community with redacting if necessary). Otherwise someone 
> has to try to reproduce without reproducer and even couldn't reproduce even 
> they spent their time. Memory leak issue is not really easy to reproduce, 
> unless it leaks some objects without any conditions.
>
> - Jungtaek Lim (HeartSaVioR)
>
> On Sun, Oct 20, 2019 at 7:18 PM Paul Wais  wrote:
>>
>> Dear List,
>>
>> I've observed some sort of memory leak when using pyspark to run ~100
>> jobs in local mode.  Each job is essentially a create RDD -> create DF
>> -> write DF sort of flow.  The RDD and DFs go out of scope after each
>> job completes, hence I call this issue a "memory leak."  Here's
>> pseudocode:
>>
>> ```
>> row_rdds = []
>> for i in range(100):
>>   row_rdd = spark.sparkContext.parallelize([{'a': i} for i in range(1000)])
>>   row_rdds.append(row_rdd)
>>
>> for row_rdd in row_rdds:
>>   df = spark.createDataFrame(row_rdd)
>>   df.persist()
>>   print(df.count())
>>   df.write.save(...) # Save parquet
>>   df.unpersist()
>>
>>   # Does not help:
>>   # del df
>>   # del row_rdd
>> ```
>>
>> In my real application:
>>  * rows are much larger, perhaps 1MB each
>>  * row_rdds are sized to fit available RAM
>>
>> I observe that after 100 or so iterations of the second loop (each of
>> which creates a "job" in the Spark WebUI), the following happens:
>>  * pyspark workers have fairly stable resident and virtual RAM usage
>>  * java process eventually approaches resident RAM cap (8GB standard)
>> but virtual RAM usage keeps ballooning.
>>
>> Eventually the machine runs out of RAM and the linux OOM killer kills
>> the java process, resulting in an "IndexError: pop from an empty
>> deque" error from py4j/java_gateway.py .
>>
>>
>> Does anybody have any ideas about what's going on?  Note that this is
>> local mode.  I have personally run standalone masters and submitted a
>> ton of jobs and never seen something like this over time.  Those were
>> very different jobs, but perhaps this issue is bespoke to local mode?
>>
>> Emphasis: I did try to del the pyspark objects and run python GC.
>> That didn't help at all.
>>
>> pyspark 2.4.4 on java 1.8 on ubuntu bionic (tensorflow docker image)
>>
>> 12-core i7 with 16GB of ram and 22GB swap file (swap is *on*).
>>
>> Cheers,
>> -Paul
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



pyspark - memory leak leading to OOM after submitting 100 jobs?

2019-10-20 Thread Paul Wais
Dear List,

I've observed some sort of memory leak when using pyspark to run ~100
jobs in local mode.  Each job is essentially a create RDD -> create DF
-> write DF sort of flow.  The RDD and DFs go out of scope after each
job completes, hence I call this issue a "memory leak."  Here's
pseudocode:

```
row_rdds = []
for i in range(100):
  row_rdd = spark.sparkContext.parallelize([{'a': i} for i in range(1000)])
  row_rdds.append(row_rdd)

for row_rdd in row_rdds:
  df = spark.createDataFrame(row_rdd)
  df.persist()
  print(df.count())
  df.write.save(...) # Save parquet
  df.unpersist()

  # Does not help:
  # del df
  # del row_rdd
```

In my real application:
 * rows are much larger, perhaps 1MB each
 * row_rdds are sized to fit available RAM

I observe that after 100 or so iterations of the second loop (each of
which creates a "job" in the Spark WebUI), the following happens:
 * pyspark workers have fairly stable resident and virtual RAM usage
 * java process eventually approaches resident RAM cap (8GB standard)
but virtual RAM usage keeps ballooning.

Eventually the machine runs out of RAM and the linux OOM killer kills
the java process, resulting in an "IndexError: pop from an empty
deque" error from py4j/java_gateway.py .


Does anybody have any ideas about what's going on?  Note that this is
local mode.  I have personally run standalone masters and submitted a
ton of jobs and never seen something like this over time.  Those were
very different jobs, but perhaps this issue is bespoke to local mode?

Emphasis: I did try to del the pyspark objects and run python GC.
That didn't help at all.

pyspark 2.4.4 on java 1.8 on ubuntu bionic (tensorflow docker image)

12-core i7 with 16GB of ram and 22GB swap file (swap is *on*).

Cheers,
-Paul

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Avro support broken?

2019-07-04 Thread Paul Wais
Dear List,

Has anybody gotten avro support to work in pyspark?  I see multiple
reports of it being broken on Stackoverflow and added my own repro to
this ticket: 
https://issues.apache.org/jira/browse/SPARK-27623?focusedCommentId=16878896=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16878896

Cheers,
-Paul

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



dropping unused data from a stream

2019-01-22 Thread Paul Tremblay
I will be streaming data and am trying to understand how to get rid of old
data from a stream so it does not become to large. I will stream in one
large table of buying data and join that to another table of different
data. I need the last 14 days from the second table. I will not need data
that is older than 14 days.

Here is my practice code:


streaming1 =
spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
.csv( "input_stream_csv1")
streaming1_with_impressions = streaming1.withWatermark("creation_time", "2
minutes")
streaming2 =
spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
.csv( "input_stream_csv2")
streaming1.registerTempTable("my_table1")
streaming2.registerTempTable("my_table2")
spark.sql("""select t1.* from my_table1 t1
inner join my_table2 t2 on t1.key = t2.key
where t1.creation_time < current_timestamp() - interval 15 minutes""")\
.writeStream.trigger(processingTime='10 seconds')\
.format("parquet")\
.option("checkpointLocation", "checkpoint_dir").outputMode("append")\
.option("path", "stream_dir5").start()

The important part of the code is the where in the SQL statement, "where
t1.creation_time < current_timestamp() - interval 15 minutes"

For this example, I am hoping that the stream will not contain any rows
more than 15 minutes ago. Is this assumption correct? I am not sure how to
test this. In addition I have set a watermark on the first stream of 2
minutes. I am thinking that this watermark will make Spark wait an
additional 2 minutes for any data that comes in late.

Thanks!
-- 
Henry Tremblay
Data Engineer, Best Buy


Re: Problem in persisting file in S3 using Spark: xxx file does not exist Exception

2018-05-02 Thread Paul Tremblay
I would like to see the full error. However, S3 can give misleading
messages if you don't have the correct permissions.

On Tue, Apr 24, 2018, 2:28 PM Marco Mistroni  wrote:

> HI all
>  i am using the following code for persisting data into S3 (aws keys are
> already stored in the environment variables)
>
> dataFrame.coalesce(1).write.format("com.databricks.spark.csv").save(fileName)
>
>
> However, i keep on receiving an exception that the file does not exist
>
> here's what comes from logs
>
> 18/04/24 22:15:32 INFO Persiste: Persisting data to text file:
> s3://ec2-bucket-mm-spark/form4-results-2404.results
> Exception in thread "main" java.io.IOException:
> /form4-results-2404.results doesn't exist
>
> It seems that Spark expects the file to be there before writing? which
> seems bizzarre?
>
> I Have even tried to remove the coalesce ,but still got the same exception
> Could anyone help pls?
> kind regarsd
>  marco
>


[Spark scheduling] Spark schedules single task although rdd has 48 partitions?

2018-05-02 Thread Paul Borgmans
(please notice this question was previously posted to 
https://stackoverflow.com/questions/49943655/spark-schedules-single-task-although-rdd-has-48-partitions)
We are running Spark 2.3 / Python 3.5.2. For a job we run following code 
(please notice that the input txt files are just a simplified example, in-fact 
these are large binary files and sc.binaryFiles(...) runs out of memory loading 
the content into memory, therefor only the filenames are parallelized and the 
executors open/read the content):
files = [u'foo.txt', u'bar.txt', u'baz.txt', etc]  # len(files) == 155
def func(filename):
from app import generate_rows
return list(generate_rows(filename))

rdd = sc.parallelize(files, numSlices=48)
rdd2 = rdd.flatMap(func)
rdd3 = rdd2.map(lambda d: Row(**d))
df = spark.createDataFrame(rdd3)
df.write.mode(u'append').partitionBy(u'foo').parquet(output_path)

Where the app is a Python module (added to Spark using --py-files app.egg), 
simplified code is like this:
def generate_rows(filename):

yield OrderedDict([
(u'filename', filename),
(u'item1', u'item1'),
etc
])

We notice that the cluster is not utilized fully during the first stages which 
we don't understand, and we are looking for ways to control this behavior.
Job0 Stage0 1Task 1min paralellize
Job1 Stage1 1Task 2min paralellize
Job2 Stage2 1Task 1min paralellize
Job3 Stage3 48Tasks 5min 
paralellize|mappartitions|map|mappartitions|existingRDD|sort
What are the first 3 jobs? And why isn't there 1 Job/Stage with the 48 tasks 
(as expected given the second parameter of parallelize set to 48)?

Excerpt from DEBUG logging:

18/05/02 10:09:07 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, 
runningTasks: 0
18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, 
runningTasks: 1
18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, 
runningTasks: 1
...
18/05/02 10:09:58 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, 
runningTasks: 1
18/05/02 10:09:59 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, 
runningTasks: 1
18/05/02 10:10:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, 
runningTasks: 0
18/05/02 10:10:00 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have 
all completed, from pool
18/05/02 10:10:00 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
18/05/02 10:10:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, 
runningTasks: 0
18/05/02 10:10:01 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, 
runningTasks: 1
18/05/02 10:10:02 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, 
runningTasks: 1
...
18/05/02 10:12:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, 
runningTasks: 1
18/05/02 10:12:04 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, 
runningTasks: 1
18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, 
runningTasks: 0
18/05/02 10:12:05 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have 
all completed, from pool
18/05/02 10:12:05 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, 
runningTasks: 0
18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, 
runningTasks: 1
18/05/02 10:12:06 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, 
runningTasks: 1
...
18/05/02 10:12:59 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, 
runningTasks: 1
18/05/02 10:13:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, 
runningTasks: 1
18/05/02 10:13:01 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, 
runningTasks: 0
18/05/02 10:13:01 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have 
all completed, from pool
18/05/02 10:13:03 INFO TaskSchedulerImpl: Adding task set 3.0 with 48 tasks
18/05/02 10:13:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, 
runningTasks: 0
18/05/02 10:13:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, 
runningTasks: 48
18/05/02 10:13:04 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, 
runningTasks: 48
...
18/05/02 10:17:16 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, 
runningTasks: 1
18/05/02 10:17:17 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, 
runningTasks: 1
18/05/02 10:17:18 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, 
runningTasks: 0
18/05/02 10:17:18 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have 
all completed, from pool

-- The information contained in this communication and any attachments is 
confidential and may be privileged, and is for the sole use of the intended 
recipient(s). Any unauthorized review, use, disclosure or distribution is 
prohibited. Unless explicitly stated otherwise in the body of this 
communication or the attachment thereto 

History server and non-HDFS filesystems

2017-11-17 Thread Paul Mackles
Hi - I had originally posted this as a bug (SPARK-22528) but given my
uncertainty, it was suggested that I send it to the mailing list instead...

We are using Azure Data Lake (ADL) to store our event logs. This worked
fine in 2.1.x, but in 2.2.0 the underlying files are no longer visible to
the history server - even though we are using the same service principal
that was used to write the logs. I tracked it down to this call in
"FSHistoryProvider" (which was added for v2.2.0):


SparkHadoopUtil.checkAccessPermission()


>From what I can tell, it is preemptively checking the permissions on the
files and skipping the ones which it thinks are not readable. The problem
is that its using a check that appears to be specific to HDFS and so even
though the files are definitely readable, it skips over them. Also,
"FSHistoryProvider"
is the only place this code is used.

I was able to workaround it by either:

* setting the permissions for the files on ADL to world readable

* or setting HADOOP_PROXY to the objectId of the Azure service principal
which owns file

Neither of these workarounds are acceptable for our environment. That said,
I am not sure how this should be addressed:

* Is this an issue with the Azure/Hadoop not complying with how the Hadoop
FileSystem interface/contract in some way?

* Is this an issue with "checkAccessPermission()" not really accounting for
all of the possible FileSystem implementations?

My gut tells me its the latter because the
SparkHadoopUtil.checkAccessPermission()
gets its "currentUser" info from outside of the FileSystem class and it
doesn't make sense to me that an instance of FileSystem would affect a
global context since there could be many FileSytem instances in a given
app.

That said, I know ADL is not heavily used at this time so I wonder if
anyone is seeing this with S3 as well? Maybe not since S3 permissions are
always reported as world-readable (I think) which causes
checkAccessPermission()
to succeed.

Any thoughts or feedback appreciated.

-- 
Thanks,
Paul


Spark REST API

2017-11-07 Thread Paul Corley
Is there a way to flush the API?

I execute http://localhost:18080/api/v1/applications?status=runningning

In the results I will get a list of applications but not all are still running. 
 This is causing an issue with monitoring what is actually running.

To compound the problem these are currently streaming apps running on EMR.


Paul Corley | Principle Data Engineer
IgnitionOne | Marketing Technology. Simplified.
Office:  1545 Peachtree St NE | Suite 500 | Atlanta, GA | 30309
Direct:  702.336.0094
Email:   paul.cor...@ignitionone.com<mailto:paul.cor...@ignitionone.com>


Re: Running spark examples in Intellij

2017-10-11 Thread Paul
You say you did the maven package but did you do a maven install and define 
your local maven repo in SBT?

-Paul

Sent from my iPhone

> On Oct 11, 2017, at 5:48 PM, Stephen Boesch <java...@gmail.com> wrote:
> 
> When attempting to run any example program w/ Intellij I am running into 
> guava versioning issues:
> 
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> com/google/common/cache/CacheLoader
>   at 
> org.apache.spark.SparkConf.loadFromSystemProperties(SparkConf.scala:73)
>   at org.apache.spark.SparkConf.(SparkConf.scala:68)
>   at org.apache.spark.SparkConf.(SparkConf.scala:55)
>   at 
> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:919)
>   at 
> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:918)
>   at scala.Option.getOrElse(Option.scala:121)
>   at 
> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:918)
>   at 
> org.apache.spark.examples.ml.KMeansExample$.main(KMeansExample.scala:40)
>   at org.apache.spark.examples.ml.KMeansExample.main(KMeansExample.scala)
> Caused by: java.lang.ClassNotFoundException: 
> com.google.common.cache.CacheLoader
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 9 more
> 
> The *scope*s for the spark dependencies were already changed from *provided* 
> to *compile* .  Both `sbt assembly` and `mvn package` had already been run 
> (successfully) from command line - and the (mvn) project completely rebuilt 
> inside intellij.
> 
> The spark testcases run fine: this is a problem only in the examples module.  
> Anyone running these successfully in IJ?  I have tried for 2.1.0-SNAPSHOT and 
> 2.3.0-SNAPSHOT - with the same outcome.
> 
> 


Re: is it ok to have multiple sparksession's in one spark structured streaming app?

2017-09-08 Thread Paul
You would set the Kafka topic as your data source and you would write a custom 
output to Cassandra everything would be or could be contained within your 
stream 

-Paul

Sent from my iPhone

> On Sep 8, 2017, at 2:52 PM, kant kodali <kanth...@gmail.com> wrote:
> 
> How can I use one SparkSession to talk to both Kafka and Cassandra let's say?
> 
> 
>> On Fri, Sep 8, 2017 at 3:46 AM, Arkadiusz Bicz <arkadiusz.b...@gmail.com> 
>> wrote:
>> You don't need multiple spark sessions to have more than one stream working, 
>> but from maintenance and reliability perspective it is not good idea. 
>> 
>>> On Thu, Sep 7, 2017 at 2:40 AM, kant kodali <kanth...@gmail.com> wrote:
>>> Hi All,
>>> 
>>> I am wondering if it is ok to have multiple sparksession's in one spark 
>>> structured streaming app? Basically, I want to create 1) Spark session for 
>>> reading from Kafka and 2) Another Spark session for storing the mutations 
>>> of a dataframe/dataset to a persistent table as I get the mutations from 
>>> #1? 
>>> 
>>> Finally, is this a common practice?
>>> 
>>> Thanks,
>>> kant
>> 
> 


Structured Streaming from Parquet

2017-05-25 Thread Paul Corley
I have a Spark Structured Streaming process that is implemented in 2 separate 
streaming apps.

First App reads .gz, which range in size from 1GB to 9GB compressed, files in 
from s3 filters out invalid records and repartitions the data and outputs to 
parquet on s3 partitioned the same as the stream is partitioned. This process 
produces thousands of files which other processes consume.  The thought on this 
approach was to:

1)   Break the file down to smaller more easily consumed sizes

2)   Allow a more parallelism in the processes that consume the data.

3)   Allow multiple downstream processes to consume data that has already

a.   Had bad records filtered out

b.   Not have to fully read in such large files

Second application reads in the files produced by the first app.  This process 
then reformats the data from a row that is:

12NDSIN|20170101:123313, 5467;20170115:987

into:
12NDSIN, 20170101, 123313
12NDSIN, 20170101, 5467
12NDSIN, 20170115, 987

App 1 runs no problems and churns through files in its source directory on s3.  
Total process time for a file is < 10min.  App2 is the one having issues.

The source is defined as
val rawReader = sparkSession
  .readStream
  .option("latestFirst", "true")
  .option("maxFilesPerTrigger", batchSize)
  .schema(rawSchema)
  .parquet(config.getString("aws.s3.sourcepath"))   <=Line85

output is defined as
val query = output
  .writeStream
  .queryName("bk")
  .format("parquet")
  .partitionBy("expireDate")
  .trigger(ProcessingTime("10 seconds"))
  .option("checkpointLocation",config.getString("spark.app.checkpoint_dir") + 
"/bk")
  .option("path", config.getString("spark.app.s3.output"))
  .start()
  .awaitTermination()

If files exist from app 1 app 2 enters a cycle of just cycling through parquet 
at 
ProcessFromSource.scala:85<http://ip-10-205-68-107.ec2.internal:18080/history/application_1491337161441_4439/stages/stage?id=78=0>
   3999/3999

If there are a few files output from app1 eventually it will enter the stage 
where it actually processes the data and begins to output, but the more files 
produced by app1 the longer it takes if it ever completes these steps.  With an 
extremely large number of files the app eventually throws a java OOM error. 
Additionally each cycle through this step takes successively longer.

Hopefully someone can lend some insight as to what is actually taking place in 
this step and how to alleviate it



Thanks,

Paul Corley | Principle Data Engineer


splitting a huge file

2017-04-21 Thread Paul Tremblay
We are tasked with loading a big file (possibly 2TB) into a data warehouse.
In order to do this efficiently, we need to split the file into smaller
files.

I don't believe there is a way to do this with Spark, because in order for
Spark to distribute the file to the worker nodes, it first has to be split
up, right?

We ended up using a single machine with a single thread to do the
splitting. I just want to make sure I am not missing something obvious.

Thanks!

-- 
Paul Henry Tremblay
Attunix


small job runs out of memory using wholeTextFiles

2017-04-07 Thread Paul Tremblay
As part of my processing, I have the following code:

rdd = sc.wholeTextFiles("s3://paulhtremblay/noaa_tmp/", 10)
rdd.count()

The s3 directory has about 8GB of data and 61,878 files. I am using Spark
2.1, and running it with 15 modes of m3.xlarge nodes on EMR.

The job fails with this error:

: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 35532 in stage 0.0 failed 4 times, most recent failure: Lost task
35532.3 in stage 0.0 (TID 35543,
ip-172-31-36-192.us-west-2.compute.internal, executor 6):
ExecutorLostFailure (executor 6 exited caused by one of the running
tasks) Reason: Container killed by YARN for exceeding memory limits.
7.4 GB of 5.5 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.


I have run it dozens of times, increasing partitions, reducing the size of
my data set (the original is 60GB), and increasing the number of
partitions, but get the same error each time.

In contrast, if I run a simple:

rdd = sc.textFile("s3://paulhtremblay/noaa_tmp/")
rdd.coutn()

The job finishes in 15 minutes, even with just 3 nodes.

Thanks

-- 
Paul Henry Tremblay
Robert Half Technology


Re: bug with PYTHONHASHSEED

2017-04-05 Thread Paul Tremblay
I saw the bug fix. I am using the latest Spark available on AWS EMR which I
think is 2.01. I am at work and can't check my home config. I don't think
AWS merged in this fix.

Henry

On Tue, Apr 4, 2017 at 4:42 PM, Jeff Zhang <zjf...@gmail.com> wrote:

>
> It is fixed in https://issues.apache.org/jira/browse/SPARK-13330
>
>
>
> Holden Karau <hol...@pigscanfly.ca>于2017年4月5日周三 上午12:03写道:
>
>> Which version of Spark is this (or is it a dev build)? We've recently
>> made some improvements with PYTHONHASHSEED propagation.
>>
>> On Tue, Apr 4, 2017 at 7:49 AM Eike von Seggern <eike.seggern@seven
>> cal.com> wrote:
>>
>> 2017-04-01 21:54 GMT+02:00 Paul Tremblay <paulhtremb...@gmail.com>:
>>
>> When I try to to do a groupByKey() in my spark environment, I get the
>> error described here:
>>
>> http://stackoverflow.com/questions/36798833/what-does-
>> exception-randomness-of-hash-of-string-should-be-disabled-via-pythonh
>>
>> In order to attempt to fix the problem, I set up my ipython environment
>> with the additional line:
>>
>> PYTHONHASHSEED=1
>>
>> When I fire up my ipython shell, and do:
>>
>> In [7]: hash("foo")
>> Out[7]: -2457967226571033580
>>
>> In [8]: hash("foo")
>> Out[8]: -2457967226571033580
>>
>> So my hash function is now seeded so it returns consistent values. But
>> when I do a groupByKey(), I get the same error:
>>
>>
>> Exception: Randomness of hash of string should be disabled via
>> PYTHONHASHSEED
>>
>> Anyone know how to fix this problem in python 3.4?
>>
>>
>> Independent of the python version, you have to ensure that Python on
>> spark-master and -workers is started with PYTHONHASHSEED set, e.g. by
>> adding it to the environment of the spark processes.
>>
>> Best
>>
>> Eike
>>
>> --
>> Cell : 425-233-8271 <(425)%20233-8271>
>> Twitter: https://twitter.com/holdenkarau
>>
>


-- 
Paul Henry Tremblay
Robert Half Technology


Re: bug with PYTHONHASHSEED

2017-04-04 Thread Paul Tremblay
So that means I have to pass that bash variable to the EMR clusters when I
spin them up, not afterwards. I'll give that a go.

Thanks!

Henry

On Tue, Apr 4, 2017 at 7:49 AM, Eike von Seggern <eike.segg...@sevenval.com>
wrote:

> 2017-04-01 21:54 GMT+02:00 Paul Tremblay <paulhtremb...@gmail.com>:
>
>> When I try to to do a groupByKey() in my spark environment, I get the
>> error described here:
>>
>> http://stackoverflow.com/questions/36798833/what-does-except
>> ion-randomness-of-hash-of-string-should-be-disabled-via-pythonh
>>
>> In order to attempt to fix the problem, I set up my ipython environment
>> with the additional line:
>>
>> PYTHONHASHSEED=1
>>
>> When I fire up my ipython shell, and do:
>>
>> In [7]: hash("foo")
>> Out[7]: -2457967226571033580
>>
>> In [8]: hash("foo")
>> Out[8]: -2457967226571033580
>>
>> So my hash function is now seeded so it returns consistent values. But
>> when I do a groupByKey(), I get the same error:
>>
>>
>> Exception: Randomness of hash of string should be disabled via
>> PYTHONHASHSEED
>>
>> Anyone know how to fix this problem in python 3.4?
>>
>
> Independent of the python version, you have to ensure that Python on
> spark-master and -workers is started with PYTHONHASHSEED set, e.g. by
> adding it to the environment of the spark processes.
>
> Best
>
> Eike
>



-- 
Paul Henry Tremblay
Robert Half Technology


Re: Alternatives for dataframe collectAsList()

2017-04-03 Thread Paul Tremblay
What do you want to do with the results of the query?

Henry

On Wed, Mar 29, 2017 at 12:00 PM, szep.laszlo.it <szep.laszlo...@gmail.com>
wrote:

> Hi,
>
> after I created a dataset
>
> Dataset df = sqlContext.sql("query");
>
> I need to have a result values and I call a method: collectAsList()
>
> List list = df.collectAsList();
>
> But it's very slow, if I work with large datasets (20-30 million records).
> I
> know, that the result isn't presented in driver app, that's why it takes
> long time, because collectAsList() collect all data from worker nodes.
>
> But then what is the right way to get result values? Is there an other
> solution to iterate over a result dataset rows, or get values? Can anyone
> post a small & working example?
>
> Thanks & Regards,
> Laszlo Szep
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Alternatives-for-dataframe-
> collectAsList-tp28547.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -----
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Paul Henry Tremblay
Robert Half Technology


Re: Read file and represent rows as Vectors

2017-04-03 Thread Paul Tremblay
So if I am understanding your problem, you have the data in CSV files, but
the CSV files are gunzipped? If so Spark can read a gunzip file directly.
Sorry if I didn't understand your question.

Henry

On Mon, Apr 3, 2017 at 5:05 AM, Old-School <giorgos_myrianth...@outlook.com>
wrote:

> I have a dataset that contains DocID, WordID and frequency (count) as shown
> below. Note that the first three numbers represent 1. the number of
> documents, 2. the number of words in the vocabulary and 3. the total number
> of words in the collection.
>
> 189
> 1430
> 12300
> 1 2 1
> 1 39 1
> 1 42 3
> 1 77 1
> 1 95 1
> 1 96 1
> 2 105 1
> 2 108 1
> 3 133 3
>
>
> What I want to do is to read the data (ignore the first three lines),
> combine the words per document and finally represent each document as a
> vector that contains the frequency of the wordID.
>
> Based on the above dataset the representation of documents 1, 2 and 3 will
> be (note that vocab_size can be extracted by the second line of the data):
>
> val data = Array(
> Vectors.sparse(vocab_size, Seq((2, 1.0), (39, 1.0), (42, 3.0), (77, 1.0),
> (95, 1.0), (96, 1.0))),
> Vectors.sparse(vocab_size, Seq((105, 1.0), (108, 1.0))),
> Vectors.sparse(vocab_size, Seq((133, 3.0
>
>
> The problem is that I am not quite sure how to read the .txt.gz file as RDD
> and create an Array of sparse vectors as described above. Please note that
> I
> actually want to pass the data array in the PCA transformer.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Read-file-and-represent-rows-as-Vectors-tp28562.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Paul Henry Tremblay
Robert Half Technology


Re: Looking at EMR Logs

2017-04-02 Thread Paul Tremblay
Thanks. That seems to work great, except EMR doesn't always copy the logs
to S3. The behavior  seems inconsistent and I am debugging it now.

On Fri, Mar 31, 2017 at 7:46 AM, Vadim Semenov <vadim.seme...@datadoghq.com>
wrote:

> You can provide your own log directory, where Spark log will be saved, and
> that you could replay afterwards.
>
> Set in your job this: `spark.eventLog.dir=s3://bucket/some/directory` and
> run it.
> Note! The path `s3://bucket/some/directory` must exist before you run your
> job, it'll not be created automatically.
>
> The Spark HistoryServer on EMR won't show you anything because it's
> looking for logs in `hdfs:///var/log/spark/apps` by default.
>
> After that you can either copy the log files from s3 to the hdfs path
> above, or you can copy them locally to `/tmp/spark-events` (the default
> directory for spark logs) and run the history server like:
> ```
> cd /usr/local/src/spark-1.6.1-bin-hadoop2.6
> sbin/start-history-server.sh
> ```
> and then open http://localhost:18080
>
>
>
>
> On Thu, Mar 30, 2017 at 8:45 PM, Paul Tremblay <paulhtremb...@gmail.com>
> wrote:
>
>> I am looking for tips on evaluating my Spark job after it has run.
>>
>> I know that right now I can look at the history of jobs through the web
>> ui. I also know how to look at the current resources being used by a
>> similar web ui.
>>
>> However, I would like to look at the logs after the job is finished to
>> evaluate such things as how many tasks were completed, how many executors
>> were used, etc. I currently save my logs to S3.
>>
>> Thanks!
>>
>> Henry
>>
>> --
>> Paul Henry Tremblay
>> Robert Half Technology
>>
>
>


-- 
Paul Henry Tremblay
Robert Half Technology


bug with PYTHONHASHSEED

2017-04-01 Thread Paul Tremblay
When I try to to do a groupByKey() in my spark environment, I get the error
described here:

http://stackoverflow.com/questions/36798833/what-does-
exception-randomness-of-hash-of-string-should-be-disabled-via-pythonh

In order to attempt to fix the problem, I set up my ipython environment
with the additional line:

PYTHONHASHSEED=1

When I fire up my ipython shell, and do:

In [7]: hash("foo")
Out[7]: -2457967226571033580

In [8]: hash("foo")
Out[8]: -2457967226571033580

So my hash function is now seeded so it returns consistent values. But when
I do a groupByKey(), I get the same error:


Exception: Randomness of hash of string should be disabled via
PYTHONHASHSEED

Anyone know how to fix this problem in python 3.4?

Thanks

Henry

-- 
Paul Henry Tremblay
Robert Half Technology


pyspark bug with PYTHONHASHSEED

2017-04-01 Thread Paul Tremblay
When I try to to do a groupByKey() in my spark environment, I get the error
described here:

http://stackoverflow.com/questions/36798833/what-does-exception-randomness-of-hash-of-string-should-be-disabled-via-pythonh

In order to attempt to fix the problem, I set up my ipython environment
with the additional line:

PYTHONHASHSEED=1

When I fire up my ipython shell, and do:

In [7]: hash("foo")
Out[7]: -2457967226571033580

In [8]: hash("foo")
Out[8]: -2457967226571033580

So my hash function is now seeded so it returns consistent values. But when
I do a groupByKey(), I get the same error:


Exception: Randomness of hash of string should be disabled via
PYTHONHASHSEED

Anyone know how to fix this problem in python 3.4?

Thanks

Henry


-- 
Paul Henry Tremblay
Robert Half Technology


Looking at EMR Logs

2017-03-30 Thread Paul Tremblay
I am looking for tips on evaluating my Spark job after it has run.

I know that right now I can look at the history of jobs through the web ui.
I also know how to look at the current resources being used by a similar
web ui.

However, I would like to look at the logs after the job is finished to
evaluate such things as how many tasks were completed, how many executors
were used, etc. I currently save my logs to S3.

Thanks!

Henry

-- 
Paul Henry Tremblay
Robert Half Technology


Re: wholeTextFiles fails, but textFile succeeds for same path

2017-02-11 Thread Paul Tremblay
I've been working on this problem for several days (I am doing more to 
increase my knowledge of Spark). The code you linked to hangs because 
after reading in the file, I have to gunzip it.


Another way that seems to be working is reading each file in using 
sc.textFile, and then writing it the HDFS, and then using wholeTextFiles 
for the HDFS result.


But the bigger issue is that both methods are not executed in parallel. 
When I open my yarn manager, it shows that only one node is being used.



Henry


On 02/06/2017 03:39 PM, Jon Gregg wrote:
Strange that it's working for some directories but not others.  Looks 
like wholeTextFiles maybe doesn't work with S3? 
https://issues.apache.org/jira/browse/SPARK-4414 .


If it's possible to load the data into EMR and run Spark from there 
that may be a workaround.  This blogspot shows a python workaround 
that might work as well: 
http://michaelryanbell.com/processing-whole-files-spark-s3.html


Jon


On Mon, Feb 6, 2017 at 6:38 PM, Paul Tremblay <paulhtremb...@gmail.com 
<mailto:paulhtremb...@gmail.com>> wrote:


I've actually been able to trace the problem to the files being
read in. If I change to a different directory, then I don't get
the error. Is one of the executors running out of memory?





On 02/06/2017 02:35 PM, Paul Tremblay wrote:

When I try to create an rdd using wholeTextFiles, I get an
incomprehensible error. But when I use the same path with
sc.textFile, I get no error.

I am using pyspark with spark 2.1.

in_path =

's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/

rdd = sc.wholeTextFiles(in_path)

rdd.take(1)


/usr/lib/spark/python/pyspark/rdd.py in take(self, num)
   1341
   1342 p = range(partsScanned, min(partsScanned +
numPartsToTry, totalParts))
-> 1343 res = self.context.runJob(self,
takeUpToNumLeft, p)
   1344
   1345 items += res

/usr/lib/spark/python/pyspark/context.py in runJob(self, rdd,
partitionFunc, partitions, allowLocal)
963 # SparkContext#runJob.
964 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc
<http://jsc.sc>(), mappedRDD._jrdd, partitions)
966 return list(_load_from_socket(port,
mappedRDD._jrdd_deserializer))
967

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py
in __call__(self, *args)
   1131 answer = self.gateway_client.send_command(command)
   1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client,
self.target_id, self.name <http://self.name>)
   1134
   1135 for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 61 def deco(*a, **kw):
 62 try:
---> 63 return f(*a, **kw)
 64 except py4j.protocol.Py4JJavaError as e:
 65 s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling
{0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 1.0 failed 4 times, most recent
failure: Lost task 0.3 in stage 1.0 (TID 7,
ip-172-31-45-114.us-west-2.com
<http://ip-172-31-45-114.us-west-2.com>pute.internal, executor
8): ExecutorLostFailure (executor 8 exited caused by one of
the running tasks) Reason: Container marked as failed:
container_1486415078210_0005_01_16 on host:
ip-172-31-45-114.us-west-2.com
<http://ip-172-31-45-114.us-west-2.com>pute.internal. Exit
status: 52. Diagnostics: Exception from container-launch.
Container id: container_1486415078210_0005_01_16
Exit code: 52
Stack trace: ExitCodeException exitCode=52:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at

org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at

org.apache.hadoop.yarn.server.nodemanager.Def

Re: Turning rows into columns

2017-02-11 Thread Paul Tremblay

Yes, that's what I need. Thanks.


P.


On 02/05/2017 12:17 PM, Koert Kuipers wrote:
since there is no key to group by and assemble records i would suggest 
to write this in RDD land and then convert to data frame. you can use 
sc.wholeTextFiles to process text files and create a state machine


On Feb 4, 2017 16:25, "Paul Tremblay" <paulhtremb...@gmail.com 
<mailto:paulhtremb...@gmail.com>> wrote:


I am using pyspark 2.1 and am wondering how to convert a flat
file, with one record per row, into a columnar format.

Here is an example of the data:

u'WARC/1.0',
 u'WARC-Type: warcinfo',
 u'WARC-Date: 2016-12-08T13:00:23Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 344',
 u'Content-Type: application/warc-fields',
 u'WARC-Filename:
CC-MAIN-20161202170900-0-ip-10-31-129-80.ec2.internal.warc.gz',
 u'',
 u'robots: classic',
 u'hostname: ip-10-31-129-80.ec2.internal',
 u'software: Nutch 1.6 (CC)/CC WarcExport 1.0',
 u'isPartOf: CC-MAIN-2016-50',
 u'operator: CommonCrawl Admin',
 u'description: Wide crawl of the web for November 2016',
 u'publisher: CommonCrawl',
 u'format: WARC File Format 1.0',
 u'conformsTo:
http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf
<http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf>',
 u'',
 u'',
 u'WARC/1.0',
 u'WARC-Type: request',
 u'WARC-Date: 2016-12-02T17:54:09Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 220',
 u'Content-Type: application/http; msgtype=request',
 u'WARC-Warcinfo-ID: ',
 u'WARC-IP-Address: 217.197.115.133',
 u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/
<http://1018201.vkrugudruzei.ru/blog/>',
 u'',
 u'GET /blog/ HTTP/1.0',
 u'Host: 1018201.vkrugudruzei.ru <http://1018201.vkrugudruzei.ru>',
 u'Accept-Encoding: x-gzip, gzip, deflate',
 u'User-Agent: CCBot/2.0 (http://commoncrawl.org/faq/)
<http://commoncrawl.org/faq/%29>',
 u'Accept:
text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
 u'',
 u'',
 u'',
 u'WARC/1.0',
 u'WARC-Type: response',
 u'WARC-Date: 2016-12-02T17:54:09Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 577',
 u'Content-Type: application/http; msgtype=response',
 u'WARC-Warcinfo-ID: ',
 u'WARC-Concurrent-To:
',
 u'WARC-IP-Address: 217.197.115.133',
 u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/
<http://1018201.vkrugudruzei.ru/blog/>',
 u'WARC-Payload-Digest: sha1:Y4TZFLB6UTXHU4HUVONBXC5NZQW2LYMM',
 u'WARC-Block-Digest: sha1:3J7HHBMWTSC7W53DDB7BHTUVPM26QS4B',
 u'']

I want to convert it to something like:
{warc-type='request',warc-date='2016-12-02'.
ward-record-id='<urn:uuid:cc7ddf8b-4646-4440-a70a-e253818cf10b}

In Python I would simply set a flag, and read line by line (create
a state machine). You can't do this in spark, though.

Thanks

Henry

-- 
Paul Henry Tremblay

Robert Half Technology






Re: wholeTextFiles fails, but textFile succeeds for same path

2017-02-06 Thread Paul Tremblay
I've actually been able to trace the problem to the files being read in. 
If I change to a different directory, then I don't get the error. Is one 
of the executors running out of memory?





On 02/06/2017 02:35 PM, Paul Tremblay wrote:
When I try to create an rdd using wholeTextFiles, I get an 
incomprehensible error. But when I use the same path with sc.textFile, 
I get no error.


I am using pyspark with spark 2.1.

in_path = 
's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/


rdd = sc.wholeTextFiles(in_path)

rdd.take(1)


/usr/lib/spark/python/pyspark/rdd.py in take(self, num)
   1341
   1342 p = range(partsScanned, min(partsScanned + 
numPartsToTry, totalParts))

-> 1343 res = self.context.runJob(self, takeUpToNumLeft, p)
   1344
   1345 items += res

/usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, 
partitionFunc, partitions, allowLocal)

963 # SparkContext#runJob.
964 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), 
mappedRDD._jrdd, partitions)
966 return list(_load_from_socket(port, 
mappedRDD._jrdd_deserializer))

967

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in 
__call__(self, *args)

   1131 answer = self.gateway_client.send_command(command)
   1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, 
self.name)

   1134
   1135 for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 61 def deco(*a, **kw):
 62 try:
---> 63 return f(*a, **kw)
 64 except py4j.protocol.Py4JJavaError as e:
 65 s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)

317 raise Py4JJavaError(
318 "An error occurred while calling 
{0}{1}{2}.\n".

--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 
in stage 1.0 (TID 7, ip-172-31-45-114.us-west-2.compute.internal, 
executor 8): ExecutorLostFailure (executor 8 exited caused by one of 
the running tasks) Reason: Container marked as failed: 
container_1486415078210_0005_01_16 on host: 
ip-172-31-45-114.us-west-2.compute.internal. Exit status: 52. 
Diagnostics: Exception from container-launch.

Container id: container_1486415078210_0005_01_16
Exit code: 52
Stack trace: ExitCodeException exitCode=52:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

rdd = sc.textFile(in_path)

In [8]: rdd.take(1)
Out[8]: [u'WARC/1.0']




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



wholeTextFiles fails, but textFile succeeds for same path

2017-02-06 Thread Paul Tremblay
When I try to create an rdd using wholeTextFiles, I get an 
incomprehensible error. But when I use the same path with sc.textFile, I 
get no error.


I am using pyspark with spark 2.1.

in_path = 
's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/


rdd = sc.wholeTextFiles(in_path)

rdd.take(1)


/usr/lib/spark/python/pyspark/rdd.py in take(self, num)
   1341
   1342 p = range(partsScanned, min(partsScanned + 
numPartsToTry, totalParts))

-> 1343 res = self.context.runJob(self, takeUpToNumLeft, p)
   1344
   1345 items += res

/usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, 
partitionFunc, partitions, allowLocal)

963 # SparkContext#runJob.
964 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), 
mappedRDD._jrdd, partitions)
966 return list(_load_from_socket(port, 
mappedRDD._jrdd_deserializer))

967

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in 
__call__(self, *args)

   1131 answer = self.gateway_client.send_command(command)
   1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
   1134
   1135 for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 61 def deco(*a, **kw):
 62 try:
---> 63 return f(*a, **kw)
 64 except py4j.protocol.Py4JJavaError as e:
 65 s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)

317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 
in stage 1.0 (TID 7, ip-172-31-45-114.us-west-2.compute.internal, 
executor 8): ExecutorLostFailure (executor 8 exited caused by one of the 
running tasks) Reason: Container marked as failed: 
container_1486415078210_0005_01_16 on host: 
ip-172-31-45-114.us-west-2.compute.internal. Exit status: 52. 
Diagnostics: Exception from container-launch.

Container id: container_1486415078210_0005_01_16
Exit code: 52
Stack trace: ExitCodeException exitCode=52:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

rdd = sc.textFile(in_path)

In [8]: rdd.take(1)
Out[8]: [u'WARC/1.0']


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Turning rows into columns

2017-02-04 Thread Paul Tremblay
I am using pyspark 2.1 and am wondering how to convert a flat file, with
one record per row, into a columnar format.

Here is an example of the data:

u'WARC/1.0',
 u'WARC-Type: warcinfo',
 u'WARC-Date: 2016-12-08T13:00:23Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 344',
 u'Content-Type: application/warc-fields',
 u'WARC-Filename:
CC-MAIN-20161202170900-0-ip-10-31-129-80.ec2.internal.warc.gz',
 u'',
 u'robots: classic',
 u'hostname: ip-10-31-129-80.ec2.internal',
 u'software: Nutch 1.6 (CC)/CC WarcExport 1.0',
 u'isPartOf: CC-MAIN-2016-50',
 u'operator: CommonCrawl Admin',
 u'description: Wide crawl of the web for November 2016',
 u'publisher: CommonCrawl',
 u'format: WARC File Format 1.0',
 u'conformsTo:
http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf',
 u'',
 u'',
 u'WARC/1.0',
 u'WARC-Type: request',
 u'WARC-Date: 2016-12-02T17:54:09Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 220',
 u'Content-Type: application/http; msgtype=request',
 u'WARC-Warcinfo-ID: ',
 u'WARC-IP-Address: 217.197.115.133',
 u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/',
 u'',
 u'GET /blog/ HTTP/1.0',
 u'Host: 1018201.vkrugudruzei.ru',
 u'Accept-Encoding: x-gzip, gzip, deflate',
 u'User-Agent: CCBot/2.0 (http://commoncrawl.org/faq/)',
 u'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
 u'',
 u'',
 u'',
 u'WARC/1.0',
 u'WARC-Type: response',
 u'WARC-Date: 2016-12-02T17:54:09Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 577',
 u'Content-Type: application/http; msgtype=response',
 u'WARC-Warcinfo-ID: ',
 u'WARC-Concurrent-To: ',
 u'WARC-IP-Address: 217.197.115.133',
 u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/',
 u'WARC-Payload-Digest: sha1:Y4TZFLB6UTXHU4HUVONBXC5NZQW2LYMM',
 u'WARC-Block-Digest: sha1:3J7HHBMWTSC7W53DDB7BHTUVPM26QS4B',
 u'']

I want to convert it to something like:
{warc-type='request',warc-date='2016-12-02'.
ward-record-id='<urn:uuid:cc7ddf8b-4646-4440-a70a-e253818cf10b}

In Python I would simply set a flag, and read line by line (create a state
machine). You can't do this in spark, though.

Thanks

Henry

-- 
Paul Henry Tremblay
Robert Half Technology


RE: spark 2.02 error when writing to s3

2017-01-27 Thread VND Tremblay, Paul
Not sure what you mean by "a consistency layer on top." Any explanation would 
be greatly appreciated!

Paul

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_

From: Steve Loughran [mailto:ste...@hortonworks.com]
Sent: Friday, January 27, 2017 3:20 AM
To: VND Tremblay, Paul
Cc: Neil Jonkers; Takeshi Yamamuro; user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

OK

Nobody should be committing output directly to S3 without having something add 
a consistency layer on top, not if you want reliabie (as in "doesn't 
lose/corrupt data" reliable) work

On 26 Jan 2017, at 19:09, VND Tremblay, Paul 
<tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>> wrote:

This seems to have done the trick, although I am not positive. If I have time, 
I'll test spinning up a cluster with and without consistent view to pin point 
the error.

_____

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_


From: Neil Jonkers [mailto:neilod...@gmail.com]
Sent: Friday, January 20, 2017 11:39 AM
To: Steve Loughran; VND Tremblay, Paul
Cc: Takeshi Yamamuro; user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: spark 2.02 error when writing to s3

Can you test by enabling emrfs consistent view and use s3:// uri.

http://docs.aws.amazon.com/emr/latest/ManagementGuide/enable-consistent-view.html

 Original message 
From: Steve Loughran
Date:20/01/2017 21:17 (GMT+02:00)
To: "VND Tremblay, Paul"
Cc: Takeshi Yamamuro ,user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: spark 2.02 error when writing to s3

AWS S3 is eventually consistent: even after something is deleted, a LIST/GET 
call may show it. You may be seeing that effect; even after the DELETE has got 
rid of the files, a listing sees something there, And I suspect the time it 
takes for the listing to "go away" will depend on the total number of entries 
underneath, as there are more deletion markers "tombstones" to propagate around 
s3

Try deleting the path and then waiting a short period


On 20 Jan 2017, at 18:54, VND Tremblay, Paul 
<tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>> wrote:

I am using an EMR cluster, and the latest version offered is 2.02. The link 
below indicates that that user had the same problem, which seems unresolved.

Thanks

Paul

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_



From: Takeshi Yamamuro [mailto:linguin@gmail.com]
Sent: Thursday, January 19, 2017 9:27 PM
To: VND Tremblay, Paul
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: spark 2.02 error when writing to s3

Hi,

Do you get the same exception also in v2.1.0?
Anyway, I saw another guy reporting the same error, I think.
https://www.mail-archive.com/user@spark.apache.org/msg60882.html

// maropu


On Fri, Jan 20, 2017 at 5:15 AM, VND Tremblay, Paul 
<tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>> wrote:
I have come across a problem when writing CSV files to S3 in Spark 2.02. The 
problem does not exist in Spark 1.6.


19:09:20 Caused by: java.io.IOException: File already 
exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv


My code is this:

new_rdd\
135 .map(add_date_diff)\
136 .map(sid_offer_days)\
137 .groupByKey()\
138 .map(custom_sort)\
139 .map(before_rev_date)\
140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, 
num_weeks))\
141 .toDF()\
142 .write.csv(
143 sep = "|",
144 header = True,
145 nullValue = '',
146 quote = None,
147 path = path
148 )

In order to get the path (the last argument), I call this function:

150 def _get_s3_write(test):
151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), 
_get_s3_write_dir(test)):
152 s3_utility.remove_s3_dir(_get_write_bucket_name(), 
_get_s3_write_dir(test))
153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test))

In other words, I am removing the directory if it exists before I write.

Notes:

* If I use a small set of data, then I don't get the error

* If I use Spark 1.6, I don'

RE: spark 2.02 error when writing to s3

2017-01-26 Thread VND Tremblay, Paul
This seems to have done the trick, although I am not positive. If I have time, 
I'll test spinning up a cluster with and without consistent view to pin point 
the error.

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_

From: Neil Jonkers [mailto:neilod...@gmail.com]
Sent: Friday, January 20, 2017 11:39 AM
To: Steve Loughran; VND Tremblay, Paul
Cc: Takeshi Yamamuro; user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

Can you test by enabling emrfs consistent view and use s3:// uri.

http://docs.aws.amazon.com/emr/latest/ManagementGuide/enable-consistent-view.html

 Original message 
From: Steve Loughran
Date:20/01/2017 21:17 (GMT+02:00)
To: "VND Tremblay, Paul"
Cc: Takeshi Yamamuro ,user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

AWS S3 is eventually consistent: even after something is deleted, a LIST/GET 
call may show it. You may be seeing that effect; even after the DELETE has got 
rid of the files, a listing sees something there, And I suspect the time it 
takes for the listing to "go away" will depend on the total number of entries 
underneath, as there are more deletion markers "tombstones" to propagate around 
s3

Try deleting the path and then waiting a short period


On 20 Jan 2017, at 18:54, VND Tremblay, Paul 
<tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>> wrote:

I am using an EMR cluster, and the latest version offered is 2.02. The link 
below indicates that that user had the same problem, which seems unresolved.

Thanks

Paul

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_


From: Takeshi Yamamuro [mailto:linguin@gmail.com]
Sent: Thursday, January 19, 2017 9:27 PM
To: VND Tremblay, Paul
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: spark 2.02 error when writing to s3

Hi,

Do you get the same exception also in v2.1.0?
Anyway, I saw another guy reporting the same error, I think.
https://www.mail-archive.com/user@spark.apache.org/msg60882.html

// maropu


On Fri, Jan 20, 2017 at 5:15 AM, VND Tremblay, Paul 
<tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>> wrote:
I have come across a problem when writing CSV files to S3 in Spark 2.02. The 
problem does not exist in Spark 1.6.


19:09:20 Caused by: java.io.IOException: File already 
exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv


My code is this:

new_rdd\
135 .map(add_date_diff)\
136 .map(sid_offer_days)\
137 .groupByKey()\
138 .map(custom_sort)\
139 .map(before_rev_date)\
140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, 
num_weeks))\
141 .toDF()\
142 .write.csv(
143 sep = "|",
144 header = True,
145 nullValue = '',
146 quote = None,
147 path = path
148 )

In order to get the path (the last argument), I call this function:

150 def _get_s3_write(test):
151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), 
_get_s3_write_dir(test)):
152 s3_utility.remove_s3_dir(_get_write_bucket_name(), 
_get_s3_write_dir(test))
153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test))

In other words, I am removing the directory if it exists before I write.

Notes:

* If I use a small set of data, then I don't get the error

* If I use Spark 1.6, I don't get the error

* If I read in a simple dataframe and then write to S3, I still get the error 
(without doing any transformations)

* If I do the previous step with a smaller set of data, I don't get the error.

* I am using pyspark, with python 2.7

* The thread at this link: 
https://forums.aws.amazon.com/thread.jspa?threadID=152470  Indicates the 
problem is caused by a problem sync problem. With large datasets, spark tries 
to write multiple times and causes the error. The suggestion is to turn off 
speculation, but I believe speculation is turned off by default in pyspark.

Thanks!

Paul


_

Paul Tremblay
Analytics Specialist

THE BOSTON CONSULTING GROUP
STL ▪

Tel. + ▪ Mobile +
tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>
_

Read BCG's latest insights, ana

RE: Ingesting Large csv File to relational database

2017-01-26 Thread VND Tremblay, Paul
What relational db are you using? We do this at work, and the way we handle it 
is to unload the db into Spark (actually, we unload it to S3 and then into 
Spark).  Redshift is very efficient at dumlping tables this way.



_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_

From: Eric Dain [mailto:ericdai...@gmail.com]
Sent: Wednesday, January 25, 2017 11:14 PM
To: user@spark.apache.org
Subject: Ingesting Large csv File to relational database

Hi,

I need to write nightly job that ingest large csv files (~15GB each) and 
add/update/delete the changed rows to relational database.

If a row is identical to what in the database, I don't want to re-write the row 
to the database. Also, if same item comes from multiple sources (files) I need 
to implement a logic to choose if the new source is preferred or the current 
one in the database should be kept unchanged.

Obviously, I don't want to query the database for each item to check if the 
item has changed or no. I prefer to maintain the state inside Spark.

Is there a preferred and performant way to do that using Apache Spark ?

Best,
Eric

__
The Boston Consulting Group, Inc.
 
This e-mail message may contain confidential and/or privileged information.
If you are not an addressee or otherwise authorized to receive this message,
you should not use, copy, disclose or take any action based on this e-mail or
any information contained in the message. If you have received this material
in error, please advise the sender immediately by reply e-mail and delete this
message. Thank you.


RE: spark 2.02 error when writing to s3

2017-01-20 Thread VND Tremblay, Paul
I am using an EMR cluster, and the latest version offered is 2.02. The link 
below indicates that that user had the same problem, which seems unresolved.

Thanks

Paul

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_

From: Takeshi Yamamuro [mailto:linguin@gmail.com]
Sent: Thursday, January 19, 2017 9:27 PM
To: VND Tremblay, Paul
Cc: user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

Hi,

Do you get the same exception also in v2.1.0?
Anyway, I saw another guy reporting the same error, I think.
https://www.mail-archive.com/user@spark.apache.org/msg60882.html

// maropu


On Fri, Jan 20, 2017 at 5:15 AM, VND Tremblay, Paul 
<tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>> wrote:
I have come across a problem when writing CSV files to S3 in Spark 2.02. The 
problem does not exist in Spark 1.6.


19:09:20 Caused by: java.io.IOException: File already 
exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv


My code is this:

new_rdd\
135 .map(add_date_diff)\
136 .map(sid_offer_days)\
137 .groupByKey()\
138 .map(custom_sort)\
139 .map(before_rev_date)\
140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, 
num_weeks))\
141 .toDF()\
142 .write.csv(
143 sep = "|",
144 header = True,
145 nullValue = '',
146 quote = None,
147 path = path
148 )

In order to get the path (the last argument), I call this function:

150 def _get_s3_write(test):
151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), 
_get_s3_write_dir(test)):
152 s3_utility.remove_s3_dir(_get_write_bucket_name(), 
_get_s3_write_dir(test))
153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test))

In other words, I am removing the directory if it exists before I write.

Notes:

* If I use a small set of data, then I don't get the error

* If I use Spark 1.6, I don't get the error

* If I read in a simple dataframe and then write to S3, I still get the error 
(without doing any transformations)

* If I do the previous step with a smaller set of data, I don't get the error.

* I am using pyspark, with python 2.7

* The thread at this link: 
https://forums.aws.amazon.com/thread.jspa?threadID=152470  Indicates the 
problem is caused by a problem sync problem. With large datasets, spark tries 
to write multiple times and causes the error. The suggestion is to turn off 
speculation, but I believe speculation is turned off by default in pyspark.

Thanks!

Paul


_

Paul Tremblay
Analytics Specialist

THE BOSTON CONSULTING GROUP
STL ▪

Tel. + ▪ Mobile +
tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>
_

Read BCG's latest insights, analysis, and viewpoints at 
bcgperspectives.com<http://www.bcgperspectives.com>



The Boston Consulting Group, Inc.

This e-mail message may contain confidential and/or privileged information. If 
you are not an addressee or otherwise authorized to receive this message, you 
should not use, copy, disclose or take any action based on this e-mail or any 
information contained in the message. If you have received this material in 
error, please advise the sender immediately by reply e-mail and delete this 
message. Thank you.



--
---
Takeshi Yamamuro


spark 2.02 error when writing to s3

2017-01-19 Thread VND Tremblay, Paul
I have come across a problem when writing CSV files to S3 in Spark 2.02. The 
problem does not exist in Spark 1.6.


19:09:20 Caused by: java.io.IOException: File already 
exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv


My code is this:

new_rdd\
135 .map(add_date_diff)\
136 .map(sid_offer_days)\
137 .groupByKey()\
138 .map(custom_sort)\
139 .map(before_rev_date)\
140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, 
num_weeks))\
141 .toDF()\
142 .write.csv(
143 sep = "|",
144 header = True,
145 nullValue = '',
146 quote = None,
147 path = path
148 )

In order to get the path (the last argument), I call this function:

150 def _get_s3_write(test):
151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), 
_get_s3_write_dir(test)):
152 s3_utility.remove_s3_dir(_get_write_bucket_name(), 
_get_s3_write_dir(test))
153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test))

In other words, I am removing the directory if it exists before I write.

Notes:

* If I use a small set of data, then I don't get the error

* If I use Spark 1.6, I don't get the error

* If I read in a simple dataframe and then write to S3, I still get the error 
(without doing any transformations)

* If I do the previous step with a smaller set of data, I don't get the error.

* I am using pyspark, with python 2.7

* The thread at this link: 
https://forums.aws.amazon.com/thread.jspa?threadID=152470  Indicates the 
problem is caused by a problem sync problem. With large datasets, spark tries 
to write multiple times and causes the error. The suggestion is to turn off 
speculation, but I believe speculation is turned off by default in pyspark.

Thanks!

Paul


_

Paul Tremblay
Analytics Specialist

THE BOSTON CONSULTING GROUP
STL ▪

Tel. + ▪ Mobile +
tremblay.p...@bcg.com<mailto:tremblay.p...@bcg.com>
_

Read BCG's latest insights, analysis, and viewpoints at 
bcgperspectives.com<http://www.bcgperspectives.com>

__
The Boston Consulting Group, Inc.
 
This e-mail message may contain confidential and/or privileged information.
If you are not an addressee or otherwise authorized to receive this message,
you should not use, copy, disclose or take any action based on this e-mail or
any information contained in the message. If you have received this material
in error, please advise the sender immediately by reply e-mail and delete this
message. Thank you.


Spark 2.0 Encoder().schema() is sorting StructFields

2016-10-12 Thread Paul Stewart
Hi all,

I am using Spark 2.0 to read a CSV file into a Dataset in Java.  This works 
fine if i define the StructType with the StructField array ordered by hand.  
What I would like to do is use a bean class for both the schema and Dataset row 
type.  For example,

Dataset beanDS = spark.read().schema( 
Encoders.bean(Bean.class).schema()).as(Encoders.bean(Bean.class));

When using the Encoder(Bean.class).schema() method to generate the StructType 
array
of StructFields the class attributes are returned as a sorted list and not
in the defined order within the Bean.class.  This makes the schema unusable
for reading from a CSV file where the ordering of the attributes is
significant.

Is there anyway to cause the Encoder().schema() method to return the array
of StructFields in the original bean class definition?  (Aside from prefix 
every attribute name to maintain order)

Would this be considered a bug/enhancement?

Regards,
Paul


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: AVRO vs Parquet

2016-03-04 Thread Paul Leclercq
Nice article about Parquet *with* Avro :

   - https://dzone.com/articles/understanding-how-parquet
   - http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/

Nice video from the good folks of Cloudera for the *differences* between
"Avrow" and Parquet

   - https://www.youtube.com/watch?v=AY1dEfyFeHc


2016-03-04 7:12 GMT+01:00 Koert Kuipers <ko...@tresata.com>:

> well can you use orc without bringing in the kitchen sink of dependencies
> also known as hive?
>
> On Thu, Mar 3, 2016 at 11:48 PM, Jong Wook Kim <ilike...@gmail.com> wrote:
>
>> How about ORC? I have experimented briefly with Parquet and ORC, and I
>> liked the fact that ORC has its schema within the file, which makes it
>> handy to work with any other tools.
>>
>> Jong Wook
>>
>> On 3 March 2016 at 23:29, Don Drake <dondr...@gmail.com> wrote:
>>
>>> My tests show Parquet has better performance than Avro in just about
>>> every test.  It really shines when you are querying a subset of columns in
>>> a wide table.
>>>
>>> -Don
>>>
>>> On Wed, Mar 2, 2016 at 3:49 PM, Timothy Spann <tim.sp...@airisdata.com>
>>> wrote:
>>>
>>>> Which format is the best format for SparkSQL adhoc queries and general
>>>> data storage?
>>>>
>>>> There are lots of specialized cases, but generally accessing some but
>>>> not all the available columns with a reasonable subset of the data.
>>>>
>>>> I am learning towards Parquet as it has great support in Spark.
>>>>
>>>> I also have to consider any file on HDFS may be accessed from other
>>>> tools like Hive, Impala, HAWQ.
>>>>
>>>> Suggestions?
>>>> —
>>>> airis.DATA
>>>> Timothy Spann, Senior Solutions Architect
>>>> C: 609-250-5894
>>>> http://airisdata.com/
>>>> http://meetup.com/nj-datascience
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Donald Drake
>>> Drake Consulting
>>> http://www.drakeconsulting.com/
>>> https://twitter.com/dondrake <http://www.MailLaunder.com/>
>>> 800-733-2143
>>>
>>
>>
>


-- 

Paul Leclercq | Data engineer


 paul.lecle...@tabmo.io  |  http://www.tabmo.fr/


Re: Kafka streaming receiver approach - new topic not read from beginning

2016-02-23 Thread Paul Leclercq
I successfully processed my data by resetting manually my topic offsets on
ZK.

If it may help someone, here's my steps :

Make sure you stop all your consumers before doing that, otherwise they
overwrite the new offsets you wrote

set /consumers/{yourConsumerGroup}/offsets/{yourFancyTopic}/{partitionId}
{newOffset}


Source : https://metabroadcast.com/blog/resetting-kafka-offsets

2016-02-22 11:55 GMT+01:00 Paul Leclercq <paul.lecle...@tabmo.io>:

> Thanks for your quick answer.
>
> If I set "auto.offset.reset" to "smallest" as for KafkaParams like this
>
> val kafkaParams = Map[String, String](
>  "metadata.broker.list" -> brokers,
>  "group.id" -> groupId,
>  "auto.offset.reset" -> "smallest"
> )
>
> And then use :
>
> val streams = KafkaUtils.createStream(ssc, kafkaParams, kafkaTopics, 
> StorageLevel.MEMORY_AND_DISK_SER_2)
>
> My fear is that, every time I deploy a new version, the all consumer's topics 
> are going to be read from the beginning, but as said in Kafka's documentation
>
> auto.offset.reset default : largest
>
> What to do when there* is no initial offset in ZooKeeper* or if an offset is 
> out of range:
> * smallest : automatically reset the offset to the smallest offset
>
> So I will go for this option the next time I need to process a new topic 
>
> To fix my problem, as the topic as already been processed and registred in 
> ZK, I will use a directStream from smallest and remove all DB inserts of this 
> topic, and restart a "normal" stream when the lag will be caught up.
>
>
> 2016-02-22 10:57 GMT+01:00 Saisai Shao <sai.sai.s...@gmail.com>:
>
>> You could set this configuration "auto.offset.reset" through parameter
>> "kafkaParams" which is provided in some other overloaded APIs of
>> createStream.
>>
>> By default Kafka will pick data from latest offset unless you explicitly
>> set it, this is the behavior Kafka, not Spark.
>>
>> Thanks
>> Saisai
>>
>> On Mon, Feb 22, 2016 at 5:52 PM, Paul Leclercq <paul.lecle...@tabmo.io>
>> wrote:
>>
>>> Hi,
>>>
>>> Do you know why, with the receiver approach
>>> <http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-1-receiver-based-approach>
>>> and a *consumer group*, a new topic is not read from the beginning but
>>> from the lastest ?
>>>
>>> Code example :
>>>
>>>  val kafkaStream = KafkaUtils.createStream(streamingContext,
>>>  [ZK quorum], [consumer group id], [per-topic number of Kafka 
>>> partitions to consume])
>>>
>>>
>>> Is there a way to tell *only for new topic *to read from the beginning ?
>>>
>>> From Confluence FAQ
>>>
>>>> Alternatively, you can configure the consumer by setting
>>>> auto.offset.reset to "earliest" for the new consumer in 0.9 and "smallest"
>>>> for the old consumer.
>>>
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata?
>>>
>>> Thanks
>>> --
>>>
>>> Paul Leclercq
>>>
>>
>>
>
>
> --
>
> Paul Leclercq | Data engineer
>
>
>  paul.lecle...@tabmo.io  |  http://www.tabmo.fr/
>



-- 

Paul Leclercq | Data engineer


 paul.lecle...@tabmo.io  |  http://www.tabmo.fr/


Re: Kafka streaming receiver approach - new topic not read from beginning

2016-02-22 Thread Paul Leclercq
Thanks for your quick answer.

If I set "auto.offset.reset" to "smallest" as for KafkaParams like this

val kafkaParams = Map[String, String](
 "metadata.broker.list" -> brokers,
 "group.id" -> groupId,
 "auto.offset.reset" -> "smallest"
)

And then use :

val streams = KafkaUtils.createStream(ssc, kafkaParams, kafkaTopics,
StorageLevel.MEMORY_AND_DISK_SER_2)

My fear is that, every time I deploy a new version, the all consumer's
topics are going to be read from the beginning, but as said in Kafka's
documentation

auto.offset.reset default : largest

What to do when there* is no initial offset in ZooKeeper* or if an
offset is out of range:
* smallest : automatically reset the offset to the smallest offset

So I will go for this option the next time I need to process a new topic 

To fix my problem, as the topic as already been processed and
registred in ZK, I will use a directStream from smallest and remove
all DB inserts of this topic, and restart a "normal" stream when the
lag will be caught up.


2016-02-22 10:57 GMT+01:00 Saisai Shao <sai.sai.s...@gmail.com>:

> You could set this configuration "auto.offset.reset" through parameter
> "kafkaParams" which is provided in some other overloaded APIs of
> createStream.
>
> By default Kafka will pick data from latest offset unless you explicitly
> set it, this is the behavior Kafka, not Spark.
>
> Thanks
> Saisai
>
> On Mon, Feb 22, 2016 at 5:52 PM, Paul Leclercq <paul.lecle...@tabmo.io>
> wrote:
>
>> Hi,
>>
>> Do you know why, with the receiver approach
>> <http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-1-receiver-based-approach>
>> and a *consumer group*, a new topic is not read from the beginning but
>> from the lastest ?
>>
>> Code example :
>>
>>  val kafkaStream = KafkaUtils.createStream(streamingContext,
>>  [ZK quorum], [consumer group id], [per-topic number of Kafka partitions 
>> to consume])
>>
>>
>> Is there a way to tell *only for new topic *to read from the beginning ?
>>
>> From Confluence FAQ
>>
>>> Alternatively, you can configure the consumer by setting
>>> auto.offset.reset to "earliest" for the new consumer in 0.9 and "smallest"
>>> for the old consumer.
>>
>>
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata?
>>
>> Thanks
>> --
>>
>> Paul Leclercq
>>
>
>


-- 

Paul Leclercq | Data engineer


 paul.lecle...@tabmo.io  |  http://www.tabmo.fr/


Kafka streaming receiver approach - new topic not read from beginning

2016-02-22 Thread Paul Leclercq
Hi,

Do you know why, with the receiver approach
<http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-1-receiver-based-approach>
and a *consumer group*, a new topic is not read from the beginning but from
the lastest ?

Code example :

 val kafkaStream = KafkaUtils.createStream(streamingContext,
 [ZK quorum], [consumer group id], [per-topic number of Kafka
partitions to consume])


Is there a way to tell *only for new topic *to read from the beginning ?

>From Confluence FAQ

> Alternatively, you can configure the consumer by setting auto.offset.reset
> to "earliest" for the new consumer in 0.9 and "smallest" for the old
> consumer.


https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata?

Thanks
-- 

Paul Leclercq


Re: spark-1.2.0--standalone-ha-zookeeper

2016-01-20 Thread Paul Leclercq
Hi Raghvendra and Spark users,

I also have trouble activating my stand by master when my first master is
shutdown (via a ./sbin/stop-master.sh or via a instance shut down) and just
want to share with you my thoughts.

To answer your question Raghvendra, in *spark-env.sh*, if 2 IPs are set
for SPARK_MASTER_IP(SPARK_MASTER_IP='W.X.Y.Z,A.B.C.D'), the standalone
cluster cannot be launched.

So I only use only one IP there, as the Spark context can know other
masters with a other way, as written in the Standalone Zookeeper HA
<http://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper>
doc, "you might start your SparkContext pointing to
spark://host1:port1,host2:port2"

In my opinion, we should not have to set a SPARK_MASTER_IP as this is
stored in ZooKeeper :

you can launch multiple Masters in your cluster connected to the same
> ZooKeeper instance. One will be elected “leader” and the others will remain
> in standby mode.

When starting up, an application or Worker needs to be able to find and
> register with the current lead Master. Once it successfully registers,
> though, it is “in the system” (i.e., stored in ZooKeeper).

 -
http://spark.apache.org/docs/latest/spark-standalone.html#standby-masters-with-zookeeper

As I understand it, after a ./sbin/stop-master.sh on both master, a master
will be elected, and the other will be stand by.
To launch the workers, we can use ./sbin/start-slave.sh
spark://MASTER_ELECTED_IP:7077
I don't think if we can use the ./sbin/start-all.sh that use the salve file
to launch workers and masters as we cannot set 2 master IPs inside
spark-env.sh

My SPARK_DAEMON_JAVA_OPTS content :

SPARK_DAEMON_JAVA_OPTS='-Dspark.deploy.recoveryMode="ZOOKEEPER"
> -Dspark.deploy.zookeeper.url="ZOOKEEPER_IP:2181"
> -Dspark.deploy.zookeeper.dir="/spark"'


A good thing to check if everything went OK is the folder /spark on the
ZooKeeper server. I could not find it on my server.

Thanks for reading,

Paul


2016-01-19 22:12 GMT+01:00 Raghvendra Singh <raghvendra.ii...@gmail.com>:

> Hi, there is one question. In spark-env.sh should i specify all masters
> for parameter SPARK_MASTER_IP. I've set SPARK_DAEMON_JAVA_OPTS already
> with zookeeper configuration as specified in spark documentation.
>
> Thanks & Regards
> Raghvendra
>
> On Wed, Jan 20, 2016 at 1:46 AM, Raghvendra Singh <
> raghvendra.ii...@gmail.com> wrote:
>
>> Here's the complete master log on reproducing the error
>> http://pastebin.com/2YJpyBiF
>>
>> Regards
>> Raghvendra
>>
>> On Wed, Jan 20, 2016 at 12:38 AM, Raghvendra Singh <
>> raghvendra.ii...@gmail.com> wrote:
>>
>>> Ok I Will try to reproduce the problem. Also I don't think this is an
>>> uncommon problem I am searching for this problem on Google for many days
>>> and found lots of questions but no answers.
>>>
>>> Do you know what kinds of settings spark and zookeeper allow for
>>> handling time outs during leader election etc. When one is down.
>>>
>>> Regards
>>> Raghvendra
>>> On 20-Jan-2016 12:28 am, "Ted Yu" <yuzhih...@gmail.com> wrote:
>>>
>>>> Perhaps I don't have enough information to make further progress.
>>>>
>>>> On Tue, Jan 19, 2016 at 10:55 AM, Raghvendra Singh <
>>>> raghvendra.ii...@gmail.com> wrote:
>>>>
>>>>> I currently do not have access to those logs but there were only about
>>>>> five lines before this error. They were the same which are present usually
>>>>> when everything works fine.
>>>>>
>>>>> Can you still help?
>>>>>
>>>>> Regards
>>>>> Raghvendra
>>>>> On 18-Jan-2016 8:50 pm, "Ted Yu" <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> Can you pastebin master log before the error showed up ?
>>>>>>
>>>>>> The initial message was posted for Spark 1.2.0
>>>>>> Which release of Spark / zookeeper do you use ?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Mon, Jan 18, 2016 at 6:47 AM, doctorx <raghvendra.ii...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> I am facing the same issue, with the given error
>>>>>>>
>>>>>>> ERROR Master:75 - Leadership has been revoked -- master shutting
>>>>>>> down.
>>>>>>>
>>>>>>> Can anybody help. Any clue will be useful. Should i change something
>>>>>>> in
>>>>>>> spark cluster or zookeeper. Is there any setting in spark which can
>>>>>>> help me?
>>>>>>>
>>>>>>> Thanks & Regards
>>>>>>> Raghvendra
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-1-2-0-standalone-ha-zookeeper-tp21308p25994.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> -
>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>
>


Re: Spark streaming job hangs

2015-12-01 Thread Paul Leclercq
You might not have enough cores to process data from Kafka


> When running a Spark Streaming program locally, do not use “local” or
> “local[1]” as the master URL. Either of these means that only one thread
> will be used for running tasks locally. If you are using a input DStream
> based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single
> thread will be used to run the receiver, leaving no thread for processing
> the received data. *Hence, when running locally, always use “local[n]” as
> the master URL, ​*where n > number of receivers to run (see Spark
> Properties for information on how to set the master).*


 
https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers
<https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers>

2015-12-01 7:13 GMT+01:00 Cassa L <lcas...@gmail.com>:

> Hi,
>  I am reading data from Kafka into spark. It runs fine for sometime but
> then hangs forever with following output. I don't see and errors in logs.
> How do I debug this?
>
> 2015-12-01 06:04:30,697 [dag-scheduler-event-loop] INFO
> (Logging.scala:59) - Adding task set 19.0 with 4 tasks
> 2015-12-01 06:04:30,872 [pool-13-thread-1] INFO  (Logging.scala:59) -
> Disconnected from Cassandra cluster: APG DEV Cluster
> 2015-12-01 06:04:35,060 [JobGenerator] INFO  (Logging.scala:59) - Added
> jobs for time 1448949875000 ms
> 2015-12-01 06:04:40,054 [JobGenerator] INFO  (Logging.scala:59) - Added
> jobs for time 144894988 ms
> 2015-12-01 06:04:45,034 [JobGenerator] INFO  (Logging.scala:59) - Added
> jobs for time 1448949885000 ms
> 2015-12-01 06:04:50,100 [JobGenerator] INFO  (Logging.scala:59) - Added
> jobs for time 144894989 ms
> 2015-12-01 06:04:55,064 [JobGenerator] INFO  (Logging.scala:59) - Added
> jobs for time 1448949895000 ms
> 2015-12-01 06:05:00,125 [JobGenerator] INFO  (Logging.scala:59) - Added
> jobs for time 144894990 ms
>
>
> Thanks
> LCassa
>



-- 

Paul Leclercq | Data engineer


 paul.lecle...@tabmo.io  |  http://www.tabmo.fr/


unpersist RDD from another thread

2015-09-16 Thread Paul Weiss
Hi,

What is the behavior when calling rdd.unpersist() from a different thread
while another thread is using that rdd.  Below is a simple case for this:

1) create rdd and load data
2) call rdd.cache() to bring data into memory
3) create another thread and pass rdd for a long computation
4) call rdd.unpersist while 3. is still running

Questions:

* Will the computation in 3) finish properly even if unpersist was called
on the rdd while running?
* What happens if a part of the computation fails and the rdd needs to
reconstruct based on DAG lineage, will this still work even though
unpersist has been called?

thanks,
-paul


Re: unpersist RDD from another thread

2015-09-16 Thread Paul Weiss
So in order to not incur any performance issues I should really wait for
all usage of the rdd to complete before calling unpersist, correct?

On Wed, Sep 16, 2015 at 4:08 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> unpredictable. I think it will be safe (as in nothing should fail), but
> the performance will be unpredictable (some partition may use cache, some
> may not be able to use the cache).
>
> On Wed, Sep 16, 2015 at 1:06 PM, Paul Weiss <paulweiss@gmail.com>
> wrote:
>
>> Hi,
>>
>> What is the behavior when calling rdd.unpersist() from a different thread
>> while another thread is using that rdd.  Below is a simple case for this:
>>
>> 1) create rdd and load data
>> 2) call rdd.cache() to bring data into memory
>> 3) create another thread and pass rdd for a long computation
>> 4) call rdd.unpersist while 3. is still running
>>
>> Questions:
>>
>> * Will the computation in 3) finish properly even if unpersist was called
>> on the rdd while running?
>> * What happens if a part of the computation fails and the rdd needs to
>> reconstruct based on DAG lineage, will this still work even though
>> unpersist has been called?
>>
>> thanks,
>> -paul
>>
>
>


RE: Too many open files

2015-07-29 Thread Paul Röwer
Maybe you forgot Tod close a reader Ort writer object.

Am 29. Juli 2015 18:04:59 MESZ, schrieb saif.a.ell...@wellsfargo.com:
Thank you both, I will take a look, but


1.   For high-shuffle tasks, is this right for the system to have
the size and thresholds high? I hope there is no bad consequences.

2.   I will try to overlook admin access and see if I can get
anything with only user rights

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Wednesday, July 29, 2015 12:59 PM
To: Ellafi, Saif A.
Cc: user@spark.apache.org
Subject: Re: Too many open files

Please increase limit for open files:

http://stackoverflow.com/questions/34588/how-do-i-change-the-number-of-open-files-limit-in-linux


On Jul 29, 2015, at 8:39 AM,
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com
saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com
wrote:
Hello,

I’ve seen a couple emails on this issue but could not find anything to
solve my situation.

Tried to reduce the partitioning level, enable consolidateFiles and
increase the sizeInFlight limit, but still no help. Spill manager is
sort, which is the default, any advice?

15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0
(TID 331, localhost): FetchFailed(BlockManagerId(driver, localhost,
43437), shuffleId=3, mapId=0, reduceId=34, message=
org.apache.spark.shuffle.FetchFailedException:
/tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index
(Too many open files)
..
..
15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in
stage 11.0 (TID 306)
org.apache.spark.SparkException: Job aborted due to stage failure: Task
20 in stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in
stage 11.0 (TID 317, localhost): java.io.FileNotFoundException:
/tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc
(Too many open files)

my fs is ext4 and currently ulist –n is 1024

Thanks
Saif

-- 
Diese Nachricht wurde von meinem Android-Mobiltelefon mit K-9 Mail gesendet.

Jobs with unknown origin.

2015-07-08 Thread Jan-Paul Bultmann
Hey,

I have quite a few jobs appearing in the web-ui with the description run at 
ThreadPoolExecutor.java:1142.
Are these generated by SparkSQL internally?

There are so many that they cause a RejectedExecutionException when the 
thread-pool runs out of space for them.

RejectedExecutionException Task 
scala.concurrent.impl.Future$PromiseCompletingRunnable@30ec07a4 rejected from 
java.util.concurrent.ThreadPoolExecutor@9110d1[Running, pool size = 128, active 
threads = 128, queued tasks = 0, completed tasks = 392]  
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution 
(ThreadPoolExecutor.java:2047)

Any ideas where they come from? I'm pretty sure that they don't originate from 
my code.

Cheers Jan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Benchmark results between Flink and Spark

2015-07-06 Thread Jan-Paul Bultmann
I would guess the opposite is true for highly iterative benchmarks (common in 
graph processing and data-science).

Spark has a pretty large overhead per iteration, more optimisations and 
planning only makes this worse.

Sure people implemented things like dijkstra's algorithm in spark
(a problem where the number of iterations is bounded by the circumference of 
the input graph),
but all the datasets I've seen it running on had a very small circumference 
(which is common for e.g. social networks).

Take sparkSQL for example. Catalyst is a really good query optimiser, but it 
introduces significant overhead.
Since spark has no iterative semantics on its own (unlike flink),
one has to materialise the intermediary dataframe at each iteration boundary to 
determine if a termination criterion is reached.
This causes a huge amount of planning, especially since it looks like catalyst 
will try to optimise the dependency graph
regardless of caching. A dependency graph that grows in the number of 
iterations and thus the size of the input dataset.

In flink on the other hand, you can describe you entire iterative program 
through transformations without ever calling an action.
This means that the optimiser will only have to do planing once.

Just my 2 cents :)
Cheers, Jan

 On 06 Jul 2015, at 06:10, n...@reactor8.com wrote:
 
 Maybe some flink benefits from some pts they outline here:
  
 http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html 
 http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
  
 Probably if re-ran the benchmarks with 1.5/tungsten line would close the gap 
 a bit(or a lot) with spark moving towards similar style off-heap memory mgmt, 
 more planning optimizations
  
  
 From: Jerry Lam [mailto:chiling...@gmail.com] 
 Sent: Sunday, July 5, 2015 6:28 PM
 To: Ted Yu
 Cc: Slim Baltagi; user
 Subject: Re: Benchmark results between Flink and Spark
  
 Hi guys,
  
 I just read the paper too. There is no much information regarding why Flink 
 is faster than Spark for data science type of workloads in the benchmark. It 
 is very difficult to generalize the conclusion of a benchmark from my point 
 of view. How much experience the author has with Spark is in comparisons to 
 Flink is one of the immediate questions I have. It would be great if they 
 have the benchmark software available somewhere for other people to 
 experiment.
  
 just my 2 cents,
  
 Jerry
  
 On Sun, Jul 5, 2015 at 4:35 PM, Ted Yu yuzhih...@gmail.com 
 mailto:yuzhih...@gmail.com wrote:
 There was no mentioning of the versions of Flink and Spark used in 
 benchmarking.
  
 The size of cluster is quite small.
  
 Cheers
  
 On Sun, Jul 5, 2015 at 10:24 AM, Slim Baltagi sbalt...@gmail.com 
 mailto:sbalt...@gmail.com wrote:
 Hi
 
 Apache Flink outperforms Apache Spark in processing machine learning  graph
 algorithms and relational queries but not in batch processing!
 
 The results were published in the proceedings of the 18th International
 Conference, Business Information Systems 2015, Poznań, Poland, June 24-26,
 2015.
 
 Thanks to our friend Google, Chapter 3: 'Evaluating New Approaches of Big
 Data Analytics Frameworks' by Norman Spangenberg, Martin Roth and Bogdan
 Franczyk is available for preview at http://goo.gl/WocQci 
 http://goo.gl/WocQci on pages 28-37.
 
 Enjoy!
 
 Slim Baltagi
 http://www.SparkBigData.com http://www.sparkbigdata.com/
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Benchmark-results-between-Flink-and-Spark-tp23626.html
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Benchmark-results-between-Flink-and-Spark-tp23626.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org


Re: Benchmark results between Flink and Spark

2015-07-06 Thread Jan-Paul Bultmann
Sorry, that should be shortest path, and diameter of the graph.
I shouldn't write emails before I get my morning coffee...

 On 06 Jul 2015, at 09:09, Jan-Paul Bultmann janpaulbultm...@me.com wrote:
 
 I would guess the opposite is true for highly iterative benchmarks (common in 
 graph processing and data-science).
 
 Spark has a pretty large overhead per iteration, more optimisations and 
 planning only makes this worse.
 
 Sure people implemented things like dijkstra's algorithm in spark
 (a problem where the number of iterations is bounded by the circumference of 
 the input graph),
 but all the datasets I've seen it running on had a very small circumference 
 (which is common for e.g. social networks).
 
 Take sparkSQL for example. Catalyst is a really good query optimiser, but it 
 introduces significant overhead.
 Since spark has no iterative semantics on its own (unlike flink),
 one has to materialise the intermediary dataframe at each iteration boundary 
 to determine if a termination criterion is reached.
 This causes a huge amount of planning, especially since it looks like 
 catalyst will try to optimise the dependency graph
 regardless of caching. A dependency graph that grows in the number of 
 iterations and thus the size of the input dataset.
 
 In flink on the other hand, you can describe you entire iterative program 
 through transformations without ever calling an action.
 This means that the optimiser will only have to do planing once.
 
 Just my 2 cents :)
 Cheers, Jan
 
 On 06 Jul 2015, at 06:10, n...@reactor8.com mailto:n...@reactor8.com wrote:
 
 Maybe some flink benefits from some pts they outline here:
  
 http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html 
 http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html
  
 Probably if re-ran the benchmarks with 1.5/tungsten line would close the gap 
 a bit(or a lot) with spark moving towards similar style off-heap memory 
 mgmt, more planning optimizations
  
  
 From: Jerry Lam [mailto:chiling...@gmail.com mailto:chiling...@gmail.com] 
 Sent: Sunday, July 5, 2015 6:28 PM
 To: Ted Yu
 Cc: Slim Baltagi; user
 Subject: Re: Benchmark results between Flink and Spark
  
 Hi guys,
  
 I just read the paper too. There is no much information regarding why Flink 
 is faster than Spark for data science type of workloads in the benchmark. It 
 is very difficult to generalize the conclusion of a benchmark from my point 
 of view. How much experience the author has with Spark is in comparisons to 
 Flink is one of the immediate questions I have. It would be great if they 
 have the benchmark software available somewhere for other people to 
 experiment.
  
 just my 2 cents,
  
 Jerry
  
 On Sun, Jul 5, 2015 at 4:35 PM, Ted Yu yuzhih...@gmail.com 
 mailto:yuzhih...@gmail.com wrote:
 There was no mentioning of the versions of Flink and Spark used in 
 benchmarking.
  
 The size of cluster is quite small.
  
 Cheers
  
 On Sun, Jul 5, 2015 at 10:24 AM, Slim Baltagi sbalt...@gmail.com 
 mailto:sbalt...@gmail.com wrote:
 Hi
 
 Apache Flink outperforms Apache Spark in processing machine learning  
 graph
 algorithms and relational queries but not in batch processing!
 
 The results were published in the proceedings of the 18th International
 Conference, Business Information Systems 2015, Poznań, Poland, June 24-26,
 2015.
 
 Thanks to our friend Google, Chapter 3: 'Evaluating New Approaches of Big
 Data Analytics Frameworks' by Norman Spangenberg, Martin Roth and Bogdan
 Franczyk is available for preview at http://goo.gl/WocQci 
 http://goo.gl/WocQci on pages 28-37.
 
 Enjoy!
 
 Slim Baltagi
 http://www.SparkBigData.com http://www.sparkbigdata.com/
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Benchmark-results-between-Flink-and-Spark-tp23626.html
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Benchmark-results-between-Flink-and-Spark-tp23626.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com 
 http://nabble.com/.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org



generateTreeString causes huge performance problems on dataframe persistence

2015-06-17 Thread Jan-Paul Bultmann
Hey,
I noticed that my code spends hours with `generateTreeString` even though the 
actual dag/dataframe execution takes seconds.

I’m running a query that grows exponential in the number of iterations when 
evaluated without caching,
but should be linear when caching previous results.

E.g.

result_i+1 = distinct(join(result_i, result_i))

Which evaluates exponentially like this this without caching.

Iteration | Dataframe Plan Tree
0|/\
1| /\/\
2|/\/\  /\/\
n|……….

But should be linear with caching.

Iteration | Dataframe Plan Tree
0| /\
  | \/
1| /\
  | \/
2| /\
  | \/
n| ……….


It seems that even though the DAG will have the later form, 
`generateTreeString` will walk the entire plan naively as if no caching was 
done.

The spark webui also shows no active jobs even though my CPU uses one core 
fully, calculating that string.

Below is the piece of stacktrace that starts the entire walk.

^
|
Thousands of calls to  `generateTreeString`.
|
org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(int, 
StringBuilder) TreeNode.scala:431
org.apache.spark.sql.catalyst.trees.TreeNode.treeString() TreeNode.scala:400
org.apache.spark.sql.catalyst.trees.TreeNode.toString() TreeNode.scala:397
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$buildBuffers$2.apply() 
InMemoryColumnarTableScan.scala:164
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$buildBuffers$2.apply() 
InMemoryColumnarTableScan.scala:164
scala.Option.getOrElse(Function0) Option.scala:120
org.apache.spark.sql.columnar.InMemoryRelation.buildBuffers() 
InMemoryColumnarTableScan.scala:164
org.apache.spark.sql.columnar.InMemoryRelation.init(Seq, boolean, int, 
StorageLevel, SparkPlan, Option, RDD, Statistics, Accumulable) 
InMemoryColumnarTableScan.scala:112
org.apache.spark.sql.columnar.InMemoryRelation$.apply(boolean, int, 
StorageLevel, SparkPlan, Option) InMemoryColumnarTableScan.scala:45
org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply() 
CacheManager.scala:102
org.apache.spark.sql.execution.CacheManager.writeLock(Function0) 
CacheManager.scala:70
org.apache.spark.sql.execution.CacheManager.cacheQuery(DataFrame, Option, 
StorageLevel) CacheManager.scala:94
org.apache.spark.sql.DataFrame.persist(StorageLevel) DataFrame.scala:1320
^
|
Application logic.
|

Could someone confirm my suspicion?
And does somebody know why it’s called while caching, and why it walks the 
entire tree including cached results?

Cheers, Jan-Paul
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: generateTreeString causes huge performance problems on dataframe persistence

2015-06-17 Thread Jan-Paul Bultmann

 Seems you're hitting the self-join, currently Spark SQL won't cache any 
 result/logical tree for further analyzing or computing for self-join.

Other joins don’t suffer from this problem?

 Since the logical tree is huge, it's reasonable to take long time in 
 generating its tree string recursively.

The internal structure is basically a graph though, right?
Where equal cached subtrees are structurally shared by reference instead of 
copying them by value.

Is the `generateTreeString` result needed for anything other than giving the 
RDD a nice name?
It seems rather wasteful to compute a graphs unfolding into a tree for this.

 And I also doubt the computing can finish within a reasonable time, as there 
 probably be lots of partitions (grows exponentially) of the intermediate 
 result.
 

Possibly, so far the number of partitions stayed the same though.
But I didn’t run that many iterations due to the problem :).

 As a workaround, you can break the iterations into smaller ones and trigger 
 them manually in sequence.

You mean` write` ing them to disk after each iteration?

Thanks :), Jan

 -Original Message-
 From: Jan-Paul Bultmann [mailto:janpaulbultm...@me.com] 
 Sent: Wednesday, June 17, 2015 6:17 PM
 To: User
 Subject: generateTreeString causes huge performance problems on dataframe 
 persistence
 
 Hey,
 I noticed that my code spends hours with `generateTreeString` even though the 
 actual dag/dataframe execution takes seconds.
 
 I’m running a query that grows exponential in the number of iterations when 
 evaluated without caching, but should be linear when caching previous results.
 
 E.g.
 
result_i+1 = distinct(join(result_i, result_i))
 
 Which evaluates exponentially like this this without caching.
 
 Iteration | Dataframe Plan Tree
 0|/\
 1| /\/\
 2|/\/\  /\/\
 n|……….
 
 But should be linear with caching.
 
 Iteration | Dataframe Plan Tree
 0| /\
  | \/
 1| /\
  | \/
 2| /\
  | \/
 n| ……….
 
 
 It seems that even though the DAG will have the later form, 
 `generateTreeString` will walk the entire plan naively as if no caching was 
 done.
 
 The spark webui also shows no active jobs even though my CPU uses one core 
 fully, calculating that string.
 
 Below is the piece of stacktrace that starts the entire walk.
 
 ^
 |
 Thousands of calls to  `generateTreeString`.
 |
 org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(int, 
 StringBuilder) TreeNode.scala:431
 org.apache.spark.sql.catalyst.trees.TreeNode.treeString() TreeNode.scala:400
 org.apache.spark.sql.catalyst.trees.TreeNode.toString() TreeNode.scala:397
 org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$buildBuffers$2.apply()
  InMemoryColumnarTableScan.scala:164
 org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$buildBuffers$2.apply()
  InMemoryColumnarTableScan.scala:164
 scala.Option.getOrElse(Function0) Option.scala:120
 org.apache.spark.sql.columnar.InMemoryRelation.buildBuffers() 
 InMemoryColumnarTableScan.scala:164
 org.apache.spark.sql.columnar.InMemoryRelation.init(Seq, boolean, int, 
 StorageLevel, SparkPlan, Option, RDD, Statistics, Accumulable) 
 InMemoryColumnarTableScan.scala:112
 org.apache.spark.sql.columnar.InMemoryRelation$.apply(boolean, int, 
 StorageLevel, SparkPlan, Option) InMemoryColumnarTableScan.scala:45
 org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply() 
 CacheManager.scala:102
 org.apache.spark.sql.execution.CacheManager.writeLock(Function0) 
 CacheManager.scala:70 
 org.apache.spark.sql.execution.CacheManager.cacheQuery(DataFrame, Option, 
 StorageLevel) CacheManager.scala:94
 org.apache.spark.sql.DataFrame.persist(StorageLevel) DataFrame.scala:1320 ^
 |
 Application logic.
 |
 
 Could someone confirm my suspicion?
 And does somebody know why it’s called while caching, and why it walks the 
 entire tree including cached results?
 
 Cheers, Jan-Paul
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
 commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: build jar with all dependencies

2015-06-02 Thread Paul Röwer
]
org.apache.spark.SecurityManager
 INFO  - SecurityManager: authentication disabled; ui acls
disabled; users with view permissions: Set(proewer); users
with modify permissions: Set(proewer)
Exception in thread main
com.typesafe.config.ConfigException$Missing: No
configuration setting found for key 'akka.version'
at
com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:115)
at
com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:136)
at
com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:142)
at
com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:150)
at
com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:155)
at

com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:197)
at
akka.actor.ActorSystem$Settings.init(ActorSystem.scala:136)
at
akka.actor.ActorSystemImpl.init(ActorSystem.scala:470)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
at

org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
at
org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at
org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at

org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454)
at
scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at
org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450)
at

org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156)
at
org.apache.spark.SparkContext.init(SparkContext.scala:203)
at

org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53)
at
mgm.tp.bigdata.ma_spark.SparkMain.main(SparkMain.java:38)

what i do wrong?

best regards,
paul








Soft distinct on data frames.

2015-05-28 Thread Jan-Paul Bultmann
Hey,
Is there a way to do a distinct operation on each partition only?
My program generates quite a few duplicate tuples and it would be nice to 
remove some of these as an optimisation
without having to reshuffle the data.

I’ve also noticed that plans generated with an unique transformation have this 
peculiar form:

== Physical Plan ==
Distinct false
 Exchange (HashPartitioning [_0#347L,_1#348L], 200)
  Distinct true
   PhysicalRDD [_0#347L,_1#348L], MapPartitionsRDD[247] at map at 
SQLContext.scala:394

Does this mean that set semantics are just a flag that can be turned off and on 
for each shuffling operation?
If so, is it possible to do so in general, so that one always uses set 
semantics instead of bag?
Or will the optimiser try to propagate the set semantics?

Cheers Jan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark sql, creating literal columns in java.

2015-05-05 Thread Jan-Paul Bultmann
Hey,
What is the recommended way to create literal columns in java?
Scala has the `lit` function from  `org.apache.spark.sql.functions`.
Should it be called from java as well?

Cheers jan

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Jackson-core-asl conflict with Spark

2015-03-12 Thread Paul Brown
So... one solution would be to use a non-Jurassic version of Jackson.  2.6
will drop before too long, and 3.0 is in longer-term planning.  The 1.x
series is long deprecated.

If you're genuinely stuck with something ancient, then you need to include
the JAR that contains the class, and 1.9.13 does not.  Why do you think you
need that particular version?




—
p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/

On Thu, Mar 12, 2015 at 9:58 AM, Uthayan Suthakar 
uthayan.sutha...@gmail.com wrote:

 Hello Guys,

 I'm running into below error:

 Exception in thread main java.lang.NoClassDefFoundError:
 org/codehaus/jackson/annotate/JsonClass

 I have created a uber jar with Jackson-core-asl.1.9.13 and passed it with
 --jars configuration, but still getting errors. I searched on the net and
 found a few suggestions, such as disabling USE_ANNOTATIONS, still no joy.

 I tried disabling SQL module and recompiled Spark and installed the custom
 library, yet no joy.

 I'm running out of ideas, could you please assist me with this issue?

 Many thanks.


 Uthay.





Perf impact of BlockManager byte[] copies

2015-02-27 Thread Paul Wais
Dear List,

I'm investigating some problems related to native code integration
with Spark, and while picking through BlockManager I noticed that data
(de)serialization currently issues lots of array copies.
Specifically:

- Deserialization: BlockManager marshals all deserialized bytes
through a spark.util. ByteBufferInputStream, which necessitates
copying data into an intermediate temporary byte[] .  The temporary
byte[] might be reused between deserialization of T instances, but
nevertheless the bytes must be copied (and likely in a Java loop).

- Serialization: BlockManager buffers all serialized bytes into a
java.io.ByteArrayOutputStream, which maintains an internal byte[]
buffer and grows/re-copies the buffer like a vector as the buffer
fills.  BlockManager then retrieves the internal byte[] buffer, wraps
it in a ByteBuffer, and sends it off to be stored (e.g. in
MemoryStore, DiskStore, Tachyon, etc).

When an individual T is somewhat large (e.g. a feature vector, an
image, etc), or blocks are megabytes in size, these copies become
expensive (especially for large written blocks), right?  Does anybody
have any measurements of /how/ expensive they are?  If not, is there
serialization benchmark code (e.g. for KryoSerializer ) that might be
helpful here?


As part of my investigation, I've found that one might be able to
sidestep these issues by extending Spark's SerializerInstance API to
offer I/O on ByteBuffers (in addition to {Input,Output}Streams).  An
extension including a ByteBuffer API would furthermore have many
benefits for native code.  A major downside of this API addition is
that it wouldn't interoperate (nontrivially) with compression, so
shuffles wouldn't benefit.  Nevertheless, BlockManager could probably
deduce when use of this ByteBuffer API is possible and leverage it.

Cheers,
-Paul

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Support for SQL on unions of tables (merge tables?)

2015-01-21 Thread Paul Wais
Thanks Cheng!

For the list, I talked with Michael Armbrust at a recent Spark meetup
and his comments were:
 * For a union of tables, use a view and the Hive metastore
 * SQLContext might have the directory-traversing logic I need in it already
 * The union() of sequence files I saw was slow because Spark was
probably trying to shuffle the whole union.  A similar Spark SQL join
will also be slow (or break) unless one runs statistics so that the
smaller table can be broadcasted (e.g. see
https://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
)

I have never used Hive, so I'll have to investigate further.


On Tue, Jan 20, 2015 at 1:15 PM, Cheng Lian lian.cs@gmail.com wrote:
 I think you can resort to a Hive table partitioned by date
 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-PartitionedTables


 On 1/11/15 9:51 PM, Paul Wais wrote:


 Dear List,

 What are common approaches for addressing over a union of tables / RDDs?
 E.g. suppose I have a collection of log files in HDFS, one log file per day,
 and I want to compute the sum of some field over a date range in SQL.  Using
 log schema, I can read each as a distinct SchemaRDD, but I want to union
 them all and query against one 'table'.

 If this data were in MySQL, I could have a table for each day of data and
 use a MyISAM merge table to union these tables together and just query
 against the merge table.  What's nice here is that MySQL persists the merge
 table, and the merge table is r/w, so one can just update the merge table
 once per day.  (What's not nice is that merge tables scale poorly, backup
 admin is a pain, and oh hey I'd like to use Spark not MySQL).

 One naive and untested idea (that achieves implicit persistence): scan an
 HDFS directory for log files, create one RDD per file, union() the RDDs,
 then create a Schema RDD from that union().

 A few specific questions:
  * Any good approaches to a merge / union table? (Other than the naive
 idea above).  Preferably with some way to persist that table / RDD between
 Spark runs.  (How does Impala approach this problem?)

  * Has anybody tried joining against such a union of tables / RDDs on a
 very large amount of data?  When I've tried (non-spark-sql) union()ing
 Sequence Files, and then join()ing them against another RDD, Spark seems to
 try to compute the full union before doing any join() computation (and
 eventually OOMs the cluster because the union of Sequence Files is so big).
 I haven't tried something similar with Spark SQL.

  * Are there any plans related to this in the Spark roadmap?  (This
 feature would be a nice compliment to, say, persistent RDD indices for
 interactive querying).

  * Related question: are there plans to use Parquet Index Pages to make
 Spark SQL faster?  E.g. log indices over date ranges would be relevant here.

 All the best,
 -Paul




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark 1.2 three times slower than spark 1.1, why?

2015-01-21 Thread Paul Wais
To force one instance per executor, you could explicitly subclass
FlatMapFunction and have it lazy-create your parser in the subclass
constructor.  You might also want to try RDD#mapPartitions() (instead of
RDD#flatMap() if you want one instance per partition.  This approach worked
well for me when I had a flat map function that used non-serializable
native code / objects.

FWIW RDD#flatMap() does not appear to have changed 1.1 - 1.2 (tho master
has a slight refactor).  Agree it's worth checking the number of partitions
in your 1.1 vs 1.2 test.



On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO raofeng...@gmail.com wrote:

 the LogParser instance is not serializable, and thus cannot be a
 broadcast,

 what’s worse, it contains an LRU cache, which is essential to the
 performance, and we would like to share among all the tasks on the same
 node.

 If it is the case, what’s the recommended way to share a variable among
 all the tasks within the same executor.
 ​

 2015-01-21 15:04 GMT+08:00 Davies Liu dav...@databricks.com:

 Maybe some change related to serialize the closure cause LogParser is
 not a singleton any more, then it is initialized for every task.

 Could you change it to a Broadcast?

 On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO raofeng...@gmail.com
 wrote:
  Currently we are migrating from spark 1.1 to spark 1.2, but found the
  program 3x slower, with nothing else changed.
  note: our program in spark 1.1 has successfully processed a whole year
 data,
  quite stable.
 
  the main script is as below
 
  sc.textFile(inputPath)
  .flatMap(line = LogParser.parseLine(line))
  .groupByKey(new HashPartitioner(numPartitions))
  .mapPartitionsWithIndex(...)
  .foreach(_ = {})
 
  where LogParser is a singleton which may take some time to initialized
 and
  is shared across the execuator.
 
  the flatMap stage is 3x slower.
 
  We tried to change spark.shuffle.manager back to hash, and
  spark.shuffle.blockTransferService back to nio, but didn’t help.
 
  May somebody explain possible causes, or what should we test or change
 to
  find it out





Support for SQL on unions of tables (merge tables?)

2015-01-11 Thread Paul Wais
Dear List,

What are common approaches for addressing over a union of tables / RDDs?
E.g. suppose I have a collection of log files in HDFS, one log file per
day, and I want to compute the sum of some field over a date range in SQL.
Using log schema, I can read each as a distinct SchemaRDD, but I want to
union them all and query against one 'table'.

If this data were in MySQL, I could have a table for each day of data and
use a MyISAM merge table to union these tables together and just query
against the merge table.  What's nice here is that MySQL persists the merge
table, and the merge table is r/w, so one can just update the merge table
once per day.  (What's not nice is that merge tables scale poorly, backup
admin is a pain, and oh hey I'd like to use Spark not MySQL).

One naive and untested idea (that achieves implicit persistence): scan an
HDFS directory for log files, create one RDD per file, union() the RDDs,
then create a Schema RDD from that union().

A few specific questions:
 * Any good approaches to a merge / union table? (Other than the naive idea
above).  Preferably with some way to persist that table / RDD between Spark
runs.  (How does Impala approach this problem?)

 * Has anybody tried joining against such a union of tables / RDDs on a
very large amount of data?  When I've tried (non-spark-sql) union()ing
Sequence Files, and then join()ing them against another RDD, Spark seems to
try to compute the full union before doing any join() computation (and
eventually OOMs the cluster because the union of Sequence Files is so big).
I haven't tried something similar with Spark SQL.

 * Are there any plans related to this in the Spark roadmap?  (This feature
would be a nice compliment to, say, persistent RDD indices for interactive
querying).

 * Related question: are there plans to use Parquet Index Pages to make
Spark SQL faster?  E.g. log indices over date ranges would be relevant here.

All the best,
-Paul


Re: Downloads from S3 exceedingly slow when running on spark-ec2

2014-12-20 Thread Paul Brown
I would suggest checking out disk IO on the nodes in your cluster and then
reading up on the limiting behaviors that accompany different kinds of EC2
storage.  Depending on how things are configured for your nodes, you may
have a local storage configuration that provides bursty IOPS where you
get apparently good performance at first and then limiting kicks in and
slows down the rate at which you can write data to local storage.


—
p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/

On Thu, Dec 18, 2014 at 5:56 AM, Jon Chase jon.ch...@gmail.com wrote:

 I'm running a very simple Spark application that downloads files from S3,
 does a bit of mapping, then uploads new files.  Each file is roughly 2MB
 and is gzip'd.  I was running the same code on Amazon's EMR w/Spark and not
 having any download speed issues (Amazon's EMR provides a custom
 implementation of the s3n:// file system, FWIW).

 When I say exceedingly slow, I mean that it takes about 2 minutes to
 download and process a 2MB file (this was taking ~2 seconds on the same
 instance types in Amazon's EMR).  When I download the same file from the
 EC2 machine with wget or curl, it downloads in ~ 1 second.  I've also done
 other bandwidth checks for downloads from other external hosts - no speed
 problems there.

 Tried this w/Spark 1.1.0 and 1.1.1.

 When I do a thread dump on a worker, I typically see this a lot:



 Executor task launch worker-7 daemon prio=10 tid=0x7fd174039000
 nid=0x59e9 runnable [0x7fd1f7dfb000]
java.lang.Thread.State: RUNNABLE
 at java.net.SocketInputStream.socketRead0(Native Method)
 at java.net.SocketInputStream.read(SocketInputStream.java:152)
 at java.net.SocketInputStream.read(SocketInputStream.java:122)
 at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
 at sun.security.ssl.InputRecord.read(InputRecord.java:480)
 at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
 - locked 0x0007e44dd140 (a java.lang.Object)
 at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
 at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
 - locked 0x0007e44e1350 (a sun.security.ssl.AppInputStream)
 at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
 at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
 - locked 0x0007e44ea800 (a java.io.BufferedInputStream)
 at org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78)
 at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106)
 at
 org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116)
 at
 org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.readLine(MultiThreadedHttpConnectionManager.java:1413)
 at
 org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973)
 at
 org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735)
 at
 org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098)
 at
 org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398)
 at
 org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
 at
 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
 at
 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
 at
 org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342)
 at
 org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718)
 at
 org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599)
 at
 org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535)
 at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987)
 at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332)
 at
 org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
 at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
 at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
 at org.apache.hadoop.fs.s3native.$Proxy6.retrieveMetadata(Unknown Source)
 at
 org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:330)
 at
 org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdir(NativeS3FileSystem.java:432)
 at
 org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdirs(NativeS3FileSystem.java:425)
 at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1126)
 at
 org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:256)
 at
 

Using S3 block file system

2014-12-09 Thread Paul Colomiets
Hi,

I'm  trying to use S3 Block file system in spark, i.e. s3:// urls
(*not* s3n://). And I always get the following error:

Py4JJavaError: An error occurred while calling o3188.saveAsParquetFile.
: org.apache.hadoop.fs.s3.S3FileSystemException: Not a Hadoop S3 file.
at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.checkMetadata(Jets3tFileSystemStore.java:206)
at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:165)
at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at com.sun.proxy.$Proxy24.retrieveINode(Unknown Source)
at org.apache.hadoop.fs.s3.S3FileSystem.mkdir(S3FileSystem.java:158)
at org.apache.hadoop.fs.s3.S3FileSystem.mkdirs(S3FileSystem.java:151)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1815)
at org.apache.hadoop.fs.s3.S3FileSystem.create(S3FileSystem.java:234)
[.. snip ..]

I believe that I must somehow initialize file system (in particular
the metadata), but I can't find out how to do it.

I use spark 1.2.0rc1 with hadoop 2.4 and Riak CS (instead of S3) if
that matters. The s3n:// protocol with same settings work.


Thanks.
-- 
Paul

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parsing a large XML file using Spark

2014-11-21 Thread Paul Brown
Unfortunately, unless you impose restrictions on the XML file (e.g., where
namespaces are declared, whether entity replacement is used, etc.), you
really can't parse only a piece of it even if you have start/end elements
grouped together.  If you want to deal effectively (and scalably) with
large XML files consisting of many records, the right thing to do is to
write them as one XML document per line just like the one JSON document per
line, at which point the data can be split effectively.  Something like
Woodstox and a little custom code should make an effective pre-processor.

Once you have the line-delimited XML, you can shred it however you want:
 JAXB, Jackson XML, etc.

—
p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/

On Fri, Nov 21, 2014 at 3:38 AM, Prannoy pran...@sigmoidanalytics.com
wrote:

 Hi,

 Parallel processing of xml files may be an issue due to the tags in the
 xml file. The xml file has to be intact as while parsing it matches the
 start and end entity and if its distributed in parts to workers possibly it
 may or may not find start and end tags within the same worker which will
 give an exception.

 Thanks.

 On Wed, Nov 19, 2014 at 6:26 AM, ssimanta [via Apache Spark User List] 
 [hidden
 email] http://user/SendEmail.jtp?type=nodenode=19477i=0 wrote:

 If there a one big XML file (e.g., Wikipedia dump 44GB or the larger dump
 that all revision information also) that is stored in HDFS, is it possible
 to parse it in parallel/faster using Spark? Or do we have to use something
 like a PullParser or Iteratee?

 My current solution is to read the single XML file in the first pass -
 write it to HDFS and then read the small files in parallel on the Spark
 workers.

 Thanks
 -Soumya





 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-tp19239.html
  To start a new topic under Apache Spark User List, email [hidden email]
 http://user/SendEmail.jtp?type=nodenode=19477i=1
 To unsubscribe from Apache Spark User List, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml



 --
 View this message in context: Re: Parsing a large XML file using Spark
 http://apache-spark-user-list.1001560.n3.nabble.com/Parsing-a-large-XML-file-using-Spark-tp19239p19477.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.



Re: Native / C/C++ code integration

2014-11-11 Thread Paul Wais
More thoughts.  I took a deeper look at BlockManager, RDD, and friends. 
Suppose one wanted to get native code access to un-deserialized blocks. 
This task looks very hard.  An RDD behaves much like a Scala iterator of
deserialized values, and interop with BlockManager is all on deserialized
data.  One would probably need to rewrite much of RDD, CacheManager, etc in
native code; an RDD subclass (e.g. PythonRDD) probably wouldn't work.

So exposing raw blocks to native code looks intractable.  I wonder how fast
Java/Kyro can SerDe of byte arrays.  E.g. suppose we have an RDDT where T
is immutable and most of the memory for a single T is a byte array.  What is
the overhead of SerDe-ing T?  (Does Java/Kyro copy the underlying memory?) 
If the overhead is small, then native access to raw blocks wouldn't really
yield any advantage.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Native-C-C-code-integration-tp18347p18640.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Native / C/C++ code integration

2014-11-07 Thread Paul Wais
Dear List,

Has anybody had experience integrating C/C++ code into Spark jobs?  

I have done some work on this topic using JNA.  I wrote a FlatMapFunction
that processes all partition entries using a C++ library.  This approach
works well, but there are some tradeoffs:
 * Shipping the native dylib with the app jar and loading it at runtime
requires a bit of work (on top of normal JNA usage)
 * Native code doesn't respect the executor heap limits.  Under heavy memory
pressure, the native code can sometimes ENOMEM sporadically.
 * While JNA can map Strings, structs, and Java primitive types, the user
still needs to deal with more complex objects.  E.g. re-serialize
protobuf/thrift objects, or provide some other encoding for moving data
between Java and C/C++.
 * C++ static is not thread-safe before C++11, so the user sometimes needs
to take care running inside multi-threaded executors
 * Avoiding memory copies can be a little tricky

One other alternative approach comes to mind is pipe().  However, PipedRDD
requires copying data over pipes, does not support binary data (?), and
native code errors that crash the subprocess don't bubble up to the Spark
job as nicely as with JNA.

Is there a way to expose raw, in-memory partition/block data to native code?

Has anybody else attacked this problem a different way?

All the best,
-Paul 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Native-C-C-code-integration-tp18347.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[SQL] PERCENTILE is not working

2014-11-05 Thread Kevin Paul
Hi all, I encounter this error when execute the query

sqlContext.sql(select percentile(age, array(0, 0.5, 1)) from people).collect()

java.lang.ClassCastException: scala.collection.mutable.ArrayBuffer
cannot be cast to [Ljava.lang.Object;

at 
org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector.getListLength(StandardListObjectInspector.java:83)

at 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters$ListConverter.convert(ObjectInspectorConverters.java:259)

at 
org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils$ConversionHelper.convertIfNecessary(GenericUDFUtils.java:349)

at 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge$GenericUDAFBridgeEvaluator.iterate(GenericUDAFBridge.java:170)

at org.apache.spark.sql.hive.HiveUdafFunction.update(hiveUdfs.scala:342)

at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:135)

at 
org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:128)

at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599)

at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:599)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

Thanks,
Kevin Paul

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Do Spark executors restrict native heap vs JVM heap?

2014-11-02 Thread Paul Wais
Thanks Sean! My novice understanding is that the 'native heap' is the
address space not allocated to the JVM heap, but I wanted to check to see
if I'm missing something.  I found out my issue appeared to be actual
memory pressure on the executor machine.  There was space for the JVM heap
but not much more.

On Thu, Oct 30, 2014 at 12:49 PM, Sean Owen so...@cloudera.com
javascript:; wrote:
 No, but, the JVM also does not allocate memory for native code on the
heap.
 I dont think heap has any bearing on whether your native code can't
allocate
 more memory except that of course the heap is also taking memory.

 On Oct 30, 2014 6:43 PM, Paul Wais pw...@yelp.com javascript:;
wrote:

 Dear Spark List,

 I have a Spark app that runs native code inside map functions.  I've
 noticed that the native code sometimes sets errno to ENOMEM indicating
 a lack of available memory.  However, I've verified that the /JVM/ has
 plenty of heap space available-- Runtime.getRuntime().freeMemory()
 shows gigabytes free and the native code needs only megabytes.  Does
 spark limit the /native/ heap size somehow?  Am poking through the
 executor code now but don't see anything obvious.

 Best Regards,
 -Paul Wais

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:;
 For additional commands, e-mail: user-h...@spark.apache.org
javascript:;




SchemaRDD.where clause error

2014-10-21 Thread Kevin Paul
Hi all, I tried to use the function SchemaRDD.where() but got some error:

  val people = sqlCtx.sql(select * from people)
  people.where('age === 10)

console:27: error: value === is not a member of Symbol

where did I go wrong?

Thanks,
Kevin Paul


Re: SparkSQL on Hive error

2014-10-13 Thread Kevin Paul
Thanks Michael, your patch works for me :)
Regards,
Kelvin Paul

On Fri, Oct 3, 2014 at 3:52 PM, Michael Armbrust mich...@databricks.com
wrote:

 Are you running master?  There was briefly a regression here that is
 hopefully fixed by spark#2635 https://github.com/apache/spark/pull/2635.

 On Fri, Oct 3, 2014 at 1:43 AM, Kevin Paul kevinpaulap...@gmail.com
 wrote:

 Hi all, I tried to launch my application with spark-submit, the command I
 use is:

 bin/spark-submit --class ${MY_CLASS} --jars ${MY_JARS} --master local
 myApplicationJar.jar

 I've buillt spark with SPARK_HIVE=true, and was able to start
 HiveContext, and was able to run command like,
 hiveContext.sql(select * from myTable)
  or hiveContext.sql(select count(*) from myTable)
 myTable is a table on my hive database. However, when I run the command:
 hiveContext.sql(show tables), I got the following error:

 java.lang.NullPointerException

 at
 org.apache.hadoop.hive.ql.Driver.validateConfVariables(Driver.java:1057)

 at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:948)

 at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888)

 at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:298)

 at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:272)

 at
 org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)

 at
 org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)

 at
 org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:38)

 at
 org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360)

 at
 org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360)


 Thanks,
 Kelvin Paul





Setting SparkSQL configuration

2014-10-13 Thread Kevin Paul
Hi all, I tried to set the configuration
spark.sql.inMemoryColumnarStorage.compressed,
and spark.sql.inMemoryColumnarStorage.batchSize in
spark.executor.extraJavaOptions
but it does not work, my spark.executor.extraJavaOptions contains
Dspark.sql.inMemoryColumnarStorage.compressed=true
-Dspark.sql.inMemoryColumnarStorage.batchSize=100, and SparkUI did
indicate that the same setting, but somehow the memory footprint of my
cacheTable RDDs are the same as without the setting. Only when I use
HiveContext.setConf, it reduce my RDDs memory usage. Is it a bug here, or
user are required to set the config using HiveContext's setConf function?

Regards,
Kelvin Paul


Re: Any issues with repartition?

2014-10-08 Thread Paul Wais
Looks like an OOM issue?  Have you tried persisting your RDDs to allow
disk writes?

I've seen a lot of similar crashes in a Spark app that reads from HDFS
and does joins.  I.e. I've seen java.io.IOException: Filesystem
closed, Executor lost, FetchFailed, etc etc with
non-deterministic crashes.  I've tried persisting RDDs, tuning other
params, and verifying that the Executor JVMs don't come close to their
max allocated memory during operation.

Looking through user@ tonight, there are a ton of email threads with
similar crashes and no answers.  It looks like a lot of people are
struggling with OOMs.

Could one of the Spark committers please comment on this thread, or
one of the other unanswered threads with similar crashes?  Is this
simply how Spark behaves if Executors OOM?  What can the user do other
than increase memory or reduce RDD size?  (And how can one deduce how
much of either is needed?)

One general workaround for OOMs could be to programmatically break the
job input (i.e. from HDFS, input from #parallelize() ) into chunks,
and only create/process RDDs related to one chunk at a time.  However,
this approach has the limitations of Spark Streaming and no formal
library support.  What might be nice is that if tasks fail, Spark
could try to re-partition in order to avoid OOMs.



On Fri, Oct 3, 2014 at 2:55 AM, jamborta jambo...@gmail.com wrote:
 I have two nodes with 96G ram 16 cores, my setup is as follows:

 conf = (SparkConf()
 .setMaster(yarn-cluster)
 .set(spark.executor.memory, 30G)
 .set(spark.cores.max, 32)
 .set(spark.executor.instances, 2)
 .set(spark.executor.cores, 8)
 .set(spark.akka.timeout, 1)
 .set(spark.akka.askTimeout, 100)
 .set(spark.akka.frameSize, 500)
 .set(spark.cleaner.ttl, 86400)
 .set(spark.tast.maxFailures, 16)
 .set(spark.worker.timeout, 150)

 thanks a lot,




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Any-issues-with-repartition-tp13462p15674.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkSQL on Hive error

2014-10-03 Thread Kevin Paul
Hi all, I tried to launch my application with spark-submit, the command I
use is:

bin/spark-submit --class ${MY_CLASS} --jars ${MY_JARS} --master local
myApplicationJar.jar

I've buillt spark with SPARK_HIVE=true, and was able to start HiveContext,
and was able to run command like,
hiveContext.sql(select * from myTable)
 or hiveContext.sql(select count(*) from myTable)
myTable is a table on my hive database. However, when I run the command:
hiveContext.sql(show tables), I got the following error:

java.lang.NullPointerException

at org.apache.hadoop.hive.ql.Driver.validateConfVariables(Driver.java:1057)

at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:948)

at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888)

at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:298)

at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:272)

at
org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)

at
org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)

at
org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:38)

at
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360)

at
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360)


Thanks,
Kelvin Paul


Worker Random Port

2014-09-23 Thread Paul Magid
I am trying to get an edge server up and running connecting to our Spark 1.1 
cluster.  The edge server is in a different DMZ than the rest of the cluster 
and we have to specifically open firewall ports between the edge server and the 
rest of the cluster.   I can log on to any node in the cluster (other than the 
edge) and submit code through spark-shell just fine.   I have port 7077 from 
the edge to the master open (verified), and I have port 7078 open from the edge 
to all the workers (also verified).  I have tried setting the worker port to 
not be dynamic by using SPARK_WORKER_PORT in the spark-env.sh but it does not 
seem to stop the dynamic port behavior.   I have included the startup output 
when running spark-shell from the edge server in a different dmz and then from 
a node in the cluster.  Any help greatly appreciated.

Paul Magid
Toyota Motor Sales IS Enterprise Architecture (EA)
Architect I RD
Ph: 310-468-9091 (X69091)
PCN 1C2970, Mail Drop PN12


Running spark-shell from the edge server

14/09/23 14:20:38 INFO SecurityManager: Changing view acls to: root,
14/09/23 14:20:38 INFO SecurityManager: Changing modify acls to: root,
14/09/23 14:20:38 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(root, ); users 
with modify permissions: Set(root, )
14/09/23 14:20:38 INFO HttpServer: Starting HTTP Server
14/09/23 14:20:39 INFO Utils: Successfully started service 'HTTP class server' 
on port 22788.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.1.0
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_55)
Type in expressions to have them evaluated.
Type :help for more information.
14/09/23 14:20:42 INFO SecurityManager: Changing view acls to: root,
14/09/23 14:20:42 INFO SecurityManager: Changing modify acls to: root,
14/09/23 14:20:42 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(root, ); users 
with modify permissions: Set(root, )
14/09/23 14:20:43 INFO Slf4jLogger: Slf4jLogger started
14/09/23 14:20:43 INFO Remoting: Starting remoting
14/09/23 14:20:43 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkdri...@votlbdcd09.tms.toyota.com:32356]
14/09/23 14:20:43 INFO Remoting: Remoting now listens on addresses: 
[akka.tcp://sparkdri...@votlbdcd09.tms.toyota.com:32356]
14/09/23 14:20:43 INFO Utils: Successfully started service 'sparkDriver' on 
port 32356.
14/09/23 14:20:43 INFO SparkEnv: Registering MapOutputTracker
14/09/23 14:20:43 INFO SparkEnv: Registering BlockManagerMaster
14/09/23 14:20:43 INFO DiskBlockManager: Created local directory at 
/tmp/spark-local-20140923142043-4454
14/09/23 14:20:43 INFO Utils: Successfully started service 'Connection manager 
for block manager' on port 48469.
14/09/23 14:20:43 INFO ConnectionManager: Bound socket to port 48469 with id = 
ConnectionManagerId(votlbdcd09.tms.toyota.com,48469)
14/09/23 14:20:43 INFO MemoryStore: MemoryStore started with capacity 265.9 MB
14/09/23 14:20:43 INFO BlockManagerMaster: Trying to register BlockManager
14/09/23 14:20:43 INFO BlockManagerMasterActor: Registering block manager 
votlbdcd09.tms.toyota.com:48469 with 265.9 MB RAM
14/09/23 14:20:43 INFO BlockManagerMaster: Registered BlockManager
14/09/23 14:20:43 INFO HttpFileServer: HTTP File server directory is 
/tmp/spark-888c359a-5a2a-4aaa-80e3-8009cdfa25c8
14/09/23 14:20:43 INFO HttpServer: Starting HTTP Server
14/09/23 14:20:43 INFO Utils: Successfully started service 'HTTP file server' 
on port 12470.
14/09/23 14:20:43 INFO Utils: Successfully started service 'SparkUI' on port 
4040.
14/09/23 14:20:43 INFO SparkUI: Started SparkUI at 
http://votlbdcd09.tms.toyota.com:4040
14/09/23 14:20:43 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
14/09/23 14:20:44 INFO EventLoggingListener: Logging events to 
file:/user/spark/applicationHistory//spark-shell-1411507243973
14/09/23 14:20:44 INFO AppClient$ClientActor: Connecting to master 
spark://votlbdcd01.tms.toyota.com:7077...
14/09/23 14:20:44 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready 
for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
14/09/23 14:20:44 INFO SparkILoop: Created spark context..
14/09/23 14:20:44 INFO SparkDeploySchedulerBackend: Connected to Spark cluster 
with app ID app-20140923142044-0006
Spark context available as sc.

scala 14/09/23 14:21:26 INFO AppClient$ClientActor: Executor added: 
app-20140923142044-0006/0 on 
worker-20140923084845-votlbdcd03.tms.toyota.com-7078 
(votlbdcd03.tms.toyota.com:7078) with 8 cores
14/09/23 14:21:26 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20140923142044-0006/0 on hostPort votlbdcd03

Re: Unable to find proto buffer class error with RDDprotobuf

2014-09-19 Thread Paul Wais
Well it looks like this is indeed a protobuf issue.  Poked a little more
with Kryo.  Since protobuf messages are serializable, I tried just making
Kryo use the JavaSerializer for my messages.  The resulting stack trace
made it look like protobuf GeneratedMessageLite is actually using the
classloader that loaded it, which I believe would be the root loader?

 *
https://code.google.com/p/protobuf/source/browse/trunk/java/src/main/java/com/google/protobuf/GeneratedMessageLite.java?r=425#775
 *
http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l186
 *
http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/ClassLoader.java#l1529
 * See note:
http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l220

So I guess protobuf java serialization is sensitive to the class loader.  I
wonder if Kenton ever saw this one coming :)  I do have a solution, though
(see way below)


Here's the code and stack trace:

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName(myapp);
sparkConf.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer);
sparkConf.set(spark.kryo.registrator, MyKryoRegistrator);

...

public class MyKryoRegistrator implements KryoRegistrator {
public void registerClasses(Kryo kryo) {
kryo.register(MyProtoMessage.class, new JavaSerializer());
}
}

...

14/09/19 05:39:12 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 2)
com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
at
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:34)
at
com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80)
at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80)
at
org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123)
at
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Unable to find proto buffer class
at
com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at

Re: Unable to find proto buffer class error with RDDprotobuf

2014-09-19 Thread Paul Wais
Derp, one caveat to my solution:  I guess Spark doesn't use Kryo for
Function serde :(

On Fri, Sep 19, 2014 at 12:44 AM, Paul Wais pw...@yelp.com wrote:
 Well it looks like this is indeed a protobuf issue.  Poked a little more
 with Kryo.  Since protobuf messages are serializable, I tried just making
 Kryo use the JavaSerializer for my messages.  The resulting stack trace made
 it look like protobuf GeneratedMessageLite is actually using the classloader
 that loaded it, which I believe would be the root loader?

  *
 https://code.google.com/p/protobuf/source/browse/trunk/java/src/main/java/com/google/protobuf/GeneratedMessageLite.java?r=425#775
  *
 http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l186
  *
 http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/ClassLoader.java#l1529
  * See note:
 http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l220

 So I guess protobuf java serialization is sensitive to the class loader.  I
 wonder if Kenton ever saw this one coming :)  I do have a solution, though
 (see way below)


 Here's the code and stack trace:

 SparkConf sparkConf = new SparkConf();
 sparkConf.setAppName(myapp);
 sparkConf.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer);
 sparkConf.set(spark.kryo.registrator, MyKryoRegistrator);

 ...

 public class MyKryoRegistrator implements KryoRegistrator {
 public void registerClasses(Kryo kryo) {
 kryo.register(MyProtoMessage.class, new JavaSerializer());
 }
 }

 ...

 14/09/19 05:39:12 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 2)
 com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
 at
 com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
 at
 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:34)
 at
 com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
 at
 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
 at
 org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80)
 at
 org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80)
 at
 org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123)
 at
 org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
 at
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.lang.RuntimeException: Unable to find proto buffer class
 at
 com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method

Unable to find proto buffer class error with RDDprotobuf

2014-09-18 Thread Paul Wais
Dear List,

I'm writing an application where I have RDDs of protobuf messages.
When I run the app via bin/spar-submit with --master local
--driver-class-path path/to/my/uber.jar, Spark is able to
ser/deserialize the messages correctly.

However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I
try --master spark://my.master:7077 , then I run into errors that make
it look like my protobuf message classes are not on the classpath:

Exception in thread main org.apache.spark.SparkException: Job
aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most
recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost):
java.lang.RuntimeException: Unable to find proto buffer class

com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
...

Why do I need --driver-class-path in the local scenario?  And how can
I ensure my classes are on the classpath no matter how my app is
submitted via bin/spark-submit (e.g. --master spark://my.master:7077 )
?  I've tried poking through the shell scripts and SparkSubmit.scala
and unfortunately I haven't been able to grok exactly what Spark is
doing with the remote/local JVMs.

Cheers,
-Paul

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL Exception

2014-09-18 Thread Paul Magid
All:

I am putting Spark SQL 1.1 through its paces (in a POC) and have been 
pleasantly surprised with what can be done with such a young technology.I 
have run into an exception (listed below) that I suspect relates to the number 
of columns in the table I am querying.   There are 336 columns in the table.   
I have included the Scala / Spark SQL I am running.  This Spark SQL code runs 
just fine when run against narrower tables.   Also, we have purpose built 
this POC cluster with lots of memory and we have set up Impala and Spark SQL 
with roughly the same amounts of memory.   There are 7 worker nodes with 20GB 
memory for Impala and Spark SQL each.  We are using Impala as a comparative 
benchmark and sanity check.  The equivalent SQL runs just fine in Impala (see 
below).   I am a bit of a noob and any help (even with the code below) is 
greatly appreciated.  Also, is there a document that lists current Spark SQL 
limitations/issues?

Paul Magid
Toyota Motor Sales IS Enterprise Architecture (EA)
Architect I RD
Ph: 310-468-9091 (X69091)
PCN 1C2970, Mail Drop PN12


Successful Result In Impala
+
++
| marital_status |
++
| M  |
| S  |
| U  |
| null   |
++
Returned 4 row(s) in 0.91s

Code
+
//Timer code
def time[R](block: = R): R = {
val t0 = System.nanoTime()
val result = block// call-by-name
val t1 = System.nanoTime()
println(Elapsed time:  + (t1 - t0).toFloat/10 + s)
result
}

//Declare and import SQLContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

//Load Parquet file into a table
val parquetFile_db2 = 
sqlContext.parquetFile(hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/)
parquetFile_db2.registerAsTable(customer_demographic_pq)

//Run SQL code with timer
val records= time {sql(select marital_status from customer_demographic_pq 
group by marital_status order by marital_status ).collect().foreach(println)}


Exception
+
14/09/18 08:50:39 INFO SparkContext: Job finished: RangePartitioner at 
Exchange.scala:79, took 21.885859255 s
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: sort, tree:
Sort [marital_status#9 ASC], true
Exchange (RangePartitioning [marital_status#9 ASC], 200)
  Aggregate false, [marital_status#9], [marital_status#9]
   Exchange (HashPartitioning [marital_status#9], 200)
Aggregate true, [marital_status#9], [marital_status#9]
 ParquetTableScan [marital_status#9], (ParquetRelation 
hdfs://x.x.x.x:8020/user/hive/warehouse/c360poc.db/customer_demographic_pq/, 
Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, 
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, 
hdfs-site.xml), org.apache.spark.sql.SQLContext@4d79d3de, []), []

at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:47)
at org.apache.spark.sql.execution.Sort.execute(basicOperators.scala:191)
at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:85)
at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:438)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcV$sp(console:19)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:19)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:19)
at $iwC$$iwC$$iwC$$iwC.time(console:12)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:19)
at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)
at $iwC$$iwC$$iwC$$iwC.init(console:26)
at $iwC$$iwC$$iwC.init(console:28)
at $iwC$$iwC.init(console:30)
at $iwC.init(console:32)
at init(console:34)
at .init(console:38)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala

RE: Spark SQL Exception

2014-09-18 Thread Paul Magid
Michael:

Thanks for the quick response.   I can confirm that once I removed the “order 
by” clause the exception below went away.   So, I believe this confirms what 
you were say and I will be opening a new feature request in JIRA.

However, that exception was replaced by a java.lang.OutOfMemoryError: Java heap 
space error.   I am guessing this relates to any of the following Issues:
SPARK-2902 Change default options to be more agressive (In memory columnar 
compression)
SPARK-3056 Sort-based Aggregation (SparkSQL only support the hash-based 
aggregation, which may cause OOM if too many identical keys in the input 
tuples.)
SPARK-2926 Add MR-style (merge-sort) SortShuffleReader for sort-based shuffle

The Exception is included below.

Paul Magid
Toyota Motor Sales IS Enterprise Architecture (EA)
Architect I RD
Ph: 310-468-9091 (X69091)
PCN 1C2970, Mail Drop PN12

Exception
+
14/09/18 10:11:03 INFO TaskSetManager: Finished task 36.0 in stage 0.0 (TID 57) 
in 18681 ms on votlbdcd04.tms.toyota.com (5/200)
14/09/18 10:11:09 ERROR Utils: Uncaught exception in thread Result resolver 
thread-0
java.lang.OutOfMemoryError: Java heap space
Exception in thread Result resolver thread-0 14/09/18 10:11:09 INFO 
RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
java.lang.OutOfMemoryError: Java heap space
14/09/18 10:11:09 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon 
shut down; proceeding with flushing remote transports.
14/09/18 10:11:09 INFO Remoting: Remoting shut down
14/09/18 10:11:09 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut 
down.
org.apache.spark.SparkException: Job cancelled because SparkContext was shut 
down
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at 
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399)
at 
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
at 
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
at akka.actor.ActorCell.terminate(ActorCell.scala:338)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
at akka.dispatch.Mailbox.run(Mailbox.scala:218)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


scala 14/09/18 10:11:09 INFO TaskSetManager: Finished task 50.0 in stage 0.0 
(TID 71) in 27100 ms on votlbdcd04.tms.toyota.com (6/200)
14/09/18 10:11:10 INFO TaskSetManager: Finished task 22.0 in stage 0.0 (TID 43) 
in 27520 ms on votlbdcd04.tms.toyota.com (7/200)
14/09/18 10:11:10 INFO ConnectionManager: Removing ReceivingConnection to 
ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Key not valid ? 
sun.nio.ch.SelectionKeyImpl@7a542d34
14/09/18 10:11:10 INFO ConnectionManager: key already cancelled ? 
sun.nio.ch.SelectionKeyImpl@7a542d34
java.nio.channels.CancelledKeyException
at 
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
at 
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Handling connection error on 
connection to ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(votlbdcd05.tms.toyota.com,24438)
14/09/18 10:11:10 INFO ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(votlbdcd06.tms.toyota.com,19998)
14/09/18 10:11:10 INFO ConnectionManager: Removing ReceivingConnection to 
ConnectionManagerId

Re: Unable to find proto buffer class error with RDDprotobuf

2014-09-18 Thread Paul Wais
Well, it looks like Spark is just not loading my code into the
driver/executors E.g.:

ListString foo = JavaRDDMyMessage bars.map(
new Function MyMessage, String() {

{
System.err.println(classpath:  +
System.getProperty(java.class.path));

CodeSource src =
com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource();
if (src2 != null) {
   URL jar = src2.getLocation();
   System.err.println(aaacom.google.protobuf.GeneratedMessageLite
from jar:  + jar.toString());
}

@Override
public String call(MyMessage v1) throws Exception {
return v1.getString();
}
}).collect();

prints:
classpath: 
::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar
com.google.protobuf.GeneratedMessageLite from jar:
file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar

I do see after those lines:
14/09/18 23:28:09 INFO Executor: Adding
file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class
loader


This is with:

spart-submit --master local --class MyClass --jars uber.jar  uber.jar


My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would
come from there.  I'm using spark 1.1 and hadoop 2.3; hadoop 2.3
should use protobuf 2.5[1] and even shade it properly.  I read claims
in this list that Spark shades protobuf correctly since 0.9.? and
looking thru the pom.xml on github it looks like Spark includes
protobuf 2.5 in the hadoop 2.3 profile.


I guess I'm still at What's the deal with getting Spark to distribute
and load code from my jar correctly?


[1] 
http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml

On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais pw...@yelp.com wrote:
 Dear List,

 I'm writing an application where I have RDDs of protobuf messages.
 When I run the app via bin/spar-submit with --master local
 --driver-class-path path/to/my/uber.jar, Spark is able to
 ser/deserialize the messages correctly.

 However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I
 try --master spark://my.master:7077 , then I run into errors that make
 it look like my protobuf message classes are not on the classpath:

 Exception in thread main org.apache.spark.SparkException: Job
 aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most
 recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost):
 java.lang.RuntimeException: Unable to find proto buffer class
 
 com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 java.lang.reflect.Method.invoke(Method.java:606)
 
 java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
 ...

 Why do I need --driver-class-path in the local scenario?  And how can
 I ensure my classes are on the classpath no matter how my app is
 submitted via bin/spark-submit (e.g. --master spark://my.master:7077 )
 ?  I've tried poking through the shell scripts and SparkSubmit.scala
 and unfortunately I haven't been able to grok exactly what Spark is
 doing with the remote/local JVMs.

 Cheers,
 -Paul

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to find proto buffer class error with RDDprotobuf

2014-09-18 Thread Paul Wais
Ah, can one NOT create an RDD of any arbitrary Serializable type?  It
looks like I might be getting bitten by the same
java.io.ObjectInputStream uses root class loader only bugs mentioned
in:

* 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-td3259.html
* https://github.com/apache/spark/pull/181

* 
http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E
* https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I




On Thu, Sep 18, 2014 at 4:51 PM, Paul Wais pw...@yelp.com wrote:
 Well, it looks like Spark is just not loading my code into the
 driver/executors E.g.:

 ListString foo = JavaRDDMyMessage bars.map(
 new Function MyMessage, String() {

 {
 System.err.println(classpath:  +
 System.getProperty(java.class.path));

 CodeSource src =
 com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource();
 if (src2 != null) {
URL jar = src2.getLocation();
System.err.println(aaacom.google.protobuf.GeneratedMessageLite
 from jar:  + jar.toString());
 }

 @Override
 public String call(MyMessage v1) throws Exception {
 return v1.getString();
 }
 }).collect();

 prints:
 classpath: 
 ::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar
 com.google.protobuf.GeneratedMessageLite from jar:
 file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar

 I do see after those lines:
 14/09/18 23:28:09 INFO Executor: Adding
 file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class
 loader


 This is with:

 spart-submit --master local --class MyClass --jars uber.jar  uber.jar


 My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would
 come from there.  I'm using spark 1.1 and hadoop 2.3; hadoop 2.3
 should use protobuf 2.5[1] and even shade it properly.  I read claims
 in this list that Spark shades protobuf correctly since 0.9.? and
 looking thru the pom.xml on github it looks like Spark includes
 protobuf 2.5 in the hadoop 2.3 profile.


 I guess I'm still at What's the deal with getting Spark to distribute
 and load code from my jar correctly?


 [1] 
 http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml

 On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais pw...@yelp.com wrote:
 Dear List,

 I'm writing an application where I have RDDs of protobuf messages.
 When I run the app via bin/spar-submit with --master local
 --driver-class-path path/to/my/uber.jar, Spark is able to
 ser/deserialize the messages correctly.

 However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I
 try --master spark://my.master:7077 , then I run into errors that make
 it look like my protobuf message classes are not on the classpath:

 Exception in thread main org.apache.spark.SparkException: Job
 aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most
 recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost):
 java.lang.RuntimeException: Unable to find proto buffer class
 
 com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 java.lang.reflect.Method.invoke(Method.java:606)
 
 java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
 ...

 Why do I need --driver-class-path in the local scenario?  And how can
 I ensure my classes are on the classpath no matter how my app is
 submitted via bin/spark-submit (e.g. --master spark://my.master:7077 )
 ?  I've tried poking through the shell scripts and SparkSubmit.scala
 and unfortunately I haven't been able to grok exactly what Spark is
 doing with the remote/local JVMs.

 Cheers,
 -Paul

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to find proto buffer class error with RDDprotobuf

2014-09-18 Thread Paul Wais
hmm would using kyro help me here?

On Thursday, September 18, 2014, Paul Wais pw...@yelp.com wrote:

 Ah, can one NOT create an RDD of any arbitrary Serializable type?  It
 looks like I might be getting bitten by the same
 java.io.ObjectInputStream uses root class loader only bugs mentioned
 in:

 *
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-td3259.html
 * https://github.com/apache/spark/pull/181

 *
 http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E
 * https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I




 On Thu, Sep 18, 2014 at 4:51 PM, Paul Wais pw...@yelp.com javascript:;
 wrote:
  Well, it looks like Spark is just not loading my code into the
  driver/executors E.g.:
 
  ListString foo = JavaRDDMyMessage bars.map(
  new Function MyMessage, String() {
 
  {
  System.err.println(classpath:  +
  System.getProperty(java.class.path));
 
  CodeSource src =
 
 com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource();
  if (src2 != null) {
 URL jar = src2.getLocation();
 
 System.err.println(aaacom.google.protobuf.GeneratedMessageLite
  from jar:  + jar.toString());
  }
 
  @Override
  public String call(MyMessage v1) throws Exception {
  return v1.getString();
  }
  }).collect();
 
  prints:
  classpath:
 ::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar
  com.google.protobuf.GeneratedMessageLite from jar:
  file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar
 
  I do see after those lines:
  14/09/18 23:28:09 INFO Executor: Adding
  file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class
  loader
 
 
  This is with:
 
  spart-submit --master local --class MyClass --jars uber.jar  uber.jar
 
 
  My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would
  come from there.  I'm using spark 1.1 and hadoop 2.3; hadoop 2.3
  should use protobuf 2.5[1] and even shade it properly.  I read claims
  in this list that Spark shades protobuf correctly since 0.9.? and
  looking thru the pom.xml on github it looks like Spark includes
  protobuf 2.5 in the hadoop 2.3 profile.
 
 
  I guess I'm still at What's the deal with getting Spark to distribute
  and load code from my jar correctly?
 
 
  [1]
 http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml
 
  On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais pw...@yelp.com
 javascript:; wrote:
  Dear List,
 
  I'm writing an application where I have RDDs of protobuf messages.
  When I run the app via bin/spar-submit with --master local
  --driver-class-path path/to/my/uber.jar, Spark is able to
  ser/deserialize the messages correctly.
 
  However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I
  try --master spark://my.master:7077 , then I run into errors that make
  it look like my protobuf message classes are not on the classpath:
 
  Exception in thread main org.apache.spark.SparkException: Job
  aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most
  recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost):
  java.lang.RuntimeException: Unable to find proto buffer class
 
  
 com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
  sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 
  sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 
  
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  java.lang.reflect.Method.invoke(Method.java:606)
 
  java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
 
  java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
  ...
 
  Why do I need --driver-class-path in the local scenario?  And how can
  I ensure my classes are on the classpath no matter how my app is
  submitted via bin/spark-submit (e.g. --master spark://my.master:7077 )
  ?  I've tried poking through the shell scripts and SparkSubmit.scala
  and unfortunately I haven't been able to grok exactly what Spark is
  doing with the remote/local JVMs.
 
  Cheers,
  -Paul



Re: Unable to find proto buffer class error with RDDprotobuf

2014-09-18 Thread Paul Wais
).


I think the root problem might be related to this change:
https://github.com/apache/spark/commit/cc3648774e9a744850107bb187f2828d447e0a48#diff-7b43397a89d8249663cbd13374a48db0R42

That change did not appear to touch ParallelCollectionRDD, which I
believe is using the root classloader (would explain why
--driver-class-path fixes the problem):
https://github.com/apache/spark/blob/2f9b2bd7844ee8393dc9c319f4fefedf95f5e460/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L74

If uber.jar is on the classpath, then the root classloader would have
the code, hence why --driver-class-path fixes the bug.




On Thu, Sep 18, 2014 at 5:42 PM, Paul Wais pw...@yelp.com wrote:
 hmm would using kyro help me here?


 On Thursday, September 18, 2014, Paul Wais pw...@yelp.com wrote:

 Ah, can one NOT create an RDD of any arbitrary Serializable type?  It
 looks like I might be getting bitten by the same
 java.io.ObjectInputStream uses root class loader only bugs mentioned
 in:

 *
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-td3259.html
 * https://github.com/apache/spark/pull/181

 *
 http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E
 * https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I




 On Thu, Sep 18, 2014 at 4:51 PM, Paul Wais pw...@yelp.com wrote:
  Well, it looks like Spark is just not loading my code into the
  driver/executors E.g.:
 
  ListString foo = JavaRDDMyMessage bars.map(
  new Function MyMessage, String() {
 
  {
  System.err.println(classpath:  +
  System.getProperty(java.class.path));
 
  CodeSource src =
 
  com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource();
  if (src2 != null) {
 URL jar = src2.getLocation();
 
  System.err.println(aaacom.google.protobuf.GeneratedMessageLite
  from jar:  + jar.toString());
  }
 
  @Override
  public String call(MyMessage v1) throws Exception {
  return v1.getString();
  }
  }).collect();
 
  prints:
  classpath:
  ::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar
  com.google.protobuf.GeneratedMessageLite from jar:
  file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar
 
  I do see after those lines:
  14/09/18 23:28:09 INFO Executor: Adding
  file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class
  loader
 
 
  This is with:
 
  spart-submit --master local --class MyClass --jars uber.jar  uber.jar
 
 
  My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would
  come from there.  I'm using spark 1.1 and hadoop 2.3; hadoop 2.3
  should use protobuf 2.5[1] and even shade it properly.  I read claims
  in this list that Spark shades protobuf correctly since 0.9.? and
  looking thru the pom.xml on github it looks like Spark includes
  protobuf 2.5 in the hadoop 2.3 profile.
 
 
  I guess I'm still at What's the deal with getting Spark to distribute
  and load code from my jar correctly?
 
 
  [1]
  http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml
 
  On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais pw...@yelp.com wrote:
  Dear List,
 
  I'm writing an application where I have RDDs of protobuf messages.
  When I run the app via bin/spar-submit with --master local
  --driver-class-path path/to/my/uber.jar, Spark is able to
  ser/deserialize the messages correctly.
 
  However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I
  try --master spark://my.master:7077 , then I run into errors that make
  it look like my protobuf message classes are not on the classpath:
 
  Exception in thread main org.apache.spark.SparkException: Job
  aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most
  recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost):
  java.lang.RuntimeException: Unable to find proto buffer class
 
  com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
  sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 
  sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 
  sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  java.lang.reflect.Method.invoke(Method.java:606)
 
  java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
 
  java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
  ...
 
  Why do I need --driver-class-path in the local scenario?  And how can
  I ensure my classes are on the classpath no matter how my app is
  submitted via bin/spark-submit (e.g. --master spark://my.master:7077 )
  ?  I've tried poking through the shell scripts and SparkSubmit.scala

Re: Spark 1.1 / cdh4 stuck using old hadoop client?

2014-09-16 Thread Paul Wais
Thanks Christian!  I tried compiling from source but am still getting the
same hadoop client version error when reading from HDFS.  Will have to poke
deeper... perhaps I've got some classpath issues.  FWIW I compiled using:

$ MAVEN_OPTS=-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m
mvn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package

and hadoop 2.3 / cdh5 from
http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.3.0-cdh5.0.0.tar.gz





On Mon, Sep 15, 2014 at 6:49 PM, Christian Chua cc8...@icloud.com wrote:

 Hi Paul.

 I would recommend building your own 1.1.0 distribution.

 ./make-distribution.sh --name hadoop-personal-build-2.4 --tgz -Pyarn
 -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests



 I downloaded the Pre-build for Hadoop 2.4 binary, and it had this
 strange behavior where

 spark-submit --master yarn-cluster ...

 will work, but

 spark-submit --master yarn-client ...

 will fail.


 But on the personal build obtained from the command above, both will then
 work.


 -Christian




 On Sep 15, 2014, at 6:28 PM, Paul Wais pw...@yelp.com wrote:

 Dear List,

 I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for
 reading SequenceFiles.  In particular, I'm seeing:

 Exception in thread main org.apache.hadoop.ipc.RemoteException:
 Server IPC version 7 cannot communicate with client version 4
at org.apache.hadoop.ipc.Client.call(Client.java:1070)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
...

 When invoking JavaSparkContext#newAPIHadoopFile().  (With args
 validSequenceFileURI, SequenceFileInputFormat.class, Text.class,
 BytesWritable.class, new Job().getConfiguration() -- Pretty close to
 the unit test here:

 https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916
 )


 This error indicates to me that Spark is using an old hadoop client to
 do reads.  Oddly I'm able to do /writes/ ok, i.e. I'm able to write
 via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster.


 Do I need to explicitly build spark for modern hadoop??  I previously
 had an hdfs cluster running hadoop 2.3.0 and I was getting a similar
 error (server is using version 9, client is using version 4).


 I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted
 on spark's site:
 * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz
 * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz


 What distro of hadoop is used at Data Bricks?  Are there distros of
 Spark 1.1 and hadoop that should work together out-of-the-box?
 (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..)

 Thanks for any help anybody can give me here!
 -Paul

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark 1.1 / cdh4 stuck using old hadoop client?

2014-09-16 Thread Paul Wais
Hi Sean,

Great catch! Yes I was including Spark as a dependency and it was
making its way into my uber jar.  Following the advice I just found at
Stackoverflow[1],  I marked Spark as a provided dependency and that
appeared to fix my Hadoop client issue.  Thanks for your help!!!
Perhaps they maintainers might consider setting this in the Quickstart
guide pom.xml ( http://spark.apache.org/docs/latest/quick-start.html )

In summary, here's what worked:
 * Hadoop 2.3 cdh5
http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.3.0-cdh5.0.0.tar.gz
 * Spark 1.1 for Hadoop 2.3
http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-hadoop2.3.tgz

pom.xml snippets: https://gist.github.com/ypwais/ff188611d4806aa05ed9

[1] 
http://stackoverflow.com/questions/24747037/how-to-define-a-dependency-scope-in-maven-to-include-a-library-in-compile-run

Thanks everybody!!
-Paul


On Tue, Sep 16, 2014 at 3:55 AM, Sean Owen so...@cloudera.com wrote:
 From the caller / application perspective, you don't care what version
 of Hadoop Spark is running on on the cluster. The Spark API you
 compile against is the same. When you spark-submit the app, at
 runtime, Spark is using the Hadoop libraries from the cluster, which
 are the right version.

 So when you build your app, you mark Spark as a 'provided' dependency.
 Therefore in general, no, you do not build Spark for yourself if you
 are a Spark app creator.

 (Of course, your app would care if it were also using Hadoop libraries
 directly. In that case, you will want to depend on hadoop-client, and
 the right version for your cluster, but still mark it as provided.)

 The version Spark is built against only matters when you are deploying
 Spark's artifacts on the cluster to set it up.

 Your error suggests there is still a version mismatch. Either you
 deployed a build that was not compatible, or, maybe you are packaging
 a version of Spark with your app which is incompatible and
 interfering.

 For example, the artifacts you get via Maven depend on Hadoop 1.0.4. I
 suspect that's what you're doing -- packaging Spark(+Hadoop1.0.4) with
 your app, when it shouldn't be packaged.

 Spark works out of the box with just about any modern combo of HDFS and YARN.

 On Tue, Sep 16, 2014 at 2:28 AM, Paul Wais pw...@yelp.com wrote:
 Dear List,

 I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for
 reading SequenceFiles.  In particular, I'm seeing:

 Exception in thread main org.apache.hadoop.ipc.RemoteException:
 Server IPC version 7 cannot communicate with client version 4
 at org.apache.hadoop.ipc.Client.call(Client.java:1070)
 at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
 at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
 ...

 When invoking JavaSparkContext#newAPIHadoopFile().  (With args
 validSequenceFileURI, SequenceFileInputFormat.class, Text.class,
 BytesWritable.class, new Job().getConfiguration() -- Pretty close to
 the unit test here:
 https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916
 )


 This error indicates to me that Spark is using an old hadoop client to
 do reads.  Oddly I'm able to do /writes/ ok, i.e. I'm able to write
 via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster.


 Do I need to explicitly build spark for modern hadoop??  I previously
 had an hdfs cluster running hadoop 2.3.0 and I was getting a similar
 error (server is using version 9, client is using version 4).


 I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted
 on spark's site:
  * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz
  * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz


 What distro of hadoop is used at Data Bricks?  Are there distros of
 Spark 1.1 and hadoop that should work together out-of-the-box?
 (Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..)

 Thanks for any help anybody can give me here!
 -Paul

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.1 / cdh4 stuck using old hadoop client?

2014-09-15 Thread Paul Wais
Dear List,

I'm having trouble getting Spark 1.1 to use the Hadoop 2 API for
reading SequenceFiles.  In particular, I'm seeing:

Exception in thread main org.apache.hadoop.ipc.RemoteException:
Server IPC version 7 cannot communicate with client version 4
at org.apache.hadoop.ipc.Client.call(Client.java:1070)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
at com.sun.proxy.$Proxy7.getProtocolVersion(Unknown Source)
...

When invoking JavaSparkContext#newAPIHadoopFile().  (With args
validSequenceFileURI, SequenceFileInputFormat.class, Text.class,
BytesWritable.class, new Job().getConfiguration() -- Pretty close to
the unit test here:
https://github.com/apache/spark/blob/f0f1ba09b195f23f0c89af6fa040c9e01dfa8951/core/src/test/java/org/apache/spark/JavaAPISuite.java#L916
)


This error indicates to me that Spark is using an old hadoop client to
do reads.  Oddly I'm able to do /writes/ ok, i.e. I'm able to write
via JavaPairRdd#saveAsNewAPIHadoopFile() to my hdfs cluster.


Do I need to explicitly build spark for modern hadoop??  I previously
had an hdfs cluster running hadoop 2.3.0 and I was getting a similar
error (server is using version 9, client is using version 4).


I'm using Spark 1.1 cdh4 as well as hadoop cdh4 from the links posted
on spark's site:
 * http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-cdh4.tgz
 * http://d3kbcqa49mib13.cloudfront.net/hadoop-2.0.0-cdh4.2.0.tar.gz


What distro of hadoop is used at Data Bricks?  Are there distros of
Spark 1.1 and hadoop that should work together out-of-the-box?
(Previously I had Spark 1.0.0 and Hadoop 2.3 working fine..)

Thanks for any help anybody can give me here!
-Paul

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: increase parallelism of reading from hdfs

2014-08-11 Thread Paul Hamilton
Hi Chen,

You need to set the max input split size so that the underlying hadoop
libraries will calculate the splits appropriately.  I have done the
following successfully:

val job = new Job()
FileInputFormat.setMaxInputSplitSize(job, 12800L)

And then use job.getConfiguration when creating a NewHadoopRDD.

I am sure there is some way to use it with convenience methods like
SparkContext.textFile, you could probably set the system property
mapreduce.input.fileinputformat.split.maxsize.

Regards,
Paul Hamilton

From:  Chen Song chen.song...@gmail.com
Date:  Friday, August 8, 2014 at 9:13 PM
To:  user@spark.apache.org user@spark.apache.org
Subject:  increase parallelism of reading from hdfs


In Spark Streaming, StreamContext.fileStream gives a FileInputDStream.
Within each batch interval, it would launch map tasks for the new files
detected during that interval. It appears that the way Spark compute the
number of map tasks is based
 oo block size of files.

Below is the quote from Spark documentation.

 Spark automatically sets the number of ³map² tasks to run on each file
according to its size (though you can control
 it through optional parameters to SparkContext.textFile,
 etc)

In my testing, if files are loaded as 512M blocks, each map task seems to
process 512M chunk of data, no matter what value I set dfs.blocksize on
driver/executor. I am wondering if there is a way to increase parallelism,
say let each map read 128M data
 and increase the number of map tasks?


-- 
Chen Song


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to read a multipart s3 file?

2014-08-07 Thread paul
darkjh wrote
 But in my experience, when reading directly from
 s3n, spark create only 1 input partition per file, regardless of the file
 size. This may lead to some performance problem if you have big files.

This is actually not true, Spark uses the underlying hadoop input formats to
read the files so if the input format you are using supports splittable
files (text, avro etc.) then it can use multiple splits per file (leading to
multiple map tasks per file).  You do have to set the max input split size,
as an example:

FileInputFormat.setMaxInputSplitSize(job, 25600L)

In this case any file larger than 256,000,000 bytes is split.  If you don't
explicitly set it the limit is infinite which leads to the behavior you are
seeing where it is 1 split per file. 

Regards,
Paul Hamilton



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-a-multipart-s3-file-tp5463p11673.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Release date for new pyspark

2014-07-17 Thread Paul Wais
Thanks all!  (And thanks Matei for the developer link!)  I was able to
build using maven[1] but `./sbt/sbt assembly` results in build errors.
(Not familiar enough with the build to know why; in the past sbt
worked for me and maven did not).

I was able to run the master version of pyspark, which was what I
wanted, though I discovered a bug when trying to read spark-pickled
data from HDFS.  (Looks similar to
https://spark-project.atlassian.net/browse/SPARK-1034 from my naive
point of view).  For the curious:

Code:

conf = SparkConf()
conf.set('spark.local.dir', '/nail/tmp')
conf.set('spark.executor.memory', '28g')
conf.set('spark.app.name', 'test')

sc = SparkContext(conf=conf)

sc.parallelize(range(10)).saveAsPickleFile(hdfs://host:9000/test_pickle)
unpickled_rdd = sc.pickleFile(hdfs://host:9000/test_pickle)
print unpickled_rdd.takeSample(False, 3)

Traceback (most recent call last):
  File /path/to/my/home/spark-master/tast.py, line 33, in module
print unpickled_rdd.takeSample(False, 3)
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line
391, in takeSample
initialCount = self.count()
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 791, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line 782, in sum
return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line
703, in reduce
vals = self.mapPartitions(func).collect()
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line
667, in collect
bytesInJava = self._jrdd.collect().iterator()
  File /path/to/my/home/spark-master/python/pyspark/rdd.py, line
1600, in _jrdd
class_tag)
  File 
/path/to/my/home/spark-master/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
line 669, in __call__
  File 
/path/to/my/home/spark-master/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py,
line 304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling
None.org.apache.spark.api.python.PythonRDD. Trace:
py4j.Py4JException: Constructor
org.apache.spark.api.python.PythonRDD([class
org.apache.spark.rdd.FlatMappedRDD, class [B, class java.util.HashMap,
class java.util.ArrayList, class java.lang.Boolean, class
java.lang.String, class java.util.ArrayList, class
org.apache.spark.Accumulator, class
scala.reflect.ManifestFactory$$anon$2]) does not exist
at 
py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:184)
at 
py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:202)
at py4j.Gateway.invoke(Gateway.java:213)
at 
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:662)


[1] mvn -Phadoop-2.3 -Dhadoop.verson=2.3.0 -DskipTests clean package

On Wed, Jul 16, 2014 at 8:39 PM, Michael Armbrust
mich...@databricks.com wrote:
 You should try cleaning and then building.  We have recently hit a bug in
 the scala compiler that sometimes causes non-clean builds to fail.


 On Wed, Jul 16, 2014 at 7:56 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Yeah, we try to have a regular 3 month release cycle; see
 https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage for the
 current window.

 Matei

 On Jul 16, 2014, at 4:21 PM, Mark Hamstra m...@clearstorydata.com wrote:

 You should expect master to compile and run: patches aren't merged unless
 they build and pass tests on Jenkins.

 You shouldn't expect new features to be added to stable code in
 maintenance releases (e.g. 1.0.1).

 AFAIK, we're still on track with Spark 1.1.0 development, which means that
 it should be released sometime in the second half of next month (or shortly
 thereafter).


 On Wed, Jul 16, 2014 at 4:03 PM, Paul Wais pw...@yelp.com wrote:

 Dear List,

 The version of pyspark on master has a lot of nice new features, e.g.
 SequenceFile reading, pickle i/o, etc:
 https://github.com/apache/spark/blob/master/python/pyspark/context.py#L353

 I downloaded the recent 1.0.1 release and was surprised to see the
 distribution did not include these changes in master.  (I've tried pulling
 master [ 9c249743ea ] and compiling from source, but I get a build failure
 in TestSQLContext.scala FWIW).

 Is an updated pyspark scheduled for the next release?  (Also, am I wrong
 in expecting HEAD on master should probably compile and run?)

 Best Regards,
 -Paul Wais






Release date for new pyspark

2014-07-16 Thread Paul Wais
Dear List,

The version of pyspark on master has a lot of nice new features, e.g.
SequenceFile reading, pickle i/o, etc:
https://github.com/apache/spark/blob/master/python/pyspark/context.py#L353

I downloaded the recent 1.0.1 release and was surprised to see the
distribution did not include these changes in master.  (I've tried pulling
master [ 9c249743ea ] and compiling from source, but I get a build failure
in TestSQLContext.scala FWIW).

Is an updated pyspark scheduled for the next release?  (Also, am I wrong in
expecting HEAD on master should probably compile and run?)

Best Regards,
-Paul Wais


Re: Recommended pipeline automation tool? Oozie?

2014-07-10 Thread Paul Brown
We use Luigi for this purpose.  (Our pipelines are typically on AWS (no
EMR) backed by S3 and using combinations of Python jobs, non-Spark
Java/Scala, and Spark.  We run Spark jobs by connecting drivers/clients to
the master, and those are what is invoked from Luigi.)

—
p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/


On Thu, Jul 10, 2014 at 10:20 AM, k.tham kevins...@gmail.com wrote:

 I'm just wondering what's the general recommendation for data pipeline
 automation.

 Say, I want to run Spark Job A, then B, then invoke script C, then do D,
 and
 if D fails, do E, and if Job A fails, send email F, etc...

 It looks like Oozie might be the best choice. But I'd like some
 advice/suggestions.

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Recommended-pipeline-automation-tool-Oozie-tp9319.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: jackson-core-asl jar (1.8.8 vs 1.9.x) conflict with the spark-sql (version 1.x)

2014-06-28 Thread Paul Brown
Hi, Mans --

Both of those versions of Jackson are pretty ancient.  Do you know which of
the Spark dependencies is pulling them in?  It would be good for us (the
Jackson, Woodstox, etc., folks) to see if we can get people to upgrade to
more recent versions of Jackson.

-- Paul

—
p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/


On Fri, Jun 27, 2014 at 12:58 PM, M Singh mans6si...@yahoo.com wrote:

  Hi:

 I am using spark to stream data to cassandra and it works fine in local
 mode. But when I execute the application in a standalone clustered env I
 got exception included below (java.lang.NoClassDefFoundError:
 org/codehaus/jackson/annotate/JsonClass).

 I think this is due to the jackson-core-asl dependency conflict
 (jackson-core-asl 1.8.8 has the JsonClass but 1.9.x does not).  The 1.9.x
 version is being pulled in by spark-sql project.  I tried adding
 jackson-core-asl 1.8.8 with --jars argument while submitting the
 application for execution but it did not work.  So I created a custom spark
 build excluding sql project.  With this custom spark install I was able to
 resolve the issue at least on a single node cluster (separate master and
 worker).

 If there is an alternate way to resolve this conflicting jar issue without
 a custom build (eg: configuration to use the user defined jars in the
 executor class path first), please let me know.

 Also, is there a comprehensive list of configuration properties available
 for spark ?

 Thanks

 Mans

 Exception trace

  TaskSetManager: Loss was due to java.lang.NoClassDefFoundError
 java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass
 at
 org.codehaus.jackson.map.introspect.JacksonAnnotationIntrospector.findDeserializationType(JacksonAnnotationIntrospector.java:524)
 at
 org.codehaus.jackson.map.deser.BasicDeserializerFactory.modifyTypeByAnnotation(BasicDeserializerFactory.java:732)
 at
 org.codehaus.jackson.map.deser.BasicDeserializerFactory.createCollectionDeserializer(BasicDeserializerFactory.java:229)
 at
 org.codehaus.jackson.map.deser.StdDeserializerProvider._createDeserializer(StdDeserializerProvider.java:386)
 at
 org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCache2(StdDeserializerProvider.java:307)
 at
 org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCacheValueDeserializer(StdDeserializerProvider.java:287)
 at
 org.codehaus.jackson.map.deser.StdDeserializerProvider.findValueDeserializer(StdDeserializerProvider.java:136)
 at
 org.codehaus.jackson.map.deser.StdDeserializerProvider.findTypedValueDeserializer(StdDeserializerProvider.java:157)
 at
 org.codehaus.jackson.map.ObjectMapper._findRootDeserializer(ObjectMapper.java:2468)
 at
 org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2402)
 at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1602)




Re: Upgrading to Spark 1.0.0 causes NoSuchMethodError

2014-06-25 Thread Paul Brown
Hi, Robert --

I wonder if this is an instance of SPARK-2075:
https://issues.apache.org/jira/browse/SPARK-2075

-- Paul

—
p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/


On Wed, Jun 25, 2014 at 6:28 AM, Robert James srobertja...@gmail.com
wrote:

 On 6/24/14, Robert James srobertja...@gmail.com wrote:
  My app works fine under Spark 0.9.  I just tried upgrading to Spark
  1.0, by downloading the Spark distro to a dir, changing the sbt file,
  and running sbt assembly, but I get now NoSuchMethodErrors when trying
  to use spark-submit.
 
  I copied in the SimpleApp example from
  http://spark.apache.org/docs/latest/quick-start.html and get the same
  error:
 
  $/usr/local/share/spark/bin/spark-submit --class SimpleApp
  target/scala-2.10/myproj-assembly-1.0.jar
  Spark assembly has been built with Hive, including Datanucleus jars on
  classpath
  Exception in thread main java.lang.NoSuchMethodError:
 
 org.apache.spark.SparkContext$.$lessinit$greater$default$2()Lscala/collection/Map;
at SimpleApp$.main(SimpleApp.scala:10)
at SimpleApp.main(SimpleApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 
  How can I migrate to Spark 1.0.0?
 

 I've done `sbt clean`, deleted the entire ivy2 cache, and still get
 the above error on both my code and the official Spark example.  Can
 anyone guide me on how to debug this?

 How does Spark find the /usr/local/share/spark directory? Is there a
 variable somewhere I need to set to point to that, or that might point
 to the old spark? I've left the old spark dir on the machine (just
 changed the symlink) - can that be causing problems?

 How should I approach this?



Re: Strange problem with saveAsTextFile after upgrade Spark 0.9.0-1.0.0

2014-06-08 Thread Paul Brown
Moving over to the dev list, as this isn't a user-scope issue.

I just ran into this issue with the missing saveAsTestFile, and here's a
little additional information:

- Code ported from 0.9.1 up to 1.0.0; works with local[n] in both cases.
- Driver built as an uberjar via Maven.
- Deployed to smallish EC2 cluster in standalone mode (S3 storage) with
Spark 1.0.0-hadoop1 downloaded from Apache.

Given that it functions correctly in local mode but not in a standalone
cluster, this suggests to me that the issue is in a difference between the
Maven version and the hadoop1 version.

In the spirit of taking the computer at its word, we can just have a look
in the JAR files.  Here's what's in the Maven dep as of 1.0.0:

jar tvf
~/.m2/repository/org/apache/spark/spark-core_2.10/1.0.0/spark-core_2.10-1.0.0.jar
| grep 'rdd/RDD' | grep 'saveAs'
  1519 Mon May 26 13:57:58 PDT 2014
org/apache/spark/rdd/RDD$anonfun$saveAsTextFile$1.class
  1560 Mon May 26 13:57:58 PDT 2014
org/apache/spark/rdd/RDD$anonfun$saveAsTextFile$2.class


And here's what's in the hadoop1 distribution:

jar tvf spark-assembly-1.0.0-hadoop1.0.4.jar| grep 'rdd/RDD' | grep 'saveAs'


I.e., it's not there.  It is in the hadoop2 distribution:

jar tvf spark-assembly-1.0.0-hadoop2.2.0.jar| grep 'rdd/RDD' | grep 'saveAs'
  1519 Mon May 26 07:29:54 PDT 2014
org/apache/spark/rdd/RDD$anonfun$saveAsTextFile$1.class
  1560 Mon May 26 07:29:54 PDT 2014
org/apache/spark/rdd/RDD$anonfun$saveAsTextFile$2.class


So something's clearly broken with the way that the distribution assemblies
are created.

FWIW and IMHO, the right way to publish the hadoop1 and hadoop2 flavors
of Spark to Maven Central would be as *entirely different* artifacts
(spark-core-h1, spark-core-h2).

Logged as SPARK-2075 https://issues.apache.org/jira/browse/SPARK-2075.

Cheers.
-- Paul



—
p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/


On Fri, Jun 6, 2014 at 2:45 AM, HenriV henri.vanh...@vdab.be wrote:

 I'm experiencing the same error while upgrading from 0.9.1 to 1.0.0.
 Im using google compute engine and cloud storage. but saveAsTextFile is
 returning errors while saving in the cloud or saving local. When i start a
 job in the cluster i do get an error but after this error it keeps on
 running fine untill the saveAsTextFile. ( I don't know if the two are
 connected)

 ---Error at job startup---
  ERROR metrics.MetricsSystem: Sink class
 org.apache.spark.metrics.sink.MetricsServlet cannot be instantialized
 java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at

 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
 at

 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at

 org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:136)
 at

 org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:130)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at

 org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:130)
 at
 org.apache.spark.metrics.MetricsSystem.init(MetricsSystem.scala:84)
 at

 org.apache.spark.metrics.MetricsSystem$.createMetricsSystem(MetricsSystem.scala:167)
 at org.apache.spark.SparkEnv$.create(SparkEnv.scala:230)
 at org.apache.spark.SparkContext.init(SparkContext.scala:202)
 at Hello$.main(Hello.scala:101)
 at Hello.main(Hello.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at sbt.Run.invokeMain(Run.scala:72)
 at sbt.Run.run0(Run.scala:65)
 at sbt.Run.sbt$Run$$execute$1(Run.scala:54)
 at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:58)
 at sbt.Run$$anonfun$run$1.apply(Run.scala:58)
 at sbt.Run$$anonfun$run$1.apply(Run.scala:58)
 at sbt.Logger$$anon$4.apply(Logger.scala:90)
 at sbt.TrapExit$App.run(TrapExit.scala:244)
 at java.lang.Thread.run(Thread.java:744)
 Caused by: java.lang.NoSuchMethodError:
 com.fasterxml.jackson.core.JsonFactory.requiresPropertyOrdering()Z

Unexpected results when caching data

2014-05-12 Thread paul
I have been experimenting with a data set with and without persisting the RDD
and have come across some unexpected results.  The files we are reading are
Avro files so we are using the following to define the RDD, what we end up
with is a RDD[CleansedLogFormat]:

 val f = new NewHadoopRDD(sc,
  classOf[AvroKeyInputFormat[CleansedLogFormat]],
  classOf[AvroKey[CleansedLogFormat]],
  classOf[NullWritable],
  job.getConfiguration).map(_._1.datum())

f.count()
= 110268763

f.persist(StorageLevel.MEMORY_AND_DISK).count()
= 110268763

So far so good.  Both the persisted and non-persisted RDDs return the same
results for the count.  Where things get weird is when I try and do some
reduce by key or other grouping operations.  Something like:

f.map(record = (record.getProviderId.toString,
record)).join(bandwidthKv).map { pair =
val hour = new
DateTime(pair._2._1.getTimeStamp).toString(MMddHH)
(hour, Set(pair._2._1.getGuid))
  }.reduceByKey(_ ++ _).collect().foreach { a = println(a._1 + :  +
a._2.size)}


We then get different results from the non-persisted vs. the persisted
version.

Non-persisted:
2014050917: 7
2014050918: 42

Persisted:
2014050917: 7
2014050918: 12

Any idea what could account for the differences?  BTW I am using Spark
0.9.1.

Thanks,

Paul 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unexpected-results-when-caching-data-tp5619.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: missing method in my slf4j after excluding Spark ZK log4j

2014-05-12 Thread Paul Brown
Hi, Adrian --

If my memory serves, you need 1.7.7 of the various slf4j modules to avoid
that issue.

Best.
-- Paul

—
p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/


On Mon, May 12, 2014 at 7:51 AM, Adrian Mocanu amoc...@verticalscope.comwrote:

  Hey guys,

 I've asked before, in Spark 0.9 - I now use 0.9.1, about removing log4j
 dependency and was told that it was gone. However I still find it part of
 zookeeper imports. This is fine since I exclude it myself in the sbt file,
 but another issue arises.

 I wonder if anyone else has run into this.



 Spark uses log4j v1.2.17 and slf4j-log4j12:1.7.2

 I use slf4j 1.7.5, logback 1.0.13, and log4joverslf4j v 1.7.5



 I think my slf4j 1.7.5 doesn't agree with what zookeeper expects in its
 log4j v 1.2.17 because I get missing method error:

 java.lang.NoSuchMethodError:
 org.apache.log4j.Logger.setLevel(Lorg/apache/log4j/Level;)V

 at
 org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58)

 at
 org.apache.spark.util.AkkaUtils$$anonfun$createActorSystem$1.apply(AkkaUtils.scala:58)

 at scala.Option.map(Option.scala:145)

 at
 org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:58)

 at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126)

 at
 org.apache.spark.SparkContext.init(SparkContext.scala:139)

 at
 org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:500)

 at
 org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:76)

 ...



 Is there a way to find out what versions of slf4j I need to make it work
 with log4j 1.2.17?



 -Adrian





CDH 5.0 and Spark 0.9.0

2014-04-30 Thread Paul Schooss
Hello,

So I was unable to run the following commands from the spark shell with CDH
5.0 and spark 0.9.0, see below.

Once I removed the property

property
nameio.compression.codec.lzo.class/name
valuecom.hadoop.compression.lzo.LzoCodec/value
finaltrue/final
/property

from the core-site.xml on the node, the spark commands worked. Is there a
specific setup I am missing?

scala var log = sc.textFile(hdfs://jobs-ab-hnn1//input/core-site.xml)
14/04/30 22:43:16 INFO MemoryStore: ensureFreeSpace(78800) called with
curMem=150115, maxMem=308713881
14/04/30 22:43:16 INFO MemoryStore: Block broadcast_1 stored as values to
memory (estimated size 77.0 KB, free 294.2 MB)
14/04/30 22:43:16 WARN Configuration: mapred-default.xml:an attempt to
override final parameter: mapreduce.tasktracker.cache.local.size; Ignoring.
14/04/30 22:43:16 WARN Configuration: yarn-site.xml:an attempt to override
final parameter: mapreduce.output.fileoutputformat.compress.type; Ignoring.
14/04/30 22:43:16 WARN Configuration: hdfs-site.xml:an attempt to override
final parameter: mapreduce.map.output.compress.codec; Ignoring.
log: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at
console:12

scala log.count()
14/04/30 22:43:03 WARN JobConf: The variable mapred.child.ulimit is no
longer used.
14/04/30 22:43:04 WARN Configuration: mapred-default.xml:an attempt to
override final parameter: mapreduce.tasktracker.cache.local.size; Ignoring.
14/04/30 22:43:04 WARN Configuration: yarn-site.xml:an attempt to override
final parameter: mapreduce.output.fileoutputformat.compress.type; Ignoring.
14/04/30 22:43:04 WARN Configuration: hdfs-site.xml:an attempt to override
final parameter: mapreduce.map.output.compress.codec; Ignoring.
java.lang.IllegalArgumentException: java.net.UnknownHostException:
jobs-a-hnn1
at
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:237)
at
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141)
at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:576)
at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:521)
at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:146)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:902)
at org.apache.spark.rdd.RDD.count(RDD.scala:720)
at $iwC$$iwC$$iwC$$iwC.init(console:15)
at $iwC$$iwC$$iwC.init(console:20)
at $iwC$$iwC.init(console:22)
at $iwC.init(console:24)
at init(console:26)
at .init(console:30)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:795)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:840)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:752)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:600)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:607)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:610)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:935)
at

Can't run a simple spark application with 0.9.1

2014-04-15 Thread Paul Schooss
Hello,

Currently I deployed 0.9.1 spark using a new way of starting up spark

exec start-stop-daemon --start --pidfile /var/run/spark.pid
--make-pidfile --chuid ${SPARK_USER}:${SPARK_GROUP} --chdir ${SPARK_HOME}
--exec /usr/bin/java -- -cp ${CLASSPATH}
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.port=10111
-Dspark.akka.logLifecycleEvents=true -Djava.library.path=
-XX:ReservedCodeCacheSize=512M -XX:+UseCodeCacheFlushing
-XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC
-Dspark.executor.memory=10G -Xmx10g ${MAIN_CLASS} ${MAIN_CLASS_ARGS}


where class path points to the spark jar that we compile with sbt. When I
try to run a job I receive the following warning

WARN TaskSchedulerImpl: Initial job has not accepted any resources; check
your cluster UI to ensure that workers are registered and have sufficient
memory


My first question is do I need the entire spark project on disk in order to
run jobs? Or what else am I doing wrong?


Re: Can't run a simple spark application with 0.9.1

2014-04-15 Thread Paul Schooss
I am a dork please disregard this issue. I did not have the slaves
correctly configured. This error is very misleading


On Tue, Apr 15, 2014 at 11:21 AM, Paul Schooss paulmscho...@gmail.comwrote:

 Hello,

 Currently I deployed 0.9.1 spark using a new way of starting up spark

 exec start-stop-daemon --start --pidfile /var/run/spark.pid
 --make-pidfile --chuid ${SPARK_USER}:${SPARK_GROUP} --chdir ${SPARK_HOME}
 --exec /usr/bin/java -- -cp ${CLASSPATH}
 -Dcom.sun.management.jmxremote.authenticate=false
 -Dcom.sun.management.jmxremote.ssl=false
 -Dcom.sun.management.jmxremote.port=10111
 -Dspark.akka.logLifecycleEvents=true -Djava.library.path=
 -XX:ReservedCodeCacheSize=512M -XX:+UseCodeCacheFlushing
 -XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC
 -Dspark.executor.memory=10G -Xmx10g ${MAIN_CLASS} ${MAIN_CLASS_ARGS}


 where class path points to the spark jar that we compile with sbt. When I
 try to run a job I receive the following warning

 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check
 your cluster UI to ensure that workers are registered and have sufficient
 memory


 My first question is do I need the entire spark project on disk in order
 to run jobs? Or what else am I doing wrong?



JMX with Spark

2014-04-15 Thread Paul Schooss
Has anyone got this working? I have enabled the properties for it in the
metrics.conf file and ensure that it is placed under spark's home
directory. Any ideas why I don't see spark beans ?


Shutdown with streaming driver running in cluster broke master web UI permanently

2014-04-11 Thread Paul Mogren
I had a cluster running with a streaming driver deployed into it. I shut down 
the cluster using sbin/stop-all.sh. Upon restarting (and restarting, and 
restarting), the master web UI cannot respond to requests. The cluster seems to 
be otherwise functional. Below is the master's log, showing stack traces.


pmogren@streamproc01:~/streamproc/spark-0.9.1-bin-hadoop2$ cat 
/home/pmogren/streamproc/spark-0.9.1-bin-hadoop2/sbin/../logs/spark-pmogren-org.apache.spark.deploy.master.Master-1-streamproc01.outSpark
 Command: /usr/lib/jvm/java-8-oracle-amd64/bin/java -cp 
:/home/pmogren/streamproc/spark-0.9.1-bin-hadoop2/conf:/home/pmogren/streamproc/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
 -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m 
-Dspark.streaming.unpersist=true -Djava.net.preferIPv4Stack=true 
-Dsun.io.serialization.extendedDebugInfo=true 
-Dspark.deploy.recoveryMode=ZOOKEEPER 
-Dspark.deploy.zookeeper.url=pubsub01:2181 
org.apache.spark.deploy.master.Master --ip 10.10.41.19 --port 7077 --webui-port 
8080


log4j:WARN No appenders could be found for logger 
(akka.event.slf4j.Slf4jLogger).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.
14/04/11 16:07:55 INFO Master: Using Spark's default log4j profile: 
org/apache/spark/log4j-defaults.properties
14/04/11 16:07:55 INFO Master: Starting Spark master at spark://10.10.41.19:7077
14/04/11 16:07:55 INFO MasterWebUI: Started Master web UI at 
http://10.10.41.19:8080
14/04/11 16:07:55 INFO Master: Persisting recovery state to ZooKeeper
14/04/11 16:07:55 INFO ZooKeeper: Client 
environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
14/04/11 16:07:55 INFO ZooKeeper: Client 
environment:host.name=streamproc01.nexus.commercehub.com
14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.version=1.8.0
14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.vendor=Oracle 
Corporation
14/04/11 16:07:55 INFO ZooKeeper: Client 
environment:java.home=/usr/lib/jvm/jdk1.8.0/jre
14/04/11 16:07:55 INFO ZooKeeper: Client 
environment:java.class.path=:/home/pmogren/streamproc/spark-0.9.1-bin-hadoop2/conf:/home/pmogren/streamproc/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.library.path=
14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.io.tmpdir=/tmp
14/04/11 16:07:55 INFO ZooKeeper: Client environment:java.compiler=NA
14/04/11 16:07:55 INFO ZooKeeper: Client environment:os.name=Linux
14/04/11 16:07:55 INFO ZooKeeper: Client environment:os.arch=amd64
14/04/11 16:07:55 INFO ZooKeeper: Client environment:os.version=3.5.0-23-generic
14/04/11 16:07:55 INFO ZooKeeper: Client environment:user.name=pmogren
14/04/11 16:07:55 INFO ZooKeeper: Client environment:user.home=/home/pmogren
14/04/11 16:07:55 INFO ZooKeeper: Client 
environment:user.dir=/home/pmogren/streamproc/spark-0.9.1-bin-hadoop2
14/04/11 16:07:55 INFO ZooKeeper: Initiating client connection, 
connectString=pubsub01:2181 sessionTimeout=3 
watcher=org.apache.spark.deploy.master.SparkZooKeeperSession$ZooKeeperWatcher@744bfbb6
14/04/11 16:07:55 INFO ZooKeeperLeaderElectionAgent: Starting ZooKeeper 
LeaderElection agent
14/04/11 16:07:55 INFO ZooKeeper: Initiating client connection, 
connectString=pubsub01:2181 sessionTimeout=3 
watcher=org.apache.spark.deploy.master.SparkZooKeeperSession$ZooKeeperWatcher@7f7e6043
14/04/11 16:07:55 INFO ClientCnxn: Opening socket connection to server 
pubsub01.nexus.commercehub.com/10.10.40.39:2181. Will not attempt to 
authenticate using SASL (unknown error)
14/04/11 16:07:55 INFO ClientCnxn: Socket connection established to 
pubsub01.nexus.commercehub.com/10.10.40.39:2181, initiating session
14/04/11 16:07:55 INFO ClientCnxn: Opening socket connection to server 
pubsub01.nexus.commercehub.com/10.10.40.39:2181. Will not attempt to 
authenticate using SASL (unknown error)
14/04/11 16:07:55 WARN ClientCnxnSocket: Connected to an old server; r-o mode 
will be unavailable
14/04/11 16:07:55 INFO ClientCnxn: Session establishment complete on server 
pubsub01.nexus.commercehub.com/10.10.40.39:2181, sessionid = 0x14515d9a11300ce, 
negotiated timeout = 3
14/04/11 16:07:55 INFO ClientCnxn: Socket connection established to 
pubsub01.nexus.commercehub.com/10.10.40.39:2181, initiating session
14/04/11 16:07:55 WARN ClientCnxnSocket: Connected to an old server; r-o mode 
will be unavailable
14/04/11 16:07:55 INFO ClientCnxn: Session establishment complete on server 
pubsub01.nexus.commercehub.com/10.10.40.39:2181, sessionid = 0x14515d9a11300cf, 
negotiated timeout = 3
14/04/11 16:07:55 WARN ZooKeeperLeaderElectionAgent: Cleaning up old ZK master 
election file that points to this master.
14/04/11 16:07:55 INFO ZooKeeperLeaderElectionAgent: Leader 

CheckpointRDD has different number of partitions than original RDD

2014-04-07 Thread Paul Mogren
Hello, Spark community!  My name is Paul. I am a Spark newbie, evaluating 
version 0.9.0 without any Hadoop at all, and need some help. I run into the 
following error with the StatefulNetworkWordCount example (and similarly in my 
prototype app, when I use the updateStateByKey operation).  I get this when 
running against my small cluster, but not (so far) against local[2].

61904 [spark-akka.actor.default-dispatcher-2] ERROR 
org.apache.spark.streaming.scheduler.JobScheduler - Error running job streaming 
job 1396905956000 ms.0
org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[310] at take at 
DStream.scala:586(0) has different number of partitions than original RDD 
MapPartitionsRDD[309] at mapPartitions at StateDStream.scala:66(2)
at 
org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:99)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:989)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:855)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:870)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:884)
at org.apache.spark.rdd.RDD.take(RDD.scala:844)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:586)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:585)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:155)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)


Please let me know what other information would be helpful; I didn't find any 
question submission guidelines.

Thanks,
Paul


  1   2   >