Re: Why is spark running multiple stages with the same code line?

2022-04-21 Thread Joe
Hi Sean,
Thanks for replying but my question was about multiple stages running
the same line of code, not about multiple stages in general. Yes single
job can have multiple stages, but they should not be repeated, as far
as I know, if you're caching/persisting your intermediate outputs.

My question is why am I seeing multiple stages running the same line of
code? As I understand it stage is a grouping of operations that can be
executed without shuffling data or invoking a new action and they are
divided into tasks, and tasks are the ones that are executed in
parallel and can have the same line of code running on different
executors. Or is this assumption wrong?
Thanks,

Joe


On Thu, 2022-04-21 at 09:14 -0500, Sean Owen wrote:
> A job can have multiple stages for sure. One action triggers a job.
> This seems normal. 
> 
> On Thu, Apr 21, 2022, 9:10 AM Joe  wrote:
> > Hi,
> > When looking at application UI (in Amazon EMR) I'm seeing one job
> > for
> > my particular line of code, for example:
> > 64 Running count at MySparkJob.scala:540
> > 
> > When I click into the job and go to stages I can see over a 100
> > stages
> > running the same line of code (stages are active, pending or
> > completed):
> > 190 Pending count at MySparkJob.scala:540
> > ...
> > 162 Active count at MySparkJob.scala:540
> > ...
> > 108 Completed count at MySparkJob.scala:540
> > ...
> > 
> > I'm not sure what that means, I thought that stage was a logical
> > operation boundary and you could have only one stage in the job
> > (unless
> > you executed the same dataset+action many times on purpose) and
> > tasks
> > were the ones that were replicated across partitions. But here I'm
> > seeing many stages running, each with the same line of code?
> > 
> > I don't have a situation where my code is re-processing the same
> > set of
> > data many times, all intermediate sets are persisted.
> > I'm not sure if EMR UI display is wrong or if spark stages are not
> > what
> > I thought they were?
> > Thanks,
> > 
> > Joe
> > 
> > 
> > 
> > ---
> > --
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > 



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Why is spark running multiple stages with the same code line?

2022-04-21 Thread Joe
Hi,
When looking at application UI (in Amazon EMR) I'm seeing one job for
my particular line of code, for example:
64 Running count at MySparkJob.scala:540

When I click into the job and go to stages I can see over a 100 stages
running the same line of code (stages are active, pending or
completed):
190 Pending count at MySparkJob.scala:540
...
162 Active count at MySparkJob.scala:540
...
108 Completed count at MySparkJob.scala:540
...

I'm not sure what that means, I thought that stage was a logical
operation boundary and you could have only one stage in the job (unless
you executed the same dataset+action many times on purpose) and tasks
were the ones that were replicated across partitions. But here I'm
seeing many stages running, each with the same line of code?

I don't have a situation where my code is re-processing the same set of
data many times, all intermediate sets are persisted.
I'm not sure if EMR UI display is wrong or if spark stages are not what
I thought they were?
Thanks,

Joe



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Apache Spark Meetup - Wednesday 1st July

2020-06-30 Thread Joe Davies
Good morning,
 I hope this email finds you well.
 I am the host for an on-going series of live webinars/virtual meetups and the 
next 2 weeks are focused on Apache Spark, I was wondering if you could share 
within your group?
 It’s free to sign up and there will be live Q throughout the presentation.
 Here is the link - 
https://www.meetup.com/OrbisConnect/events/271400656/<https://protect-eu.mimecast.com/s/SejaCv8kPtA0pDfQdkZv?domain=meetup.com/>
 Thanks,


Joe Davies
Senior Consultant
[cid:image001.jpg@01D36AC0.7B5BDF80]

Direct: +44 (0)203 854 0015
Mobile: +44 (0)7391 650 347
2 Leman Street, We Work, Aldgate Tower, London E1 8FA
www.orbisconsultants.com<http://www.orbisconsultants.com/>

Orbis Consultants Limited · Company Reg No. 09749682· Registered Office: 82 St 
John’s Street, London, EC1M 4JN
[]Save a tree - we only print the emails we really need.




subscribe

2020-02-05 Thread Cool Joe
subscribe


Low-level behavior of Exchange

2019-10-30 Thread Joe Naegele
Hello,

I’m benchmarking our hardware and Spark deployment by repartitioning large 
datasets. We recently corrected a misconfigured aggregate network link (bonded) 
that was causing fatal network timeouts in long-running jobs. Now that it’s 
fixed, we still observe less-than-desirable performance while simply 
repartitioning a randomly-generated dataset.

My primary question is: What is the low-level behavior of the “Exchange 
hashpartitioning(…)” operation, with respect to moving data to and from 
disk/hosts?

I used AWS EMR to set up a similar Spark cluster and run the exact same job, 
under nearly identical conditions. Our cluster performs the first stage nearly 
2x faster than on the AWS cluster, but takes over 2x as long to complete the 
second stage (Explicit partitioning appears to always require two stages). I 
want to better understand the Exchange operation in order to describe this 
performance discrepancy, and hopefully find a correlation to our cluster’s 
resource limitations (e.g. disk I/O or IOPS capability).

I’ve already benchmarked our system resources and compared to AWS, so I can 
make some assumptions. I have investigated the Exchange (and some related) 
source code but it’s not clear to me what actually occurs with respect to I/O. 
It seems to me that the first stage is basically a scan, and is very fast 
because it’s only really limited by sequential disk I/O speed. The second stage 
does not appear to stress any resource on the cluster, but yet can take 10x as 
long to complete as the first stage… Finally, the only hint that something 
might be “wrong” is a proportionally high “Shuffle Read Blocked Time” for each 
task during the second stage (90% of task duration).

If I’m not mistaken, my assessment thus far can be applied to shuffles in 
general, since they often require repartitioning.

Current configuration, although the AWS EMR comparison used a significantly 
reduced set of Executors:


  *   5 hosts, 5x 10 TB disks each
  *   54 executors, 5 vcores and 23 GB each
  *   50+ billion records of form Record(a: Long, b: Long, c: Long, d: Long), 
