[jira] [Commented] (SPARK-19067) mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)

2017-04-06 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958403#comment-15958403
 ] 

Amit Sela commented on SPARK-19067:
---

[~tdas] should I open a ticket, for future improvements, about allowing 
timeouts to execute even if there's no data in the pipeline ?
As I mentioned in the PR comments: 

Using the {{EventTime}} timeout in the future, I assume the "clock" would be 
watermark based instead of wall-time, and I see at least two use-cases where 
this would matter:

# Testing - being able to move the clock forward to end-of-time to force firing 
everything that still awaits for the closing of windows.
# A pipeline where there is a filter before the stateful op. such that there is 
data, and the watermark advances, but some of the events are dropped and don't 
reach the stateful operator so it will hold off firing until the "proper" data 
(that passes filter) comes along - this again could cause an unknown delay to 
emitting results out of the stateful operator.

> mapGroupsWithState - arbitrary stateful operations with Structured Streaming 
> (similar to DStream.mapWithState)
> --
>
> Key: SPARK-19067
> URL: https://issues.apache.org/jira/browse/SPARK-19067
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Michael Armbrust
>Assignee: Tathagata Das
>Priority: Critical
> Fix For: 2.2.0
>
>
> Right now the only way to do stateful operations with with Aggregator or 
> UDAF.  However, this does not give users control of emission or expiration of 
> state making it hard to implement things like sessionization.  We should add 
> a more general construct (probably similar to {{DStream.mapWithState}}) to 
> structured streaming. Here is the design. 
> *Requirements*
> - Users should be able to specify a function that can do the following
> - Access the input row corresponding to a key
> - Access the previous state corresponding to a key
> - Optionally, update or remove the state
> - Output any number of new rows (or none at all)
> *Proposed API*
> {code}
> //  New methods on KeyValueGroupedDataset 
> class KeyValueGroupedDataset[K, V] {  
>   // Scala friendly
>   def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], 
> State[S]) => U)
> def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, 
> Iterator[V], State[S]) => Iterator[U])
>   // Java friendly
>def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, 
> R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
>def flatMapGroupsWithState[S, U](func: 
> FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], 
> resultEncoder: Encoder[U])
> }
> // --- New Java-friendly function classes --- 
> public interface MapGroupsWithStateFunction extends Serializable {
>   R call(K key, Iterator values, state: State) throws Exception;
> }
> public interface FlatMapGroupsWithStateFunction extends 
> Serializable {
>   Iterator call(K key, Iterator values, state: GroupState) throws 
> Exception;
> }
> // -- Wrapper class for state data -- 
> trait GroupState[S] {
>   def exists(): Boolean   
>   def get(): S// throws Exception is state does not 
> exist
>   def getOption(): Option[S]   
>   def update(newState: S): Unit
>   def remove(): Unit  // exists() will be false after this
> }
> {code}
> Key Semantics of the State class
> - The state can be null.
> - If the state.remove() is called, then state.exists() will return false, and 
> getOption will returm None.
> - After that state.update(newState) is called, then state.exists() will 
> return true, and getOption will return Some(...). 
> - None of the operations are thread-safe. This is to avoid memory barriers.
> *Usage*
> {code}
> val stateFunc = (word: String, words: Iterator[String, runningCount: 
> GroupState[Long]) => {
> val newCount = words.size + runningCount.getOption.getOrElse(0L)
> runningCount.update(newCount)
>(word, newCount)
> }
> dataset   // type 
> is Dataset[String]
>   .groupByKey[String](w => w) // generates 
> KeyValueGroupedDataset[String, String]
>   .mapGroupsWithState[Long, (String, Long)](stateFunc)// returns 
> Dataset[(String, Long)]
> {code}
> *Future Directions*
> - Timeout based state expiration (that has not received data for a while) - 
> Done
> - General expression based expiration - TODO. Any real usecases that cannot 
> be done with timeouts?



--
This message 

[jira] [Comment Edited] (SPARK-19067) mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)

2017-03-09 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903532#comment-15903532
 ] 

Amit Sela edited comment on SPARK-19067 at 3/9/17 6:15 PM:
---

[~tdas] I just read the PR, and I'm very excited for Spark to provide such a 
powerful stateful operator!
I added a few comments in the PR based on my first impressions as a user, hope 
you don't mind.
I assume that for event-time-timeouts you'd look at the Watermark time instead 
of Wall time, correct ? how would that work ? If I get it right it's all 
represented as a table so the "Watermark Manager" would constantly right 
updates to the table in the "Watermark Column" ?


was (Author: amitsela):
[~tdas] I just read the PR, and I'm very excited for Spark to provide such a 
powerful stateful operator!
I added a few comments in the PR based on my first impressions, hope you don't 
mind.
I assume that for event-time-timeouts you'd look at the Watermark time instead 
of Wall time, correct ? how would that work ? If I get it right it's all 
represented as a table so the "Watermark Manager" would constantly right 
updates to the table in the "Watermark Column" ?

> mapGroupsWithState - arbitrary stateful operations with Structured Streaming 
> (similar to DStream.mapWithState)
> --
>
> Key: SPARK-19067
> URL: https://issues.apache.org/jira/browse/SPARK-19067
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Michael Armbrust
>Assignee: Tathagata Das
>Priority: Critical
>
> Right now the only way to do stateful operations with with Aggregator or 
> UDAF.  However, this does not give users control of emission or expiration of 
> state making it hard to implement things like sessionization.  We should add 
> a more general construct (probably similar to {{DStream.mapWithState}}) to 
> structured streaming. Here is the design. 
> *Requirements*
> - Users should be able to specify a function that can do the following
> - Access the input row corresponding to a key
> - Access the previous state corresponding to a key
> - Optionally, update or remove the state
> - Output any number of new rows (or none at all)
> *Proposed API*
> {code}
> //  New methods on KeyValueGroupedDataset 
> class KeyValueGroupedDataset[K, V] {  
>   // Scala friendly
>   def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], 
> State[S]) => U)
> def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, 
> Iterator[V], State[S]) => Iterator[U])
>   // Java friendly
>def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, 
> R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
>def flatMapGroupsWithState[S, U](func: 
> FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], 
> resultEncoder: Encoder[U])
> }
> // --- New Java-friendly function classes --- 
> public interface MapGroupsWithStateFunction extends Serializable {
>   R call(K key, Iterator values, state: State) throws Exception;
> }
> public interface FlatMapGroupsWithStateFunction extends 
> Serializable {
>   Iterator call(K key, Iterator values, state: State) throws 
> Exception;
> }
> // -- Wrapper class for state data -- 
> trait KeyedState[S] {
>   def exists(): Boolean   
>   def get(): S// throws Exception is state does not 
> exist
>   def getOption(): Option[S]   
>   def update(newState: S): Unit
>   def remove(): Unit  // exists() will be false after this
> }
> {code}
> Key Semantics of the State class
> - The state can be null.
> - If the state.remove() is called, then state.exists() will return false, and 
> getOption will returm None.
> - After that state.update(newState) is called, then state.exists() will 
> return true, and getOption will return Some(...). 
> - None of the operations are thread-safe. This is to avoid memory barriers.
> *Usage*
> {code}
> val stateFunc = (word: String, words: Iterator[String, runningCount: 
> KeyedState[Long]) => {
> val newCount = words.size + runningCount.getOption.getOrElse(0L)
> runningCount.update(newCount)
>(word, newCount)
> }
> dataset   // type 
> is Dataset[String]
>   .groupByKey[String](w => w) // generates 
> KeyValueGroupedDataset[String, String]
>   .mapGroupsWithState[Long, (String, Long)](stateFunc)// returns 
> Dataset[(String, Long)]
> {code}
> *Future Directions*
> - Timeout based state expiration (that has not received data for a while)
> - 

[jira] [Commented] (SPARK-19067) mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)

2017-03-09 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903532#comment-15903532
 ] 

Amit Sela commented on SPARK-19067:
---

[~tdas] I just read the PR, and I'm very excited for Spark to provide such a 
powerful stateful operator!
I added a few comments in the PR based on my first impressions, hope you don't 
mind.
I assume that for event-time-timeouts you'd look at the Watermark time instead 
of Wall time, correct ? how would that work ? If I get it right it's all 
represented as a table so the "Watermark Manager" would constantly right 
updates to the table in the "Watermark Column" ?

> mapGroupsWithState - arbitrary stateful operations with Structured Streaming 
> (similar to DStream.mapWithState)
> --
>
> Key: SPARK-19067
> URL: https://issues.apache.org/jira/browse/SPARK-19067
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Michael Armbrust
>Assignee: Tathagata Das
>Priority: Critical
>
> Right now the only way to do stateful operations with with Aggregator or 
> UDAF.  However, this does not give users control of emission or expiration of 
> state making it hard to implement things like sessionization.  We should add 
> a more general construct (probably similar to {{DStream.mapWithState}}) to 
> structured streaming. Here is the design. 
> *Requirements*
> - Users should be able to specify a function that can do the following
> - Access the input row corresponding to a key
> - Access the previous state corresponding to a key
> - Optionally, update or remove the state
> - Output any number of new rows (or none at all)
> *Proposed API*
> {code}
> //  New methods on KeyValueGroupedDataset 
> class KeyValueGroupedDataset[K, V] {  
>   // Scala friendly
>   def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], 
> State[S]) => U)
> def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, 
> Iterator[V], State[S]) => Iterator[U])
>   // Java friendly
>def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, 
> R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
>def flatMapGroupsWithState[S, U](func: 
> FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], 
> resultEncoder: Encoder[U])
> }
> // --- New Java-friendly function classes --- 
> public interface MapGroupsWithStateFunction extends Serializable {
>   R call(K key, Iterator values, state: State) throws Exception;
> }
> public interface FlatMapGroupsWithStateFunction extends 
> Serializable {
>   Iterator call(K key, Iterator values, state: State) throws 
> Exception;
> }
> // -- Wrapper class for state data -- 
> trait KeyedState[S] {
>   def exists(): Boolean   
>   def get(): S// throws Exception is state does not 
> exist
>   def getOption(): Option[S]   
>   def update(newState: S): Unit
>   def remove(): Unit  // exists() will be false after this
> }
> {code}
> Key Semantics of the State class
> - The state can be null.
> - If the state.remove() is called, then state.exists() will return false, and 
> getOption will returm None.
> - After that state.update(newState) is called, then state.exists() will 
> return true, and getOption will return Some(...). 
> - None of the operations are thread-safe. This is to avoid memory barriers.
> *Usage*
> {code}
> val stateFunc = (word: String, words: Iterator[String, runningCount: 
> KeyedState[Long]) => {
> val newCount = words.size + runningCount.getOption.getOrElse(0L)
> runningCount.update(newCount)
>(word, newCount)
> }
> dataset   // type 
> is Dataset[String]
>   .groupByKey[String](w => w) // generates 
> KeyValueGroupedDataset[String, String]
>   .mapGroupsWithState[Long, (String, Long)](stateFunc)// returns 
> Dataset[(String, Long)]
> {code}
> *Future Directions*
> - Timeout based state expiration (that has not received data for a while)
> - General expression based expiration 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19067) mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)

