Re: Get size of intermediate results

2016-10-20 Thread Egor Pahomov
I needed the same for debugging and I just added "count" action in debug
mode for every step I was interested in. It's very time-consuming, but I
debug not very often.

2016-10-20 2:17 GMT-07:00 Andreas Hechenberger :

> Hey awesome Spark-Dev's :)
>
> i am new to spark and i read a lot but now i am stuck :( so please be
> kind, if i ask silly questions.
>
> I want to analyze some algorithms and strategies in spark and for one
> experiment i want to know the size of the intermediate results between
> iterations/jobs. Some of them are written to disk and some are in the
> cache, i guess. I am not afraid of looking into the code (i already did)
> but its complex and have no clue where to start :( It would be nice if
> someone can point me in the right direction or where i can find more
> information about the structure of spark core devel :)
>
> I already setup the devel environment and i can compile spark. It was
> really awesome how smoothly the setup was :) Thx for that.
>
> Servus
> Andy
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


-- 


*Sincerely yoursEgor Pakhomov*


Re: StructuredStreaming status

2016-10-20 Thread Michael Armbrust
>
> On a personal note, I'm quite surprised that this is all the progress in
> Structured Streaming over the last three months since 2.0 was released. I
> was under the impression that this was one of the biggest things that the
> Spark community actively works on, but that is clearly not the case, given
> that most of the activity is a couple of (very important) JIRAs from the
> last several weeks. Not really sure how to parse that yet...
> I think having some clearer, prioritized roadmap going forward will be a
> good first to recalibrate expectations for 2.2 and for graduating from an
> alpha state.
>

I totally agree we should spend more time making sure the roadmap is clear
to everyone, but I disagree with this characterization.  There is a lot of
work happening in Structured Streaming. In this next release (2.1 as well
as 2.0.1 and 2.0.2) it has been more about stability and scalability rather
than user visible features.  We are running it for real on production jobs
and working to make it rock solid (Everyone can help here!). Just look at the
list of commits

.

Regarding the timeline to graduation, I think its instructive to look at
what happened with Spark SQL.

 - Spark 1.0 - added to Spark
 - Spark 1.1 - basic apis, and stability
 - Spark 1.2 - stabilization of Data Source APIs for plugging in external
sources
 - Spark 1.3 - GA
 - Spark 1.4-1.5 - Tungsten
 - Spark 1.6 - Fully-codegened / memory managed
 - Spark 2.0 - Whole stage codegen, experimental streaming support

We probably won't follow that exactly, and we clearly are not done yet.
However, I think the trajectory is good.

But Streaming Query sources
> 
>  are
> still designed with microbatches in mind, can this be removed and leave
> offset tracking to the executors?


It certainly could be, but what Matei is saying is that user code should be
able to seamlessly upgrade.  A lot of early focus and thought was towards
this goal.  However, these kinds of concerns are exactly why I think it is
premature to expose these internal APIs to end users. Lets build several
Sources and Sinks internally, and figure out what works and what doesn't.
Spark SQL had JSON, Hive, Parquet, and RDDs before we opened up the APIs.
This experience allowed us keep the Data Source API stable into 2.x and
build a large library of connectors.


Re: Mini-Proposal: Make it easier to contribute to the contributing to Spark Guide

2016-10-20 Thread Fred Reiss
Great idea! If the developer docs are in github, then new contributors who
find errors or omissions can update the docs as an introduction to the PR
process.

Fred

On Wed, Oct 19, 2016 at 5:46 PM, Reynold Xin  wrote:

> For the contributing guide I think it makes more sense to put it in
> apache/spark github, since that's where contributors start. I'd also link
> to it from the website ...
>
>
> On Tue, Oct 18, 2016 at 10:03 AM, Shivaram Venkataraman <
> shiva...@eecs.berkeley.edu> wrote:
>
>> +1 - Given that our website is now on github
>> (https://github.com/apache/spark-website), I think we can move most of
>> our wiki into the main website. That way we'll only have two sources
>> of documentation to maintain: A release specific one in the main repo
>> and the website which is more long lived.
>>
>> Thanks
>> Shivaram
>>
>> On Tue, Oct 18, 2016 at 9:59 AM, Matei Zaharia 
>> wrote:
>> > Is there any way to tie wiki accounts with JIRA accounts? I found it
>> weird
>> > that they're not tied at the ASF.
>> >
>> > Otherwise, moving this into the docs might make sense.
>> >
>> > Matei
>> >
>> > On Oct 18, 2016, at 6:19 AM, Cody Koeninger  wrote:
>> >
>> > +1 to putting docs in one clear place.
>> >
>> >
>> > On Oct 18, 2016 6:40 AM, "Sean Owen"  wrote:
>> >>
>> >> I'm OK with that. The upside to the wiki is that it can be edited
>> directly
>> >> outside of a release cycle. However, in practice I find that the wiki
>> is
>> >> rarely changed. To me it also serves as a place for information that
>> isn't
>> >> exactly project documentation like "powered by" listings.
>> >>
>> >> In a way I'd like to get rid of the wiki to have one less place for
>> docs,
>> >> that doesn't have the same accessibility (I don't know who can give
>> edit
>> >> access), and doesn't have a review process.
>> >>
>> >> For now I'd settle for bringing over a few key docs like the one you
>> >> mention. I spent a little time a while ago removing some duplication
>> across
>> >> the wiki and project docs and think there's a bit more than could be
>> done.
>> >>
>> >>
>> >> On Tue, Oct 18, 2016 at 12:25 PM Holden Karau 
>> >> wrote:
>> >>>
>> >>> Right now the wiki isn't particularly accessible to updates by
>> external
>> >>> contributors. We've already got a contributing to spark page which
>> just
>> >>> links to the wiki - how about if we just move the wiki contents over?
>> This
>> >>> way contributors can contribute to our documentation about how to
>> contribute
>> >>> probably helping clear up points of confusion for new contributors
>> which the
>> >>> rest of us may be blind to.
>> >>>
>> >>> If we do this we would probably want to update the wiki page to point
>> to
>> >>> the documentation generated from markdown. It would also mean that the
>> >>> results of any update to the contributing guide take a full release
>> cycle to
>> >>> be visible. Another alternative would be opening up the wiki to a
>> broader
>> >>> set of people.
>> >>>
>> >>> I know a lot of people are probably getting ready for Spark Summit EU
>> >>> (and I hope to catch up with some of y'all there) but I figured this a
>> >>> relatively minor proposal.
>> >>> --
>> >>> Cell : 425-233-8271
>> >>> Twitter: https://twitter.com/holdenkarau
>> >
>> >
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>


Re: [PSA] TaskContext.partitionId != the actual logical partition index

2016-10-20 Thread Cody Koeninger
Yep, I had submitted a PR that included it way back in the original
direct stream for kafka, but it got nixed in favor of
TaskContext.partitionId ;)  The concern then was about too many
xWithBlah apis on rdd.

If we do want to deprecate taskcontext.partitionId and add
foreachPartitionWithIndex, I think that makes sense, I can start a
ticket.

