Checkpointing in Spark - Cleaning files and Support across app attempts

2016-10-11 Thread dhruve ashar
While checkpointing RDDs as a part of an application that doesn't use
spark-streaming, I observed that the checkpointed files are not being
cleaned up even after the application completes successfully.

Is it because we assume that checkpointing would be primarily used for
spark-streaming applications which run in continuum?

Also the current mechanism supports recovery only in spark-streaming which
can survive driver crashes. There's no support to recover from previously
checkpointed RDDs in subsequent application attempts. It would be
consistent and nice to have the ability to recover across app attempts in
non streaming jobs.

Is there any specific reason for the current behavior of not cleaning the
files and lack of support across app attempts? If not I can raise a JIRA
for this.

Thanks,
Dhruve


Re: Spark Improvement Proposals

2016-10-11 Thread Ryan Blue
I don't think we will have trouble with whatever rule that is adopted for
accepting proposals. Considering committers' votes binding (if that is what
we choose) is an established practice as long as it isn't for specific
votes, like a release vote. From the Apache docs: "Who is permitted to vote
is, to some extent, a community-specific thing." [1] And, I also don't see
why it would be a problem to choose consensus, as long as we have an open
discussion and vote about these rules.

rb

On Mon, Oct 10, 2016 at 4:15 PM, Cody Koeninger  wrote:

> If someone wants to tell me that it's OK and "The Apache Way" for
> Kafka and Flink to have a proposal process that ends in a lazy
> majority, but it's not OK for Spark to have a proposal process that
> ends in a non-lazy consensus...
>
> https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Improvement+Proposals#KafkaImprovementProposals-Process
>
> In practice any PMC member can stop a proposal they don't like, so I'm
> not sure how much it matters.
>
>
>
> On Mon, Oct 10, 2016 at 5:59 PM, Mark Hamstra 
> wrote:
> > There is a larger issue to keep in mind, and that is that what you are
> > proposing is a procedure that, as far as I am aware, hasn't previously
> been
> > adopted in an Apache project, and thus is not an easy or exact fit with
> > established practices that have been blessed as "The Apache Way".  As
> such,
> > we need to be careful, because we have run into some trouble in the past
> > with some inside the ASF but essentially outside the Spark community who
> > didn't like the way we were doing things.
> >
> > On Mon, Oct 10, 2016 at 3:53 PM, Cody Koeninger 
> wrote:
> >>
> >> Apache documents say lots of confusing stuff, including that commiters
> are
> >> in practice given a vote.
> >>
> >> https://www.apache.org/foundation/voting.html
> >>
> >> I don't care either way, if someone wants me to sub commiter for PMC in
> >> the voting section, fine, we just need a clear outcome.
> >>
> >>
> >> On Oct 10, 2016 17:36, "Mark Hamstra"  wrote:
> >>>
> >>> If I'm correctly understanding the kind of voting that you are talking
> >>> about, then to be accurate, it is only the PMC members that have a
> vote, not
> >>> all committers:
> >>> https://www.apache.org/foundation/how-it-works.html#pmc-members
> >>>
> >>> On Mon, Oct 10, 2016 at 12:02 PM, Cody Koeninger 
> >>> wrote:
> 
>  I think the main value is in being honest about what's going on.  No
>  one other than committers can cast a meaningful vote, that's the
>  reality.  Beyond that, if people think it's more open to allow formal
>  proposals from anyone, I'm not necessarily against it, but my main
>  question would be this:
> 
>  If anyone can submit a proposal, are committers actually going to
>  clearly reject and close proposals that don't meet the requirements?
> 
>  Right now we have a serious problem with lack of clarity regarding
>  contributions, and that cannot spill over into goal-setting.
> 
>  On Mon, Oct 10, 2016 at 1:54 PM, Ryan Blue  wrote:
>  > +1 to votes to approve proposals. I agree that proposals should have
>  > an
>  > official mechanism to be accepted, and a vote is an established
> means
>  > of
>  > doing that well. I like that it includes a period to review the
>  > proposal and
>  > I think proposals should have been discussed enough ahead of a vote
> to
>  > survive the possibility of a veto.
>  >
>  > I also like the names that are short and (mostly) unique, like SEP.
>  >
>  > Where I disagree is with the requirement that a committer must
>  > formally
>  > propose an enhancement. I don't see the value of restricting this:
> if
>  > someone has the will to write up a proposal then they should be
>  > encouraged
>  > to do so and start a discussion about it. Even if there is a
> political
>  > reality as Cody says, what is the value of codifying that in our
>  > process? I
>  > think restricting who can submit proposals would only undermine them
>  > by
>  > pushing contributors out. Maybe I'm missing something here?
>  >
>  > rb
>  >
>  >
>  >
>  > On Mon, Oct 10, 2016 at 7:41 AM, Cody Koeninger  >
>  > wrote:
>  >>
>  >> Yes, users suggesting SIPs is a good thing and is explicitly called
>  >> out in the linked document under the Who? section.  Formally
>  >> proposing
>  >> them, not so much, because of the political realities.
>  >>
>  >> Yes, implementation strategy definitely affects goals.  There are
> all
>  >> kinds of examples of this, I'll pick one that's my fault so as to
>  >> avoid sounding like I'm blaming:
>  >>
>  >> When I implemented the Kafka DStream, one of my (not explicitly
>  >> 

Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-11 Thread Shivaram Venkataraman
Thanks Fred - that is very helpful.

> Delivering low latency, high throughput, and stability simultaneously: Right
> now, our own tests indicate you can get at most two of these characteristics
> out of Spark Streaming at the same time. I know of two parties that have
> abandoned Spark Streaming because "pick any two" is not an acceptable answer
> to the latency/throughput/stability question for them.
>
Could you expand a little bit more on stability ? Is it just bursty
workloads in terms of peak vs. average throughput ? Also what level of
latencies do you find users care about ? Is it on the order of 2-3
seconds vs. 1 second vs. 100s of milliseconds ?
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-11 Thread Reynold Xin
On Tue, Oct 11, 2016 at 10:55 AM, Michael Armbrust 
wrote:

> *Complex event processing and state management:* Several groups I've
>> talked to want to run a large number (tens or hundreds of thousands now,
>> millions in the near future) of state machines over low-rate partitions of
>> a high-rate stream. Covering these use cases translates roughly into a
>> three sub-requirements: maintaining lots of persistent state efficiently,
>> feeding tuples to each state machine in the right order, and exposing
>> convenient programmer APIs for complex event detection and signal
>> processing tasks.
>>
>
> I've heard this one too, but don't know of anyone actively working on it.
> Would be awesome to open a JIRA and start discussing what the APIs would
> look like.
>

There is an existing ticket for CEP:
https://issues.apache.org/jira/browse/SPARK-14745


Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-11 Thread Michael Armbrust
This is super helpful, thanks for writing it up!


> *Delivering low latency, high throughput, and stability simultaneously:* Right
> now, our own tests indicate you can get at most two of these
> characteristics out of Spark Streaming at the same time. I know of two
> parties that have abandoned Spark Streaming because "pick any two" is not
> an acceptable answer to the latency/throughput/stability question for them.
>

Agree, this should be the major focus.


> *Complex event processing and state management:* Several groups I've
> talked to want to run a large number (tens or hundreds of thousands now,
> millions in the near future) of state machines over low-rate partitions of
> a high-rate stream. Covering these use cases translates roughly into a
> three sub-requirements: maintaining lots of persistent state efficiently,
> feeding tuples to each state machine in the right order, and exposing
> convenient programmer APIs for complex event detection and signal
> processing tasks.
>

I've heard this one too, but don't know of anyone actively working on it.
Would be awesome to open a JIRA and start discussing what the APIs would
look like.


> *Job graph scheduling and access to Dataset APIs: *These requirements
> come up in the context of groups who want to do streaming ETL. The general
> application profile that I've seen involves updating a large number of
> materialized views based on a smaller number of streams, using a mixture of
> SQL and nontraditional processing. The extended SQL that the Dataset APIs
> provide is useful in these applications. As for scheduling needs, it's
> common for multiple output tables to share intermediate computations. Users
> need an easy way to ensure that this shared computation happens only once,
> while controlling the peak memory utilization during each batch.
>

This sounds like two separate things to me.  High-level APIs (are streaming
DataFrames / Datasets missing anything?) and multi-query optimization for
streams.  I've been thinking about the latter.  I think we probably want to
crush latency/throughput/stability in the simpler case first, but after
that I think there is a lot of machinery already in SQL we can reuse (i.e.
the sameResult calculations used for caching).


StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-11 Thread Fred Reiss
On Thu, Oct 6, 2016 at 12:37 PM, Michael Armbrust > wrote:
>
> [snip!]
> Relatedly, I'm curious to hear more about the types of questions you are
> getting.  I think the dev list is a good place to discuss applications and
> if/how structured streaming can handle them.
>

Details are difficult to share, but I can give the general gist without
revealing anything proprietary.

I find myself having the same conversation about twice a month. The other
party to the conversation is an IBM product group or an IBM client who
is using Spark for batch and interactive analytics. Their overall
application has or will soon have a real-time component. They want
information from the IBM Spark Technology Center on the relative merits
of different streaming systems for that part of the application or product.
Usually, the options on the table are Spark Streaming/Structured Streaming
and another more "pure" streaming system like Apache Flink or IBM Streams.

Right now, the best recommendation I can give is: "Spark Streaming has
known shortcomings; here's a list. If you are certain that your application
can work within these constraints, then we recommend you give Spark
Streaming a try. Otherwise, check back 12-18 months from now, when
Structured Streaming will hopefully provide a usable platform for your
application."

The specific unmet requirements that are most relevant to these
conversations are: latency, throughput, stability under bursty loads,
complex event processing support, state management, job graph scheduling,
and access to the newer Dataset-based Spark APIs.

Again, apologies for not being able to name names, but here's a redacted
description of why these requirements are relevant.

*Delivering low latency, high throughput, and stability simultaneously:* Right
now, our own tests indicate you can get at most two of these
characteristics out of Spark Streaming at the same time. I know of two
parties that have abandoned Spark Streaming because "pick any two" is not
an acceptable answer to the latency/throughput/stability question for them.

*Complex event processing and state management:* Several groups I've talked
to want to run a large number (tens or hundreds of thousands now, millions
in the near future) of state machines over low-rate partitions of a
high-rate stream. Covering these use cases translates roughly into a three
sub-requirements: maintaining lots of persistent state efficiently, feeding
tuples to each state machine in the right order, and exposing convenient
programmer APIs for complex event detection and signal processing tasks.

*Job graph scheduling and access to Dataset APIs: *These requirements come
up in the context of groups who want to do streaming ETL. The general
application profile that I've seen involves updating a large number of
materialized views based on a smaller number of streams, using a mixture of
SQL and nontraditional processing. The extended SQL that the Dataset APIs
provide is useful in these applications. As for scheduling needs, it's
common for multiple output tables to share intermediate computations. Users
need an easy way to ensure that this shared computation happens only once,
while controlling the peak memory utilization during each batch.

Hope this helps.

Fred


Spark Streaming deletes checkpointed RDD then tries to load it after restart

2016-10-11 Thread Cosmin Ciobanu
This is a follow up for this unanswered October 2015 issue:
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-streaming-failed-recovery-from-checkpoint-td14832.html

The issue is that the Spark driver checkpoints an RDD, deletes it, the job
restarts, and *the new driver tries to load the deleted checkpoint RDD*.

The application is run in YARN, which attempts to restart the application a
number of times (100 in our case), all of which fail due to missing the
deleted RDD. 

Here is a Splunk log which shows the inconsistency in checkpoint behaviour:

*2016-10-09 02:48:43,533* [streaming-job-executor-0] INFO 
org.apache.spark.rdd.ReliableRDDCheckpointData - Done checkpointing RDD
73847 to
hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*,
new parent is RDD 73872
host = ip-10-1-1-13.ec2.internal
*2016-10-09 02:53:14,696* [JobGenerator] INFO 
org.apache.spark.streaming.dstream.DStreamCheckpointData - Deleted
checkpoint file
'hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*'
for time 147598131 ms
host = ip-10-1-1-13.ec2.internal
*Job restarts here, notice driver host change from ip-10-1-1-13.ec2.internal
to ip-10-1-1-25.ec2.internal.*
*2016-10-09 02:53:30,175* [Driver] INFO 
org.apache.spark.streaming.dstream.DStreamCheckpointData - Restoring
checkpointed RDD for time 147598131 ms from file
'hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*'
host = ip-10-1-1-25.ec2.internal
*2016-10-09 02:53:30,491* [Driver] ERROR
org.apache.spark.deploy.yarn.ApplicationMaster - User class threw exception:
java.lang.IllegalArgumentException: requirement failed: Checkpoint directory
does not exist:
hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*
java.lang.IllegalArgumentException: requirement failed: Checkpoint directory
does not exist:
hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*
host = ip-10-1-1-25.ec2.internal

Spark streaming is configured with a microbatch interval of 30 seconds,
checkpoint interval of 120 seconds, and cleaner.ttl of 28800 (8 hours), but
as far as I can tell, this TTL only affects metadata cleanup interval. RDDs
seem to be deleted every 4-5 minutes after being checkpointed.

Running on top of Spark 1.5.1.

Questions: 
- How is checkpoint deletion metadata saved so that in case of a driver
restart the new driver does not read invalid metadata? Is there an interval
between the point when the checkpoint data is deleted and this information
is logged? 
- Why does Spark try to load data checkpointed 4-5 minutes in the past,
given the fact checkpoint interval is 120 seconds and thus 5 minute old data
is stale? There should be a newer checkpoint.
- Would the memory management in Spark 2.0 handle this differently? Since we
have not been able to reproduce the issue outside production environments,
it might be useful to know in advance if there have been changes in this
area.

This issue is constantly causing serious data loss in production
environments, I'd appreciate any assistance with it.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-Streaming-deletes-checkpointed-RDD-then-tries-to-load-it-after-restart-tp19409.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Looking for a Spark-Python expert

2016-10-11 Thread Hyukjin Kwon
Just as one of those who subscribed to dev/user mailing list, I would like
to avoid to recieve flooding emails about job recruiting.

In my personal opinion, I think that might mean virtually allowing that
this list is being used as the mean for some profits in an organisation.

On 7 Oct 2016 5:05 p.m., "Sean Owen"  wrote:

> dev@ is for the project's own development discussions, so not the right
> place. user@ is better, but job postings are discouraged in general on
> ASF lists. I think people get away with the occasional legitimate, targeted
> message prefixed with [JOBS], but I hesitate to open the flood gates,
> because we also have no real way of banning the inevitable spam.
>
> On Fri, Oct 7, 2016 at 8:45 AM Boris Lenzinger 
> wrote:
>
>>
>> Hi all,
>>
>> I don't know where to post this announce so I really apologize to pollute
>> the ML with such a mail.
>>
>> I'm looking for an expert in Spark 2.0 and its Python API. I have a
>> customer that is looking for an expertise mission (for one month but I
>> guess it can spread on 2 month seeing the goals to reach).
>>
>> Here is the context : there is a team (3 personnes) that is studying
>> different solutions for an image processing framework and Spark has been
>> identified as a candidate. So they want to make a proof of concept around
>> this with a known use case.
>>
>> Where does the mission take place ? Sophia-Antipolis in France (French
>> Riviera). Remote ? Not sure but could be a good solution. I will check and
>> potentially update the post.
>>
>> Dates : the mission should start, in a perfect world, mid-October but
>> tell me your availability and I will try to negociate.
>>
>> Price : first let's get in touch and send me your resume (or if you are
>> part of the authors of the framework, I guess it will be ok as a resume :-)
>> but I'm still interested in your general background so please send me a
>> resume )
>>
>> I know that the deadlines are quite short so even if you cannot exactly
>> on those dates, do not hesitate to apply.
>>
>> I hope that some of you will be interested in this.
>>
>> Again sorry for posting on the dev list.
>>
>> Have a nice day,
>>
>> boris
>>
>>
>>
>>