2017-03-06 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897868#comment-15897868
 ] 

Amit Sela commented on SPARK-19067:
---

Sweet!
I will make time tomorrow to go over the PR thoroughly (We're on ~opposite 
timezones ;) ). 
I also see a note about State Store API, which is something I'm really looking 
forward to. Any news there ?

> mapGroupsWithState - arbitrary stateful operations with Structured Streaming 
> (similar to DStream.mapWithState)
> --
>
> Key: SPARK-19067
> URL: https://issues.apache.org/jira/browse/SPARK-19067
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Michael Armbrust
>Assignee: Tathagata Das
>Priority: Critical
>
> Right now the only way to do stateful operations with with Aggregator or 
> UDAF.  However, this does not give users control of emission or expiration of 
> state making it hard to implement things like sessionization.  We should add 
> a more general construct (probably similar to {{DStream.mapWithState}}) to 
> structured streaming. Here is the design. 
> *Requirements*
> - Users should be able to specify a function that can do the following
> - Access the input row corresponding to a key
> - Access the previous state corresponding to a key
> - Optionally, update or remove the state
> - Output any number of new rows (or none at all)
> *Proposed API*
> {code}
> //  New methods on KeyValueGroupedDataset 
> class KeyValueGroupedDataset[K, V] {  
>   // Scala friendly
>   def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], 
> State[S]) => U)
> def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, 
> Iterator[V], State[S]) => Iterator[U])
>   // Java friendly
>def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, 
> R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
>def flatMapGroupsWithState[S, U](func: 
> FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], 
> resultEncoder: Encoder[U])
> }
> // --- New Java-friendly function classes --- 
> public interface MapGroupsWithStateFunction extends Serializable {
>   R call(K key, Iterator values, state: State) throws Exception;
> }
> public interface FlatMapGroupsWithStateFunction extends 
> Serializable {
>   Iterator call(K key, Iterator values, state: State) throws 
> Exception;
> }
> // -- Wrapper class for state data -- 
> trait KeyedState[S] {
>   def exists(): Boolean   
>   def get(): S// throws Exception is state does not 
> exist
>   def getOption(): Option[S]   
>   def update(newState: S): Unit
>   def remove(): Unit  // exists() will be false after this
> }
> {code}
> Key Semantics of the State class
> - The state can be null.
> - If the state.remove() is called, then state.exists() will return false, and 
> getOption will returm None.
> - After that state.update(newState) is called, then state.exists() will 
> return true, and getOption will return Some(...). 
> - None of the operations are thread-safe. This is to avoid memory barriers.
> *Usage*
> {code}
> val stateFunc = (word: String, words: Iterator[String, runningCount: 
> KeyedState[Long]) => {
> val newCount = words.size + runningCount.getOption.getOrElse(0L)
> runningCount.update(newCount)
>(word, newCount)
> }
> dataset   // type 
> is Dataset[String]
>   .groupByKey[String](w => w) // generates 
> KeyValueGroupedDataset[String, String]
>   .mapGroupsWithState[Long, (String, Long)](stateFunc)// returns 
> Dataset[(String, Long)]
> {code}
> *Future Directions*
> - Timeout based state expiration (that has not received data for a while)
> - General expression based expiration 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19067) mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)

2017-03-06 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896866#comment-15896866
 ] 

Amit Sela commented on SPARK-19067:
---

It depends, will those timers be "resetable" ? So that once I visit the state 
on timeout I can re-set a new timeout ? if so, that could work.
You can get more insight of what I'm talking about in my implementation of 
Triggers in Apache Beam: 
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java#L92
I used {{updateStateByKey}} because I need to visit the state even in case 
there are no updates - for example, if the Watermark passed the end-of-window 
and it is time to fire a result (based on event-time). If the new 
{{mapWithState}} would support resetable timers, I can keep the "next time/s to 
fire" in the state and set the timer to the closest time so it would initiate 
the visit instead of re-visiting all state as I do today.
Does this make sense ?

> mapGroupsWithState - arbitrary stateful operations with Structured Streaming 
> (similar to DStream.mapWithState)
> --
>
> Key: SPARK-19067
> URL: https://issues.apache.org/jira/browse/SPARK-19067
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Michael Armbrust
>Assignee: Tathagata Das
>Priority: Critical
>
> Right now the only way to do stateful operations with with Aggregator or 
> UDAF.  However, this does not give users control of emission or expiration of 
> state making it hard to implement things like sessionization.  We should add 
> a more general construct (probably similar to {{DStream.mapWithState}}) to 
> structured streaming. Here is the design. 
> *Requirements*
> - Users should be able to specify a function that can do the following
> - Access the input row corresponding to a key
> - Access the previous state corresponding to a key
> - Optionally, update or remove the state
> - Output any number of new rows (or none at all)
> *Proposed API*
> {code}
> //  New methods on KeyValueGroupedDataset 
> class KeyValueGroupedDataset[K, V] {  
>   // Scala friendly
>   def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], 
> State[S]) => U)
> def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, 
> Iterator[V], State[S]) => Iterator[U])
>   // Java friendly
>def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, 
> R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
>def flatMapGroupsWithState[S, U](func: 
> FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], 
> resultEncoder: Encoder[U])
> }
> // --- New Java-friendly function classes --- 
> public interface MapGroupsWithStateFunction extends Serializable {
>   R call(K key, Iterator values, state: State) throws Exception;
> }
> public interface FlatMapGroupsWithStateFunction extends 
> Serializable {
>   Iterator call(K key, Iterator values, state: State) throws 
> Exception;
> }
> // -- Wrapper class for state data -- 
> trait KeyedState[S] {
>   def exists(): Boolean   
>   def get(): S// throws Exception is state does not 
> exist
>   def getOption(): Option[S]   
>   def update(newState: S): Unit
>   def remove(): Unit  // exists() will be false after this
> }
> {code}
> Key Semantics of the State class
> - The state can be null.
> - If the state.remove() is called, then state.exists() will return false, and 
> getOption will returm None.
> - After that state.update(newState) is called, then state.exists() will 
> return true, and getOption will return Some(...). 
> - None of the operations are thread-safe. This is to avoid memory barriers.
> *Usage*
> {code}
> val stateFunc = (word: String, words: Iterator[String, runningCount: 
> KeyedState[Long]) => {
> val newCount = words.size + runningCount.getOption.getOrElse(0L)
> runningCount.update(newCount)
>(word, newCount)
> }
> dataset   // type 
> is Dataset[String]
>   .groupByKey[String](w => w) // generates 
> KeyValueGroupedDataset[String, String]
>   .mapGroupsWithState[Long, (String, Long)](stateFunc)// returns 
> Dataset[(String, Long)]
> {code}
> *Future Directions*
> - Timeout based state expiration (that has not received data for a while)
> - General expression based expiration 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (SPARK-19067) mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)

2017-02-09 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15859228#comment-15859228
 ] 

Amit Sela commented on SPARK-19067:
---

[~tdas] I would like to add another feature to consider, that could give great 
flexibility into stateful operators in Spark.

We should be able to react with the state for more than just an update (new 
input) or timeout - this way you could fire based on things like Watermarks, 
Timers etc.

I'm currently working on this using {{updateStateByKey}} for the Beam Spark 
runner, but this could be made much more efficient if Spark implements this in 
a low-level and could also provide Spark with Watermark-based triggers and 
Timers (user sets a Timer to react, once the Timer is done the applied function 
could run doing whatever it does, and a Timer could be reset or dropped).