where ‘a’-‘d’ are randomly-generated values. 1.5+ TB total size, Parquet format
  *   The job is as simple as `spark.read.parquet(“input.dat”).repartition(N, 
“a”, “b”, “c”, “d”).write.parquet(“output.dat”), where N is roughly 
(input_data_size / 128 MB).

Thanks!
---
Joe Naegele
Grier Forensics
410.220.0968



Re: Spark structured streaming leftOuter join not working as I expect

2019-06-10 Thread Joe Ammann
Hi all

it took me some time to get the issues extracted into a piece of standalone 
code. I created the following gist

https://gist.github.com/jammann/b58bfbe0f4374b89ecea63c1e32c8f17

I has messages for 4 topics A/B/C/D and a simple Python program which shows 6 
use cases, with my expectations and observations with Spark 2.4.3

It would be great if you could have a look and check if I'm doing something 
wrong, or this is indeed a limitation of Spark?

On 6/5/19 5:35 PM, Jungtaek Lim wrote:
> Nice to hear you're investigating the issue deeply.
> 
> Btw, if attaching code is not easy, maybe you could share logical/physical 
> plan on any batch: "detail" in SQL tab would show up the plan as string. 
> Plans from sequential batches would be much helpful - and streaming query 
> status in these batch (especially watermark) should be helpful too.
> 


-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark structured streaming leftOuter join not working as I expect

2019-06-05 Thread Joe Ammann
Hi Jungtaek

Thanks for your response!

I actually have set watermarks on all the streams A/B/C with the respective 
event time
column A/B/C_LAST_MOD. So I think this should not be the reason.

Of course, the event time on the C stream (the "optional one") progresses much 
slower
than on the other 2. I try to adjust for this by setting 

   spark.sql.streaming.multipleWatermarkPolicy=max

and judging from the microbatch results, this also works. The global watermark 
seems
to progress as expected with the event time from A/B stream.

I will try to put together an isolated test case to reproduce the issue, that 
whole code
is embedded in a larger app and hence not easily to rip out.

I did some more testing, and for now these are my observations
 - inner join followed by aggregation works as expected
 - inner join with 1 left outer (and no aggregation) works as expected
 - inner join with 2 left outer only produces results where both outer have a 
match
 - inner join with 1 left outer followed by aggregation only produces the 
messages with a match 

Of course, all are stream-stream joins

CU, Joe
 
On Wednesday, June 5, 2019 09:17 CEST, Jungtaek Lim  wrote: 
> I would suspect that rows are never evicted in state in second join. To
> determine whether the row is NOT matched to other side, Spark should check
> whether the row is ever matched before evicted. You need to set watermark
> either B_LAST_MOD or C_LAST_MOD.
> 
> If you already did but not exposed to here, please paste all codes
> (assuming you've already redacted) to gist or attach zipped file for
> project.
> 
> Btw, there's known "correctness" issue on streaming-streaming left/right
> outer join. Please refer SPARK-26154 [1] for details. That's not a same
> case, but should be good to know once you're dealing with
> streaming-streaming join.
> 
> Thanks,
> Jungtaek Lim (HeartSaVioR)
> 
> 1. https://issues.apache.org/jira/browse/SPARK-26154
> 
> On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann  wrote:
> 
> > Hi all
> >
> > sorry, tl;dr
> >
> > I'm on my first Python Spark structured streaming app, in the end joining
> > messages from ~10 different Kafka topics. I've recently upgraded to Spark
> > 2.4.3, which has resolved all my issues with the time handling (watermarks,
> > join windows) I had before with Spark 2.3.2.
> >
> > My current problem happens during a leftOuter join, where messages from 3
> > topics are joined, the results are then aggregated with a groupBy and
> > finally put onto a result Kafka topic. On the 3 input topics involved, all
> > messages have ID and LAST_MOD fields. I use the ID for joining, and the
> > LAST_MOD as event timestamp on all incoming streams. Since the fields on
> > the incoming messages are all named the same (ID and LAST_MOD), I rename
> > them on all incoming streams with
> >
> >  aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as
> > A_LAST_MOD").drop(*["ID", "LAST_MOD"])
> >
> > For those data frames, I then take the watermark with the A/B/C_LAST_MOD
> > as event time, before joining. I know that the LAST_MOD timestamps are
> > equal on the messages that I want to join together.
> >
> > The first join is an inner join, where a field on stream A links with the
> > ID of stream B. So I have
> >
> >  aDf
> > .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
> > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> > .agg(
> > collect_list(struct("*")).alias("RESULTS"),
> > count("A_ID").alias("NUM_RESULTS"),
> > # just add a timestamp to watermark on, they are all the
> > min("A_LAST_MOD").alias("RESULT_LAST_MOD")
> > )
> > .withWatermark("RESULT_LAST_MOD", "30 seconds")
> > )
> >
> > This works perfectly and generates (on my current data set) some 10'000
> > records. This is the expected result.
> >
> > When I add the leftOuter join of the third topic as follows
> >
> >  aDf
> > .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
> > # here the additional left join
> > -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD",
> > "leftOuter)) # C_FK is the field in stream B
> > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> > .agg(
> > collect_list(struct("*")).alias("RESULT

Spark structured streaming leftOuter join not working as I expect

2019-06-04 Thread Joe Ammann
Hi all

sorry, tl;dr

I'm on my first Python Spark structured streaming app, in the end joining 
messages from ~10 different Kafka topics. I've recently upgraded to Spark 
2.4.3, which has resolved all my issues with the time handling (watermarks, 
join windows) I had before with Spark 2.3.2.

My current problem happens during a leftOuter join, where messages from 3 
topics are joined, the results are then aggregated with a groupBy and finally 
put onto a result Kafka topic. On the 3 input topics involved, all messages 
have ID and LAST_MOD fields. I use the ID for joining, and the LAST_MOD as 
event timestamp on all incoming streams. Since the fields on the incoming 
messages are all named the same (ID and LAST_MOD), I rename them on all 
incoming streams with

 aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as 
A_LAST_MOD").drop(*["ID", "LAST_MOD"])

For those data frames, I then take the watermark with the A/B/C_LAST_MOD as 
event time, before joining. I know that the LAST_MOD timestamps are equal on 
the messages that I want to join together.

The first join is an inner join, where a field on stream A links with the ID of 
stream B. So I have

 aDf
.join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
.groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
.agg(
collect_list(struct("*")).alias("RESULTS"),
count("A_ID").alias("NUM_RESULTS"),
# just add a timestamp to watermark on, they are all the
min("A_LAST_MOD").alias("RESULT_LAST_MOD")
)
.withWatermark("RESULT_LAST_MOD", "30 seconds")
)

This works perfectly and generates (on my current data set) some 10'000 
records. This is the expected result.

When I add the leftOuter join of the third topic as follows

 aDf
.join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
# here the additional left join
-join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD", "leftOuter)) 
# C_FK is the field in stream B
.groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
.agg(
collect_list(struct("*")).alias("RESULTS"),
count("A_ID").alias("NUM_RESULTS"),
# just add a timestamp to watermark on, they are all the
min("A_LAST_MOD").alias("RESULT_LAST_MOD")
)
.withWatermark("RESULT_LAST_MOD", "30 seconds")
)

then what I would expect is that I get the same number of output records 
(~10'000), and some of them have the additional fields from the C stream.

But what happens is that my output is reduced to ~1'500 records, exactly those 
which have a successful join on records on topic C. The other are not shown on 
the output.

I already tried

   * make sure that the optional FK on topic B is never null, by using an 
NVL2(C_FK, C_FK, '')
   * widen the time window join on the leftOuter to "B_LAST_MOD < 
C_LAST_LAST_MOD - interval 5 seconds ..."
   * use various combinations of joinWindows and watermarkLateThreshold

The result is always the same: I'm "losing" the ~8'500 records for which the 
optional join FK is NULL on topic B.

Did I totally misunderstand the concept of stream-stream left outer join? Or 
what could be wrong

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Watermark handling on initial query start (Structured Streaming)

2019-05-20 Thread Joe Ammann
Hi all

I'm currently developing a Spark structured streaming application which 
joins/aggregates messages from ~7 Kafka topics and produces messages onto 
another Kafka topic.

Quite often in my development cycle, I want to "reprocess from scratch": I stop 
the program, delete the target topic and associated checkpoint information, and 
restart the application with the query.

My assumption would be that the newly started query then processes all messages 
that are on the input topics, sets the watermark according to the freshest 
messages on the topic and produces the output messages which have moved past 
the watermark and can thus be safely produced. As an example, if the freshest 
message on the topic has an event time of "2019-05-20 10:13" I restart the 
query at "2019-05-20 11:30" and I have a watermark duration of 10 minutes, I 
would expect the query to have a eventTime watermark of "2019-05-20 10:03" and 
all earlier results are produced.

But my observations indicate that after initial query startup and reading all 
input topics, the watermark stays at Unix epoch (1970-01-01) and no messages 
are produced. Only once a new message comes in, after the start of the query, 
then the watermark is moved ahead and all the messages are produced.

Is this the expected behaviour, and my assumption is wrong? Am I doing 
something wrong during query setup?

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Handling of watermark in structured streaming

2019-05-14 Thread Joe Ammann
Hi Suket, Anastasios

Many thanks for your time and your suggestions!

I tried again with various settings for the watermarks and the trigger time

- watermark 20sec, trigger 2sec
- watermark 10sec, trigger 1sec
- watermark 20sec, trigger 0sec

I also tried continuous processing mode, but since I want to do aggregations, 
this did not work at all.

With all the combinations above, my observations (using 'append' output mode) 
are the same: the latest group/aggregation of events is not output/published to 
the target Kafka topic, until another event arrives with a later event 
timestamp that moves the watermark ahead far enough, so that this waiting group 
of events can safely be published. Neither the processing time (wall clock 
time) nor the trigger time play any role in that decision. Only a new event can 
move the watermark ahead, and cause the publishing/output. As long as no new 
events arrive, new mini-batches will be triggered very frequently, but will not 
produce new results.

In the meantime, I read a lot about the semantics of such event time handling 
in various streaming systems. And I think Spark's behaviour that I'm observing 
makes actually sense and is fully in line with the documentation. It just does 
not match my naive intuition.

Using 'update' mode instead of 'append' solves this and aggregates are 
immediately published (may be amended later). But 'update' mode is not very 
useful for my application, because I need to join these aggregates with other 
streams. Using 'update' would force me to persist those intermediate 
aggregation results. But I'm getting the impression this is what I will have to 
do.

On 5/14/19 6:49 PM, Suket Arora wrote:
>   df = inputStream.withWatermark("eventtime", "20 
> seconds").groupBy("sharedId", window("20 seconds", "10 seconds")
> 
> // ProcessingTime trigger with two-seconds micro-batch interval
> 
> |df.writeStream .format("console") .trigger(Trigger.ProcessingTime("2 
> seconds")) .start()|
> 
> 
> On Tue, 14 May 2019 at 20:40, Joe Ammann mailto:j...@pyx.ch>> 
> wrote:
> 
> Hi Anastasios
> 
> On 5/14/19 4:15 PM, Anastasios Zouzias wrote:
> > Hi Joe,
> >
> > How often do you trigger your mini-batch? Maybe you can specify the 
> trigger time explicitly to a low value or even better set it off.
> >
> > See: 
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
> 
> I tried different values for the trigger, and settled on 10 seconds. I 
> can see in the logs that this actually works (it outputs a mini-batch summary 
> in the log every 10 seconds).
> 
> There in these log entries I also see that the watermark does not 
> progress, if no new data is coming in. This is how I came to my suspsicion on 
> how it works internally.
> 
> I understand that it is quite uncommon to have such "slowly moving 
> topics", but unfortunately in my use case I have them.
> 
> > On Tue, May 14, 2019 at 3:49 PM Joe Ammann  <mailto:j...@pyx.ch> <mailto:j...@pyx.ch <mailto:j...@pyx.ch>>> wrote:
> >
> >     Hi all
> >
> >     I'm fairly new to Spark structured streaming and I'm only starting 
> to develop an understanding for the watermark handling.
> >
> >     Our application reads data from a Kafka input topic and as one of 
> the first steps, it has to group incoming messages. Those messages come in 
> bulks, e.g. 5 messages which belong to the same "business event" (share a 
> common key), with event timestamps differing in only a few millisecs. And 
> then no messages for say 3 minutes. And after that another bulk of 3 messages 
> with very close event timestamps.
> >
> >     I have set a watermark of 20 seconds on my streaming query, and a 
> groupBy on the shared common key, and a window of 20 seconds (10 seconds 
> sliding). So something like
> >
> >         df = inputStream.withWatermark("eventtime", "20 
> seconds").groupBy("sharedId", window("20 seconds", "10 seconds")
> >
> >     The output mode is set to append, since I intend to join this 
> streams with other streams later in the application.
> >
> >     Naively, I would have expected to see any incoming bulk of messages 
> as an aggregated message ~20 seconds after it's eventtime on the output 
> stream. But my observations indicate that the "latest bulk of events" always 
> stays queued inside the query, until a new bulk of events arrive and bump up 
> the watermark. In my example above, this means that I see the first bulk of 
> events only after 3 mi

Re: Handling of watermark in structured streaming

2019-05-14 Thread Joe Ammann
Hi Anastasios

On 5/14/19 4:15 PM, Anastasios Zouzias wrote:
> Hi Joe,
> 
> How often do you trigger your mini-batch? Maybe you can specify the trigger 
> time explicitly to a low value or even better set it off.
> 
> See: 
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

I tried different values for the trigger, and settled on 10 seconds. I can see 
in the logs that this actually works (it outputs a mini-batch summary in the 
log every 10 seconds).

There in these log entries I also see that the watermark does not progress, if 
no new data is coming in. This is how I came to my suspsicion on how it works 
internally.

I understand that it is quite uncommon to have such "slowly moving topics", but 
unfortunately in my use case I have them.

> On Tue, May 14, 2019 at 3:49 PM Joe Ammann mailto:j...@pyx.ch>> 
> wrote:
> 
> Hi all
> 
> I'm fairly new to Spark structured streaming and I'm only starting to 
> develop an understanding for the watermark handling.
> 
> Our application reads data from a Kafka input topic and as one of the 
> first steps, it has to group incoming messages. Those messages come in bulks, 
> e.g. 5 messages which belong to the same "business event" (share a common 
> key), with event timestamps differing in only a few millisecs. And then no 
> messages for say 3 minutes. And after that another bulk of 3 messages with 
> very close event timestamps.
> 
> I have set a watermark of 20 seconds on my streaming query, and a groupBy 
> on the shared common key, and a window of 20 seconds (10 seconds sliding). So 
> something like
> 
>     df = inputStream.withWatermark("eventtime", "20 
> seconds").groupBy("sharedId", window("20 seconds", "10 seconds")
> 
> The output mode is set to append, since I intend to join this streams 
> with other streams later in the application.
> 
> Naively, I would have expected to see any incoming bulk of messages as an 
> aggregated message ~20 seconds after it's eventtime on the output stream. But 
> my observations indicate that the "latest bulk of events" always stays queued 
> inside the query, until a new bulk of events arrive and bump up the 
> watermark. In my example above, this means that I see the first bulk of 
> events only after 3 minutes, when the second bulk comes in.
> 
> This does indeed make some sense, and if I understand the documentation 
> correctly the watermark is only ever updated upon arrival of new inputs. The 
> "real time" does not play a role in the setting of watermarks.
> 
> But to me this means that any bulk of events is prohibited from being 
> sent downstreams until a new bulk comes in. This is not what I intended.
> 
> Is my understanding more or less correct? And is there any way of 
> bringing "the real time" into the calculation of the watermark (short of 
> producing regular dummy messages which are then again filtered out).
> -- 
> -- Anastasios Zouzias
> <mailto:a...@zurich.ibm.com>


-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Handling of watermark in structured streaming

2019-05-14 Thread Joe Ammann
Hi Suket

Sorry, this was a typo in the pseudo-code I sent. Of course that what you 
suggested (using the same eventtime attribute for the watermark and the window) 
is what my code does in reality. Sorry, to confuse people.

On 5/14/19 4:14 PM, suket arora wrote:
> Hi Joe,
> As per the spark structured streaming documentation and I quote
> |"withWatermark| must be called on the same column as the timestamp column 
> used in the aggregate. For example, |df.withWatermark("time", "1 
> min").groupBy("time2").count()| is invalid in Append output mode, as 
> watermark is defined on a different column from the aggregation column."
> 
> *And after referring  the following code *
>          // Group the data by window and word and compute the count of each 
> group
> 
> |val windowedCounts = words .withWatermark("timestamp", "10 minutes") 
> .groupBy( window($"timestamp", "10 minutes", "5 minutes"), $"word") .count()|
> 
> 
> I would suggest you to try following code
> 
>  df = inputStream.withWatermark("eventtime", "20 
> seconds").groupBy($"sharedId", window($"eventtime","20 seconds", "10 
> seconds"))
> 
> And If this doesn't work, you can try trigger on query.

Can you maybe explain what you mean by "try trigger on query" - I don't 
understand that.

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Handling of watermark in structured streaming

2019-05-14 Thread Joe Ammann
Hi all

I'm fairly new to Spark structured streaming and I'm only starting to develop 
an understanding for the watermark handling.

Our application reads data from a Kafka input topic and as one of the first 
steps, it has to group incoming messages. Those messages come in bulks, e.g. 5 
messages which belong to the same "business event" (share a common key), with 
event timestamps differing in only a few millisecs. And then no messages for 
say 3 minutes. And after that another bulk of 3 messages with very close event 
timestamps.

I have set a watermark of 20 seconds on my streaming query, and a groupBy on 
the shared common key, and a window of 20 seconds (10 seconds sliding). So 
something like

df = inputStream.withWatermark("eventtime", "20 
seconds").groupBy("sharedId", window("20 seconds", "10 seconds")

The output mode is set to append, since I intend to join this streams with 
other streams later in the application.

Naively, I would have expected to see any incoming bulk of messages as an 
aggregated message ~20 seconds after it's eventtime on the output stream. But 
my observations indicate that the "latest bulk of events" always stays queued 
inside the query, until a new bulk of events arrive and bump up the watermark. 
In my example above, this means that I see the first bulk of events only after 
3 minutes, when the second bulk comes in.

This does indeed make some sense, and if I understand the documentation 
correctly the watermark is only ever updated upon arrival of new inputs. The 
"real time" does not play a role in the setting of watermarks.

But to me this means that any bulk of events is prohibited from being sent 
downstreams until a new bulk comes in. This is not what I intended.

Is my understanding more or less correct? And is there any way of bringing "the 
real time" into the calculation of the watermark (short of producing regular 
dummy messages which are then again filtered out).

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark structured streaming watermarks on nested attributes

2019-05-07 Thread Joe Ammann
Hi Yuanjian

On 5/7/19 4:55 AM, Yuanjian Li wrote:
> Hi Joe
> 
> I think you met this issue: https://issues.apache.org/jira/browse/SPARK-27340
> You can check the description in Jira and PR. We also met this in our 
> production env and fixed by the providing PR.
> 
> The PR is still in review. cc Langchang Zhu(zhuliangch...@baidu.com 
> <mailto:zhuliangch...@baidu.com>), who's the author for the fix.

Yes, this very much looks like the issue I'm having. As an exercise for me 
(never built Spark locally) I will try to build your branch and see if it fixes 
my issue

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark structured streaming watermarks on nested attributes

2019-05-06 Thread Joe Ammann
On 5/6/19 6:23 PM, Pat Ferrel wrote:
> Streams have no end until watermarked or closed. Joins need bounded datasets, 
> et voila. Something tells me you should consider the streaming nature of your 
> data and whether your joins need to use increments/snippets of infinite 
> streams or to re-join the entire contents of the streams accumulated at 
> checkpoints.

I certainly don't question the need for watermarks. 

What I was wondering is that when I use fields called 
"entityX_LAST_MODIFICATION" for watermarks/conditions, my joins work as 
expected.

Whereas when I nest the attributes and use "entityX.LAST_MODIFICATION" (notice 
the dot for the nesting) the joins fail.

I have a feeling that the Spark execution plan get's somewhat confused, because 
in the latter case, there are multiple fields called "LAST_MODIFICATION" with 
differing nesting prefixes.

> From: Joe Ammann  <mailto:j...@pyx.ch>
> Reply: Joe Ammann  <mailto:j...@pyx.ch>
> Date: May 6, 2019 at 6:45:13 AM
> To: user@spark.apache.org <mailto:user@spark.apache.org> 
>  <mailto:user@spark.apache.org>
> Subject: Spark structured streaming watermarks on nested attributes
> 
>> Hi all
>>
>> I'm pretty new to Spark and implementing my first non-trivial structured 
>> streaming job with outer joins. My environment is a Hortonworks HDP 3.1 
>> cluster with Spark 2.3.2, working with Python.
>>
>> I understood that I need to provide watermarks and join conditions for left 
>> outer joins to work. All my incoming Kafka streams have an attribute 
>> "LAST_MODIFICATION" which is well suited to indicate the event time, so I 
>> chose that for watermarking. Since I'm joining from multiple topics where 
>> the incoming messages have common attributes, I though I'd prefix/nest all 
>> incoming messages. Something like
>>
>> entity1DF.select(struct("*").alias("entity1")).withWatermark("entity1.LAST_MODIFICATION")
>> entity2DF.select(struct("*").alias("entity2")).withWatermark("entity2.LAST_MODIFICATION")
>>
>> Now when I try to join such 2 streams, it would fail and tell me that I need 
>> to use watermarks
>>
>> When I leave the watermarking attribute "at the top level", everything works 
>> as expected, e.g.
>>
>> entity1DF.select(struct("*").alias("entity1"), 
>> col("LAST_MODIFICATION").alias("entity1_LAST_MODIFICATION")).withWatermark("entity1_LAST_MODIFICATION")
>>
>> Before I hunt this down any further, is this kind of a known limitation? Or 
>> am I doing something fundamentally wrong?


-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark structured streaming watermarks on nested attributes

2019-05-06 Thread Joe Ammann
Hi all

I'm pretty new to Spark and implementing my first non-trivial structured 
streaming job with outer joins. My environment is a Hortonworks HDP 3.1 cluster 
with Spark 2.3.2, working with Python.

I understood that I need to provide watermarks and join conditions for left 
outer joins to work. All my incoming Kafka streams have an attribute 
"LAST_MODIFICATION" which is well suited to indicate the event time, so I chose 
that for watermarking. Since I'm joining from multiple topics where the 
incoming messages have common attributes, I though I'd prefix/nest all incoming 
messages. Something like


entity1DF.select(struct("*").alias("entity1")).withWatermark("entity1.LAST_MODIFICATION")

entity2DF.select(struct("*").alias("entity2")).withWatermark("entity2.LAST_MODIFICATION")

Now when I try to join such 2 streams, it would fail and tell me that I need to 
use watermarks

When I leave the watermarking attribute "at the top level", everything works as 
expected, e.g.

entity1DF.select(struct("*").alias("entity1"), 
col("LAST_MODIFICATION").alias("entity1_LAST_MODIFICATION")).withWatermark("entity1_LAST_MODIFICATION")

Before I hunt this down any further, is this kind of a known limitation? Or am 
I doing something fundamentally wrong?

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



question about barrier execution mode in Spark 2.4.0

2018-11-12 Thread Joe

Hello,
I was reading Spark 2.4.0 release docs and I'd like to find out more 
about barrier execution mode.
In particular I'd like to know what happens when number of partitions 
exceeds number of nodes (which I think is allowed, Spark tuning doc 
mentions that)?
Does Spark guarantee that all tasks process all partitions 
simultaneously? If not then how does barrier mode handle partitions that 
are waiting to be processed?
If there are partitions waiting to be processed then I don't think it's 
possible to send all data from given stage to a DL process, even when 
using barrier mode?

Thanks a lot,

Joe


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: writing to local files on a worker

2018-11-11 Thread Joe

Hello,
You could try using mapPartitions function if you can send partial data 
to your C++ program:


mapPartitions(func):
Similar to map, but runs separately on each partition (block) of the 
RDD, so /func/ must be of type Iterator => Iterator when running 
on an RDD of type T.


That way you can write partition data to temp file, call your C++ app, 
then delete the temp file. Of course your data would be limited to all 
rows in one partition.


Also the latest release of Spark (2.4.0) introduced barrier execution mode:
https://issues.apache.org/jira/browse/SPARK-24374

Maybe you could combine the two, just using mapPartitions will give you 
single partition data only, and your app call will be repeated on all 
nodes, not necessarily at the same time.


Spark's strong point is parallel execution, so what you're trying to do 
kind of defeats that.
But if you do not need to combine all the data before calling your app 
then you could do it.

Or you could split your job into Spark -> app -> Spark chain.
Good luck,

Joe



On 11/11/2018 02:13 PM, Steve Lewis wrote:
I have a problem where a critical step needs to be performed by  a 
third party c++ application. I can send or install this program on the 
worker nodes. I can construct  a function holding all the data this 
program needs to process. The problem is that the program is designed 
to read and write from the local file system. I can call the program 
from Java and read its output as  a  local file - then deleting all 
temporary files but I doubt that it is possible to get the program to 
read from hdfs or any shared file system.
My question is can a function running on a worker node create 
temporary files and pass the names of these to a local process 
assuming everything is cleaned up after the call?


--
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How does shuffle operation work in Spark?

2018-11-07 Thread Joe

Hello,
I'm looking for a detailed description of the shuffle operation in 
Spark, something that would explain what are the criteria for assigning 
blocks to nodes, how many go where, what happens when there are memory 
constraints, etc.
If anyone knows of such a document I'd appreciate a link (or a detailed 
answer).

Thanks a lot,

Joe


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark SQL bucket pruning support

2018-01-22 Thread Joe Wang
Hi,

I'm wondering if the current version of Spark still supports bucket
pruning? I see the pull request <https://github.com/apache/spark/pull/10942>
that incorporated the change, but the logic to actually skip reading
buckets has since been removed as part of other PRs
<https://github.com/apache/spark/pull/12300>, and the logic in the
BucketedReadSuite to verify that pruned buckets are empty is currently
commented
out
<https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L114>
.

Thanks,
Joe


share datasets across multiple spark-streaming applications for lookup

2017-10-30 Thread roshan joe
Hi,

What is the recommended way to share datasets across multiple
spark-streaming applications, so that the incoming data can be looked up
against this shared dataset?

The shared dataset is also incrementally refreshed and stored on S3. Below
is the scenario.

Streaming App-1 consumes data from Source-1 and writes to DS-1 in S3.
Streaming App-2 consumes data from Source-2 and writes to DS-2 in S3.


Streaming App-3 consumes data from Source-3, *needs to lookup against DS-1
and DS-2* and write to DS-3 in S3.
Streaming App-4 consumes data from Source-4, *needs to lookup against DS-1
and DS-2 *and write to DS-3 in S3.
Streaming App-n consumes data from Source-n, *needs to lookup against DS-1
and DS-2 *and write to DS-n in S3.

So DS-1 and DS-2 ideally should be shared for lookup across multiple
streaming apps. Any input is appreciated. Thank you!


Re: spark-submit question

2017-02-28 Thread Joe Olson
> Everything after the jar path is passed to the main class as parameters.

I don't think that is accurate if your application arguments contain double 
dashes. I've tried with several permutations of with and without '\'s and 
newlines.

Just thought I'd ask here before I have to re-configure and re-compile all my 
jars.

./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar
  --num-decimals=1000
  --second-argument=Arg2

{
  "action" : "CreateSubmissionResponse",
  "serverSparkVersion" : "2.1.0",
  "submissionId" : "driver-20170228155848-0016",
  "success" : true
}
./test3.sh: line 15: --num-decimals=1000: command not found
./test3.sh: line 16: --second-argument=Arg2: command not found




From: Marcelo Vanzin <van...@cloudera.com>
Sent: Tuesday, February 28, 2017 12:17:49 PM
To: Joe Olson
Cc: user@spark.apache.org
Subject: Re: spark-submit question

Everything after the jar path is passed to the main class as
parameters. So if it's not working you're probably doing something
wrong in your code (that you haven't posted).

On Tue, Feb 28, 2017 at 7:05 AM, Joe Olson <jo4...@outlook.com> wrote:
> For spark-submit, I know I can submit application level command line
> parameters to my .jar.
>
>
> However, can I prefix them with switches? My command line params are
> processed in my applications using JCommander. I've tried several variations
> of the below with no success.
>
>
> An example of what I am trying to do is below in the --num-decimals
> argument.
>
>
> ./bin/spark-submit \
>   --class org.apache.spark.examples.SparkPi \
>   --master spark://207.184.161.138:7077 \
>   --deploy-mode cluster \
>   --supervise \
>   --executor-memory 20G \
>   --total-executor-cores 100 \
>   /path/to/examples.jar \
>   --num-decimals=1000 \
>   --second-argument=Arg2
>
>



--
Marcelo


spark-submit question

2017-02-28 Thread Joe Olson
For spark-submit, I know I can submit application level command line parameters 
to my .jar.


However, can I prefix them with switches? My command line params are processed 
in my applications using JCommander. I've tried several variations of the below 
with no success.


An example of what I am trying to do is below in the --num-decimals argument.


./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --deploy-mode cluster \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  --num-decimals=1000 \
  --second-argument=Arg2



Re: Fatal error when using broadcast variables and checkpointing in Spark Streaming

2016-07-22 Thread Joe Panciera
I realized that there's an error in the code. Corrected:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream



sc = SparkContext(appName="FileAutomation")

# Create streaming context from existing spark context
ssc = StreamingContext(sc, 10)
alert_stream = KinesisUtils.createStream(ssc,
 "Events",  # App Name
 "Event_Test",  # Stream Name

"https://kinesis.us-west-2.amazonaws.com;,
 "us-west-2",
 InitialPositionInStream.LATEST,
 1
 )

events = sc.broadcast(25)


def test(rdd):

global events
num = events.value
print num

events.unpersist()
events = sc.broadcast(num + 1)


alert_stream.foreachRDD(test)

# Comment this line and no error occurs
ssc.checkpoint('dir')
ssc.start()
ssc.awaitTermination()


On Fri, Jul 22, 2016 at 1:50 PM, Joe Panciera <joe.panci...@gmail.com>
wrote:

> Hi,
>
> I'm attempting to use broadcast variables to update stateful values used
> across the cluster for processing. Essentially, I have a function that is
> executed in .foreachRDD that updates the broadcast variable by calling
> unpersist() and then rebroadcasting. This works without issues when I
> execute the code without checkpointing, but as soon as I include
> checkpointing it seems to be unable to pickle the function. I get this
> error:
>
> *It appears that you are attempting to reference SparkContext from a
> broadcast *
>
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line
> 268, in __getnewargs__
> "It appears that you are attempting to reference SparkContext from a
> broadcast "
> Exception: It appears that you are attempting to reference SparkContext
> from a broadcast variable, action, or transformation. SparkContext can only
> be used on the driver, not in code that it run on workers. For more
> information, see SPARK-5063.
>
> at
> org.apache.spark.streaming.api.python.PythonTransformFunctionSerializer$.serialize(PythonDStream.scala:144)
> at
> org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:101)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
> ... 61 more
>
>
> Here's some simple code that shows this occurring:
>
> from pyspark import SparkContext
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
>
>
>
> sc = SparkContext(appName="FileAutomation")
>
> # Create streaming context from existing spark context
> ssc = StreamingContext(sc, 10)
> alert_stream = KinesisUtils.createStream(ssc,
>  "Events",  # App Name
>  "Event_Test",  # Stream Name
>  
> "https://kinesis.us-west-2.amazonaws.com;,
>  "us-west-2",
>  InitialPositionInStream.LATEST,
>  1
>  )
>
> events = sc.broadcast(25)
>
>
> def test(rdd):
>
> global events
> num = events.value
> print num
>
> events.unpersist()
> events = sc.broadcast(num + 1)
>
>
> events.foreachRDD(test)
>
> # Comment this line and no error occurs
> ssc.checkpoint('dir')
> ssc.start()
> ssc.awaitTermination()
>
>


Fatal error when using broadcast variables and checkpointing in Spark Streaming

2016-07-22 Thread Joe Panciera
Hi,

I'm attempting to use broadcast variables to update stateful values used
across the cluster for processing. Essentially, I have a function that is
executed in .foreachRDD that updates the broadcast variable by calling
unpersist() and then rebroadcasting. This works without issues when I
execute the code without checkpointing, but as soon as I include
checkpointing it seems to be unable to pickle the function. I get this
error:

*It appears that you are attempting to reference SparkContext from a
broadcast *

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line
268, in __getnewargs__
"It appears that you are attempting to reference SparkContext from a
broadcast "
Exception: It appears that you are attempting to reference SparkContext
from a broadcast variable, action, or transformation. SparkContext can only
be used on the driver, not in code that it run on workers. For more
information, see SPARK-5063.

at
org.apache.spark.streaming.api.python.PythonTransformFunctionSerializer$.serialize(PythonDStream.scala:144)
at
org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:101)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
... 61 more


Here's some simple code that shows this occurring:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream



sc = SparkContext(appName="FileAutomation")

# Create streaming context from existing spark context
ssc = StreamingContext(sc, 10)
alert_stream = KinesisUtils.createStream(ssc,
 "Events",  # App Name
 "Event_Test",  # Stream Name

"https://kinesis.us-west-2.amazonaws.com;,
 "us-west-2",
 InitialPositionInStream.LATEST,
 1
 )

events = sc.broadcast(25)


def test(rdd):

global events
num = events.value
print num

events.unpersist()
events = sc.broadcast(num + 1)


events.foreachRDD(test)

# Comment this line and no error occurs
ssc.checkpoint('dir')
ssc.start()
ssc.awaitTermination()


Using multiple data sources in one stream

2016-07-20 Thread Joe Panciera
Hi,

I have a rather complicated situation thats raised an issue regarding
consuming multiple data sources for processing. Unlike the use cases I've
found, I have 3 sources of different formats. There's one 'main' stream A
that does the processing, and 2 sources B and C that provide elements
required for processing. All 3 sources come from Kinesis streams and are
updated in real time, so using a broadcast variable or passing a file to
each node won't work. Also, each data source is different so I won't be
able to join the streams.

Source A is an event stream, source B is used for processing events, and
source C us used for filtering events. State needs to be maintained for
source A and source B. So my question is, how can I pull in data from
multiple, disparate sources and feed it into a single stream?

Thanks


Re: Standalone cluster node utilization

2016-07-14 Thread Zhou (Joe) Xing

i have seen similar behavior in my standalone cluster, I tried to increase the 
number of partitions and at some point it seems all the executors or worker 
nodes start to make parallel connection to remote data store. But it would be 
nice if someone could point us to some references on how to make proper use of 
the repartition of data from a remote data store read by spark SQL, thanks a lot

zhou




> On Jul 14, 2016, at 9:18 AM, Jakub Stransky  wrote:
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Matrix Factorization Model model.save error "NullPointerException"

2016-07-12 Thread Zhou (Joe) Xing
Anyone may have an idea on what this NPE issue below is about? Thank you!

cheers

zhou

On Jul 11, 2016, at 11:27 PM, Zhou (Joe) Xing 
<joe.x...@nextev.com<mailto:joe.x...@nextev.com>> wrote:


Hi Guys,

I searched for the archive and also googled this problem when saving the ALS 
trained Matrix Factorization Model to local file system using Model.save() 
method, I found some hints such as partition the model before saving, etc. But 
it does not seem to solve my problem. I’m always getting this NPE error when 
running in a cluster of several nodes, while it’s totally fine when running in 
local node.

I’m using spark 1.6.2, pyspark. Any hint would be appreciated! thank you

cheers

zhou




model = ALS.train(ratingsRDD, rank, numIter, lmbda, 5)



16/07/12 02:14:32 INFO ParquetFileReader: Initiating action with parallelism: 5
16/07/12 02:14:32 WARN ParquetOutputCommitter: could not write summary file for 
file:/home/ec2-user/myCollaborativeFilterNoTesting_2016_07_12_02_13_35.dat/data/product
java.lang.NullPointerException
at 
org.apache.parquet.hadoop.ParquetFileWriter.mergeFooters(ParquetFileWriter.java:456)
at 
org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:420)
at 
org.apache.parquet.hadoop.ParquetOutputCommitter.writeMetaDataFile(ParquetOutputCommitter.java:58)
at 
org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at 
org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:149)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:106)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:106)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:106)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)
at 
org.apache.spark.mllib.recommendation.MatrixFactorizationModel$SaveLoadV1_0$.save(MatrixFactorizationModel.scala:362)
at 
org.apache.spark.mllib.recommendation.MatrixFactorizationModel.save(MatrixFactorizationModel.scala:205)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)



Matrix Factorization Model model.save error "NullPointerException"

2016-07-12 Thread Zhou (Joe) Xing

Hi Guys,

I searched for the archive and also googled this problem when saving the ALS 
trained Matrix Factorization Model to local file system using Model.save() 
method, I found some hints such as partition the model before saving, etc. But 
it does not seem to solve my problem. I’m always getting this NPE error when 
running in a cluster of several nodes, while it’s totally fine when running in 
local node.

I’m using spark 1.6.2, pyspark. Any hint would be appreciated! thank you

cheers

zhou




model = ALS.train(ratingsRDD, rank, numIter, lmbda, 5)



16/07/12 02:14:32 INFO ParquetFileReader: Initiating action with parallelism: 5
16/07/12 02:14:32 WARN ParquetOutputCommitter: could not write summary file for 
file:/home/ec2-user/myCollaborativeFilterNoTesting_2016_07_12_02_13_35.dat/data/product
java.lang.NullPointerException
at 
org.apache.parquet.hadoop.ParquetFileWriter.mergeFooters(ParquetFileWriter.java:456)
at 
org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:420)
at 
org.apache.parquet.hadoop.ParquetOutputCommitter.writeMetaDataFile(ParquetOutputCommitter.java:58)
at 
org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at 
org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:149)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:106)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:106)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:106)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)
at 
org.apache.spark.mllib.recommendation.MatrixFactorizationModel$SaveLoadV1_0$.save(MatrixFactorizationModel.scala:362)
at 
org.apache.spark.mllib.recommendation.MatrixFactorizationModel.save(MatrixFactorizationModel.scala:205)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)



Re: Variable in UpdateStateByKey Not Updating After Restarting Application from Checkpoint

2016-06-09 Thread Joe Panciera
Could it be possible that this is a bug? I hate to throw that word around,
but this is definitely not expected behavior (as far as I can tell). If
anyone has a suggestion for a work around or better way to accomplish
handling a global value in UpdateStateByKey, that would be fantastic.

Thanks

On Wed, Jun 8, 2016 at 1:27 PM, Joe Panciera <joe.panci...@gmail.com> wrote:

> I've run into an issue where a global variable used within an
> UpdateStateByKey function isn't being assigned after the application
> restarts from a checkpoint. Using ForEachRDD I have a global variable 'A'
> that is propagated from a file every time a batch runs, and A is then used
> in an UpdateStateByKey. When I initially run the application, it functions
> as expected and the value of A is referenced correctly within the scope of
> the update function.
>
> However, when I bring the application down and restart, I see a different
> behavior. Variable A is assigned the correct value by its corresponding
> ForEachRDD function, but when the UpdateStateByKey function is executed the
> new value for A isn't used. It just... disappears.
>
> I could be going about the implementation of this wrong, but I'm hoping
> that someone can point me in the correct direction.
>
> Here's some pseudocode:
>
> def readfile(rdd):
>
> global A
> a = readFromFile
>
> def update(new, old)
>
> if old in A:
> do something
>
>
> dstream.forEachRDD(readfile)
> dstream.updateStateByKey(update)
>
> ssc.checkpoint('checkpoint')
>
> A is correct the first time this is run, but when the application is
> killed and restarted A doesn't seem to be reassigned correctly.
>
>


Variable in UpdateStateByKey Not Updating After Restarting Application from Checkpoint

2016-06-08 Thread Joe Panciera
I've run into an issue where a global variable used within an
UpdateStateByKey function isn't being assigned after the application
restarts from a checkpoint. Using ForEachRDD I have a global variable 'A'
that is propagated from a file every time a batch runs, and A is then used
in an UpdateStateByKey. When I initially run the application, it functions
as expected and the value of A is referenced correctly within the scope of
the update function.

However, when I bring the application down and restart, I see a different
behavior. Variable A is assigned the correct value by its corresponding
ForEachRDD function, but when the UpdateStateByKey function is executed the
new value for A isn't used. It just... disappears.

I could be going about the implementation of this wrong, but I'm hoping
that someone can point me in the correct direction.

Here's some pseudocode:

def readfile(rdd):

global A
a = readFromFile

def update(new, old)

if old in A:
do something


dstream.forEachRDD(readfile)
dstream.updateStateByKey(update)

ssc.checkpoint('checkpoint')

A is correct the first time this is run, but when the application is killed
and restarted A doesn't seem to be reassigned correctly.


Choosing an Algorithm in Spark MLib

2016-04-12 Thread Joe San
up vote
down votefavorite


I'm working on a problem where in I have some data sets about some power
generating units. Each of these units have been activated to run in the
past and while activation, some units went into some issues. I now have all
these data and I would like to come up with some sort of Ranking for these
generating units. The criteria for ranking would be pretty simple to start
with. They are:

   1. Maximum number of times a particular generating unit was activated
   2. How many times did the generating unit ran into problems during
   activation

Later on I would expand on this ranking algorithm by adding more criteria.
I will be using Apache Spark MLIB library and I can already see that there
are quite a few algorithms already in place.

http://spark.apache.org/docs/latest/mllib-guide.html

I'm just not sure which algorithm would fit my purpose. Any suggestions?


Spark GraphX + TitanDB + Cassandra?

2016-01-26 Thread Joe Bako
I’ve found some references online to various implementations (such as Dendrite) 
leveraging HDFS via TitanDB + HBase for graph processing.  GraphLab also uses 
HDFS/Hadoop.  I am wondering if (and how) one might use TitanDB + Cassandra as 
the data source for Spark GraphX?  The Gremlin language seems more targeted 
towards basic traversals rather than analytics, and I’m unsure the performance 
of attempting to use Gremlin to load sub-graphs up into GraphX for analysis.  
For example, if I have a large property graph and wish to run algorithms to 
find similar sub-graphs within, would TitanDB/Gremlin even be a consideration?  
The underlying data model that Titan uses in Cassandra does not seem accessible 
for direct querying via CQL/Thrift.

Any guidance around this nebulous subject is much appreciated!

Joe Bako
Software Architect
Gracenote, Inc.
Mobile: 925.818.2230
http://www.gracenote.com/

[cid:24DDC72C-B607-4624-9CB7-8DB5E866F2BF]



Re: Spark dramatically slow when I add saveAsTextFile

2015-05-24 Thread Joe Wass
This may sound like an obvious question, but are you sure that the program
is doing any work when you don't have a saveAsTextFile? If there are
transformations but no actions to actually collect the data, there's no
need for Spark to execute the transformations.

As to the question of 'is this taking too long', I can't answer that. But
your code was HTML escaped and therefore difficult to read, perhaps you
should post a link to a Gist.

Joe

On 24 May 2015 at 10:36, allanjie allanmcgr...@gmail.com wrote:

 *Problem Description*:

 The program running in  stand-alone spark cluster (1 master, 6 workers with
 8g ram and 2 cores).
 Input: a 468MB file with 133433 records stored in HDFS.
 Output: just 2MB file will stored in HDFS
 The program has two map operations and one reduceByKey operation.
 Finally I save the result to HDFS using *saveAsTextFile*.
 *Problem*: if I don't add saveAsTextFile, the program runs very fast(a
 few
 seconds), otherwise extremely slow until about 30 mins.

 *My program (is very Simple)*
 public static void main(String[] args) throws IOException{
 /**Parameter Setting***/
  String localPointPath =
 /home/hduser/skyrock/skyrockImageFeatures.csv;
  String remoteFilePath =
 hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv;
  String outputPath =
 hdfs://HadoopV26Master:9000/user/sparkoutput/;
  final int row = 133433;
  final int col = 458;
  final double dc = Double.valueOf(args[0]);

 SparkConf conf = new SparkConf().
 setAppName(distance)
 .set(spark.executor.memory,
 4g).set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
 .set(spark.eventLog.enabled, true);
 JavaSparkContext sc = new JavaSparkContext(conf);

 JavaRDDString textFile = sc.textFile(remoteFilePath);

 //Broadcast variable, the dimension of this double array:
 133433*458
 final Broadcastdouble[][] broadcastPoints =
 sc.broadcast(createBroadcastPoints(localPointPath,row,col));
 /**
  * Compute the distance in terms of each point on each
 instance.
  * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
  */
 JavaPairRDDInteger,Double distance =
 textFile.flatMapToPair(new
 PairFlatMapFunctionString, Integer, Double(){
 public IterableTuple2lt;Integer, Double
 call(String v1) throws
 Exception{
 ListString al =
 Arrays.asList(v1.split(,));
 double[] featureVals = new
 double[al.size()];
 for(int j=0;jal.size()-1;j++)
 featureVals[j] =
 Double.valueOf(al.get(j+1));
 int jIndex = Integer.valueOf(al.get(0));
 double[][] allPoints =
 broadcastPoints.value();
 double sum = 0;
 Listlt;Tuple2lt;Integer, Double list =
 new
 ArrayListTuple2lt;Integer, Double();
 for(int i=0;irow; i++){
 sum = 0;
 for(int j=0;jlt;al.size()-1;j++){
 sum +=
 (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]);
 }
 list.add(new
 Tuple2lt;Integer,Double(jIndex, Math.sqrt(sum) ));
 }
 return list;
 }
 });

 //Create zeroOne density
 JavaPairRDDInteger, Integer densityZeroOne =
 distance.mapValues(new
 FunctionDouble, Integer(){
 public Integer call(Double v1) throws Exception {
 if(v1dc)
 return 1;
 else return 0;
 }

 });
 //  //Combine the density
 JavaPairRDDlt;Integer, Integer counts =
 densityZeroOne.reduceByKey(new
 Function2Integer, Integer,Integer() {
 public Integer call(Integer v1, Integer
 v2) throws Exception {
 return v1+v2;
 }
 });
 counts.*saveAsTextFile*(outputPath+args[1]);
 sc.stop();
 }

 *If I comment saveAsTextFile, log will be:*
 Picked up _JAVA_OPTIONS: -Xmx4g
 15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1
 15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load

Re: Is anyone using Amazon EC2?

2015-05-23 Thread Joe Wass
Sorry guys, my email submitted before I finished writing it. Check my other
message (with the same subject)!

On 23 May 2015 at 20:25, Shafaq s.abdullah...@gmail.com wrote:

 Yes-Spark EC2 cluster . Looking into migrating to spark emr.
 Adding more ec2 is not possible afaik.
 On May 23, 2015 11:22 AM, Johan Beisser j...@caustic.org wrote:

 Yes.

 We're looking at bootstrapping in EMR...
 On Sat, May 23, 2015 at 07:21 Joe Wass jw...@crossref.org wrote:

 I used Spark on EC2 a while ago




Is anyone using Amazon EC2?

2015-05-23 Thread Joe Wass
I used Spark on EC2 a while ago


Running out of space (when there's no shortage)

2015-02-24 Thread Joe Wass
I'm running a cluster of 3 Amazon EC2 machines (small number because it's
expensive when experiments keep crashing after a day!).

Today's crash looks like this (stacktrace at end of message).
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0

On my three nodes, I have plenty of space and inodes:

A $ df -i
FilesystemInodes   IUsed   IFree IUse% Mounted on
/dev/xvda1524288   97937  426351   19% /
tmpfs1909200   1 19091991% /dev/shm
/dev/xvdb2457600  54 24575461% /mnt
/dev/xvdc2457600  24 24575761% /mnt2
/dev/xvds831869296   23844 8318454521% /vol0

A $ df -h
FilesystemSize  Used Avail Use% Mounted on
/dev/xvda17.9G  3.4G  4.5G  44% /
tmpfs 7.3G 0  7.3G   0% /dev/shm
/dev/xvdb  37G  1.2G   34G   4% /mnt
/dev/xvdc  37G  177M   35G   1% /mnt2
/dev/xvds1000G  802G  199G  81% /vol0

B $ df -i
FilesystemInodes   IUsed   IFree IUse% Mounted on
/dev/xvda1524288   97947  426341   19% /
tmpfs1906639   1 19066381% /dev/shm
/dev/xvdb2457600  54 24575461% /mnt
/dev/xvdc2457600  24 24575761% /mnt2
/dev/xvds816200704   24223 8161764811% /vol0

B $ df -h
FilesystemSize  Used Avail Use% Mounted on
/dev/xvda17.9G  3.6G  4.3G  46% /
tmpfs 7.3G 0  7.3G   0% /dev/shm
/dev/xvdb  37G  1.2G   34G   4% /mnt
/dev/xvdc  37G  177M   35G   1% /mnt2
/dev/xvds1000G  805G  195G  81% /vol0

C $df -i
FilesystemInodes   IUsed   IFree IUse% Mounted on
/dev/xvda1524288   97938  426350   19% /
tmpfs1906897   1 19068961% /dev/shm
/dev/xvdb2457600  54 24575461% /mnt
/dev/xvdc2457600  24 24575761% /mnt2
/dev/xvds755218352   24024 7551943281% /vol0
root@ip-10-204-136-223 ~]$

C $ df -h
FilesystemSize  Used Avail Use% Mounted on
/dev/xvda17.9G  3.4G  4.5G  44% /
tmpfs 7.3G 0  7.3G   0% /dev/shm
/dev/xvdb  37G  1.2G   34G   4% /mnt
/dev/xvdc  37G  177M   35G   1% /mnt2
/dev/xvds1000G  820G  181G  82% /vol0

The devices may be ~80% full but that still leaves ~200G free on each. My
spark-env.sh has

export SPARK_LOCAL_DIRS=/vol0/spark

I have manually verified that on each slave the only temporary files are
stored on /vol0, all looking something like this

/vol0/spark/spark-f05d407c/spark-fca3e573/spark-78c06215/spark-4f0c4236/20/rdd_8_884

So it looks like all the files are being stored on the large drives
(incidentally they're AWS EBS volumes, but that's the only way to get
enough storage). My process crashed before with a slightly different
exception under the same circumstances: kryo.KryoException:
java.io.IOException: No space left on device

These both happen after several hours and several GB of temporary files.

Why does Spark think it's run out of space?

TIA

Joe

Stack trace 1:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
at
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
at
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:109

Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Joe Wass
Thanks for your reply Sean.

Looks like it's happening in a map:

15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks
from Stage 1 (MappedRDD[17] at mapToPair at
NativeMethodAccessorImpl.java:-2)

That's my initial 'parse' stage, done before repartitioning. It reduces the
data size significantly so I thought it would be sensible to do before
repartitioning, which involves moving lots of data around. That might be a
stupid idea in hindsight!

So the obvious thing to try would be to try repartitioning before the map
as the first transformation. I would have done that if I could be sure that
it would succeed or fail quickly.

I'm not entirely clear about the lazy execution of transformations in DAG.
It could be that the error is manifesting during the mapToPair, but caused
by the earlier read from text file stage.

Thanks for pointers to those compression formats. I'll give them a go
(although it's not trivial to re-encode 200 GB of data on S3, so if I can
get this working reasonably with gzip I'd like to).

Any advice about whether this error can be worked round with an early
partition?

Cheers

Joe


On 19 February 2015 at 09:51, Sean Owen so...@cloudera.com wrote:

 gzip and zip are not splittable compression formats; bzip and lzo are.
 Ideally, use a splittable compression format.

 Repartitioning is not a great solution since it means a shuffle, typically.

 This is not necessarily related to how big your partitions are. The
 question is, when does this happen? what operation?

 On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass jw...@crossref.org wrote:
  On the advice of some recent discussions on this list, I thought I would
 try
  and consume gz files directly. I'm reading them, doing a preliminary map,
  then repartitioning, then doing normal spark things.
 
  As I understand it, zip files aren't readable in partitions because of
 the
  format, so I thought that repartitioning would be the next best thing for
  parallelism. I have about 200 files, some about 1GB compressed and some
 over
  2GB uncompressed.
 
  I'm hitting the 2GB maximum partition size. It's been discussed on this
 list
  (topic: 2GB limit for partitions?, tickets SPARK-1476 and SPARK-1391).
  Stack trace at the end. This happened at 10 hours in (probably when it
 saw
  its first file). I can't just re-run it quickly!
 
  Does anyone have any advice? Might I solve this by re-partitioning as the
  first step after reading the file(s)? Or is it effectively impossible to
  read a gz file that expands to over 2GB? Does anyone have any experience
  with this?
 
  Thanks in advance
 
  Joe
 
  Stack trace:
 
  Exception in thread main 15/02/18 20:44:25 INFO
 scheduler.TaskSetManager:
  Lost task 5.3 in stage 1.0 (TID 283) on executor:
  java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
  [duplicate 6]
  org.apache.spark.SparkException: Job aborted due to stage failure: Task
 2 in
  stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
 1.0:
  java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
  at
  org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
  at
  org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
  at
 org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
  at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)



Unzipping large files and 2GB partition size.

2015-02-19 Thread Joe Wass
On the advice of some recent discussions on this list, I thought I would
try and consume gz files directly. I'm reading them, doing a preliminary
map, then repartitioning, then doing normal spark things.

As I understand it, zip files aren't readable in partitions because of the
format, so I thought that repartitioning would be the next best thing for
parallelism. I have about 200 files, some about 1GB compressed and some
over 2GB uncompressed.

I'm hitting the 2GB maximum partition size. It's been discussed on this
list (topic: 2GB limit for partitions?, tickets SPARK-1476 and
SPARK-1391).  Stack trace at the end. This happened at 10 hours in
(probably when it saw its first file). I can't just re-run it quickly!

Does anyone have any advice? Might I solve this by re-partitioning as the
first step after reading the file(s)? Or is it effectively impossible to
read a gz file that expands to over 2GB? Does anyone have any experience
with this?

Thanks in advance

Joe

Stack trace:

Exception in thread main 15/02/18 20:44:25 INFO scheduler.TaskSetManager:
Lost task 5.3 in stage 1.0 (TID 283) on executor:
java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
[duplicate 6]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2
in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
1.0: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
at
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)


Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Joe Wass
Thanks for your detailed reply Imran. I'm writing this in Clojure (using
Flambo which uses the Java API) but I don't think that's relevant. So
here's the pseudocode (sorry I've not written Scala for a long time):

val rawData = sc.hadoopFile(/dir/to/gzfiles) // NB multiple files.
val parsedFiles = rawData.map(parseFunction)   // can return nil on failure
val filtered = parsedFiles.filter(notNil)
val partitioned = filtered.repartition(100) // guessed number
val persisted = partitioned.persist(StorageLevels.DISK_ONLY)

val resultA = stuffA(persisted)
val resultB = stuffB(persisted)
val resultC = stuffC(persisted)

So, I think I'm already doing what you suggested. I would have assumed that
partition size would be («size of expanded file» / «number of partitions»).
In this case, 100 (which I picked out of the air).

I wonder whether the «size of expanded file» is actually the size of all
concatenated input files (probably about 800 GB)? In that case should I
multiply it by the number of files? Or perhaps I'm barking up completely
the wrong tree.

Joe




On 19 February 2015 at 14:44, Imran Rashid iras...@cloudera.com wrote:

 Hi Joe,

 The issue is not that you have input partitions that are bigger than 2GB
 -- its just that they are getting cached.  You can see in the stack trace,
 the problem is when you try to read data out of the DiskStore:

 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)

 Also, just because you see this:

 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
 tasks from Stage 1 (MappedRDD[17] at mapToPair at
 NativeMethodAccessorImpl.java:-2)

 it doesn't *necessarily* mean that this is coming from your map.  It can
 be pretty confusing how your operations on RDDs get turned into stages, it
 could be a lot more than just your map.  and actually, it might not even be
 your map at all -- some of the other operations you invoke call map
 underneath the covers.  So its hard to say what is going on here w/ out
 seeing more code.  Anyway, maybe you've already considered all this (you
 did mention the lazy execution of the DAG), but I wanted to make sure.  it
 might help to use rdd.setName() and also to look at rdd.toDebugString.

 As far as what you can do about this -- it could be as simple as moving
 your rdd.persist() to after you have compressed and repartitioned your
 data.  eg., I'm blindly guessing you have something like this:

 val rawData = sc.hadoopFile(...)
 rawData.persist(DISK)
 rawData.count()
 val compressedData = rawData.map{...}
 val repartitionedData = compressedData.repartition(N)
 ...

 change it to something like:

 val rawData = sc.hadoopFile(...)
 val compressedData = rawData.map{...}
 val repartitionedData = compressedData.repartition(N)
 repartitionedData.persist(DISK)
 repartitionedData.count()
 ...


 The point is, you avoid caching any data until you have ensured that the
 partitions are small.  You might have big partitions before that in
 rawData, but that is OK.

 Imran


 On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass jw...@crossref.org wrote:

 Thanks for your reply Sean.

 Looks like it's happening in a map:

 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
 tasks from Stage 1 (MappedRDD[17] at mapToPair at
 NativeMethodAccessorImpl.java:-2)

 That's my initial 'parse' stage, done before repartitioning. It reduces
 the data size significantly so I thought it would be sensible to do before
 repartitioning, which involves moving lots of data around. That might be a
 stupid idea in hindsight!

 So the obvious thing to try would be to try repartitioning before the map
 as the first transformation. I would have done that if I could be sure that
 it would succeed or fail quickly.

 I'm not entirely clear about the lazy execution of transformations in
 DAG. It could be that the error is manifesting during the mapToPair, but
 caused by the earlier read from text file stage.

 Thanks for pointers to those compression formats. I'll give them a go
 (although it's not trivial to re-encode 200 GB of data on S3, so if I can
 get this working reasonably with gzip I'd like to).

 Any advice about whether this error can be worked round with an early
 partition?

 Cheers

 Joe


 On 19 February 2015 at 09:51, Sean Owen so...@cloudera.com wrote:

 gzip and zip are not splittable compression formats; bzip and lzo are.
 Ideally, use a splittable compression format.

 Repartitioning is not a great solution since it means a shuffle,
 typically.

 This is not necessarily related to how big your partitions are. The
 question is, when does this happen? what operation?

 On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass jw...@crossref.org wrote:
  On the advice of some recent discussions on this list, I thought I
 would try
  and consume gz files directly. I'm reading them, doing a preliminary
 map,
  then repartitioning, then doing normal spark things.
 
  As I understand it, zip files aren't readable in partitions because of
 the
  format

Has Spark 1.2.0 changed EC2 persistent-hdfs?

2015-02-13 Thread Joe Wass
I've updated to Spark 1.2.0 and the EC2 and the persistent-hdfs behaviour
appears to have changed.

My launch script is

spark-1.2.0-bin-hadoop2.4/ec2/spark-ec2 --instance-type=m3.xlarge -s 5
--ebs-vol-size=1000 launch myproject

When I ssh into master I get:

$ df -h
FilesystemSize  Used Avail Use% Mounted on
/dev/xvda17.9G  2.9G  5.0G  37% /
tmpfs 7.3G 0  7.3G   0% /dev/shm
/dev/xvdb  37G  1.3G   34G   4% /mnt
/dev/xvdc  37G  177M   35G   1% /mnt2
/dev/xvds1000G   33M 1000G   1% /vol0

that /vol0 is the place I want (and assume) persistent-hdfs to go. But when
I look at the size I get:

$ persistent-hdfs/bin/start-all.sh
$ persistent-hdfs/bin/hadoop dfsadmin -report
Warning: $HADOOP_HOME is deprecated.

Configured Capacity: 42275430400 (39.37 GB)
Present Capacity: 2644878 (24.63 GB)
DFS Remaining: 26448601088 (24.63 GB)
DFS Used: 143360 (140 KB)
DFS Used%: 0%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0

-
Datanodes available: 5 (5 total, 0 dead)

Name: 10.46.11.156:60010
Decommission Status : Normal
Configured Capacity: 8455086080 (7.87 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3165372416 (2.95 GB)
DFS Remaining: 5289684992(4.93 GB)
DFS Used%: 0%
DFS Remaining%: 62.56%
Last contact: Fri Feb 13 17:41:46 UTC 2015


Name: 10.41.51.155:60010
Decommission Status : Normal
Configured Capacity: 8455086080 (7.87 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3165364224 (2.95 GB)
DFS Remaining: 5289693184(4.93 GB)
DFS Used%: 0%
DFS Remaining%: 62.56%
Last contact: Fri Feb 13 17:41:46 UTC 2015


Name: 10.38.30.254:60010
Decommission Status : Normal
Configured Capacity: 8455086080 (7.87 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3165249536 (2.95 GB)
DFS Remaining: 5289807872(4.93 GB)
DFS Used%: 0%
DFS Remaining%: 62.56%
Last contact: Fri Feb 13 17:41:46 UTC 2015


Name: 10.204.134.84:60010
Decommission Status : Normal
Configured Capacity: 8455086080 (7.87 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3165343744 (2.95 GB)
DFS Remaining: 5289713664(4.93 GB)
DFS Used%: 0%
DFS Remaining%: 62.56%
Last contact: Fri Feb 13 17:41:46 UTC 2015


Name: 10.33.15.134:60010
Decommission Status : Normal
Configured Capacity: 8455086080 (7.87 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3165356032 (2.95 GB)
DFS Remaining: 5289701376(4.93 GB)
DFS Used%: 0%
DFS Remaining%: 62.56%
Last contact: Fri Feb 13 17:41:46 UTC 2015


That's tiny. My suspicions are aroused when I see:

$ ls /vol
persistent-hdfs

/vol is on the small /dev/xvda1 not the large EBS /dev/xvds

I thought I'd be able to edit persistent-hdfs/conf/core-site.xml to change
the volume:

property
  namehadoop.tmp.dir/name
  value/vol0/persistent-hdfs/value  !-- was /vol/persistent-hdfs --
/property

And then

persistent-hdfs/bin/stop-all.sh  persistent-hdfs/bin/start-all.sh

but when I do that, the persistent HDFS won't start for whatever reason. I
can't run

$ persistent-hdfs/bin/hadoop dfsadmin -report

15/02/13 18:50:25 INFO ipc.Client: Retrying connect to server:
ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already
tried 0 time(s).
15/02/13 18:50:26 INFO ipc.Client: Retrying connect to server:
ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already
tried 1 time(s).
15/02/13 18:50:27 INFO ipc.Client: Retrying connect to server:
ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already
tried 2 time(s).

So, it looks like I can't use the EBS for persistent-hdfs. I was doing it
before, so something must have changed in the last couple of weeks (last
time I was using 1.1.0).

Is this a bug? Has the behaviour of AWS changed? Am I doing something
stupid? How do I fix it?

Thanks in advance!

Joe


Re: Has Spark 1.2.0 changed EC2 persistent-hdfs?

2015-02-13 Thread Joe Wass
Looks like this is caused by issue SPARK-5008:
https://issues.apache.org/jira/browse/SPARK-5008

On 13 February 2015 at 19:04, Joe Wass jw...@crossref.org wrote:

 I've updated to Spark 1.2.0 and the EC2 and the persistent-hdfs behaviour
 appears to have changed.

 My launch script is

 spark-1.2.0-bin-hadoop2.4/ec2/spark-ec2 --instance-type=m3.xlarge -s 5
 --ebs-vol-size=1000 launch myproject

 When I ssh into master I get:

 $ df -h
 FilesystemSize  Used Avail Use% Mounted on
 /dev/xvda17.9G  2.9G  5.0G  37% /
 tmpfs 7.3G 0  7.3G   0% /dev/shm
 /dev/xvdb  37G  1.3G   34G   4% /mnt
 /dev/xvdc  37G  177M   35G   1% /mnt2
 /dev/xvds1000G   33M 1000G   1% /vol0

 that /vol0 is the place I want (and assume) persistent-hdfs to go. But
 when I look at the size I get:

 $ persistent-hdfs/bin/start-all.sh
 $ persistent-hdfs/bin/hadoop dfsadmin -report
 Warning: $HADOOP_HOME is deprecated.

 Configured Capacity: 42275430400 (39.37 GB)
 Present Capacity: 2644878 (24.63 GB)
 DFS Remaining: 26448601088 (24.63 GB)
 DFS Used: 143360 (140 KB)
 DFS Used%: 0%
 Under replicated blocks: 0
 Blocks with corrupt replicas: 0
 Missing blocks: 0

 -
 Datanodes available: 5 (5 total, 0 dead)

 Name: 10.46.11.156:60010
 Decommission Status : Normal
 Configured Capacity: 8455086080 (7.87 GB)
 DFS Used: 28672 (28 KB)
 Non DFS Used: 3165372416 (2.95 GB)
 DFS Remaining: 5289684992(4.93 GB)
 DFS Used%: 0%
 DFS Remaining%: 62.56%
 Last contact: Fri Feb 13 17:41:46 UTC 2015


 Name: 10.41.51.155:60010
 Decommission Status : Normal
 Configured Capacity: 8455086080 (7.87 GB)
 DFS Used: 28672 (28 KB)
 Non DFS Used: 3165364224 (2.95 GB)
 DFS Remaining: 5289693184(4.93 GB)
 DFS Used%: 0%
 DFS Remaining%: 62.56%
 Last contact: Fri Feb 13 17:41:46 UTC 2015


 Name: 10.38.30.254:60010
 Decommission Status : Normal
 Configured Capacity: 8455086080 (7.87 GB)
 DFS Used: 28672 (28 KB)
 Non DFS Used: 3165249536 (2.95 GB)
 DFS Remaining: 5289807872(4.93 GB)
 DFS Used%: 0%
 DFS Remaining%: 62.56%
 Last contact: Fri Feb 13 17:41:46 UTC 2015


 Name: 10.204.134.84:60010
 Decommission Status : Normal
 Configured Capacity: 8455086080 (7.87 GB)
 DFS Used: 28672 (28 KB)
 Non DFS Used: 3165343744 (2.95 GB)
 DFS Remaining: 5289713664(4.93 GB)
 DFS Used%: 0%
 DFS Remaining%: 62.56%
 Last contact: Fri Feb 13 17:41:46 UTC 2015


 Name: 10.33.15.134:60010
 Decommission Status : Normal
 Configured Capacity: 8455086080 (7.87 GB)
 DFS Used: 28672 (28 KB)
 Non DFS Used: 3165356032 (2.95 GB)
 DFS Remaining: 5289701376(4.93 GB)
 DFS Used%: 0%
 DFS Remaining%: 62.56%
 Last contact: Fri Feb 13 17:41:46 UTC 2015


 That's tiny. My suspicions are aroused when I see:

 $ ls /vol
 persistent-hdfs

 /vol is on the small /dev/xvda1 not the large EBS /dev/xvds

 I thought I'd be able to edit persistent-hdfs/conf/core-site.xml to change
 the volume:

 property
   namehadoop.tmp.dir/name
   value/vol0/persistent-hdfs/value  !-- was /vol/persistent-hdfs --
 /property

 And then

 persistent-hdfs/bin/stop-all.sh  persistent-hdfs/bin/start-all.sh

 but when I do that, the persistent HDFS won't start for whatever reason. I
 can't run

 $ persistent-hdfs/bin/hadoop dfsadmin -report

 15/02/13 18:50:25 INFO ipc.Client: Retrying connect to server:
 ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010.
 Already tried 0 time(s).
 15/02/13 18:50:26 INFO ipc.Client: Retrying connect to server:
 ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010.
 Already tried 1 time(s).
 15/02/13 18:50:27 INFO ipc.Client: Retrying connect to server:
 ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010.
 Already tried 2 time(s).

 So, it looks like I can't use the EBS for persistent-hdfs. I was doing it
 before, so something must have changed in the last couple of weeks (last
 time I was using 1.1.0).

 Is this a bug? Has the behaviour of AWS changed? Am I doing something
 stupid? How do I fix it?

 Thanks in advance!

 Joe





How do I set spark.local.dirs?

2015-02-06 Thread Joe Wass
I'm running on EC2 and I want to set the directory to use on the slaves
(mounted EBS volumes).

I have set:
spark.local.dir /vol3/my-spark-dir
in
   /root/spark/conf/spark-defaults.conf

and replicated to all nodes. I have verified that in the console the value
in the config corresponds. I have checked that these values are present in
nodes.

But it's still creating temp files in the wrong (default) place:

/mnt2/spark

How do I get my slaves to pick up this value? How can I verify that they
have?

Thanks!

Joe


Re: How many stages in my application?

2015-02-05 Thread Joe Wass
Thanks Akhil and Mark. I can of course count events (assuming I can deduce
the shuffle boundaries), but like I said the program isn't simple and I'd
have to do this manually every time I change the code. So I rather find a
way of doing this automatically if possible.

On 4 February 2015 at 19:41, Mark Hamstra m...@clearstorydata.com wrote:

 But there isn't a 1-1 mapping from operations to stages since multiple
 operations will be pipelined into a single stage if no shuffle is
 required.  To determine the number of stages in a job you really need to be
 looking for shuffle boundaries.

 On Wed, Feb 4, 2015 at 11:27 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 You can easily understand the flow by looking at the number of operations
 in your program (like map, groupBy, join etc.), first of all you list out
 the number of operations happening in your application and then from the
 webui you will be able to see how many operations have happened so far.

 Thanks
 Best Regards

 On Wed, Feb 4, 2015 at 4:33 PM, Joe Wass jw...@crossref.org wrote:

 I'm sitting here looking at my application crunching gigabytes of data
 on a cluster and I have no idea if it's an hour away from completion or a
 minute. The web UI shows progress through each stage, but not how many
 stages remaining. How can I work out how many stages my program will take
 automatically?

 My application has a slightly interesting DAG (re-use of functions that
 contain Spark transformations, persistent RDDs). Not that complex, but not
 'step 1, step 2, step 3'.

 I'm guessing that if the driver program runs sequentially sending
 messages to Spark, then Spark has no knowledge of the structure of the
 driver program. Therefore it's necessary to execute it on a small test
 dataset and see how many stages result?

 When I set spark.eventLog.enabled = true and run on (very small) test
 data I don't get any stage messages in my STDOUT or in the log file. This
 is on a `local` instance.

 Did I miss something obvious?

 Thanks!

 Joe






How many stages in my application?

2015-02-04 Thread Joe Wass
I'm sitting here looking at my application crunching gigabytes of data on a
cluster and I have no idea if it's an hour away from completion or a
minute. The web UI shows progress through each stage, but not how many
stages remaining. How can I work out how many stages my program will take
automatically?

My application has a slightly interesting DAG (re-use of functions that
contain Spark transformations, persistent RDDs). Not that complex, but not
'step 1, step 2, step 3'.

I'm guessing that if the driver program runs sequentially sending messages
to Spark, then Spark has no knowledge of the structure of the driver
program. Therefore it's necessary to execute it on a small test dataset and
see how many stages result?

When I set spark.eventLog.enabled = true and run on (very small) test data
I don't get any stage messages in my STDOUT or in the log file. This is on
a `local` instance.

Did I miss something obvious?

Thanks!

Joe


ephemeral-hdfs vs persistent-hdfs - performance

2015-02-03 Thread Joe Wass
I want to process about 800 GB of data on an Amazon EC2 cluster. So, I need
to store the input in HDFS somehow.

I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk.
Each HDFS node reports 73 GB, and the total capacity is ~370 GB.

If I want to process 800 GB of data (assuming I can't split the jobs up),
I'm guessing I need to get persistent-hdfs involved.

1 - Does persistent-hdfs have noticeably different performance than
ephemeral-hdfs?
2 - If so, is there a recommended configuration (like storing input and
output on persistent, but persisted RDDs on ephemeral?)

This seems like a common use-case, so sorry if this has already been
covered.

Joe


Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-03 Thread Joe Wass
The data is coming from S3 in the first place, and the results will be
uploaded back there. But even in the same availability zone, fetching 170
GB (that's gzipped) is slow. From what I understand of the pipelines,
multiple transforms on the same RDD might involve re-reading the input,
which very quickly add up in comparison to having the data locally. Unless
I persisted the data (which I am in fact doing) but that would involve
storing approximately the same amount of data in HDFS, which wouldn't fit.

Also, I understood that S3 was unsuitable for practical? See Why you
cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong,
though, that would make things a lot easier.

[0] http://wiki.apache.org/hadoop/AmazonS3



On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net wrote:

 You could also just push the data to Amazon S3, which would un-link the
 size of the cluster needed to process the data from the size of the data.

 DR


 On 02/03/2015 11:43 AM, Joe Wass wrote:

 I want to process about 800 GB of data on an Amazon EC2 cluster. So, I
 need
 to store the input in HDFS somehow.

 I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk.
 Each HDFS node reports 73 GB, and the total capacity is ~370 GB.

 If I want to process 800 GB of data (assuming I can't split the jobs up),
 I'm guessing I need to get persistent-hdfs involved.

 1 - Does persistent-hdfs have noticeably different performance than
 ephemeral-hdfs?
 2 - If so, is there a recommended configuration (like storing input and
 output on persistent, but persisted RDDs on ephemeral?)

 This seems like a common use-case, so sorry if this has already been
 covered.

 Joe



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Kyro serialization and OOM

2015-02-03 Thread Joe Wass
I have about 500 MB of data and I'm trying to process it on a single
`local` instance. I'm getting an Out of Memory exception. Stack trace at
the end.

Spark 1.1.1
My JVM has --Xmx2g

spark.driver.memory = 1000M
spark.executor.memory = 1000M
spark.kryoserializer.buffer.mb = 256
spark.kryoserializer.buffer.max.mb = 256

The objects I'm dealing with are well constrained. Each can be no more than
500 bytes at the very most. I ran into problems with the kryo buffer being
too small but I think that 256 MB should do the job. The docs say This
must be larger than any object you attempt to serialize. No danger of that.

My input is a single file (on average each line is 500 bytes). I'm
performing various filter, map, flatMap, groupByKey and reduceByKey. The
only 'actions' I'm performing are foreach, which inserts values into a
database.

On input, I'm parsing the lines and then persisting with DISK_ONLY.

I'm foreaching over the keys and then foreaching over the values of key
value RDDs. The docs say that groupByKey returns (K, IterableV). So the
values (which can be large) shouldn't be serialized as a single list.

So I don't think I should be loading anything larger than 256 MB at once.

My code works for small sample toy data and I'm trying it out on a bit
more. As I understand it, the way that Spark partitions data means that it
(in most cases) any job that will run on a cluster will also run on a
single instance, given enough time.

I think I've given enough memory to cover my serialization needs as I
understand them. Have I misunderstood?

Joe

Stack trace:

INFO  org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in
stage 30.0 (TID 116, localhost, PROCESS_LOCAL, 993 bytes)
INFO  org.apache.spark.executor.Executor - Running task 0.0 in stage 30.0
(TID 116)

...

ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage
30.0 (TID 116)
java.lang.OutOfMemoryError: Java heap space
at com.esotericsoftware.kryo.io.Output.init(Output.java:35)
at
org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:58)
at
org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:151)
at
org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:151)
at
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:155)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:188)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

...

WARN  org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage
30.0 (TID 116, localhost): java.lang.OutOfMemoryError: Java heap space
com.esotericsoftware.kryo.io.Output.init(Output.java:35)

org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:58)

org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:151)

org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:151)

org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:155)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:188)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)


Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-03 Thread Joe Wass
Thanks very much, that's good to know, I'll certainly give it a look.

Can you give me a hint about you unzip your input files on the fly? I
thought that it wasn't possible to parallelize zipped inputs unless they
were unzipped before passing to Spark?

Joe

On 3 February 2015 at 17:48, David Rosenstrauch dar...@darose.net wrote:

 We use S3 as a main storage for all our input data and our generated
 (output) data.  (10's of terabytes of data daily.)  We read gzipped data
 directly from S3 in our Hadoop/Spark jobs - it's not crazily slow, as long
 as you parallelize the work well by distributing the processing across
 enough machines.  (About 100 nodes, in our case.)

 The way we generally operate is re: storage is:  read input directly from
 s3, write output from Hadoop/Spark jobs to HDFS, then after job is complete
 distcp the relevant output from HDFS back to S3.  Works for us ... YMMV.
 :-)

 HTH,

 DR


 On 02/03/2015 12:32 PM, Joe Wass wrote:

 The data is coming from S3 in the first place, and the results will be
 uploaded back there. But even in the same availability zone, fetching 170
 GB (that's gzipped) is slow. From what I understand of the pipelines,
 multiple transforms on the same RDD might involve re-reading the input,
 which very quickly add up in comparison to having the data locally. Unless
 I persisted the data (which I am in fact doing) but that would involve
 storing approximately the same amount of data in HDFS, which wouldn't fit.

 Also, I understood that S3 was unsuitable for practical? See Why you
 cannot use S3 as a replacement for HDFS[0]. I'd love to be proved wrong,
 though, that would make things a lot easier.

 [0] http://wiki.apache.org/hadoop/AmazonS3



 On 3 February 2015 at 16:45, David Rosenstrauch dar...@darose.net
 wrote:

  You could also just push the data to Amazon S3, which would un-link the
 size of the cluster needed to process the data from the size of the data.

 DR


 On 02/03/2015 11:43 AM, Joe Wass wrote:

  I want to process about 800 GB of data on an Amazon EC2 cluster. So, I
 need
 to store the input in HDFS somehow.

 I currently have a cluster of 5 x m3.xlarge, each of which has 80GB
 disk.
 Each HDFS node reports 73 GB, and the total capacity is ~370 GB.

 If I want to process 800 GB of data (assuming I can't split the jobs
 up),
 I'm guessing I need to get persistent-hdfs involved.

 1 - Does persistent-hdfs have noticeably different performance than
 ephemeral-hdfs?
 2 - If so, is there a recommended configuration (like storing input and
 output on persistent, but persisted RDDs on ephemeral?)

 This seems like a common use-case, so sorry if this has already been
 covered.

 Joe



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: PermGen issues on AWS

2015-01-09 Thread Joe Wass
Thanks, I noticed this after posting. I'll try that.
I also think that perhaps Clojure might be creating more classes than the
equivalent Java would, so I'll nudge it a bit higher.

On 9 January 2015 at 11:45, Sean Owen so...@cloudera.com wrote:

 It's normal for PermGen to be a bit more of an issue with Spark than
 for other JVM-based applications. You should simply increase the
 PermGen size, which I don't see in your command. -XX:MaxPermSize=256m
 allows it to grow to 256m for example. The right size depends on your
 total heap size and app.

 Also, Java 8 no longer has a permanent generation, so this particular
 type of problem and tuning is not needed. You might consider running
 on Java 8.

 On Fri, Jan 9, 2015 at 10:38 AM, Joe Wass jw...@crossref.org wrote:
  I'm running on an AWS cluster of 10 x m1.large (64 bit, 7.5 GiB RAM).
 FWIW
  I'm using the Flambo Clojure wrapper which uses the Java API but I don't
  think that should make any difference. I'm running with the following
  command:
 
  spark/bin/spark-submit --class mything.core --name My Thing --conf
  spark.yarn.executor.memoryOverhead=4096 --conf
  spark.executor.extraJavaOptions=-XX:+CMSClassUnloadingEnabled
  -XX:+CMSPermGenSweepingEnabled /root/spark/code/myjar.jar
 
  For one of the stages I'm getting errors:
 
   - ExecutorLostFailure (executor lost)
   - Resubmitted (resubmitted due to lost executor)
 
  And I think they're caused by slave executor JVMs dying up with this
 error:
 
  java.lang.OutOfMemoryError: PermGen space
  java.lang.Class.getDeclaredConstructors0(Native Method)
  java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
  java.lang.Class.getConstructor0(Class.java:2885)
  java.lang.Class.newInstance(Class.java:350)
 
 
 sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399)
 
 
 sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396)
  java.security.AccessController.doPrivileged(Native Method)
 
 
 sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395)
 
 
 sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113)
 
 
 sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331)
 
 
 java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
  java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
  java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
  java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
  java.security.AccessController.doPrivileged(Native Method)
  java.io.ObjectStreamClass.init(ObjectStreamClass.java:468)
  java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
 
  java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
 
  java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
 
  java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 
  java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 
  java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 
  java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 
  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
  java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 
  java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 
 
  1 stage out of 14 (so far) is failing. My failing stage is 1768
 succeeded /
  1862 (940 failed). 7 tasks failed with OOM, 919 were Resubmitted
  (resubmitted due to lost executor).
 
  Now my Aggregated Metrics by Executor shows that 10 out of 16 executors
  show CANNOT FIND ADDRESS which I imagine means the JVM blew up and
 hasn't
  been restarted. Now the 'Executors' tab shows only 7 executors.
 
   - Is this normal?
   - Any ideas why this is happening?
   - Any other measures I can take to prevent this?
   - Is the rest of my app going to run on a reduced number of executors?
   - Can I re-start the executors mid-application? This is a long-running
 job,
  so I'd like to do what I can whilst it's running, if possible.
   - Am I correct in thinking that the --conf arguments are supplied to the
  JVMs of the slave executors, so they will be receiving the
 extraJavaOptions
  and memoryOverhead?
 
  Thanks very much!
 
  Joe



Accidental kill in UI

2015-01-09 Thread Joe Wass
So I had a Spark job with various failures, and I decided to kill it and
start again. I clicked the 'kill' link in the web console, restarted the
job on the command line and headed back to the web console and refreshed to
see how my job was doing... the URL at the time was:

/stages/stage/kill?id=1terminate=true

Which of course terminated the stage again. No loss, but if I'd waited a
few hours before doing that, I would have lost data.

I know to be careful next time, but isn't 'don't modify state as a result
of a GET request' the first rule of HTTP? It could lead to an expensive
mistake. Making this a POST would be a simple fix.

Does anyone else think this is worth creating an issue for?


PermGen issues on AWS

2015-01-09 Thread Joe Wass
I'm running on an AWS cluster of 10 x m1.large (64 bit, 7.5 GiB RAM). FWIW
I'm using the Flambo Clojure wrapper which uses the Java API but I don't
think that should make any difference. I'm running with the following
command:

spark/bin/spark-submit --class mything.core --name My Thing --conf
spark.yarn.executor.memoryOverhead=4096 --conf
spark.executor.extraJavaOptions=-XX:+CMSClassUnloadingEnabled
-XX:+CMSPermGenSweepingEnabled /root/spark/code/myjar.jar

For one of the stages I'm getting errors:

 - ExecutorLostFailure (executor lost)
 - Resubmitted (resubmitted due to lost executor)

And I think they're caused by slave executor JVMs dying up with this error:

java.lang.OutOfMemoryError: PermGen space
java.lang.Class.getDeclaredConstructors0(Native Method)
java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
java.lang.Class.getConstructor0(Class.java:2885)
java.lang.Class.newInstance(Class.java:350)

sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399)

sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396)
java.security.AccessController.doPrivileged(Native Method)

sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395)

sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113)

sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331)

java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
java.security.AccessController.doPrivileged(Native Method)
java.io.ObjectStreamClass.init(ObjectStreamClass.java:468)
java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)

java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)


1 stage out of 14 (so far) is failing. My failing stage is 1768 succeeded /
1862 (940 failed). 7 tasks failed with OOM, 919 were Resubmitted
(resubmitted due to lost executor).

Now my Aggregated Metrics by Executor shows that 10 out of 16 executors
show CANNOT FIND ADDRESS which I imagine means the JVM blew up and hasn't
been restarted. Now the 'Executors' tab shows only 7 executors.

 - Is this normal?
 - Any ideas why this is happening?
 - Any other measures I can take to prevent this?
 - Is the rest of my app going to run on a reduced number of executors?
 - Can I re-start the executors mid-application? This is a long-running
job, so I'd like to do what I can whilst it's running, if possible.
 - Am I correct in thinking that the --conf arguments are supplied to the
JVMs of the slave executors, so they will be receiving the extraJavaOptions
and memoryOverhead?

Thanks very much!

Joe


Are failures normal / to be expected on an AWS cluster?

2014-12-20 Thread Joe Wass
I have a Spark job running on about 300 GB of log files, on Amazon EC2,
with 10 x Large instances (each with 600 GB disk). The job hasn't yet
completed.

So far, 18 stages have completed (2 of which have retries) and 3 stages
have failed. In each failed stage there are ~160 successful tasks, but
CANNOT FIND ADDRESS for half of the executors.

Are these numbers normal for AWS? Should a certain number of faults be
expected? I know that AWS isn't meant to be perfect, but this doesn't seem
right.

Cheers

Joe


GC problem while filtering large data

2014-12-16 Thread Joe L
Hi I am trying to filter large table with 3 columns. Spark SQL might be a
good choice but want to do it without SQL. The goal is to filter bigtable
with multi clauses. I filtered bigtable 3times but the first filtering takes
about 50seconds but the second and third filter transformation took about 5
seconds. I wonder if it is because of lazy evaluation. But I already
evaluated my rdd parsing it when I first read it using sc.textFile then
counted it.
Running times:
t1 = 50seconds
t2 = 5seconds
t3 = 4seconds

val clause = List(
  (http://www.w3.org/1999/02/22-rdf-syntax-ns#type,
www.ssu.ac.kr#GraduateStudent),
  (www.ssu.ac.kr#memberOf, ?Z),
  (www.ssu.ac.kr#undergraduateDegreeFrom, ?Y)
)

val bcastedSubj: Broadcast[String] = sc.broadcast(?X)
val bcastedCls: Broadcast[List[(String, String)]] = sc.broadcast(clause)
var n = clause.length

val t0 = System.currentTimeMillis()

val subgraph1 = bigtable.mapPartitions (
  iterator = {
val bcls = bcastedCls.value
val bsubj = bcastedSubj.value
n = bcls.length
for ((s, grp) - iterator;
 if {
   val flag = if (!bsubj.startsWith(?)  !bsubj.equals(s))
false
   else {
 var k = 0

 val m = grp.length
 var flag1 = true

 while(k  n) {
   var flag2 = false
   var l = 0
   while(l  m) {
 if (grp(l)._1.equals(bcls(k)._1) 
grp(l)._2.equals(bcls(k)._2)) flag2 = true
 else if (bcls(k)._1.startsWith(?) 
grp(l)._2.equals(bcls(k)._2)) flag2 = true
 else if  (bcls(k)._2.startsWith(?) 
grp(l)._1.equals(bcls(k)._1)) flag2 = true
 l += 1
   }
   if (!flag2) flag1 = false

   k += 1
 }

 flag1
   }

   flag
 }
) yield (s, grp)
  }, preservesPartitioning = true).cache()
val num1 = subgraph1.count()



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GC-problem-while-filtering-large-data-tp20702.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



classnotfound error due to groupByKey

2014-07-04 Thread Joe L

Hi, 

When I run the following a piece of code, it is throwing a classnotfound
error. Any suggestion would be appreciated.

Wanted to group an RDD by key:
val t = rdd.groupByKey()

Error message:
java.lang.ClassNotFoundException:
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/classnotfound-error-due-to-groupByKey-tp8836.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


ec2 deployment regions supported

2014-06-07 Thread Joe Mathai
Hi ,

I am interested in deploying spark 1.0.0 on ec2 and wanted to know
which all regions are supported.I have been able to deploy the
previous version in east but i had a hard time launching the cluster
due to bad connection the script provided would fail to ssh into a
node after a couple of tries and stop.


Need equallyWeightedPartitioner Algorithm

2014-06-03 Thread Joe L
I need to partition my data into the same weighted partitions, suppose I have
20GB data and I want 4 partitions where each partition has 5GB of the data.

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Need-equallyWeightedPartitioner-Algorithm-tp6788.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Map failed [dupliacte 1] error

2014-05-27 Thread Joe L
Hi, I am getting the following error but I don't understand what the problem
is. 


14/05/27 17:44:29 INFO TaskSetManager: Loss was due to java.io.IOException:
Map failed [duplicate 15]
14/05/27 17:44:30 INFO TaskSetManager: Starting task 47.0:43 as TID 60281 on
executor 0: cm07 (PROCESS_LOCAL)
14/05/27 17:44:30 INFO TaskSetManager: Serialized task 47.0:43 as 2132 bytes
in 0 ms
14/05/27 17:44:30 WARN TaskSetManager: Lost TID 60235 (task 47.0:3)
14/05/27 17:44:30 INFO TaskSetManager: Loss was due to java.io.IOException:
Map failed [duplicate 16]
14/05/27 17:44:30 INFO TaskSetManager: Starting task 47.0:3 as TID 60282 on
executor 3: cm04 (PROCESS_LOCAL)
14/05/27 17:44:30 INFO TaskSetManager: Serialized task 47.0:3 as 2132 bytes
in 0 ms
14/05/27 17:44:30 WARN TaskSetManager: Lost TID 60273 (task 47.0:29)
14/05/27 17:44:30 INFO TaskSetManager: Loss was due to java.io.IOException:
Map failed [duplicate 17]
14/05/27 17:44:30 ERROR TaskSetManager: Task 47.0:29 failed 4 times;
aborting job
14/05/27 17:44:30 INFO DAGScheduler: Failed to run count at
reasoner1.scala:144
[error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
47.0:29 failed 4 times (most recent failure: Except 
  
ion failure: java.io.IOException: Map failed)
org.apache.spark.SparkException: Job aborted: Task 47.0:29 failed 4 times
(most recent failure: Exception failure: java.io.   

IOException: Map failed)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAG
   
Scheduler.scala:1020)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAG
   
Scheduler.scala:1018)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1
   
018)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.sca
   
la:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[trace] Stack trace suppressed: run last compile:run for the full output.
14/05/27 17:44:30 INFO ConnectionManager: Selector thread was interrupted!
java.lang.RuntimeException: Nonzero exit code: 1
at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last compile:run for the full output.
[error] (compile:run) Nonzero exit code: 1
[error] Total time: 172 s, completed 2014. 5. 27 오후 5:44:30




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Map-failed-dupliacte-1-error-tp6415.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


facebook data mining with Spark

2014-05-19 Thread Joe L
Is there any way to get facebook data into Spark and filter the content of
it?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/facebook-data-mining-with-Spark-tp6072.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


ClassNotFoundException

2014-05-01 Thread Joe L
Hi, I am getting the following error. How could I fix this problem?

Joe

14/05/02 03:51:48 WARN TaskSetManager: Lost TID 12 (task 2.0:1)
14/05/02 03:51:48 INFO TaskSetManager: Loss was due to
java.lang.ClassNotFoundException:
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4 [duplicate 6]
14/05/02 03:51:48 ERROR TaskSetManager: Task 2.0:1 failed 4 times; aborting
job
14/05/02 03:51:48 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks
have all completed, from pool 
14/05/02 03:51:48 INFO TaskSetManager: Loss was due to
java.lang.ClassNotFoundException:
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4 [duplicate 7]
14/05/02 03:51:48 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks
have all completed, from pool 
14/05/02 03:51:48 INFO DAGScheduler: Failed to run count at
reasoner.scala:70
[error] (run-main-0) org.apache.spark.SparkException: Job aborted: Task
2.0:1 failed 4 times (most recent failure: Exception failure:
java.lang.ClassNotFoundException:
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4)
org.apache.spark.SparkException: Job aborted: Task 2.0:1 failed 4 times
(most recent failure: Exception failure: java.lang.ClassNotFoundException:
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ClassNotFoundException-tp5182.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


help

2014-04-27 Thread Joe L
I am getting this error, please help me to fix it

4/04/28 02:16:20 INFO SparkDeploySchedulerBackend: Executor
app-20140428021620-0007/10 removed: class java.io.IOException: Cannot run
program /home/exobrain/install/spark-0.9.1/bin/compute-classpath.sh (in
directory .): error=13,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/help-tp4901.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


read file from hdfs

2014-04-25 Thread Joe L
I have just 2 two questions?

sc.textFile(hdfs://host:port/user/matei/whatever.txt)

Is host master node?
What port we should use?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/read-file-from-hdfs-tp4824.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


help

2014-04-25 Thread Joe L
I need someone's help please I am getting the following error. 

[error] 14/04/26 03:09:47 INFO cluster.SparkDeploySchedulerBackend: Executor
app-20140426030946-0004/8 removed: class java.io.IOException: Cannot run
program /home/exobrain/install/spark-0.9.1/bin/compute-classpath.sh (in
directory .): error=13



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/help-tp4841.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: help

2014-04-25 Thread Joe L
hi thank you for your reply but I could not find it. it says that no such
file or directory

 
http://apache-spark-user-list.1001560.n3.nabble.com/file/n4848/Capture.png 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/help-tp4841p4848.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


help me

2014-04-22 Thread Joe L
I got the following performance is it normal in spark to be like this. some
times spark switchs into node_local mode from process_local and it becomes
10x faster. I am very confused.

scala val a = sc.textFile(/user/exobrain/batselem/LUBM1000)
scala f.count()

Long = 137805557
took 130.809661618 s




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/help-me-tp4598.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark is slow

2014-04-21 Thread Joe L
g1 = pairs1.groupByKey().count() 
pairs1 = pairs1.groupByKey(g1).cache() 
g2 = triples.groupByKey().count() 
pairs2 = pairs2.groupByKey(g2) 

pairs = pairs2.join(pairs1) 

Hi, I want to implement hash-partitioned joining as shown above. But
somehow, it is taking so long to perform. As I understand, the above joining
is only implemented locally right since they are partitioned respectively?
After we partition, they will reside in the same node. So, isn't it supposed
to be fast when we partition by keys. Thank you. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-slow-tp4539p4577.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


evaluate spark

2014-04-20 Thread Joe L
I want to evaluate spark performance by measuring the running time of
transformation operations such as map and join. To do so, do I need to
materialize merely count action? because As far as I know, transformations
are lazy operations and don't do any computation until we action on them but
when I use an action to run them I don't want to include the running time of
that count action in my measurement.
How could I do it? Thank you



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/evaluate-spark-tp4508.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


what is a partition? how it works?

2014-04-16 Thread Joe L
I want to know as follows:

what is a partition? how it works?
how it is different from hadoop partition?

For example:
 sc.parallelize([1,2,3,4]).map(lambda x:
 (x,x)).partitionBy(2).glom().collect()
[[(2,2), (4,4)], [(1,1), (3,3)]]

from this, we will get 2 partitions but what does it mean? how do they
reside in memory in the cluster?

I am sorry for such a simple question but I couldn't find any specific
information about what happens underneath partitioning. 

Thank you, Joe



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-is-a-partition-how-it-works-tp4325.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


groupByKey returns a single partition in a RDD?

2014-04-15 Thread Joe L
I want to apply the following transformations to 60Gbyte data on 7nodes with
10Gbyte memory. And I am wondering if groupByKey() function returns a RDD
with a single partition for each key? if so, what will happen if the size of
the partition doesn't fit into that particular node? 

rdd = sc.textFile(hdfs//.).map(parserFunc).groupByKey()



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/groupByKey-returns-a-single-partition-in-a-RDD-tp4264.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


what is the difference between element and partition?

2014-04-15 Thread Joe L




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-difference-between-element-and-partition-tp4317.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


groupByKey(None) returns partitions according to the keys?

2014-04-15 Thread Joe L
I was wonder if groupByKey returns 2 partitions in the below example?

 x = sc.parallelize([(a, 1), (b, 1), (a, 1)])
 sorted(x.groupByKey().collect())
[('a', [1, 1]), ('b', [1])]



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/groupByKey-None-returns-partitions-according-to-the-keys-tp4318.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Proper caching method

2014-04-14 Thread Joe L
Hi I am trying to cache 2Gbyte data and to implement the following procedure.
In order to cache them I did as follows: Is it necessary to cache rdd2 since
rdd1 is already cached?

rdd1 = textFile(hdfs...).cache()

rdd2 = rdd1.filter(userDefinedFunc1).cache()
rdd3 = rdd1.filter(userDefinedFunc2).cache()






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Proper-caching-method-tp4206.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


shuffle vs performance

2014-04-14 Thread Joe L
I was wondering less partitioning rdds could help the Spark performance and
reduce shuffling? is it true?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/shuffle-vs-performance-tp4255.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


how to use a single filter instead of multiple filters

2014-04-13 Thread Joe L

Hi, I have multiple filters as shown below, should I use a single optimal
filter instead of them? these filters can degrade the performance of spark?


http://apache-spark-user-list.1001560.n3.nabble.com/file/n4185/Capture.png 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-use-a-single-filter-instead-of-multiple-filters-tp4185.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


how to count maps without shuffling too much data?

2014-04-13 Thread Joe L




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-count-maps-without-shuffling-too-much-data-tp4194.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.