Re: partitionBy causing OOM

2017-09-26 Thread Amit Sela
Thanks for all the answers!
It looks like increasing the heap a little, and setting spark.sql.
shuffle.partitions to a much lower number (I used the recommended
input_size_mb/128 formula) did the trick.
As for partitionBy, unless I use repartition("dt") before the writer, it
actually writes more than one output file per "dt" partition so I guess the
same "dt" value is spread across multiple partitions, right?

On Mon, Sep 25, 2017 at 11:07 PM ayan guha <guha.a...@gmail.com> wrote:

> Another possible option would be creating partitioned table in hive and
> use dynamic partitioning while inserting. This will not require spark to do
> explocit partition by
>
> On Tue, 26 Sep 2017 at 12:39 pm, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
>> Hi Amit,
>>
>> Spark keeps the partition that it is working on in memory (and does not
>> spill to disk even if it is running OOM). Also since you are getting OOM
>> when using partitionBy (and not when you just use flatMap), there should be
>> one (or few) dates on which your partition size is bigger than the heap.
>> You can do a count on dates to check if there is skewness in your data.
>>
>> The way out would be increase the heap size or use columns in partitionBy
>> (like date + hour) to distribute the data better.
>>
>> Hope this helps!
>>
>> Thanks
>> Ankur
>>
>> On Mon, Sep 25, 2017 at 7:30 PM, 孫澤恩 <gn00710...@gmail.com> wrote:
>>
>>> Hi, Amit,
>>>
>>> Maybe you can change this configuration spark.sql.shuffle.partitions.
>>> The default is 200 change this property could change the task number
>>> when you are using DataFrame API.
>>>
>>> On 26 Sep 2017, at 1:25 AM, Amit Sela <amit.s...@venmo.com> wrote:
>>>
>>> I'm trying to run a simple pyspark application that reads from file
>>> (json), flattens it (explode) and writes back to file (json) partitioned by
>>> date using DataFrameWriter.partitionBy(*cols).
>>>
>>> I keep getting OOMEs like:
>>> java.lang.OutOfMemoryError: Java heap space
>>> at
>>> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.(UnsafeSorterSpillWriter.java:46)
>>> at
>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
>>> at
>>> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
>>> ...
>>>
>>> Explode could make the underlying RDD grow a lot, and maybe in an
>>> unbalanced way sometimes,
>>> adding to that partitioning by date (in daily ETLs for instance) would
>>> probably cause a data skew (right?), but why am I getting OOMs? Isn't Spark
>>> supposed to spill to disk if the underlying RDD is too big to fit in memory?
>>>
>>> If I'm not using "partitionBy" with the writer (still exploding)
>>> everything works fine.
>>>
>>> This happens both in EMR and in local (mac) pyspark/spark shell (tried
>>> both in python and scala).
>>>
>>> Thanks!
>>>
>>>
>>>
>> --
> Best Regards,
> Ayan Guha
>


partitionBy causing OOM

2017-09-25 Thread Amit Sela
I'm trying to run a simple pyspark application that reads from file (json),
flattens it (explode) and writes back to file (json) partitioned by date
using DataFrameWriter.partitionBy(*cols).

I keep getting OOMEs like:
java.lang.OutOfMemoryError: Java heap space
at
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.(UnsafeSorterSpillWriter.java:46)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
at
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
...

Explode could make the underlying RDD grow a lot, and maybe in an
unbalanced way sometimes,
adding to that partitioning by date (in daily ETLs for instance) would
probably cause a data skew (right?), but why am I getting OOMs? Isn't Spark
supposed to spill to disk if the underlying RDD is too big to fit in memory?

If I'm not using "partitionBy" with the writer (still exploding) everything
works fine.

This happens both in EMR and in local (mac) pyspark/spark shell (tried both
in python and scala).

Thanks!


Re: Union of DStream and RDD

2017-02-11 Thread Amit Sela
Not specifically, I want to generally be able to union any form of
DStream/RDD. I'm working on Apache Beam's Spark runner so the abstraction
their does not tell between streaming/batch (kinda like Dataset API).
Since I wrote my own InputDStream I will simply stream any "batch source"
instead, because I really don't see a way to union both.

On Sun, Feb 12, 2017 at 6:49 AM Egor Pahomov <pahomov.e...@gmail.com> wrote:

> Interestingly, I just faced with the same problem. By any change, do you
> want to process old files in the directory as well as new ones? It's my
> motivation and checkpointing my problem as well.
>
> 2017-02-08 22:02 GMT-08:00 Amit Sela <amitsel...@gmail.com>:
>
> Not with checkpointing.
>
> On Thu, Feb 9, 2017, 04:58 Egor Pahomov <pahomov.e...@gmail.com> wrote:
>
> Just guessing here, but would
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources
> "*Queue of RDDs as a Stream*" work? Basically create DStream from your
> RDD and than union with other DStream.
>
> 2017-02-08 12:32 GMT-08:00 Amit Sela <amitsel...@gmail.com>:
>
> Hi all,
>
> I'm looking to union a DStream and RDD into a single stream.
> One important note is that the RDD has to be added to the DStream just
> once.
>
> Ideas ?
>
> Thanks,
> Amit
>
>
>
>
>
> --
>
>
> *Sincerely yoursEgor Pakhomov*
>
>
>
>
> --
>
>
> *Sincerely yoursEgor Pakhomov*
>