The simplest thing to do would be to simply make this feature here be able to 
scan through the entire state as well (not just when there's new input), and 
since this could be expensive, it could happen every X batches (configured) - 
doesn't this happened anyway today ? on clean-up or checkpointing ?

On top of this, future development of Watermark-based triggers and Timers would 
be easier:
# Scanning through the state to fire if reached Watermark.
# Scanning through the state to check if user-defined Timers are ready for 
action.

WDYT ? 
   

> mapGroupsWithState - arbitrary stateful operations with Structured Streaming 
> (similar to DStream.mapWithState)
> --
>
> Key: SPARK-19067
> URL: https://issues.apache.org/jira/browse/SPARK-19067
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Michael Armbrust
>Assignee: Tathagata Das
>Priority: Critical
>
> Right now the only way to do stateful operations with with Aggregator or 
> UDAF.  However, this does not give users control of emission or expiration of 
> state making it hard to implement things like sessionization.  We should add 
> a more general construct (probably similar to {{DStream.mapWithState}}) to 
> structured streaming. Here is the design. 
> *Requirements*
> - Users should be able to specify a function that can do the following
> - Access the input row corresponding to a key
> - Access the previous state corresponding to a key
> - Optionally, update or remove the state
> - Output any number of new rows (or none at all)
> *Proposed API*
> {code}
> //  New methods on KeyValueGroupedDataset 
> class KeyValueGroupedDataset[K, V] {  
>   // Scala friendly
>   def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], 
> State[S]) => U)
> def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, 
> Iterator[V], State[S]) => Iterator[U])
>   // Java friendly
>def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, 
> R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
>def flatMapGroupsWithState[S, U](func: 
> FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], 
> resultEncoder: Encoder[U])
> }
> // --- New Java-friendly function classes --- 
> public interface MapGroupsWithStateFunction extends Serializable {
>   R call(K key, Iterator values, state: State) throws Exception;
> }
> public interface FlatMapGroupsWithStateFunction extends 
> Serializable {
>   Iterator call(K key, Iterator values, state: State) throws 
> Exception;
> }
> // -- Wrapper class for state data -- 
> trait KeyedState[S] {
>   def exists(): Boolean   
>   def get(): S// throws Exception is state does not 
> exist
>   def getOption(): Option[S]   
>   def update(newState: S): Unit
>   def remove(): Unit  // exists() will be false after this
> }
> {code}
> Key Semantics of the State class
> - The state can be null.
> - If the state.remove() is called, then state.exists() will return false, and 
> getOption will returm None.
> - After that state.update(newState) is called, then state.exists() will 
> return true, and getOption will return Some(...). 
> - None of the operations are thread-safe. This is to avoid memory barriers.
> *Usage*
> {code}
> val stateFunc = (word: String, words: Iterator[String, runningCount: 
> KeyedState[Long]) => {
> val newCount = words.size + runningCount.getOption.getOrElse(0L)
> runningCount.update(newCount)
>(word, newCount)
> }
> dataset   // type 
> is Dataset[String]
>   .groupByKey[String](w => w) // generates 
> KeyValueGroupedDataset[String, 

[jira] [Commented] (SPARK-10816) EventTime based sessionization

2016-11-17 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15674147#comment-15674147
 ] 

Amit Sela commented on SPARK-10816:
---

[~rxin] it might be worth taking into account a more generic implementation of 
"Merging Windows".
Sessions are merging windows that use "gap duration" to determine when to close 
the session, such that if we say that the first element arrives in window 
{{[event_time1, event_time1 + gap_duration)}} and the next one at 
{{[event_time2, event_time2 + gap_duration)}} and {{event_time1 < 
event_time2}}, their combined value will belong to {{[event_time1, event_time2 
+ gap_duration)}}, right ?
But the same "merge" of windows could very well be determined by a "close 
session" element (using Kafka for example would guarantee order of messages), 
or any user defined logic for that matter, as long as the "merge function" is 
provided by the user.
Of course providing Sessions API out-of-the-box would prove most useful as it 
is the most common, but I don't see any downside to also have a more "advanced" 
API here.
Thanks!

> EventTime based sessionization
> --
>
> Key: SPARK-10816
> URL: https://issues.apache.org/jira/browse/SPARK-10816
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Reynold Xin
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16607) Aggregator with null initialisation will result in null

2016-07-18 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15382602#comment-15382602
 ] 

Amit Sela commented on SPARK-16607:
---

Copy of the thread discussing this in the user mailing list:

Aggregator (Spark 2.0) skips aggregation is zero(0 returns null

Amit Sela 
Jun 26

to user

Sometimes, the BUF for the aggregator may depend on the actual input.. and 
while this passes the responsibility to handle null in merge/reduce to the 
developer, it sounds fine to me if he is the one who put null in zero() anyway.
Now, it seems that the aggregation is skipped entirely when zero() = null. Not 
sure if that was the behaviour in 1.6

Is this behaviour wanted ? 

Thanks,
Amit

Aggregator example:

public static class Agg extends Aggregator, Integer, 
Integer> {

  @Override
  public Integer zero() {
return null;
  }

  @Override
  public Integer reduce(Integer b, Tuple2 a) {
if (b == null) {
  b = 0;
}
return b + a._2();
  }

  @Override
  public Integer merge(Integer b1, Integer b2) {
if (b1 == null) {
  return b2;
} else if (b2 == null) {
  return b1;
} else {
  return b1 + b2;
}
  }

Takeshi Yamamuro 
Jun 26

to me, user

Hi,

This behaviour seems to be expected because you must ensure `b + zero() = b`
The your case `b + null = null` breaks this rule.
This is the same with v1.6.1.
See: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57

// maropu


Amit Sela 
Jun 26

to Takeshi, user

Not sure about what's the rule in case of `b + null = null` but the same code 
works perfectly in 1.6.1, just tried it..


Takeshi Yamamuro
Jun 26

to me, user

Whatever it is, this is expected; if an initial value is null, spark codegen 
removes all the aggregates.
See: 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199

// maropu


Amit Sela 
Jun 26

to Takeshi, user

This "if (value == null)" condition you point to exists in 1.6 branch as well, 
so that's probably not the reason. 


Takeshi Yamamuro
Jun 26

to me, user

No, TypedAggregateExpression that uses Aggregator#zero is different between 
v2.0 and v1.6.
v2.0: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L91
v1.6: 
https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L115

// maropu


Amit Sela 
Jun 27

to Takeshi, user

OK. I see that, but the current (provided) implementations are very naive - 
Sum, Count, Average -let's take Max for example: I guess zero() would be set to 
some value like Long.MIN_VALUE, but what if you trigger (I assume in the future 
Spark streaming will support time-based triggers) for a result and there are no 
events ? 

And like I said, for a more general use case: What if my zero() function 
depends on my input ?

I just don't see the benefit of this behaviour, though I realise this is the 
implementation. 

Thanks,
Amit


Koert Kuipers 
Jun 30

to me, Takeshi, user

its the difference between a semigroup and a monoid, and yes max does not 
easily fit into a monoid.

see also discussion here:
https://issues.apache.org/jira/browse/SPARK-15598


Amit Sela 
Jul 2

to Koert, Takeshi, user

Thanks for pointing that Koert!

I understand now why zero() and not init(a: IN), though I still don't see a 
good reason to skip the aggregation if zero returns null. 
If the user did it, it's on him to take care of null cases in reduce/merge, but 
it opens-up the possibility to use the input to create the buffer for the 
aggregator. 
Wouldn't that at least enable the functionality discussed in SPARK-15598 ? 
without changing how the Aggregator works.

I bypassed it by using Optional (Guava) because I'm using the Java API, but 
it's a bit cumbersome...

Thanks,
Amit


Koert Kuipers
Jul 2

to me, Takeshi, user

valid functions can be written for reduce and merge when the zero is null. so 
not being able to provide null as the initial value is something troublesome.

i guess the proper way to do this is use Option, and have the None be the zero, 
which is what i assumed you did?
unfortunately last time i tried using scala Options with spark Aggregators it 
didnt work quite well. see:
https://issues.apache.org/jira/browse/SPARK-15810

lifting a semigroup into a monoid like this using Option is fairly typical, so 
either null or None has to work or else this api will be somewhat unpleasant to 
use for anything practical.

for an example of this lifting on a related Aggregator class:

[jira] [Commented] (SPARK-15810) Aggregator doesn't play nice with Option

2016-07-18 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15382586#comment-15382586
 ] 

Amit Sela commented on SPARK-15810:
---

opened https://issues.apache.org/jira/browse/SPARK-16607

> Aggregator doesn't play nice with Option
> 
>
> Key: SPARK-15810
> URL: https://issues.apache.org/jira/browse/SPARK-15810
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
> Environment: spark 2.0.0-SNAPSHOT
>Reporter: koert kuipers
>
> {code}
> val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
> val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) }
> val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), 
> Option[Int], Option[Int]]{
>   def zero: Option[Int] = None
>   def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = 
> b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2)
>   def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => 
> b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2)
>   def finish(reduction: Option[Int]): Option[Int] = reduction
>   def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
>   def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
> }.toColumn)
> ds3.printSchema
> ds3.show
> {code}
> i get as output a somewhat odd looking schema, and after that the program 
> just hangs pinning one cpu at 100%. the data never shows.
> output:
> {noformat}
> root
>  |-- value: string (nullable = true)
>  |-- $anon$1(scala.Tuple2): struct (nullable = true)
>  ||-- value: integer (nullable = true)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16607) Aggregator with null initialisation will result in null

2016-07-18 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated SPARK-16607:
--
Description: 
Java code example:
{code}
SparkSession session = SparkSession.builder()
   .appName("TestAggregatorJava")
   .master("local[*]")
   .getOrCreate();
Dataset> ds1 = session.createDataset(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("a", 2),
new Tuple2<>("a", 3)
), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds2 = ds1.map(
new MapFunction, Tuple2>() {

  @Override
  public Tuple2 call(Tuple2 value) throws 
Exception {
if (value._2() > 1) {
  return value;
} else {
  return new Tuple2<>(value._1, null);
}
  }
}, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds3 = ds2.groupByKey(
new MapFunction, String>() {

  @Override
  public String call(Tuple2 value) throws Exception {
return value._1();
  }
}, Encoders.STRING()).agg(new Aggregator, Integer, 
Integer>() {
  @Override
  public Integer zero() {
return null;
  }

  @Override
  public Integer reduce(Integer b, Tuple2 a) {
return merge(b, a._2());
  }

  @Override
  public Integer merge(Integer b1, Integer b2) {
if (b1 == null) {
  return b2;
} else if (b2 == null){
  return b1;
} else {
  return b1 + b2;
}
  }

  @Override
  public Integer finish(Integer reduction) {
return reduction;
  }

  @Override
  public Encoder bufferEncoder() {
return Encoders.INT();
  }

  @Override
  public Encoder outputEncoder() {
return Encoders.INT();
  }
}.toColumn());

ds3.printSchema();
ds3.show();
  }
{code} 
I get this schema:
{noformat}
root
 |-- value: string (nullable = true)
 |-- (scala.Tuple2): integer (nullable = true)
{noformat}
And this result:
{noformat}
+-+--+
|value|(scala.Tuple2)|
+-+--+
|a|  null|
+-+--+
{noformat}

The same happens with Scala, simply wrap Scala's Int with a case class (because 
it defaults to 0) and you'll get the same result.

  was:
Java code example:
{code}
SparkSession session = SparkSession.builder()
   .appName("TestAggregatorJava")
   .master("local[*]")
   .getOrCreate();