On Thu, Oct 20, 2016 at 1:16 PM, Reynold Xin  wrote:
> Seems like a good new API to add?
>
>
> On Thu, Oct 20, 2016 at 11:14 AM, Cody Koeninger  wrote:
>>
>> Access to the partition ID is necessary for basically every single one
>> of my jobs, and there isn't a foreachPartiionWithIndex equivalent.
>> You can kind of work around it with empty foreach after the map, but
>> it's really awkward to explain to people.
>>
>> On Thu, Oct 20, 2016 at 12:52 PM, Reynold Xin  wrote:
>> > FYI - Xiangrui submitted an amazing pull request to fix a long standing
>> > issue with a lot of the nondeterministic expressions (rand, randn,
>> > monotonically_increasing_id): https://github.com/apache/spark/pull/15567
>> >
>> > Prior to this PR, we were using TaskContext.partitionId as the partition
>> > index in initializing expressions. However, that is actually not a good
>> > index to use in most cases, because it is the physical task's partition
>> > id
>> > and does not always reflect the partition index at the time the RDD is
>> > created (or in the Spark SQL physical plan). This makes a big difference
>> > once there is a union or coalesce operation.
>> >
>> > The "index" given by mapPartitionsWithIndex, on the other hand, does not
>> > have this problem because it actually reflects the logical partition
>> > index
>> > at the time the RDD is created.
>> >
>> > When is it safe to use TaskContext.partitionId? It is safe at the very
>> > end
>> > of a query plan (the root node), because there partitionId is guaranteed
>> > based on the current implementation to be the same as the physical task
>> > partition id.
>> >
>> >
>> > For example, prior to Xiangrui's PR, the following query would return 2
>> > rows, whereas the correct behavior should be 1 entry:
>> >
>> >
>> > spark.range(1).selectExpr("rand(1)").union(spark.range(1).selectExpr("rand(1)")).distinct.show()
>> >
>> > The reason it'd return 2 rows is because rand was using
>> > TaskContext.partitionId as the per-partition seed, and as a result the
>> > two
>> > sides of the union are using different seeds.
>> >
>> >
>> > I'm starting to think we should deprecate the API and ban the use of it
>> > within the project to be safe ...
>> >
>> >
>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [PSA] TaskContext.partitionId != the actual logical partition index

2016-10-20 Thread Reynold Xin
Seems like a good new API to add?


On Thu, Oct 20, 2016 at 11:14 AM, Cody Koeninger  wrote:

> Access to the partition ID is necessary for basically every single one
> of my jobs, and there isn't a foreachPartiionWithIndex equivalent.
> You can kind of work around it with empty foreach after the map, but
> it's really awkward to explain to people.
>
> On Thu, Oct 20, 2016 at 12:52 PM, Reynold Xin  wrote:
> > FYI - Xiangrui submitted an amazing pull request to fix a long standing
> > issue with a lot of the nondeterministic expressions (rand, randn,
> > monotonically_increasing_id): https://github.com/apache/spark/pull/15567
> >
> > Prior to this PR, we were using TaskContext.partitionId as the partition
> > index in initializing expressions. However, that is actually not a good
> > index to use in most cases, because it is the physical task's partition
> id
> > and does not always reflect the partition index at the time the RDD is
> > created (or in the Spark SQL physical plan). This makes a big difference
> > once there is a union or coalesce operation.
> >
> > The "index" given by mapPartitionsWithIndex, on the other hand, does not
> > have this problem because it actually reflects the logical partition
> index
> > at the time the RDD is created.
> >
> > When is it safe to use TaskContext.partitionId? It is safe at the very
> end
> > of a query plan (the root node), because there partitionId is guaranteed
> > based on the current implementation to be the same as the physical task
> > partition id.
> >
> >
> > For example, prior to Xiangrui's PR, the following query would return 2
> > rows, whereas the correct behavior should be 1 entry:
> >
> > spark.range(1).selectExpr("rand(1)").union(spark.range(1)
> .selectExpr("rand(1)")).distinct.show()
> >
> > The reason it'd return 2 rows is because rand was using
> > TaskContext.partitionId as the per-partition seed, and as a result the
> two
> > sides of the union are using different seeds.
> >
> >
> > I'm starting to think we should deprecate the API and ban the use of it
> > within the project to be safe ...
> >
> >
>


Re: [PSA] TaskContext.partitionId != the actual logical partition index

2016-10-20 Thread Cody Koeninger
Access to the partition ID is necessary for basically every single one
of my jobs, and there isn't a foreachPartiionWithIndex equivalent.
You can kind of work around it with empty foreach after the map, but
it's really awkward to explain to people.