Re: Union of DStream and RDD

2017-02-08 Thread Amit Sela
Not with checkpointing.

On Thu, Feb 9, 2017, 04:58 Egor Pahomov <pahomov.e...@gmail.com> wrote:

> Just guessing here, but would
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources
> "*Queue of RDDs as a Stream*" work? Basically create DStream from your
> RDD and than union with other DStream.
>
> 2017-02-08 12:32 GMT-08:00 Amit Sela <amitsel...@gmail.com>:
>
> Hi all,
>
> I'm looking to union a DStream and RDD into a single stream.
> One important note is that the RDD has to be added to the DStream just
> once.
>
> Ideas ?
>
> Thanks,
> Amit
>
>
>
>
>
> --
>
>
> *Sincerely yoursEgor Pakhomov*
>


Union of DStream and RDD

2017-02-08 Thread Amit Sela
Hi all,

I'm looking to union a DStream and RDD into a single stream.
One important note is that the RDD has to be added to the DStream just once.

Ideas ?

Thanks,
Amit


Re: Fault tolerant broadcast in updateStateByKey

2017-02-07 Thread Amit Sela
I'm updating the Broadcast between batches, but I've ended up doing it in a
listener, thanks!

On Wed, Feb 8, 2017 at 12:31 AM Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> broadcasts are not saved in checkpoints. so you have to save it externally
> yourself, and recover it before restarting the stream from checkpoints.
>
> On Tue, Feb 7, 2017 at 3:55 PM, Amit Sela <amitsel...@gmail.com> wrote:
>
> I know this approach, only thing is, it relies on the transformation being
> an RDD transfomration as well and so could be applied via foreachRDD and
> using the rdd context to avoid a stale context after recovery/resume.
> My question is how to void stale context in a DStream-only transformation
> such as updateStateByKey / mapWithState ?
>
> On Tue, Feb 7, 2017 at 9:19 PM Shixiong(Ryan) Zhu <shixi...@databricks.com>
> wrote:
>
> It's documented here:
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#accumulators-broadcast-variables-and-checkpoints
>
> On Tue, Feb 7, 2017 at 8:12 AM, Amit Sela <amitsel...@gmail.com> wrote:
>
> Hi all,
>
> I was wondering if anyone ever used a broadcast variable within
> an updateStateByKey op. ? Using it is straight-forward but I was wondering
> how it'll work after resuming from checkpoint (using the rdd.context()
> trick is not possible here) ?
>
> Thanks,
> Amit
>
>
>
>


Re: Fault tolerant broadcast in updateStateByKey

2017-02-07 Thread Amit Sela
I know this approach, only thing is, it relies on the transformation being
an RDD transfomration as well and so could be applied via foreachRDD and
using the rdd context to avoid a stale context after recovery/resume.
My question is how to void stale context in a DStream-only transformation
such as updateStateByKey / mapWithState ?

On Tue, Feb 7, 2017 at 9:19 PM Shixiong(Ryan) Zhu <shixi...@databricks.com>
wrote:

> It's documented here:
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#accumulators-broadcast-variables-and-checkpoints
>
> On Tue, Feb 7, 2017 at 8:12 AM, Amit Sela <amitsel...@gmail.com> wrote:
>
> Hi all,
>
> I was wondering if anyone ever used a broadcast variable within
> an updateStateByKey op. ? Using it is straight-forward but I was wondering
> how it'll work after resuming from checkpoint (using the rdd.context()
> trick is not possible here) ?
>
> Thanks,
> Amit
>
>
>


Fault tolerant broadcast in updateStateByKey

2017-02-07 Thread Amit Sela
Hi all,

I was wondering if anyone ever used a broadcast variable within
an updateStateByKey op. ? Using it is straight-forward but I was wondering
how it'll work after resuming from checkpoint (using the rdd.context()
trick is not possible here) ?

Thanks,
Amit


RDD getPartitions() size and HashPartitioner numPartitions

2016-12-02 Thread Amit Sela
This might be a silly question, but I wanted to make sure, when
implementing my own RDD, if using a HashPartitioner as the RDD's
partitioner the number of partitions returned by the implementation of
getPartitions() has to match the number of partitions set in the
HashPartitioner, correct ?


Fault-tolerant Accumulators in a DStream-only transformations.

2016-11-29 Thread Amit Sela
Hi all,

In order to recover Accumulators (functionally) from a Driver failure, it
is recommended to use it within a foreachRDD/transform and use the RDD
context with a Singleton wrapping the Accumulator as shown in the examples

.

I was wondering if there's a similar technique for DStream-only
transformations, such as *updateStateByKey/mapWithState* ?

Thanks,
Amit


Does MapWithState follow with a shuffle ?

2016-11-29 Thread Amit Sela
Hi all,

I've been digging into MapWithState code (branch 1.6), and I came across
the compute

implementation in *InternalMapWithStateDStream*.

Looking at the defined partitioner

it
looks like it could be different from the parent RDD partitioner (if
defaultParallelism() changed for instance, or input partitioning was
smaller to begin with), which will eventually create

a ShuffleRDD.

Am I reading this right ?

Thanks,
Amit


Fault-tolerant Accumulators in stateful operators.

2016-11-22 Thread Amit Sela
Hi all,