Dataset> ds1 = session.createDataset(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("a", 2),
new Tuple2<>("a", 3)
), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds2 = ds1.map(
new MapFunction, Tuple2>() {

  @Override
  public Tuple2 call(Tuple2 value) throws 
Exception {
if (value._2() > 1) {
  return value;
} else {
  return new Tuple2<>(value._1, null);
}
  }
}, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds3 = ds2.groupByKey(
new MapFunction, String>() {

  @Override
  public String call(Tuple2 value) throws Exception {
return value._1();
  }
}, Encoders.STRING()).agg(new Aggregator, Integer, 
Integer>() {
  @Override
  public Integer zero() {
return null;
  }

  @Override
  public Integer reduce(Integer b, Tuple2 a) {
return merge(b, a._2());
  }

  @Override
  public Integer merge(Integer b1, Integer b2) {
if (b1 == null) {
  return b2;
} else if (b2 == null){
  return b1;
} else {
  return b1 + b2;
}
  }

  @Override
  public Integer finish(Integer reduction) {
return reduction;
  }

  @Override
  public Encoder bufferEncoder() {
return Encoders.INT();
  }

  @Override
  public Encoder outputEncoder() {
return Encoders.INT();
  }
}.toColumn());

ds3.printSchema();
ds3.show();
  }
{code} 
I get this schema:
{noformat}
root
 |-- value: string (nullable = true)
 |-- (scala.Tuple2): integer (nullable = true)
{noformat}
And this result:
{noformat}
+-+--+
|value|(scala.Tuple2)|
+-+--+
|

[jira] [Created] (SPARK-16607) Aggregator with null initialisation will result in null

2016-07-18 Thread Amit Sela (JIRA)
Amit Sela created SPARK-16607:
-

 Summary: Aggregator with null initialisation will result in null 
 Key: SPARK-16607
 URL: https://issues.apache.org/jira/browse/SPARK-16607
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
 Environment: spark 2.0-branch
Reporter: Amit Sela


Java code example:
{code}
SparkSession session = SparkSession.builder()
   .appName("TestAggregatorJava")
   .master("local[*]")
   .getOrCreate();
Dataset> ds1 = session.createDataset(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("a", 2),
new Tuple2<>("a", 3)
), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds2 = ds1.map(
new MapFunction, Tuple2>() {

  @Override
  public Tuple2 call(Tuple2 value) throws 
Exception {
if (value._2() > 1) {
  return value;
} else {
  return new Tuple2<>(value._1, null);
}
  }
}, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds3 = ds2.groupByKey(
new MapFunction, String>() {

  @Override
  public String call(Tuple2 value) throws Exception {
return value._1();
  }
}, Encoders.STRING()).agg(new Aggregator, Integer, 
Integer>() {
  @Override
  public Integer zero() {
return null;
  }

  @Override
  public Integer reduce(Integer b, Tuple2 a) {
return merge(b, a._2());
  }

  @Override
  public Integer merge(Integer b1, Integer b2) {
if (b1 == null) {
  return b2;
} else if (b2 == null){
  return b1;
} else {
  return b1 + b2;
}
  }

  @Override
  public Integer finish(Integer reduction) {
return reduction;
  }

  @Override
  public Encoder bufferEncoder() {
return Encoders.INT();
  }

  @Override
  public Encoder outputEncoder() {
return Encoders.INT();
  }
}.toColumn());

ds3.printSchema();
ds3.show();
  }
{code} 
I get this schema:
{noformat}
root
 |-- value: string (nullable = true)
 |-- (scala.Tuple2): integer (nullable = true)
{noformat}
And this result:
{noformat}
+-+--+
|value|(scala.Tuple2)|
+-+--+
|a|  null|
+-+--+
{noformat}

The same happens with Scala, just wrap Scala's Int with a case class (because 
it defaults to 0) and you'll get the same result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15810) Aggregator doesn't play nice with Option

2016-07-11 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370494#comment-15370494
 ] 

Amit Sela commented on SPARK-15810:
---

[~rxin] what's your take on the Java API issue with null ? does this belong 
here ? do you think it should be a separate issue ?

> Aggregator doesn't play nice with Option
> 
>
> Key: SPARK-15810
> URL: https://issues.apache.org/jira/browse/SPARK-15810
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
> Environment: spark 2.0.0-SNAPSHOT
>Reporter: koert kuipers
>
> {code}
> val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
> val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) }
> val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), 
> Option[Int], Option[Int]]{
>   def zero: Option[Int] = None
>   def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = 
> b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2)
>   def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => 
> b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2)
>   def finish(reduction: Option[Int]): Option[Int] = reduction
>   def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
>   def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
> }.toColumn)
> ds3.printSchema
> ds3.show
> {code}
> i get as output a somewhat odd looking schema, and after that the program 
> just hangs pinning one cpu at 100%. the data never shows.
> output:
> {noformat}
> root
>  |-- value: string (nullable = true)
>  |-- $anon$1(scala.Tuple2): struct (nullable = true)
>  ||-- value: integer (nullable = true)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-16474) Global Aggregation doesn't seem to work at all

2016-07-10 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela closed SPARK-16474.
-
Resolution: Not A Problem

It seems as if the right way to use the agg() API directly on Dataset/DataFrame 
is with select() or providing an implicit conversion.

> Global Aggregation doesn't seem to work at all 
> ---
>
> Key: SPARK-16474
> URL: https://issues.apache.org/jira/browse/SPARK-16474
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 1.6.2, 2.0.0
>Reporter: Amit Sela
>
> Executing a global aggregation (not grouped by key) fails.
> Take the following code for example:
> {code}
> val session = SparkSession.builder()
>   .appName("TestGlobalAggregator")
>   .master("local[*]")
>   .getOrCreate()
> import session.implicits._
> val ds1 = List(1, 2, 3).toDS
> val ds2 = ds1.agg(
>   new Aggregator[Int, Int, Int]{
>   def zero: Int = 0
>   def reduce(b: Int, a: Int): Int = b + a
>   def merge(b1: Int, b2: Int): Int = b1 + b2
>   def finish(reduction: Int): Int = reduction
>   def bufferEncoder: Encoder[Int] = implicitly[Encoder[Int]]
>   def outputEncoder: Encoder[Int] = implicitly[Encoder[Int]]
> }.toColumn)
> ds2.printSchema
> ds2.show
> {code}
> I would expect the result to be 6, but instead I get the following exception:
> {noformat}
> java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to java.lang.Integer 
> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
> .
> {noformat}
> Trying the same code on DataFrames in 1.6.2 results in:
> {noformat}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved 
> operator 'Aggregate [(anon$1(),mode=Complete,isDistinct=false) AS 
> anon$1()#8]; 
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16474) Global Aggregation doesn't seem to work at all

2016-07-10 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369829#comment-15369829
 ] 

Amit Sela commented on SPARK-16474:
---

I thought the bufferEncoder is supposed to take care of that.. unless it's 
because of the input ? would that be solved by 
https://issues.apache.org/jira/browse/SPARK-15769 ?

Anyway, I'll close this, though this is a bit confusing because the API looks 
the same as if I was doing groupByKey().agg(), and in this case it works. Is 
this because the groupByKey provides the conversion?

> Global Aggregation doesn't seem to work at all 
> ---
>
> Key: SPARK-16474
> URL: https://issues.apache.org/jira/browse/SPARK-16474
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 1.6.2, 2.0.0
>Reporter: Amit Sela
>
> Executing a global aggregation (not grouped by key) fails.
> Take the following code for example:
> {code}
> val session = SparkSession.builder()
>   .appName("TestGlobalAggregator")
>   .master("local[*]")
>   .getOrCreate()
> import session.implicits._
> val ds1 = List(1, 2, 3).toDS
> val ds2 = ds1.agg(
>   new Aggregator[Int, Int, Int]{
>   def zero: Int = 0
>   def reduce(b: Int, a: Int): Int = b + a
>   def merge(b1: Int, b2: Int): Int = b1 + b2
>   def finish(reduction: Int): Int = reduction
>   def bufferEncoder: Encoder[Int] = implicitly[Encoder[Int]]
>   def outputEncoder: Encoder[Int] = implicitly[Encoder[Int]]
> }.toColumn)
> ds2.printSchema
> ds2.show
> {code}
> I would expect the result to be 6, but instead I get the following exception:
> {noformat}
> java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to java.lang.Integer 
> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
> .
> {noformat}
> Trying the same code on DataFrames in 1.6.2 results in:
> {noformat}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved 
> operator 'Aggregate [(anon$1(),mode=Complete,isDistinct=false) AS 
> anon$1()#8]; 
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16474) Global Aggregation doesn't seem to work at all

2016-07-10 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369706#comment-15369706
 ] 

Amit Sela commented on SPARK-16474:
---

Thanks [~koert] that works.

> Global Aggregation doesn't seem to work at all 
> ---
>
> Key: SPARK-16474
> URL: https://issues.apache.org/jira/browse/SPARK-16474
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 1.6.2, 2.0.0
>Reporter: Amit Sela
>
> Executing a global aggregation (not grouped by key) fails.
> Take the following code for example:
> {code}
> val session = SparkSession.builder()
>   .appName("TestGlobalAggregator")
>   .master("local[*]")
>   .getOrCreate()
> import session.implicits._
> val ds1 = List(1, 2, 3).toDS
> val ds2 = ds1.agg(
>   new Aggregator[Int, Int, Int]{
>   def zero: Int = 0
>   def reduce(b: Int, a: Int): Int = b + a
>   def merge(b1: Int, b2: Int): Int = b1 + b2
>   def finish(reduction: Int): Int = reduction
>   def bufferEncoder: Encoder[Int] = implicitly[Encoder[Int]]
>   def outputEncoder: Encoder[Int] = implicitly[Encoder[Int]]
> }.toColumn)
> ds2.printSchema
> ds2.show
> {code}
> I would expect the result to be 6, but instead I get the following exception:
> {noformat}
> java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
> to java.lang.Integer 
> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
> .
> {noformat}
> Trying the same code on DataFrames in 1.6.2 results in:
> {noformat}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved 
> operator 'Aggregate [(anon$1(),mode=Complete,isDistinct=false) AS 
> anon$1()#8]; 
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-15810) Aggregator doesn't play nice with Option

2016-07-10 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369496#comment-15369496
 ] 

Amit Sela edited comment on SPARK-15810 at 7/10/16 2:53 PM:


Just ran this exact code, prefixed by:
{code}
val session = 
SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate()
import session.implicits._
{code}
This is the schema I get:
{noformat}
root
 |-- value: string (nullable = true)
 |-- anon$1(scala.Tuple2): struct (nullable = true)
 ||-- value: integer (nullable = true)
{noformat}
And this is the output:
{noformat}
+-++
|value|anon$1(scala.Tuple2)|
+-++
|a| [5]|
+-++
{noformat}