On Thu, Oct 20, 2016 at 12:52 PM, Reynold Xin  wrote:
> FYI - Xiangrui submitted an amazing pull request to fix a long standing
> issue with a lot of the nondeterministic expressions (rand, randn,
> monotonically_increasing_id): https://github.com/apache/spark/pull/15567
>
> Prior to this PR, we were using TaskContext.partitionId as the partition
> index in initializing expressions. However, that is actually not a good
> index to use in most cases, because it is the physical task's partition id
> and does not always reflect the partition index at the time the RDD is
> created (or in the Spark SQL physical plan). This makes a big difference
> once there is a union or coalesce operation.
>
> The "index" given by mapPartitionsWithIndex, on the other hand, does not
> have this problem because it actually reflects the logical partition index
> at the time the RDD is created.
>
> When is it safe to use TaskContext.partitionId? It is safe at the very end
> of a query plan (the root node), because there partitionId is guaranteed
> based on the current implementation to be the same as the physical task
> partition id.
>
>
> For example, prior to Xiangrui's PR, the following query would return 2
> rows, whereas the correct behavior should be 1 entry:
>
> spark.range(1).selectExpr("rand(1)").union(spark.range(1).selectExpr("rand(1)")).distinct.show()
>
> The reason it'd return 2 rows is because rand was using
> TaskContext.partitionId as the per-partition seed, and as a result the two
> sides of the union are using different seeds.
>
>
> I'm starting to think we should deprecate the API and ban the use of it
> within the project to be safe ...
>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



[PSA] TaskContext.partitionId != the actual logical partition index

2016-10-20 Thread Reynold Xin
FYI - Xiangrui submitted an amazing pull request to fix a long standing
issue with a lot of the nondeterministic expressions (rand, randn,
monotonically_increasing_id): https://github.com/apache/spark/pull/15567

Prior to this PR, we were using TaskContext.partitionId as the partition
index in initializing expressions. However, that is actually not a good
index to use in most cases, because it is the physical task's partition id
and does not always reflect the partition index at the time the RDD is
created (or in the Spark SQL physical plan). This makes a big difference
once there is a union or coalesce operation.

The "index" given by mapPartitionsWithIndex, on the other hand, does not
have this problem because it actually reflects the logical partition index
at the time the RDD is created.

When is it safe to use TaskContext.partitionId? It is safe at the very end
of a query plan (the root node), because there partitionId is guaranteed
based on the current implementation to be the same as the physical task
partition id.


For example, prior to Xiangrui's PR, the following query would return 2
rows, whereas the correct behavior should be 1 entry:

spark.range(1).selectExpr("rand(1)").union(spark.range(1)
.selectExpr("rand(1)")).distinct.show()

The reason it'd return 2 rows is because rand was using
TaskContext.partitionId as the per-partition seed, and as a result the two
sides of the union are using different seeds.


I'm starting to think we should deprecate the API and ban the use of it
within the project to be safe ...


Re: StructuredStreaming status

2016-10-20 Thread Amit Sela
On Thu, Oct 20, 2016 at 7:40 AM Matei Zaharia 
wrote:

> Yeah, as Shivaram pointed out, there have been research projects that
> looked at it. Also, Structured Streaming was explicitly designed to not
> make microbatching part of the API or part of the output behavior (tying
> triggers to it).
>
But Streaming Query sources

are
still designed with microbatches in mind, can this be removed and leave
offset tracking to the executors ?