To recover (functionally) Accumulators from Driver failure in a streaming
application, we wrap them in a "getOrCreate" Singleton as shown here

.
I was wondering how that works if I use the Accumulators inside a
mapWithState/updateStateByKey operator ? Has anyone used Accumulators in
one of the stateful operators ?

Thanks,
Amit


Many Spark metric names do not include the application name

2016-10-27 Thread Amit Sela
Hi guys,


It seems that JvmSource / DAGSchedulerSource / BlockManagerSource
/ ExecutorAllocationManager and other metrics sources (except for the
StreamingSource) publish their metrics directly under the "driver" fragment
(or its executor counter-part) of the metric path without including the
application name.


For instance:

   - "spark.application_.driver.DAGScheduler.job.allJobs"
   - while I would expect it to be something like:


   - *"*spark.application_.driver*.myAppName.*DAGScheduler.job.allJobs
   *"*
   - just like it currently is in the *streaming* metrics
   (StreamingSource):


   - "spark.application_.driver.*myAppName*
   .StreamingMetrics.streaming.lastCompletedBatch_processingDelay"

I was wondering if there is a reason for not including the application name
in the metric path?


Your help would be much appreciated!


Regards,

Amit


Re: Subscribe

2016-09-26 Thread Amit Sela
Please Subscribe via the mailing list as described here:
http://beam.incubator.apache.org/use/mailing-lists/

On Mon, Sep 26, 2016, 12:11 Lakshmi Rajagopalan  wrote:

>
>


Dropping late date in Structured Streaming

2016-08-06 Thread Amit Sela
I've noticed that when using Structured Streaming with event-time windows
(fixed/sliding), all windows are retained. This is clearly how "late" data
is handled, but I was wondering if there is some pruning mechanism that I
might have missed ? or is this planned in future releases ?

Thanks,
Amit


Re: spark 2.0 readStream from a REST API

2016-08-01 Thread Amit Sela
I think you're missing:

val query = wordCounts.writeStream

  .outputMode("complete")
  .format("console")
  .start()

Dis it help ?

On Mon, Aug 1, 2016 at 2:44 PM Jacek Laskowski  wrote:

> On Mon, Aug 1, 2016 at 11:01 AM, Ayoub Benali
>  wrote:
>
> > the problem now is that when I consume the dataframe for example with
> count
> > I get the stack trace below.
>
> Mind sharing the entire pipeline?
>
> > I followed the implementation of TextSocketSourceProvider to implement my
> > data source and Text Socket source is used in the official documentation
> > here.
>
> Right. Completely forgot about the provider. Thanks for reminding me about
> it!
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


init() and cleanup() for Spark map functions

2016-07-21 Thread Amit Sela
I have a use case where I use Spark (streaming) as a way to distribute a
set of computations, which requires (some) of the computations to call an
external service.
Naturally, I'd like to manage my connections (per executor/worker).

I know this pattern for DStream:
https://people.apache.org/~pwendell/spark-nightly/spark-branch-2.0-docs/spark-2.0.1-SNAPSHOT-2016_07_21_04_05-f9367d6-docs/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
and
I was wondering how I'd do the same for map functions ? as I would like to
"commit" the output iterator and only afterwards "return" my connection.
And generally, how's this going to work with Structured Streaming ?

Thanks,
Amit


Re: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-07-01 Thread Amit Sela
Thanks for pointing that Koert!

I understand now why zero() and not init(a: IN), though I still don't see a
good reason to skip the aggregation if zero returns null.
If the user did it, it's on him to take care of null cases in reduce/merge,
but it opens-up the possibility to use the input to create the buffer for
the aggregator.
Wouldn't that at least enable the functionality discussed in SPARK-15598 ?
without changing how the Aggregator works.

I bypassed it by using Optional (Guava) because I'm using the Java API, but
it's a bit cumbersome...

Thanks,
Amit

On Thu, Jun 30, 2016 at 1:54 AM Koert Kuipers <ko...@tresata.com> wrote:

> its the difference between a semigroup and a monoid, and yes max does not
> easily fit into a monoid.
>
> see also discussion here:
> https://issues.apache.org/jira/browse/SPARK-15598
>
> On Mon, Jun 27, 2016 at 3:19 AM, Amit Sela <amitsel...@gmail.com> wrote:
>
>> OK. I see that, but the current (provided) implementations are very naive
>> - Sum, Count, Average -let's take Max for example: I guess zero() would be
>> set to some value like Long.MIN_VALUE, but what if you trigger (I assume in
>> the future Spark streaming will support time-based triggers) for a result
>> and there are no events ?
>>
>> And like I said, for a more general use case: What if my zero() function
>> depends on my input ?
>>
>> I just don't see the benefit of this behaviour, though I realise this is
>> the implementation.
>>
>> Thanks,
>> Amit
>>
>> On Sun, Jun 26, 2016 at 2:09 PM Takeshi Yamamuro <linguin@gmail.com>
>> wrote:
>>
>>> No, TypedAggregateExpression that uses Aggregator#zero is different
>>> between v2.0 and v1.6.
>>> v2.0:
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L91
>>> v1.6:
>>> https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L115
>>>
>>> // maropu
>>>
>>>
>>> On Sun, Jun 26, 2016 at 8:03 PM, Amit Sela <amitsel...@gmail.com> wrote:
>>>
>>>> This "if (value == null)" condition you point to exists in 1.6 branch
>>>> as well, so that's probably not the reason.
>>>>
>>>> On Sun, Jun 26, 2016 at 1:53 PM Takeshi Yamamuro <linguin@gmail.com>
>>>> wrote:
>>>>
>>>>> Whatever it is, this is expected; if an initial value is null, spark
>>>>> codegen removes all the aggregates.
>>>>> See:
>>>>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199
>>>>>
>>>>> // maropu
>>>>>
>>>>> On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela <amitsel...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Not sure about what's the rule in case of `b + null = null` but the
>>>>>> same code works perfectly in 1.6.1, just tried it..
>>>>>>
>>>>>> On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro <
>>>>>> linguin@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> This behaviour seems to be expected because you must ensure `b +
>>>>>>> zero() = b`
>>>>>>> The your case `b + null = null` breaks this rule.
>>>>>>> This is the same with v1.6.1.
>>>>>>> See:
>>>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57
>>>>>>>
>>>>>>> // maropu
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela <amitsel...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Sometimes, the BUF for the aggregator may depend on the actual
>>>>>>>> input.. and while this passes the responsibility to handle null in
>>>>>>>> merge/reduce to the developer, it sounds fine to me if he is the one 
>>>>>>>> who
>>>>>>>> put null in zero() anyway.
>>>>>>>> Now, it seems that the aggregation is skipped entirely when zero()
>>>>>>>> = null. Not sure if that was the behaviour in 1.6
>>>>>>>>
>>>>>>>> Is this behaviour wanted ?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Amit
>>>>>>>>
>>>>>>>> Aggregator example:
>>>>>>>>
>>>>>>>> public static class Agg extends Aggregator<Tuple2<String, Integer>, 
>>>>>>>> Integer, Integer> {
>>>>>>>>
>>>>>>>>   @Override
>>>>>>>>   public Integer zero() {
>>>>>>>> return null;
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   @Override
>>>>>>>>   public Integer reduce(Integer b, Tuple2<String, Integer> a) {
>>>>>>>> if (b == null) {
>>>>>>>>   b = 0;
>>>>>>>> }
>>>>>>>> return b + a._2();
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   @Override
>>>>>>>>   public Integer merge(Integer b1, Integer b2) {
>>>>>>>> if (b1 == null) {
>>>>>>>>   return b2;
>>>>>>>> } else if (b2 == null) {
>>>>>>>>   return b1;
>>>>>>>> } else {
>>>>>>>>   return b1 + b2;
>>>>>>>> }
>>>>>>>>   }
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> ---
>>>>>>> Takeshi Yamamuro
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> ---
>>>>> Takeshi Yamamuro
>>>>>
>>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>


Re: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-06-27 Thread Amit Sela
OK. I see that, but the current (provided) implementations are very naive -
Sum, Count, Average -let's take Max for example: I guess zero() would be
set to some value like Long.MIN_VALUE, but what if you trigger (I assume in
the future Spark streaming will support time-based triggers) for a result
and there are no events ?

And like I said, for a more general use case: What if my zero() function
depends on my input ?

I just don't see the benefit of this behaviour, though I realise this is
the implementation.

Thanks,
Amit

On Sun, Jun 26, 2016 at 2:09 PM Takeshi Yamamuro <linguin@gmail.com>
wrote:

> No, TypedAggregateExpression that uses Aggregator#zero is different
> between v2.0 and v1.6.
> v2.0:
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L91
> v1.6:
> https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L115
>
> // maropu
>
>
> On Sun, Jun 26, 2016 at 8:03 PM, Amit Sela <amitsel...@gmail.com> wrote:
>
>> This "if (value == null)" condition you point to exists in 1.6 branch as
>> well, so that's probably not the reason.
>>
>> On Sun, Jun 26, 2016 at 1:53 PM Takeshi Yamamuro <linguin@gmail.com>
>> wrote:
>>
>>> Whatever it is, this is expected; if an initial value is null, spark
>>> codegen removes all the aggregates.
>>> See:
>>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199
>>>
>>> // maropu
>>>
>>> On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela <amitsel...@gmail.com> wrote:
>>>
>>>> Not sure about what's the rule in case of `b + null = null` but the
>>>> same code works perfectly in 1.6.1, just tried it..
>>>>
>>>> On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro <linguin@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> This behaviour seems to be expected because you must ensure `b +
>>>>> zero() = b`
>>>>> The your case `b + null = null` breaks this rule.
>>>>> This is the same with v1.6.1.
>>>>> See:
>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57
>>>>>
>>>>> // maropu
>>>>>
>>>>>
>>>>> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela <amitsel...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Sometimes, the BUF for the aggregator may depend on the actual
>>>>>> input.. and while this passes the responsibility to handle null in
>>>>>> merge/reduce to the developer, it sounds fine to me if he is the one who
>>>>>> put null in zero() anyway.
>>>>>> Now, it seems that the aggregation is skipped entirely when zero() =
>>>>>> null. Not sure if that was the behaviour in 1.6
>>>>>>
>>>>>> Is this behaviour wanted ?
>>>>>>
>>>>>> Thanks,
>>>>>> Amit
>>>>>>
>>>>>> Aggregator example:
>>>>>>
>>>>>> public static class Agg extends Aggregator<Tuple2<String, Integer>, 
>>>>>> Integer, Integer> {
>>>>>>
>>>>>>   @Override
>>>>>>   public Integer zero() {
>>>>>> return null;
>>>>>>   }
>>>>>>
>>>>>>   @Override
>>>>>>   public Integer reduce(Integer b, Tuple2<String, Integer> a) {
>>>>>> if (b == null) {
>>>>>>   b = 0;
>>>>>> }
>>>>>> return b + a._2();
>>>>>>   }
>>>>>>
>>>>>>   @Override
>>>>>>   public Integer merge(Integer b1, Integer b2) {
>>>>>> if (b1 == null) {
>>>>>>   return b2;
>>>>>> } else if (b2 == null) {
>>>>>>   return b1;
>>>>>> } else {
>>>>>>   return b1 + b2;
>>>>>> }
>>>>>>   }
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> ---
>>>>> Takeshi Yamamuro
>>>>>
>>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-06-26 Thread Amit Sela
This "if (value == null)" condition you point to exists in 1.6 branch as
well, so that's probably not the reason.

On Sun, Jun 26, 2016 at 1:53 PM Takeshi Yamamuro <linguin@gmail.com>
wrote:

> Whatever it is, this is expected; if an initial value is null, spark
> codegen removes all the aggregates.
> See:
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199
>
> // maropu
>
> On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela <amitsel...@gmail.com> wrote:
>
>> Not sure about what's the rule in case of `b + null = null` but the same
>> code works perfectly in 1.6.1, just tried it..
>>
>> On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro <linguin@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> This behaviour seems to be expected because you must ensure `b + zero()
>>> = b`
>>> The your case `b + null = null` breaks this rule.
>>> This is the same with v1.6.1.
>>> See:
>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57
>>>
>>> // maropu
>>>
>>>
>>> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela <amitsel...@gmail.com> wrote:
>>>
>>>> Sometimes, the BUF for the aggregator may depend on the actual input..
>>>> and while this passes the responsibility to handle null in merge/reduce to
>>>> the developer, it sounds fine to me if he is the one who put null in zero()
>>>> anyway.
>>>> Now, it seems that the aggregation is skipped entirely when zero() =
>>>> null. Not sure if that was the behaviour in 1.6
>>>>
>>>> Is this behaviour wanted ?
>>>>
>>>> Thanks,
>>>> Amit
>>>>
>>>> Aggregator example:
>>>>
>>>> public static class Agg extends Aggregator<Tuple2<String, Integer>, 
>>>> Integer, Integer> {
>>>>
>>>>   @Override
>>>>   public Integer zero() {
>>>> return null;
>>>>   }
>>>>
>>>>   @Override
>>>>   public Integer reduce(Integer b, Tuple2<String, Integer> a) {
>>>> if (b == null) {
>>>>   b = 0;
>>>> }
>>>> return b + a._2();
>>>>   }
>>>>
>>>>   @Override
>>>>   public Integer merge(Integer b1, Integer b2) {
>>>> if (b1 == null) {
>>>>   return b2;
>>>> } else if (b2 == null) {
>>>>   return b1;
>>>> } else {
>>>>   return b1 + b2;
>>>> }
>>>>   }
>>>>
>>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-06-26 Thread Amit Sela
Not sure about what's the rule in case of `b + null = null` but the same
code works perfectly in 1.6.1, just tried it..

On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro <linguin@gmail.com>
wrote:

> Hi,
>
> This behaviour seems to be expected because you must ensure `b + zero() =
> b`
> The your case `b + null = null` breaks this rule.
> This is the same with v1.6.1.
> See:
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57
>
> // maropu
>
>
> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela <amitsel...@gmail.com> wrote:
>
>> Sometimes, the BUF for the aggregator may depend on the actual input..
>> and while this passes the responsibility to handle null in merge/reduce to
>> the developer, it sounds fine to me if he is the one who put null in zero()
>> anyway.
>> Now, it seems that the aggregation is skipped entirely when zero() =
>> null. Not sure if that was the behaviour in 1.6
>>
>> Is this behaviour wanted ?
>>
>> Thanks,
>> Amit
>>
>> Aggregator example:
>>
>> public static class Agg extends Aggregator<Tuple2<String, Integer>, Integer, 
>> Integer> {
>>
>>   @Override
>>   public Integer zero() {
>> return null;
>>   }
>>
>>   @Override
>>   public Integer reduce(Integer b, Tuple2<String, Integer> a) {
>> if (b == null) {
>>   b = 0;
>> }
>> return b + a._2();
>>   }
>>
>>   @Override
>>   public Integer merge(Integer b1, Integer b2) {
>> if (b1 == null) {
>>   return b2;
>> } else if (b2 == null) {
>>   return b1;
>> } else {
>>   return b1 + b2;
>> }
>>   }
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

2016-06-26 Thread Amit Sela
Sometimes, the BUF for the aggregator may depend on the actual input.. and
while this passes the responsibility to handle null in merge/reduce to the
developer, it sounds fine to me if he is the one who put null in zero()
anyway.
Now, it seems that the aggregation is skipped entirely when zero() = null.
Not sure if that was the behaviour in 1.6

Is this behaviour wanted ?

Thanks,
Amit

Aggregator example:

public static class Agg extends Aggregator,
Integer, Integer> {

  @Override
  public Integer zero() {
return null;
  }

  @Override
  public Integer reduce(Integer b, Tuple2 a) {
if (b == null) {
  b = 0;
}
return b + a._2();
  }

  @Override
  public Integer merge(Integer b1, Integer b2) {
if (b1 == null) {
  return b2;
} else if (b2 == null) {
  return b1;
} else {
  return b1 + b2;
}
  }


Are ser/de optimizations relevant with Dataset API and Encoders ?

2016-06-19 Thread Amit Sela
With RDD API, you could optimize shuffling data by making sure that bytes
are shuffled instead of objects and using the appropriate ser/de mechanism
before and after the shuffle, for example:

Before parallelize, transform to bytes using a dedicated serializer,
parallelize, and immediately after desirialize (happens on the nodes).
The same optimization could be applied in combinePerKey, and when
collecting the data to the driver.

My question: is this relevant with the Dataset API ? Datasets have a
dedicated Encoder and I guess that the binary encoder is less informative
then say Integer/String or general Kryo encoder for Objects, and as a
result will "lose" some optimization abilities.

Is this correct ?

Thanks,
Amit


Re: LegacyAccumulatorWrapper basically requires the Accumulator value to implement equlas() or it will fail on isZero()

2016-06-13 Thread Amit Sela
I thought so, and I agree. Still good to have this indexed here :)

On Mon, Jun 13, 2016 at 10:43 PM Sean Owen <so...@cloudera.com> wrote:

> I think that's right, but that seems as expected. If you're going to
> use this utility wrapper class, it can only determine if something is
> zero by comparing it to your 'zero' object, and that means defining
> equality. I suspect it's uncommon to accumulate things that aren't
> primitives or standard collections, which already define equality. But
> I'd expect I'd have to define equality for some custom user class in
> general if handing it over for a library to compare, add, clear, etc.
>
> On Mon, Jun 13, 2016 at 8:15 PM, Amit Sela <amitsel...@gmail.com> wrote:
> > It seems that if you have an AccumulatorParam (or AccumulableParam) where
> > "R" is not a primitive, it will need to implement equlas() if the
> > implementation of the zero() creates a new instance (which I guess it
> will
> > in those cases).
> > This is where isZero applies the comparison:
> >
> https://github.com/apache/spark/blob/254bc8c34e70241508bdfc8ff42a65491f5280cd/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L462
> > I overcame this by providing null in zero and instantiating in
> > addAccumulator() but that's more of a hack, on the other hand, I don't
> see a
> > trivial solution, which is one of the reasons I'm writing this.
> > Anyone encounter this when upgrading to 2.0 ?  with non-trivial
> Accumulators
> > of course..
> >
> >
> > Thanks,
> > Amit
>


LegacyAccumulatorWrapper basically requires the Accumulator value to implement equlas() or it will fail on isZero()

2016-06-13 Thread Amit Sela
It seems that if you have an AccumulatorParam (or AccumulableParam) where
"R" is not a primitive, it will need to implement equlas() if the
implementation of the zero() creates a new instance (which I guess it will
in those cases).
This is where isZero applies the comparison:
https://github.com/apache/spark/blob/254bc8c34e70241508bdfc8ff42a65491f5280cd/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L462
I overcame this by providing null in zero and instantiating in
addAccumulator() but that's more of a hack, on the other hand, I don't see
a trivial solution, which is one of the reasons I'm writing this.
Anyone encounter this when upgrading to 2.0 ?  with non-trivial
Accumulators of course..


Thanks,
Amit


Re: Dataset kryo encoder fails on Collections$UnmodifiableCollection

2016-05-23 Thread Amit Sela
See SPARK-15489 <https://issues.apache.org/jira/browse/SPARK-15489>
I'll try to figure this one out as well, any leads ? "immediate suspects" ?

Thanks,
Amit

On Mon, May 23, 2016 at 10:27 PM Michael Armbrust <mich...@databricks.com>
wrote:

> Can you open a JIRA?
>
> On Sun, May 22, 2016 at 2:50 PM, Amit Sela <amitsel...@gmail.com> wrote:
>
>> I've been using Encoders with Kryo to support encoding of generically
>> typed Java classes, mostly with success, in the following manner:
>>
>> public static  Encoder encoder() {
>>   return Encoders.kryo((Class) Object.class);
>> }
>>
>> But at some point I got a decoding exception "Caused by:
>> java.lang.UnsupportedOperationException
>> at java.util.Collections$UnmodifiableCollection.add..."
>>
>> This seems to be because of Guava's `ImmutableList`.
>>
>> I tried registering `UnmodifiableCollectionsSerializer` and `
>> ImmutableListSerializer` from: https://github.com/magro/kryo-serializers
>> but it didn't help.
>>
>> Ideas ?
>>
>> Thanks,
>> Amit
>>
>
>


Re: What / Where / When / How questions in Spark 2.0 ?

2016-05-22 Thread Amit Sela
I need to update this ;)
To start with, you could just take a look at branch-2.0.

On Sun, May 22, 2016, 01:23 Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> Thank you, Amit! I was looking for this kind of information.
>
> I did not fully read your paper, I see in it a TODO with basically the
> same question(s) [1], maybe someone from Spark team (including Databricks)
> will be so kind to send some feedback..
>
> Best,
> Ovidiu
>
> [1] Integrate “Structured Streaming”: //TODO - What (and how) will Spark
> 2.0 support (out-of-order, event-time windows, watermarks, triggers,
> accumulation modes) - how straight forward will it be to integrate with the
> Beam Model ?
>
>
> On 21 May 2016, at 23:00, Sela, Amit  wrote:
>
> It seems I forgot to add the link to the “Technical Vision” paper so there
> it is -
> https://docs.google.com/document/d/1y4qlQinjjrusGWlgq-mYmbxRW2z7-_X5Xax-GG0YsC0/edit?usp=sharing
>
> From: "Sela, Amit" 
> Date: Saturday, May 21, 2016 at 11:52 PM
> To: Ovidiu-Cristian MARCU , "user @spark"
> 
> Cc: Ovidiu Cristian Marcu 
> Subject: Re: What / Where / When / How questions in Spark 2.0 ?
>
> This is a “Technical Vision” paper for the Spark runner, which provides
> general guidelines to the future development of Spark’s Beam support as
> part of the Apache Beam (incubating) project.
> This is our JIRA -
> https://issues.apache.org/jira/browse/BEAM/component/12328915/?selectedTab=com.atlassian.jira.jira-projects-plugin:component-summary-panel
>
> Generally, I’m currently working on Datasets integration for Batch (to
> replace RDD) against Spark 1.6, and going towards enhancing Stream
> processing capabilities with Structured Streaming (2.0)
>
> And you’re welcomed to ask those questions at the Apache Beam (incubating)
> mailing list as well ;)
> http://beam.incubator.apache.org/mailing_lists/
>
> Thanks,
> Amit
>
> From: Ovidiu-Cristian MARCU 
> Date: Tuesday, May 17, 2016 at 12:11 AM
> To: "user @spark" 
> Cc: Ovidiu Cristian Marcu 
> Subject: Re: What / Where / When / How questions in Spark 2.0 ?
>
> Could you please consider a short answer regarding the Apache Beam
> Capability Matrix todo’s for future Spark 2.0 release [4]? (some related
> references below [5][6])
>
> Thanks
>
> [4] http://beam.incubator.apache.org/capability-matrix/#cap-full-what
> [5] https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
> [6] https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102
>
> On 16 May 2016, at 14:18, Ovidiu-Cristian MARCU <
> ovidiu-cristian.ma...@inria.fr> wrote:
>
> Hi,
>
> We can see in [2] many interesting (and expected!) improvements (promises)
> like extended SQL support, unified API (DataFrames, DataSets), improved
> engine (Tungsten relates to ideas from modern compilers and MPP databases -
> similar to Flink [3]), structured streaming etc. It seems we somehow assist
> at a smart unification of Big Data analytics (Spark, Flink - best of two
> worlds)!
>
> *How does Spark respond to the missing What/Where/When/How questions
> (capabilities) highlighted in the unified model Beam [1] ?*
>
> Best,
> Ovidiu
>
> [1]
> https://cloud.google.com/blog/big-data/2016/05/why-apache-beam-a-google-perspective
> [2]
> https://databricks.com/blog/2016/05/11/spark-2-0-technical-preview-easier-faster-and-smarter.html
> [3] http://stratosphere.eu/project/publications/
>
>
>
>
>


Re: Datasets is extremely slow in comparison to RDD in standalone mode WordCount examlpe

2016-05-13 Thread Amit Sela
Taking it to a more basic level, I compared between a simple transformation
with RDDs and with Datasets. This is far simpler than Renato's use case and
this brungs up two good question:
1. Is the time it takes to "spin-up" a standalone instance of Spark(SQL) is
just an additional one-time overhead - which is reasonable, especially for
the first version of datasets..
2. Is Datasets, in some cases, slower than RDDs ? if so in which, and why ?

*Datasets code*: ~2000 msec
SQLContext sqc = createSQLContext(createContext());
sqc.createDataset(WORDS, Encoders.STRING())
.map(new MapFunction<String, String>() {
@Override
  public String call(String value) throws Exception {
return value.toUpperCase();
  }
}, Encoders.STRING())
.show();

*RDDs code*: < 500 msec
JavaSparkContext jsc = createContext();
List res = jsc.parallelize(WORDS)
.map(new Function<String, String>() {
  @Override
  public String call(String v1) throws Exception {
return v1.toUpperCase();
  }
   })
   .collect();

*Those are the context creation functions:*
 * static SQLContext createSQLContext(JavaSparkContext jsc) {*
*return new SQLContext(jsc);*
*  }*
*  static JavaSparkContext createContext() {*
*return new JavaSparkContext(new
SparkConf().setMaster("local[*]").setAppName("WordCount")*
*.set("spark.ui.enabled", "false"));*
*  }*
*And the input:*
*List WORDS = Arrays.asList("hi there", "hi", "hi sue bob", "hi
sue", "bob hi");*

On Thu, May 12, 2016 at 12:04 PM Renato Marroquín Mogrovejo <
renatoj.marroq...@gmail.com> wrote:

> Hi Amit,
>
> This is very interesting indeed because I have got similar resutls. I
> tried doing a filtter + groupBy using DataSet with a function, and using
> the inner RDD of the DF(RDD[row]). I used the inner RDD of a DataFrame
> because apparently there is no straight-forward way to create an RDD of
> Parquet data without creating a sqlContext. if anybody has some code to
> share with me, please share (:
> I used 1GB of parquet data and when doing the operations with the RDD it
> was much faster. After looking at the execution plans, it is clear why
> DataSets do worse. For using them an extra map operation is done to map row
> objects into the defined case class. Then the DataSet uses the whole query
> optimization platform (Catalyst and move objects in and out of Tungsten).
> Thus, I think for operations that are too "simple", it is more expensive to
> use the entire DS/DF infrastructure than the inner RDD.
> IMHO if you have complex SQL queries, it makes sense you use DS/DF but if
> you don't, then probably using RDDs directly is still faster.
>
>
> Renato M.
>
> 2016-05-11 20:17 GMT+02:00 Amit Sela <amitsel...@gmail.com>:
>
>> Some how missed that ;)
>> Anything about Datasets slowness ?
>>
>> On Wed, May 11, 2016, 21:02 Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Which release are you using ?
>>>
>>> You can use the following to disable UI:
>>> --conf spark.ui.enabled=false
>>>
>>> On Wed, May 11, 2016 at 10:59 AM, Amit Sela <amitsel...@gmail.com>
>>> wrote:
>>>
>>>> I've ran a simple WordCount example with a very small List as
>>>> input lines and ran it in standalone (local[*]), and Datasets is very 
>>>> slow..
>>>> We're talking ~700 msec for RDDs while Datasets takes ~3.5 sec.
>>>> Is this just start-up overhead ? please note that I'm not timing the
>>>> context creation...
>>>>
>>>> And in general, is there a way to run with local[*] "lightweight" mode
>>>> for testing ? something like without the WebUI server for example (and
>>>> anything else that's not needed for testing purposes)
>>>>
>>>> Thanks,
>>>> Amit
>>>>
>>>
>>>
>


Re: Datasets is extremely slow in comparison to RDD in standalone mode WordCount examlpe

2016-05-11 Thread Amit Sela
Some how missed that ;)
Anything about Datasets slowness ?

On Wed, May 11, 2016, 21:02 Ted Yu <yuzhih...@gmail.com> wrote:

> Which release are you using ?
>
> You can use the following to disable UI:
> --conf spark.ui.enabled=false
>
> On Wed, May 11, 2016 at 10:59 AM, Amit Sela <amitsel...@gmail.com> wrote:
>
>> I've ran a simple WordCount example with a very small List as
>> input lines and ran it in standalone (local[*]), and Datasets is very slow..
>> We're talking ~700 msec for RDDs while Datasets takes ~3.5 sec.
>> Is this just start-up overhead ? please note that I'm not timing the
>> context creation...
>>
>> And in general, is there a way to run with local[*] "lightweight" mode
>> for testing ? something like without the WebUI server for example (and
>> anything else that's not needed for testing purposes)
>>
>> Thanks,
>> Amit
>>
>
>


Datasets is extremely slow in comparison to RDD in standalone mode WordCount examlpe

2016-05-11 Thread Amit Sela
I've ran a simple WordCount example with a very small List as input
lines and ran it in standalone (local[*]), and Datasets is very slow..
We're talking ~700 msec for RDDs while Datasets takes ~3.5 sec.
Is this just start-up overhead ? please note that I'm not timing the
context creation...

And in general, is there a way to run with local[*] "lightweight" mode for
testing ? something like without the WebUI server for example (and anything
else that's not needed for testing purposes)

Thanks,
Amit


Re: Datasets combineByKey

2016-04-10 Thread Amit Sela
I think *org.apache.spark.sql.expressions.Aggregator* is what I'm looking
for, makes sense ?

On Sun, Apr 10, 2016 at 4:08 PM Amit Sela <amitsel...@gmail.com> wrote:

> I'm mapping RDD API to Datasets API and I was wondering if I was missing
> something or is this functionality is missing.
>
>
> On Sun, Apr 10, 2016 at 3:00 PM Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Haven't found any JIRA w.r.t. combineByKey for Dataset.
>>
>> What's your use case ?
>>
>> Thanks
>>
>> On Sat, Apr 9, 2016 at 7:38 PM, Amit Sela <amitsel...@gmail.com> wrote:
>>
>>> Is there (planned ?) a combineByKey support for Dataset ?
>>> Is / Will there be a support for combiner lifting ?
>>>
>>> Thanks,
>>> Amit
>>>
>>
>>


Re: Datasets combineByKey

2016-04-10 Thread Amit Sela
I'm mapping RDD API to Datasets API and I was wondering if I was missing
something or is this functionality is missing.

On Sun, Apr 10, 2016 at 3:00 PM Ted Yu <yuzhih...@gmail.com> wrote:

> Haven't found any JIRA w.r.t. combineByKey for Dataset.
>
> What's your use case ?
>
> Thanks
>
> On Sat, Apr 9, 2016 at 7:38 PM, Amit Sela <amitsel...@gmail.com> wrote:
>
>> Is there (planned ?) a combineByKey support for Dataset ?
>> Is / Will there be a support for combiner lifting ?
>>
>> Thanks,
>> Amit
>>
>
>


Datasets combineByKey

2016-04-09 Thread Amit Sela
Is there (planned ?) a combineByKey support for Dataset ?
Is / Will there be a support for combiner lifting ?

Thanks,
Amit