So I don't know about the schema looking odd, but I get a result.


was (Author: amitsela):
Just ran this exact code, prefixed by:
{code}
val session = 
SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate()
import session.implicits._
{code}
This is the schema I get:
{noformat}
root
 |-- value: string (nullable = true)
 |-- anon$1(scala.Tuple2): struct (nullable = true)
 ||-- value: integer (nullable = true)
{noformat}
And this is the output:
{noformat}
+-++
|value|anon$1(scala.Tuple2)|
+-++
|a| [5]|
+-++
{noformat}

> Aggregator doesn't play nice with Option
> 
>
> Key: SPARK-15810
> URL: https://issues.apache.org/jira/browse/SPARK-15810
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
> Environment: spark 2.0.0-SNAPSHOT
>Reporter: koert kuipers
>
> {code}
> val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
> val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) }
> val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), 
> Option[Int], Option[Int]]{
>   def zero: Option[Int] = None
>   def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = 
> b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2)
>   def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => 
> b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2)
>   def finish(reduction: Option[Int]): Option[Int] = reduction
>   def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
>   def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
> }.toColumn)
> ds3.printSchema
> ds3.show
> {code}
> i get as output a somewhat odd looking schema, and after that the program 
> just hangs pinning one cpu at 100%. the data never shows.
> output:
> {noformat}
> root
>  |-- value: string (nullable = true)
>  |-- $anon$1(scala.Tuple2): struct (nullable = true)
>  ||-- value: integer (nullable = true)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-16474) Global Aggregation doesn't seem to work at all

2016-07-10 Thread Amit Sela (JIRA)
Amit Sela created SPARK-16474:
-

 Summary: Global Aggregation doesn't seem to work at all 
 Key: SPARK-16474
 URL: https://issues.apache.org/jira/browse/SPARK-16474
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 1.6.2, 2.0.0
Reporter: Amit Sela


Executing a global aggregation (not grouped by key) fails.
Take the following code for example:
{code}
val session = SparkSession.builder()
  .appName("TestGlobalAggregator")
  .master("local[*]")
  .getOrCreate()
import session.implicits._

val ds1 = List(1, 2, 3).toDS
val ds2 = ds1.agg(
  new Aggregator[Int, Int, Int]{

  def zero: Int = 0

  def reduce(b: Int, a: Int): Int = b + a

  def merge(b1: Int, b2: Int): Int = b1 + b2

  def finish(reduction: Int): Int = reduction

  def bufferEncoder: Encoder[Int] = implicitly[Encoder[Int]]

  def outputEncoder: Encoder[Int] = implicitly[Encoder[Int]]
}.toColumn)

ds2.printSchema
ds2.show
{code}
I would expect the result to be 6, but instead I get the following exception:
{noformat}
java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast 
to java.lang.Integer 
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
.
{noformat}
Trying the same code on DataFrames in 1.6.2 results in:
{noformat}
Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved 
operator 'Aggregate [(anon$1(),mode=Complete,isDistinct=false) AS anon$1()#8]; 
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
..
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-15810) Aggregator doesn't play nice with Option

2016-07-10 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369509#comment-15369509
 ] 

Amit Sela edited comment on SPARK-15810 at 7/10/16 9:01 AM:


Running the (sort of) same Java code:
{code}
SparkSession session = SparkSession.builder()
   .appName("TestAggregatorJava")
   .master("local[*]")
   .getOrCreate();
Dataset> ds1 = session.createDataset(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("a", 2),
new Tuple2<>("a", 3)
), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds2 = ds1.map(
new MapFunction, Tuple2>() {

  @Override
  public Tuple2 call(Tuple2 value) throws 
Exception {
if (value._2() > 1) {
  return value;
} else {
  return new Tuple2<>(value._1, null);
}
  }
}, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds3 = ds2.groupByKey(
new MapFunction, String>() {

  @Override
  public String call(Tuple2 value) throws Exception {
return value._1();
  }
}, Encoders.STRING()).agg(new Aggregator, Integer, 
Integer>() {
  @Override
  public Integer zero() {
return null;
  }

  @Override
  public Integer reduce(Integer b, Tuple2 a) {
return merge(b, a._2());
  }

  @Override
  public Integer merge(Integer b1, Integer b2) {
if (b1 == null) {
  return b2;
} else if (b2 == null){
  return b1;
} else {
  return b1 + b2;
}
  }

  @Override
  public Integer finish(Integer reduction) {
return reduction;
  }

  @Override
  public Encoder bufferEncoder() {
return Encoders.INT();
  }

  @Override
  public Encoder outputEncoder() {
return Encoders.INT();
  }
}.toColumn());

ds3.printSchema();
ds3.show();
  }
{code} 
I get this schema:
{noformat}
root
 |-- value: string (nullable = true)
 |-- (scala.Tuple2): integer (nullable = true)
{noformat}
And this result:
{noformat}
+-+--+
|value|(scala.Tuple2)|
+-+--+
|a|  null|
+-+--+
{noformat}

As for Scala, it's clear that `Option` is preferred on `null`, but because the 
Dataset API is supposed to support Java as well, it should not discard the 
aggregation if the zero method returns null.
For Java, I currently use Guava's `Optional` but that just seems cumbersome to 
me.


was (Author: amitsela):
Running the (sort of) same Java code:
{code}
SparkSession session = SparkSession.builder()
   .appName("TestAggregatorJava")
   .master("local[*]")
   .getOrCreate();
Dataset> ds1 = session.createDataset(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("a", 2),
new Tuple2<>("a", 3)
), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds2 = ds1.map(
new MapFunction, Tuple2>() {

  @Override
  public Tuple2 call(Tuple2 value) throws 
Exception {
if (value._2() > 1) {
  return value;
} else {
  return new Tuple2<>(value._1, null);
}
  }
}, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds3 = ds2.groupByKey(
new MapFunction, String>() {

  @Override
  public String call(Tuple2 value) throws Exception {
return value._1();
  }
}, Encoders.STRING()).agg(new Aggregator, Integer, 
Integer>() {
  @Override
  public Integer zero() {
return null;
  }

  @Override
  public Integer reduce(Integer b, Tuple2 a) {
return merge(b, a._2());
  }

  @Override
  public Integer merge(Integer b1, Integer b2) {
if (b1 == null) {
  return b2;
} else if (b2 == null){
  return b1;
} else {
  return b1 + b2;
}
  }

  @Override
  public Integer finish(Integer reduction) {
return reduction;
  }

  @Override
  public Encoder bufferEncoder() {
return Encoders.INT();
  }

  @Override
  public Encoder outputEncoder() {
return Encoders.INT();
  }
}.toColumn());

 

[jira] [Comment Edited] (SPARK-15810) Aggregator doesn't play nice with Option

2016-07-10 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369509#comment-15369509
 ] 

Amit Sela edited comment on SPARK-15810 at 7/10/16 8:59 AM:


Running the (sort of) same Java code:
{code}
SparkSession session = SparkSession.builder()
   .appName("TestAggregatorJava")
   .master("local[*]")
   .getOrCreate();
Dataset> ds1 = session.createDataset(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("a", 2),
new Tuple2<>("a", 3)
), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds2 = ds1.map(
new MapFunction, Tuple2>() {

  @Override
  public Tuple2 call(Tuple2 value) throws 
Exception {
if (value._2() > 1) {
  return value;
} else {
  return new Tuple2<>(value._1, null);
}
  }
}, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds3 = ds2.groupByKey(
new MapFunction, String>() {

  @Override
  public String call(Tuple2 value) throws Exception {
return value._1();
  }
}, Encoders.STRING()).agg(new Aggregator, Integer, 
Integer>() {
  @Override
  public Integer zero() {
return null;
  }

  @Override
  public Integer reduce(Integer b, Tuple2 a) {
return merge(b, a._2());
  }

  @Override
  public Integer merge(Integer b1, Integer b2) {
if (b1 == null) {
  return b2;
} else if (b2 == null){
  return b1;
} else {
  return b1 + b2;
}
  }

  @Override
  public Integer finish(Integer reduction) {
return reduction;
  }

  @Override
  public Encoder bufferEncoder() {
return Encoders.INT();
  }

  @Override
  public Encoder outputEncoder() {
return Encoders.INT();
  }
}.toColumn());

ds3.printSchema();
ds3.show();
  }
{code} 
I get this schema:
{noformat}
root
 |-- value: string (nullable = true)
 |-- (scala.Tuple2): integer (nullable = true)
{noformat}
And this result:
{noformat}
+-+--+
|value|(scala.Tuple2)|
+-+--+
|a|  null|
+-+--+
{noformat}

As for Scala, it's clear that `Option` is preferred on `null`, but because 
Dataset API is supposed to support Java as well, it should not discard the 
aggregation if the zero method returns null.
For Java, I currently use Guava's `Optional` but that just seems cumbersome to 
me.


was (Author: amitsela):
Running the (sort of) same Java code:
{code}
SparkSession session = SparkSession.builder()
   .appName("TestAggregatorJava")
   .master("local[*]")
   .getOrCreate();
Dataset> ds1 = session.createDataset(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("a", 2),
new Tuple2<>("a", 3)
), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds2 = ds1.map(
new MapFunction, Tuple2>() {

  @Override
  public Tuple2 call(Tuple2 value) throws 
Exception {
if (value._2() > 1) {
  return value;
} else {
  return new Tuple2<>(value._1, null);
}
  }
}, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds3 = ds2.groupByKey(
new MapFunction, String>() {

  @Override
  public String call(Tuple2 value) throws Exception {
return value._1();
  }
}, Encoders.STRING()).agg(new Aggregator, Integer, 
Integer>() {
  @Override
  public Integer zero() {
return null;
  }

  @Override
  public Integer reduce(Integer b, Tuple2 a) {
return merge(b, a._2());
  }

  @Override
  public Integer merge(Integer b1, Integer b2) {
if (b1 == null) {
  return b2;
} else if (b2 == null){
  return b1;
} else {
  return b1 + b2;
}
  }

  @Override
  public Integer finish(Integer reduction) {
return reduction;
  }

  @Override
  public Encoder bufferEncoder() {
return Encoders.INT();
  }

  @Override
  public Encoder outputEncoder() {
return Encoders.INT();
  }
}.toColumn());


[jira] [Comment Edited] (SPARK-15810) Aggregator doesn't play nice with Option

2016-07-10 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369509#comment-15369509
 ] 