> However, when people begin working on that is a function of demand
> relative to other features. I don't think we can commit to one plan before
> exploring more options, but basically there is Shivaram's project, which
> adds a few new concepts to the scheduler, and there's the option to reduce
> control plane latency in the current system, which hasn't been heavily
> optimized yet but should be doable (lots of systems can handle 10,000s of
> RPCs per second).
>
> Matei
>
> On Oct 19, 2016, at 9:20 PM, Cody Koeninger  wrote:
>
> I don't think it's just about what to target - if you could target 1ms
> batches, without harming 1 second or 1 minute batches why wouldn't you?
> I think it's about having a clear strategy and dedicating resources to it.
> If  scheduling batches at an order of magnitude or two lower latency is the
> strategy, and that's actually feasible, that's great. But I haven't seen
> that clear direction, and this is by no means a recent issue.
>
> On Oct 19, 2016 7:36 PM, "Matei Zaharia"  wrote:
>
> I'm also curious whether there are concerns other than latency with the
> way stuff executes in Structured Streaming (now that the time steps don't
> have to act as triggers), as well as what latency people want for various
> apps.
>
> The stateful operator designs for streaming systems aren't inherently
> "better" than micro-batching -- they lose a lot of stuff that is possible
> in Spark, such as load balancing work dynamically across nodes, speculative
> execution for stragglers, scaling clusters up and down elastically, etc.
> Moreover, Spark itself could execute the current model with much lower
> latency. The question is just what combinations of latency, throughput,
> fault recovery, etc to target.
>
> Matei
>
> On Oct 19, 2016, at 2:18 PM, Amit Sela  wrote:
>
>
>
> On Thu, Oct 20, 2016 at 12:07 AM Shivaram Venkataraman <
> shiva...@eecs.berkeley.edu> wrote:
>
> At the AMPLab we've been working on a research project that looks at
> just the scheduling latencies and on techniques to get lower
> scheduling latency. It moves away from the micro-batch model, but
> reuses the fault tolerance etc. in Spark. However we haven't yet
> figure out all the parts in integrating this with the rest of
> structured streaming. I'll try to post a design doc / SIP about this
> soon.
>
> On a related note - are there other problems users face with
> micro-batch other than latency ?
>
> I think that the fact that they serve as an output trigger is a problem,
> but Structured Streaming seems to resolve this now.
>
>
> Thanks
> Shivaram
>
> On Wed, Oct 19, 2016 at 1:29 PM, Michael Armbrust
>  wrote:
> > I know people are seriously thinking about latency.  So far that has not
> > been the limiting factor in the users I've been working with.
> >
> > On Wed, Oct 19, 2016 at 1:11 PM, Cody Koeninger 
> wrote:
> >>
> >> Is anyone seriously thinking about alternatives to microbatches?
> >>
> >> On Wed, Oct 19, 2016 at 2:45 PM, Michael Armbrust
> >>  wrote:
> >> > Anything that is actively being designed should be in JIRA, and it
> seems
> >> > like you found most of it.  In general, release windows can be found
> on
> >> > the
> >> > wiki.
> >> >
> >> > 2.1 has a lot of stability fixes as well as the kafka support you
> >> > mentioned.
> >> > It may also include some of the following.
> >> >
> >> > The items I'd like to start thinking about next are:
> >> >  - Evicting state from the store based on event time watermarks
> >> >  - Sessionization (grouping together related events by key /
> eventTime)
> >> >  - Improvements to the query planner (remove some of the restrictions
> on
> >> > what queries can be run).
> >> >
> >> > This is roughly in order based on what I've been hearing users hit the
> >> > most.
> >> > Would love more feedback on what is blocking real use cases.
> >> >
> >> > On Tue, Oct 18, 2016 at 1:51 AM, Ofir Manor 
> >> > wrote:
> >> >>
> >> >> Hi,
> >> >> I hope it is the right forum.
> >> >> I am looking for some information of what to expect from
> >> >> StructuredStreaming in its next releases to help me choose when /
> where
> >> >> to
> >> >> start using it more seriously (or where to invest in workarounds and
> >> >> where
> >> >> to wait). I couldn't 

Get size of intermediate results

2016-10-20 Thread Andreas Hechenberger
Hey awesome Spark-Dev's :)

i am new to spark and i read a lot but now i am stuck :( so please be
kind, if i ask silly questions.

I want to analyze some algorithms and strategies in spark and for one
experiment i want to know the size of the intermediate results between
iterations/jobs. Some of them are written to disk and some are in the
cache, i guess. I am not afraid of looking into the code (i already did)
but its complex and have no clue where to start :( It would be nice if
someone can point me in the right direction or where i can find more
information about the structure of spark core devel :)

I already setup the devel environment and i can compile spark. It was
really awesome how smoothly the setup was :) Thx for that.