Amit Sela edited comment on SPARK-15810 at 7/10/16 8:59 AM:


Running the (sort of) same Java code:
{code}
SparkSession session = SparkSession.builder()
   .appName("TestAggregatorJava")
   .master("local[*]")
   .getOrCreate();
Dataset> ds1 = session.createDataset(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("a", 2),
new Tuple2<>("a", 3)
), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds2 = ds1.map(
new MapFunction, Tuple2>() {

  @Override
  public Tuple2 call(Tuple2 value) throws 
Exception {
if (value._2() > 1) {
  return value;
} else {
  return new Tuple2<>(value._1, null);
}
  }
}, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds3 = ds2.groupByKey(
new MapFunction, String>() {

  @Override
  public String call(Tuple2 value) throws Exception {
return value._1();
  }
}, Encoders.STRING()).agg(new Aggregator, Integer, 
Integer>() {
  @Override
  public Integer zero() {
return null;
  }

  @Override
  public Integer reduce(Integer b, Tuple2 a) {
return merge(b, a._2());
  }

  @Override
  public Integer merge(Integer b1, Integer b2) {
if (b1 == null) {
  return b2;
} else if (b2 == null){
  return b1;
} else {
  return b1 + b2;
}
  }

  @Override
  public Integer finish(Integer reduction) {
return reduction;
  }

  @Override
  public Encoder bufferEncoder() {
return Encoders.INT();
  }

  @Override
  public Encoder outputEncoder() {
return Encoders.INT();
  }
}.toColumn());

ds3.printSchema();
ds3.show();
  }
{code} 
I get this schema:
{noformat}
root
 |-- value: string (nullable = true)
 |-- (scala.Tuple2): integer (nullable = true)
{noformat}
And this result:
{noformat}
+-+--+
|value|(scala.Tuple2)|
+-+--+
|a|  null|
+-+--+
{noformat}

As for Scala, it's clear that {noformat}Option{noformat} is preferred on 
`null`, but because Dataset API is supposed to support Java as well, it should 
not discard the aggregation if the zero method returns null.
For Java, I currently use Guava's `Optional` but that just seems cumbersome to 
me.


was (Author: amitsela):
Running the (sort of) same Java code:
{code}
SparkSession session = SparkSession.builder()
   .appName("TestAggregatorJava")
   .master("local[*]")
   .getOrCreate();
Dataset> ds1 = session.createDataset(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("a", 2),
new Tuple2<>("a", 3)
), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds2 = ds1.map(
new MapFunction, Tuple2>() {

  @Override
  public Tuple2 call(Tuple2 value) throws 
Exception {
if (value._2() > 1) {
  return value;
} else {
  return new Tuple2<>(value._1, null);
}
  }
}, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds3 = ds2.groupByKey(
new MapFunction, String>() {

  @Override
  public String call(Tuple2 value) throws Exception {
return value._1();
  }
}, Encoders.STRING()).agg(new Aggregator, Integer, 
Integer>() {
  @Override
  public Integer zero() {
return null;
  }

  @Override
  public Integer reduce(Integer b, Tuple2 a) {
return merge(b, a._2());
  }

  @Override
  public Integer merge(Integer b1, Integer b2) {
if (b1 == null) {
  return b2;
} else if (b2 == null){
  return b1;
} else {
  return b1 + b2;
}
  }

  @Override
  public Integer finish(Integer reduction) {
return reduction;
  }

  @Override
  public Encoder bufferEncoder() {
return Encoders.INT();
  }

  @Override
  public Encoder outputEncoder() {
return Encoders.INT();
  }

[jira] [Commented] (SPARK-15810) Aggregator doesn't play nice with Option

2016-07-10 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369509#comment-15369509
 ] 

Amit Sela commented on SPARK-15810:
---

Running the (sort of) same Java code:
{code}
SparkSession session = SparkSession.builder()
   .appName("TestAggregatorJava")
   .master("local[*]")
   .getOrCreate();
Dataset> ds1 = session.createDataset(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("a", 2),
new Tuple2<>("a", 3)
), Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds2 = ds1.map(
new MapFunction, Tuple2>() {

  @Override
  public Tuple2 call(Tuple2 value) throws 
Exception {
if (value._2() > 1) {
  return value;
} else {
  return new Tuple2<>(value._1, null);
}
  }
}, Encoders.tuple(Encoders.STRING(), Encoders.INT()));
Dataset> ds3 = ds2.groupByKey(
new MapFunction, String>() {

  @Override
  public String call(Tuple2 value) throws Exception {
return value._1();
  }
}, Encoders.STRING()).agg(new Aggregator, Integer, 
Integer>() {
  @Override
  public Integer zero() {
return null;
  }

  @Override
  public Integer reduce(Integer b, Tuple2 a) {
return merge(b, a._2());
  }

  @Override
  public Integer merge(Integer b1, Integer b2) {
if (b1 == null) {
  return b2;
} else if (b2 == null){
  return b1;
} else {
  return b1 + b2;
}
  }

  @Override
  public Integer finish(Integer reduction) {
return reduction;
  }

  @Override
  public Encoder bufferEncoder() {
return Encoders.INT();
  }

  @Override
  public Encoder outputEncoder() {
return Encoders.INT();
  }
}.toColumn());

ds3.printSchema();
ds3.show();
  }
{code} 
I get this schema:
{noformat}
root
 |-- value: string (nullable = true)
 |-- (scala.Tuple2): integer (nullable = true)
{noformat}
And this result:
{noformat}
+-+--+
|value|(scala.Tuple2)|
+-+--+
|a|  null|
+-+--+
{noformat}

As for Scala, it's clear that `Option` is preferred on `null`, but because 
Dataset API is supposed to support Java as well, it should not discard the 
aggregation if the zero method returns null.
For Java, I currently use Guava's `Optional` but that just seems cumbersome to 
me.

> Aggregator doesn't play nice with Option
> 
>
> Key: SPARK-15810
> URL: https://issues.apache.org/jira/browse/SPARK-15810
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
> Environment: spark 2.0.0-SNAPSHOT
>Reporter: koert kuipers
>
> {code}
> val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
> val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) }
> val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), 
> Option[Int], Option[Int]]{
>   def zero: Option[Int] = None
>   def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = 
> b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2)
>   def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => 
> b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2)
>   def finish(reduction: Option[Int]): Option[Int] = reduction
>   def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
>   def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
> }.toColumn)
> ds3.printSchema
> ds3.show
> {code}
> i get as output a somewhat odd looking schema, and after that the program 
> just hangs pinning one cpu at 100%. the data never shows.
> output:
> {noformat}
> root
>  |-- value: string (nullable = true)
>  |-- $anon$1(scala.Tuple2): struct (nullable = true)
>  ||-- value: integer (nullable = true)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-15810) Aggregator doesn't play nice with Option

2016-07-10 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369496#comment-15369496
 ] 

Amit Sela edited comment on SPARK-15810 at 7/10/16 8:28 AM:


Just ran this exact code, prefixed by:
{code}
val session = 
SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate()
import session.implicits._
{code}
This is the schema I get:
{noformat}
root
 |-- value: string (nullable = true)
 |-- anon$1(scala.Tuple2): struct (nullable = true)
 ||-- value: integer (nullable = true)
{noformat}
And this is the output:
{noformat}
+-++
|value|anon$1(scala.Tuple2)|
+-++
|a| [5]|
+-++
{noformat}


was (Author: amitsela):
Just ran this exact code, prefixed by:
{code}
val session = 
SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate()
import session.implicits._
{code}
This is the schema I get:
{noformat}
root
 |-- value: string (nullable = true)
 |-- anon$1(scala.Tuple2): struct (nullable = true)
 ||-- value: integer (nullable = true)
{noformat}
And this is the output:
{noformat}
+-++
|value|anon$1(scala.Tuple2)|
+-++
|a| [5]|
+-++
{noformat}

> Aggregator doesn't play nice with Option
> 
>
> Key: SPARK-15810
> URL: https://issues.apache.org/jira/browse/SPARK-15810
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
> Environment: spark 2.0.0-SNAPSHOT
>Reporter: koert kuipers
>
> {code}
> val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
> val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) }
> val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), 
> Option[Int], Option[Int]]{
>   def zero: Option[Int] = None
>   def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = 
> b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2)
>   def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => 
> b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2)
>   def finish(reduction: Option[Int]): Option[Int] = reduction
>   def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
>   def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
> }.toColumn)
> ds3.printSchema
> ds3.show
> {code}
> i get as output a somewhat odd looking schema, and after that the program 
> just hangs pinning one cpu at 100%. the data never shows.
> output:
> {noformat}
> root
>  |-- value: string (nullable = true)
>  |-- $anon$1(scala.Tuple2): struct (nullable = true)
>  ||-- value: integer (nullable = true)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-15810) Aggregator doesn't play nice with Option

2016-07-10 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369496#comment-15369496
 ] 

Amit Sela edited comment on SPARK-15810 at 7/10/16 8:28 AM:


Just ran this exact code, prefixed by:
{code}
val session = 
SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate()
import session.implicits._
{code}
This is the schema I get:
{noformat}
root
 |-- value: string (nullable = true)
 |-- anon$1(scala.Tuple2): struct (nullable = true)
 ||-- value: integer (nullable = true)
{noformat}
And this is the output:
{noformat}
+-++
|value|anon$1(scala.Tuple2)|
+-++
|a| [5]|
+-++
{noformat}


was (Author: amitsela):
Just ran this exact code, prefixed by:
{code}
val session = 
SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate()
import session.implicits._val session = 
SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate()
import session.implicits._
{code}
This is the schema I get:
{noformat}
root
 |-- value: string (nullable = true)
 |-- anon$1(scala.Tuple2): struct (nullable = true)
 ||-- value: integer (nullable = true)
{noformat}
And this is the output:
{noformat}
+-++
|value|anon$1(scala.Tuple2)|
+-++
|a| [5]|
+-++
{noformat}

> Aggregator doesn't play nice with Option
> 
>
> Key: SPARK-15810
> URL: https://issues.apache.org/jira/browse/SPARK-15810
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
> Environment: spark 2.0.0-SNAPSHOT
>Reporter: koert kuipers
>
> {code}
> val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
> val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) }
> val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), 
> Option[Int], Option[Int]]{
>   def zero: Option[Int] = None
>   def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = 
> b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2)
>   def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => 
> b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2)
>   def finish(reduction: Option[Int]): Option[Int] = reduction
>   def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
>   def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
> }.toColumn)
> ds3.printSchema
> ds3.show
> {code}
> i get as output a somewhat odd looking schema, and after that the program 
> just hangs pinning one cpu at 100%. the data never shows.
> output:
> {noformat}
> root
>  |-- value: string (nullable = true)
>  |-- $anon$1(scala.Tuple2): struct (nullable = true)
>  ||-- value: integer (nullable = true)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15810) Aggregator doesn't play nice with Option

2016-07-10 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369496#comment-15369496
 ] 

Amit Sela commented on SPARK-15810:
---

Just ran this exact code, prefixed by:
{code}
val session = 
SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate()
import session.implicits._val session = 
SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate()
import session.implicits._
{code}
This is the schema I get:
{noformat}
root
 |-- value: string (nullable = true)
 |-- anon$1(scala.Tuple2): struct (nullable = true)
 ||-- value: integer (nullable = true)
{noformat}
And this is the output:
{noformat}
+-++
|value|anon$1(scala.Tuple2)|
+-++
|a| [5]|
+-++
{noformat}

> Aggregator doesn't play nice with Option
> 
>
> Key: SPARK-15810
> URL: https://issues.apache.org/jira/browse/SPARK-15810
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
> Environment: spark 2.0.0-SNAPSHOT
>Reporter: koert kuipers
>
> {code}
> val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
> val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) }
> val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), 
> Option[Int], Option[Int]]{
>   def zero: Option[Int] = None
>   def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = 
> b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2)
>   def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => 
> b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2)
>   def finish(reduction: Option[Int]): Option[Int] = reduction
>   def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
>   def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
> }.toColumn)
> ds3.printSchema
> ds3.show
> {code}
> i get as output a somewhat odd looking schema, and after that the program 
> just hangs pinning one cpu at 100%. the data never shows.
> output:
> {noformat}
> root
>  |-- value: string (nullable = true)
>  |-- $anon$1(scala.Tuple2): struct (nullable = true)
>  ||-- value: integer (nullable = true)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15489) Dataset kryo encoder won't load custom user settings

2016-05-29 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305997#comment-15305997
 ] 

Amit Sela commented on SPARK-15489:
---

[~marmbrus] you can assign this one to me.

> Dataset kryo encoder won't load custom user settings 
> -
>
> Key: SPARK-15489
> URL: https://issues.apache.org/jira/browse/SPARK-15489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Amit Sela
>
> When setting a custom "spark.kryo.registrator" (or any other configuration 
> for that matter) through the API, this configuration will not propagate to 
> the encoder that uses a KryoSerializer since it instantiates with "new 
> SparkConf()".
> See:  
> https://github.com/apache/spark/blob/07c36a2f07fcf5da6fb395f830ebbfc10eb27dcc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala#L554
> This could be hacked by providing those configurations as System properties, 
> but this probably should be passed to the encoder and set in the 
> SerializerInstance after creation.
> Example:
> When using Encoders with kryo to encode generically typed Objects in the 
> following manner:
> public static  Encoder encoder() {
>   return Encoders.kryo((Class) Object.class);
> }
> I get a decoding exception when trying to decode 
> `java.util.Collections$UnmodifiableCollection`, which probably comes from 
> Guava's `ImmutableList`.
> This happens when running with master = local[1]. Same code had no problems 
> with RDD api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15489) Dataset kryo encoder won't load custom user settings

2016-05-27 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated SPARK-15489:
--
Description: 
When setting a custom "spark.kryo.registrator" (or any other configuration for 
that matter) through the API, this configuration will not propagate to the 
encoder that uses a KryoSerializer since it instantiates with "new SparkConf()".
See:  
https://github.com/apache/spark/blob/07c36a2f07fcf5da6fb395f830ebbfc10eb27dcc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala#L554

This could be hacked by providing those configurations as System properties, 
but this probably should be passed to the encoder and set in the 
SerializerInstance after creation.

Example:
When using Encoders with kryo to encode generically typed Objects in the 
following manner:

public static  Encoder encoder() {
  return Encoders.kryo((Class) Object.class);
}

I get a decoding exception when trying to decode 
`java.util.Collections$UnmodifiableCollection`, which probably comes from 
Guava's `ImmutableList`.

This happens when running with master = local[1]. Same code had no problems 
with RDD api.


  was:
When setting a custom "spark.kryo.registrator" (or any other configuration for 
that matter) through the API, this configuration will not propagate to the 
encoder that uses a KryoSerializer since it instantiates with "new SparkConf()".
See:  

Example:
When using Encoders with kryo to encode generically typed Objects in the 
following manner:

public static  Encoder encoder() {
  return Encoders.kryo((Class) Object.class);
}

I get a decoding exception when trying to decode 
`java.util.Collections$UnmodifiableCollection`, which probably comes from 
Guava's `ImmutableList`.

This happens when running with master = local[1]. Same code had no problems 
with RDD api.



> Dataset kryo encoder won't load custom user settings 
> -
>
> Key: SPARK-15489
> URL: https://issues.apache.org/jira/browse/SPARK-15489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Amit Sela
>
> When setting a custom "spark.kryo.registrator" (or any other configuration 
> for that matter) through the API, this configuration will not propagate to 
> the encoder that uses a KryoSerializer since it instantiates with "new 
> SparkConf()".
> See:  
> https://github.com/apache/spark/blob/07c36a2f07fcf5da6fb395f830ebbfc10eb27dcc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala#L554
> This could be hacked by providing those configurations as System properties, 
> but this probably should be passed to the encoder and set in the 
> SerializerInstance after creation.
> Example:
> When using Encoders with kryo to encode generically typed Objects in the 
> following manner:
> public static  Encoder encoder() {
>   return Encoders.kryo((Class) Object.class);
> }
> I get a decoding exception when trying to decode 
> `java.util.Collections$UnmodifiableCollection`, which probably comes from 
> Guava's `ImmutableList`.
> This happens when running with master = local[1]. Same code had no problems 
> with RDD api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15489) Dataset kryo encoder won't load custom user settings

2016-05-27 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated SPARK-15489:
--
Description: 
When setting a custom "spark.kryo.registrator" (or any other configuration for 
that matter) through the API, this configuration will not propagate to the 
encoder that uses a KryoSerializer since it instantiates with "new SparkConf()".
See:  

Example:
When using Encoders with kryo to encode generically typed Objects in the 
following manner:

public static  Encoder encoder() {
  return Encoders.kryo((Class) Object.class);
}

I get a decoding exception when trying to decode 
`java.util.Collections$UnmodifiableCollection`, which probably comes from 
Guava's `ImmutableList`.

This happens when running with master = local[1]. Same code had no problems 
with RDD api.


  was:
When using Encoders with kryo to encode generically typed Objects in the 
following manner:

public static  Encoder encoder() {
  return Encoders.kryo((Class) Object.class);
}

I get a decoding exception when trying to decode 
`java.util.Collections$UnmodifiableCollection`, which probably comes from 
Guava's `ImmutableList`.

This happens when running with master = local[1]. Same code had no problems 
with RDD api.



> Dataset kryo encoder won't load custom user settings 
> -
>
> Key: SPARK-15489
> URL: https://issues.apache.org/jira/browse/SPARK-15489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Amit Sela
>
> When setting a custom "spark.kryo.registrator" (or any other configuration 
> for that matter) through the API, this configuration will not propagate to 
> the encoder that uses a KryoSerializer since it instantiates with "new 
> SparkConf()".
> See:  
> Example:
> When using Encoders with kryo to encode generically typed Objects in the 
> following manner:
> public static  Encoder encoder() {
>   return Encoders.kryo((Class) Object.class);
> }
> I get a decoding exception when trying to decode 
> `java.util.Collections$UnmodifiableCollection`, which probably comes from 
> Guava's `ImmutableList`.
> This happens when running with master = local[1]. Same code had no problems 
> with RDD api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-15489) Dataset kryo encoder won't load custom user settings

2016-05-27 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated SPARK-15489:
--
Summary: Dataset kryo encoder won't load custom user settings   (was: 
Dataset kryo encoder fails on Collections$UnmodifiableCollection)

> Dataset kryo encoder won't load custom user settings 
> -
>
> Key: SPARK-15489
> URL: https://issues.apache.org/jira/browse/SPARK-15489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Amit Sela
>
> When using Encoders with kryo to encode generically typed Objects in the 
> following manner:
> public static  Encoder encoder() {
>   return Encoders.kryo((Class) Object.class);
> }
> I get a decoding exception when trying to decode 
> `java.util.Collections$UnmodifiableCollection`, which probably comes from 
> Guava's `ImmutableList`.
> This happens when running with master = local[1]. Same code had no problems 
> with RDD api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection

2016-05-27 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15304893#comment-15304893
 ] 

Amit Sela commented on SPARK-15489:
---

The issue here is the fact that setting the SparkConf does not propagate to the 
KryoSerializer used by the encoder.
I managed to make this work by using Java System properties instead of 
SparkConf#set since the SparkConf constructor will take them into account, but 
it's a hack...

For now I think I'll change the description of the issue, and propose this as a 
temporary solution.

> Dataset kryo encoder fails on Collections$UnmodifiableCollection
> 
>
> Key: SPARK-15489
> URL: https://issues.apache.org/jira/browse/SPARK-15489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Amit Sela
>
> When using Encoders with kryo to encode generically typed Objects in the 
> following manner:
> public static  Encoder encoder() {
>   return Encoders.kryo((Class) Object.class);
> }
> I get a decoding exception when trying to decode 
> `java.util.Collections$UnmodifiableCollection`, which probably comes from 
> Guava's `ImmutableList`.
> This happens when running with master = local[1]. Same code had no problems 
> with RDD api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection

2016-05-27 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15304886#comment-15304886
 ] 

Amit Sela commented on SPARK-15489:
---

Got it!

So I wasn't using the custom registrator correctly, it works better like this:

public class ImmutablesRegistrator implements KryoRegistrator {

  @Override
  public void registerClasses(Kryo kryo) {
UnmodifiableCollectionsSerializer.registerSerializers(kryo);
// Guava
ImmutableListSerializer.registerSerializers(kryo);
ImmutableSetSerializer.registerSerializers(kryo);
ImmutableMapSerializer.registerSerializers(kryo);
ImmutableMultimapSerializer.registerSerializers(kryo);
  }
}


> Dataset kryo encoder fails on Collections$UnmodifiableCollection
> 
>
> Key: SPARK-15489
> URL: https://issues.apache.org/jira/browse/SPARK-15489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Amit Sela
>
> When using Encoders with kryo to encode generically typed Objects in the 
> following manner:
> public static  Encoder encoder() {
>   return Encoders.kryo((Class) Object.class);
> }
> I get a decoding exception when trying to decode 
> `java.util.Collections$UnmodifiableCollection`, which probably comes from 
> Guava's `ImmutableList`.
> This happens when running with master = local[1]. Same code had no problems 
> with RDD api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection

2016-05-27 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15304222#comment-15304222
 ] 

Amit Sela commented on SPARK-15489:
---

I would expect this to be related to KryoSerializer not registering the 
user-provided registrator, but I've added a print in 
https://github.com/apache/spark/blob/07c36a2f07fcf5da6fb395f830ebbfc10eb27dcc/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala#L123
 to check if any new Kryo is created without the provided registrator, but it 
seems that all instances have a user-provided registrator (if one is provided).

> Dataset kryo encoder fails on Collections$UnmodifiableCollection
> 
>
> Key: SPARK-15489
> URL: https://issues.apache.org/jira/browse/SPARK-15489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Amit Sela
>
> When using Encoders with kryo to encode generically typed Objects in the 
> following manner:
> public static  Encoder encoder() {
>   return Encoders.kryo((Class) Object.class);
> }
> I get a decoding exception when trying to decode 
> `java.util.Collections$UnmodifiableCollection`, which probably comes from 
> Guava's `ImmutableList`.
> This happens when running with master = local[1]. Same code had no problems 
> with RDD api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection

2016-05-24 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298847#comment-15298847
 ] 

Amit Sela commented on SPARK-15489:
---

Well it looks like the codegen creates a ```java new SparkConf()```, instead of 
deserializing a "broadcasted" one. 
I've tried adding the registrator configuration as a System parameter (-D), but 
it didn't catch. Is the generated code executed in the JVM ? in the same one if 
running in standalone ?

> Dataset kryo encoder fails on Collections$UnmodifiableCollection
> 
>
> Key: SPARK-15489
> URL: https://issues.apache.org/jira/browse/SPARK-15489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Amit Sela
>
> When using Encoders with kryo to encode generically typed Objects in the 
> following manner:
> public static  Encoder encoder() {
>   return Encoders.kryo((Class) Object.class);
> }
> I get a decoding exception when trying to decode 
> `java.util.Collections$UnmodifiableCollection`, which probably comes from 
> Guava's `ImmutableList`.
> This happens when running with master = local[1]. Same code had no problems 
> with RDD api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection

2016-05-24 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298847#comment-15298847
 ] 

Amit Sela edited comment on SPARK-15489 at 5/24/16 8:38 PM:


Well it looks like the codegen creates a `new SparkConf()`, instead of 
deserializing a "broadcasted" one. 
I've tried adding the registrator configuration as a System parameter (-D), but 
it didn't catch. Is the generated code executed in the JVM ? in the same one if 
running in standalone ?


was (Author: amitsela):
Well it looks like the codegen creates a ```java new SparkConf()```, instead of 
deserializing a "broadcasted" one. 
I've tried adding the registrator configuration as a System parameter (-D), but 
it didn't catch. Is the generated code executed in the JVM ? in the same one if 
running in standalone ?

> Dataset kryo encoder fails on Collections$UnmodifiableCollection
> 
>
> Key: SPARK-15489
> URL: https://issues.apache.org/jira/browse/SPARK-15489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Amit Sela
>
> When using Encoders with kryo to encode generically typed Objects in the 
> following manner:
> public static  Encoder encoder() {
>   return Encoders.kryo((Class) Object.class);
> }
> I get a decoding exception when trying to decode 
> `java.util.Collections$UnmodifiableCollection`, which probably comes from 
> Guava's `ImmutableList`.
> This happens when running with master = local[1]. Same code had no problems 
> with RDD api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection

2016-05-23 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297087#comment-15297087
 ] 

Amit Sela commented on SPARK-15489:
---

This is my registrator:

public class ImmutableRegistrator implements KryoRegistrator {

  @Override
  public void registerClasses(Kryo kryo) {
try {
  
kryo.register(Class.forName("java.util.Collections$UnmodifiableCollection"),
  new UnmodifiableCollectionsSerializer());
  kryo.register(ImmutableList.class, new ImmutableListSerializer());
} catch (ClassNotFoundException e) {
  //
}
  }
}

And I register with: 
conf.set("spark.kryo.registrator", 
ImmutableRegistrator.class.getCanonicalName())

When KryoSerializer.deserializeStream is called I see my registrar in 
KryoSerializerInstance, but when KryoSerializer.deserialize[T: ClassTag](bytes: 
ByteBuffer) is called I'm not so sure, if the instance is this.ks then no, I 
don't see my registrar.


> Dataset kryo encoder fails on Collections$UnmodifiableCollection
> 
>
> Key: SPARK-15489
> URL: https://issues.apache.org/jira/browse/SPARK-15489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Amit Sela
>
> When using Encoders with kryo to encode generically typed Objects in the 
> following manner:
> public static  Encoder encoder() {
>   return Encoders.kryo((Class) Object.class);
> }
> I get a decoding exception when trying to decode 
> `java.util.Collections$UnmodifiableCollection`, which probably comes from 
> Guava's `ImmutableList`.
> This happens when running with master = local[1]. Same code had no problems 
> with RDD api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection

2016-05-23 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296989#comment-15296989
 ] 

Amit Sela commented on SPARK-15489:
---

In 2.0.0-SNAPSHOT it manifests as:

java.lang.AssertionError: assertion failed: copyAndReset must return a zero 
value copy
at scala.Predef$.assert(Predef.scala:179)
at 
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:157)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at 
java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1075)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
.
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:287)
at 
org.apache.spark.sql.Dataset$$anonfun$collectAsList$1$$anonfun$apply$12.apply(Dataset.scala:2115)
at 
org.apache.spark.sql.Dataset$$anonfun$collectAsList$1$$anonfun$apply$12.apply(Dataset.scala:2114)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2436)
at 
org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2114)
at 
org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2113)
at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2449)
at org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2113)
..

> Dataset kryo encoder fails on Collections$UnmodifiableCollection
> 
>
> Key: SPARK-15489
> URL: https://issues.apache.org/jira/browse/SPARK-15489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Amit Sela
>
> When using Encoders with kryo to encode generically typed Objects in the 
> following manner:
> public static  Encoder encoder() {
>   return Encoders.kryo((Class) Object.class);
> }
> I get a decoding exception when trying to decode 
> `java.util.Collections$UnmodifiableCollection`, which probably comes from 
> Guava's `ImmutableList`.
> This happens when running with master = local[1]. Same code had no problems 
> with RDD api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection

2016-05-23 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296950#comment-15296950
 ] 

Amit Sela commented on SPARK-15489:
---

I've tried registering `UnmodifiableCollectionsSerializer` and 
`ImmutableListSerializer` from: https://github.com/magro/kryo-serializers 
but no luck there..

> Dataset kryo encoder fails on Collections$UnmodifiableCollection
> 
>
> Key: SPARK-15489
> URL: https://issues.apache.org/jira/browse/SPARK-15489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Amit Sela
>
> When using Encoders with kryo to encode generically typed Objects in the 
> following manner:
> public static  Encoder encoder() {
>   return Encoders.kryo((Class) Object.class);
> }
> I get a decoding exception when trying to decode 
> `java.util.Collections$UnmodifiableCollection`, which probably comes from 
> Guava's `ImmutableList`.
> This happens when running with master = local[1]. Same code had no problems 
> with RDD api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection

2016-05-23 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296951#comment-15296951
 ] 

Amit Sela commented on SPARK-15489:
---

Serialization trace:
tupleTags (org.apache.beam.sdk.values.TupleTagList)
tupleTagList (org.apache.beam.sdk.transforms.join.CoGbkResultSchema)
schema (org.apache.beam.sdk.transforms.join.CoGbkResult)
value (org.apache.beam.sdk.values.KV)
value (org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:230)
... 44 more
Caused by: java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableCollection.add(Collections.java:1075)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
... 61 more

> Dataset kryo encoder fails on Collections$UnmodifiableCollection
> 
>
> Key: SPARK-15489
> URL: https://issues.apache.org/jira/browse/SPARK-15489
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.1
>Reporter: Amit Sela
>
> When using Encoders with kryo to encode generically typed Objects in the 
> following manner:
> public static  Encoder encoder() {
>   return Encoders.kryo((Class) Object.class);
> }
> I get a decoding exception when trying to decode 
> `java.util.Collections$UnmodifiableCollection`, which probably comes from 
> Guava's `ImmutableList`.
> This happens when running with master = local[1]. Same code had no problems 
> with RDD api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection

2016-05-23 Thread Amit Sela (JIRA)
Amit Sela created SPARK-15489:
-

 Summary: Dataset kryo encoder fails on 
Collections$UnmodifiableCollection
 Key: SPARK-15489
 URL: https://issues.apache.org/jira/browse/SPARK-15489
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.1
Reporter: Amit Sela


When using Encoders with kryo to encode generically typed Objects in the 
following manner:

public static  Encoder encoder() {
  return Encoders.kryo((Class) Object.class);
}

I get a decoding exception when trying to decode 
`java.util.Collections$UnmodifiableCollection`, which probably comes from 
Guava's `ImmutableList`.

This happens when running with master = local[1]. Same code had no problems 
with RDD api.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org