Servus
Andy

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



RE: StructuredStreaming status

2016-10-20 Thread assaf.mendelson
My thoughts were of handling just the “current” state of the sliding window 
(i.e. the “last” window). The idea is that at least in cases which I 
encountered, the sliding window is used to “forget” irrelevant information and 
therefore when a step goes out of  date for the “current” window it becomes 
irrelevant.
I agree that this use case is just an example and will also have issues if 
there is a combination of windows. My main issue was that if we need to have a 
relatively large buffer (such as full distinct count) then the memory overhead 
of this can be very high.

As for the example of the map you gave, If I understand correctly how this 
would occur behind the scenes, this just provides the map but the memory cost 
of having multiple versions of the data remain. As I said, my issue is with the 
high memory overhead.

Consider a simple example: I do a sliding window of 1 day with a 1 minute step. 
There are 1440 minutes per day which means the groupby has a cost of 
multiplying all aggregations by 1440. For something such as a count or sum, 
this might not be a big issue but if we have an array of say 100 elements then 
this can quickly become very costly.

As I said, it is just an idea for optimization for specific use cases.


From: Michael Armbrust [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n1952...@n3.nabble.com]
Sent: Thursday, October 20, 2016 11:16 AM
To: Mendelson, Assaf
Subject: Re: StructuredStreaming status

let’s say we would have implemented distinct count by saving a map with the key 
being the distinct value and the value being the last time we saw this value. 
This would mean that we wouldn’t really need to save all the steps in the 
middle and copy the data, we could only save the last portion.

I don't think you can calculate count distinct in each event time window 
correctly using this map if there is late data, which is one of the key 
problems we are trying to solve with this API.  If you are only tracking the 
last time you saw this value, how do you know if a late data item was already 
accounted for in any given window that is earlier than this "last time"?

We would currently need to track the items seen in each window (though much 
less space is required for approx count distinct).  However, the state eviction 
I mentioned above should also let you give us a boundary on how late data can 
be, and thus how many windows we need retain state for.  You should also be 
able to group by processing time instead of event time if you want something 
closer to the semantics of DStreams.

Finally, you can already construct the map you describe using structured 
streaming and use its result to output statistics at each trigger window:

df.groupBy($"value")
  .select(max($"eventTime") as 'lastSeen)
  .writeStream
  .outputMode("complete")
  .trigger(ProcessingTime("5 minutes"))
  .foreach(  )


If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/StructuredStreaming-status-tp19490p19520.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com
To unsubscribe from Apache Spark Developers List, click 
here.
NAML




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/StructuredStreaming-status-tp19490p19521.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: StructuredStreaming status

2016-10-20 Thread Michael Armbrust
>
> let’s say we would have implemented distinct count by saving a map with
> the key being the distinct value and the value being the last time we saw
> this value. This would mean that we wouldn’t really need to save all the
> steps in the middle and copy the data, we could only save the last portion.
>

I don't think you can calculate count distinct in each event time window
correctly using this map if there is late data, which is one of the key
problems we are trying to solve with this API.  If you are only tracking
the last time you saw this value, how do you know if a late data item was
already accounted for in any given window that is earlier than this "last
time"?

We would currently need to track the items seen in each window (though much
less space is required for approx count distinct).  However, the state
eviction I mentioned above should also let you give us a boundary on how
late data can be, and thus how many windows we need retain state for.  You
should also be able to group by processing time instead of event time if
you want something closer to the semantics of DStreams.

Finally, you can already construct the map you describe using structured
streaming and use its result to output statistics at each trigger window:

df.groupBy($"value")
  .select(max($"eventTime") as 'lastSeen)
  .writeStream
  .outputMode("complete")
  .trigger(ProcessingTime("5 minutes"))
  .foreach(  )