[jira] [Commented] (SPARK-19067) mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)
[ https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958403#comment-15958403 ] Amit Sela commented on SPARK-19067: --- [~tdas] should I open a ticket, for future improvements, about allowing timeouts to execute even if there's no data in the pipeline ? As I mentioned in the PR comments: Using the {{EventTime}} timeout in the future, I assume the "clock" would be watermark based instead of wall-time, and I see at least two use-cases where this would matter: # Testing - being able to move the clock forward to end-of-time to force firing everything that still awaits for the closing of windows. # A pipeline where there is a filter before the stateful op. such that there is data, and the watermark advances, but some of the events are dropped and don't reach the stateful operator so it will hold off firing until the "proper" data (that passes filter) comes along - this again could cause an unknown delay to emitting results out of the stateful operator. > mapGroupsWithState - arbitrary stateful operations with Structured Streaming > (similar to DStream.mapWithState) > -- > > Key: SPARK-19067 > URL: https://issues.apache.org/jira/browse/SPARK-19067 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Reporter: Michael Armbrust >Assignee: Tathagata Das >Priority: Critical > Fix For: 2.2.0 > > > Right now the only way to do stateful operations with with Aggregator or > UDAF. However, this does not give users control of emission or expiration of > state making it hard to implement things like sessionization. We should add > a more general construct (probably similar to {{DStream.mapWithState}}) to > structured streaming. Here is the design. > *Requirements* > - Users should be able to specify a function that can do the following > - Access the input row corresponding to a key > - Access the previous state corresponding to a key > - Optionally, update or remove the state > - Output any number of new rows (or none at all) > *Proposed API* > {code} > // New methods on KeyValueGroupedDataset > class KeyValueGroupedDataset[K, V] { > // Scala friendly > def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], > State[S]) => U) > def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, > Iterator[V], State[S]) => Iterator[U]) > // Java friendly >def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, > R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) >def flatMapGroupsWithState[S, U](func: > FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], > resultEncoder: Encoder[U]) > } > // --- New Java-friendly function classes --- > public interface MapGroupsWithStateFunctionextends Serializable { > R call(K key, Iterator values, state: State) throws Exception; > } > public interface FlatMapGroupsWithStateFunction extends > Serializable { > Iterator call(K key, Iterator values, state: GroupState) throws > Exception; > } > // -- Wrapper class for state data -- > trait GroupState[S] { > def exists(): Boolean > def get(): S// throws Exception is state does not > exist > def getOption(): Option[S] > def update(newState: S): Unit > def remove(): Unit // exists() will be false after this > } > {code} > Key Semantics of the State class > - The state can be null. > - If the state.remove() is called, then state.exists() will return false, and > getOption will returm None. > - After that state.update(newState) is called, then state.exists() will > return true, and getOption will return Some(...). > - None of the operations are thread-safe. This is to avoid memory barriers. > *Usage* > {code} > val stateFunc = (word: String, words: Iterator[String, runningCount: > GroupState[Long]) => { > val newCount = words.size + runningCount.getOption.getOrElse(0L) > runningCount.update(newCount) >(word, newCount) > } > dataset // type > is Dataset[String] > .groupByKey[String](w => w) // generates > KeyValueGroupedDataset[String, String] > .mapGroupsWithState[Long, (String, Long)](stateFunc)// returns > Dataset[(String, Long)] > {code} > *Future Directions* > - Timeout based state expiration (that has not received data for a while) - > Done > - General expression based expiration - TODO. Any real usecases that cannot > be done with timeouts? -- This message
[jira] [Comment Edited] (SPARK-19067) mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)
[ https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903532#comment-15903532 ] Amit Sela edited comment on SPARK-19067 at 3/9/17 6:15 PM: --- [~tdas] I just read the PR, and I'm very excited for Spark to provide such a powerful stateful operator! I added a few comments in the PR based on my first impressions as a user, hope you don't mind. I assume that for event-time-timeouts you'd look at the Watermark time instead of Wall time, correct ? how would that work ? If I get it right it's all represented as a table so the "Watermark Manager" would constantly right updates to the table in the "Watermark Column" ? was (Author: amitsela): [~tdas] I just read the PR, and I'm very excited for Spark to provide such a powerful stateful operator! I added a few comments in the PR based on my first impressions, hope you don't mind. I assume that for event-time-timeouts you'd look at the Watermark time instead of Wall time, correct ? how would that work ? If I get it right it's all represented as a table so the "Watermark Manager" would constantly right updates to the table in the "Watermark Column" ? > mapGroupsWithState - arbitrary stateful operations with Structured Streaming > (similar to DStream.mapWithState) > -- > > Key: SPARK-19067 > URL: https://issues.apache.org/jira/browse/SPARK-19067 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Reporter: Michael Armbrust >Assignee: Tathagata Das >Priority: Critical > > Right now the only way to do stateful operations with with Aggregator or > UDAF. However, this does not give users control of emission or expiration of > state making it hard to implement things like sessionization. We should add > a more general construct (probably similar to {{DStream.mapWithState}}) to > structured streaming. Here is the design. > *Requirements* > - Users should be able to specify a function that can do the following > - Access the input row corresponding to a key > - Access the previous state corresponding to a key > - Optionally, update or remove the state > - Output any number of new rows (or none at all) > *Proposed API* > {code} > // New methods on KeyValueGroupedDataset > class KeyValueGroupedDataset[K, V] { > // Scala friendly > def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], > State[S]) => U) > def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, > Iterator[V], State[S]) => Iterator[U]) > // Java friendly >def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, > R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) >def flatMapGroupsWithState[S, U](func: > FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], > resultEncoder: Encoder[U]) > } > // --- New Java-friendly function classes --- > public interface MapGroupsWithStateFunctionextends Serializable { > R call(K key, Iterator values, state: State) throws Exception; > } > public interface FlatMapGroupsWithStateFunction extends > Serializable { > Iterator call(K key, Iterator values, state: State) throws > Exception; > } > // -- Wrapper class for state data -- > trait KeyedState[S] { > def exists(): Boolean > def get(): S// throws Exception is state does not > exist > def getOption(): Option[S] > def update(newState: S): Unit > def remove(): Unit // exists() will be false after this > } > {code} > Key Semantics of the State class > - The state can be null. > - If the state.remove() is called, then state.exists() will return false, and > getOption will returm None. > - After that state.update(newState) is called, then state.exists() will > return true, and getOption will return Some(...). > - None of the operations are thread-safe. This is to avoid memory barriers. > *Usage* > {code} > val stateFunc = (word: String, words: Iterator[String, runningCount: > KeyedState[Long]) => { > val newCount = words.size + runningCount.getOption.getOrElse(0L) > runningCount.update(newCount) >(word, newCount) > } > dataset // type > is Dataset[String] > .groupByKey[String](w => w) // generates > KeyValueGroupedDataset[String, String] > .mapGroupsWithState[Long, (String, Long)](stateFunc)// returns > Dataset[(String, Long)] > {code} > *Future Directions* > - Timeout based state expiration (that has not received data for a while) > -
[jira] [Commented] (SPARK-19067) mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)
[ https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903532#comment-15903532 ] Amit Sela commented on SPARK-19067: --- [~tdas] I just read the PR, and I'm very excited for Spark to provide such a powerful stateful operator! I added a few comments in the PR based on my first impressions, hope you don't mind. I assume that for event-time-timeouts you'd look at the Watermark time instead of Wall time, correct ? how would that work ? If I get it right it's all represented as a table so the "Watermark Manager" would constantly right updates to the table in the "Watermark Column" ? > mapGroupsWithState - arbitrary stateful operations with Structured Streaming > (similar to DStream.mapWithState) > -- > > Key: SPARK-19067 > URL: https://issues.apache.org/jira/browse/SPARK-19067 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Reporter: Michael Armbrust >Assignee: Tathagata Das >Priority: Critical > > Right now the only way to do stateful operations with with Aggregator or > UDAF. However, this does not give users control of emission or expiration of > state making it hard to implement things like sessionization. We should add > a more general construct (probably similar to {{DStream.mapWithState}}) to > structured streaming. Here is the design. > *Requirements* > - Users should be able to specify a function that can do the following > - Access the input row corresponding to a key > - Access the previous state corresponding to a key > - Optionally, update or remove the state > - Output any number of new rows (or none at all) > *Proposed API* > {code} > // New methods on KeyValueGroupedDataset > class KeyValueGroupedDataset[K, V] { > // Scala friendly > def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], > State[S]) => U) > def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, > Iterator[V], State[S]) => Iterator[U]) > // Java friendly >def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, > R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) >def flatMapGroupsWithState[S, U](func: > FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], > resultEncoder: Encoder[U]) > } > // --- New Java-friendly function classes --- > public interface MapGroupsWithStateFunctionextends Serializable { > R call(K key, Iterator values, state: State) throws Exception; > } > public interface FlatMapGroupsWithStateFunction extends > Serializable { > Iterator call(K key, Iterator values, state: State) throws > Exception; > } > // -- Wrapper class for state data -- > trait KeyedState[S] { > def exists(): Boolean > def get(): S// throws Exception is state does not > exist > def getOption(): Option[S] > def update(newState: S): Unit > def remove(): Unit // exists() will be false after this > } > {code} > Key Semantics of the State class > - The state can be null. > - If the state.remove() is called, then state.exists() will return false, and > getOption will returm None. > - After that state.update(newState) is called, then state.exists() will > return true, and getOption will return Some(...). > - None of the operations are thread-safe. This is to avoid memory barriers. > *Usage* > {code} > val stateFunc = (word: String, words: Iterator[String, runningCount: > KeyedState[Long]) => { > val newCount = words.size + runningCount.getOption.getOrElse(0L) > runningCount.update(newCount) >(word, newCount) > } > dataset // type > is Dataset[String] > .groupByKey[String](w => w) // generates > KeyValueGroupedDataset[String, String] > .mapGroupsWithState[Long, (String, Long)](stateFunc)// returns > Dataset[(String, Long)] > {code} > *Future Directions* > - Timeout based state expiration (that has not received data for a while) > - General expression based expiration -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19067) mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)
[ https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897868#comment-15897868 ] Amit Sela commented on SPARK-19067: --- Sweet! I will make time tomorrow to go over the PR thoroughly (We're on ~opposite timezones ;) ). I also see a note about State Store API, which is something I'm really looking forward to. Any news there ? > mapGroupsWithState - arbitrary stateful operations with Structured Streaming > (similar to DStream.mapWithState) > -- > > Key: SPARK-19067 > URL: https://issues.apache.org/jira/browse/SPARK-19067 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Reporter: Michael Armbrust >Assignee: Tathagata Das >Priority: Critical > > Right now the only way to do stateful operations with with Aggregator or > UDAF. However, this does not give users control of emission or expiration of > state making it hard to implement things like sessionization. We should add > a more general construct (probably similar to {{DStream.mapWithState}}) to > structured streaming. Here is the design. > *Requirements* > - Users should be able to specify a function that can do the following > - Access the input row corresponding to a key > - Access the previous state corresponding to a key > - Optionally, update or remove the state > - Output any number of new rows (or none at all) > *Proposed API* > {code} > // New methods on KeyValueGroupedDataset > class KeyValueGroupedDataset[K, V] { > // Scala friendly > def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], > State[S]) => U) > def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, > Iterator[V], State[S]) => Iterator[U]) > // Java friendly >def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, > R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) >def flatMapGroupsWithState[S, U](func: > FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], > resultEncoder: Encoder[U]) > } > // --- New Java-friendly function classes --- > public interface MapGroupsWithStateFunctionextends Serializable { > R call(K key, Iterator values, state: State) throws Exception; > } > public interface FlatMapGroupsWithStateFunction extends > Serializable { > Iterator call(K key, Iterator values, state: State) throws > Exception; > } > // -- Wrapper class for state data -- > trait KeyedState[S] { > def exists(): Boolean > def get(): S// throws Exception is state does not > exist > def getOption(): Option[S] > def update(newState: S): Unit > def remove(): Unit // exists() will be false after this > } > {code} > Key Semantics of the State class > - The state can be null. > - If the state.remove() is called, then state.exists() will return false, and > getOption will returm None. > - After that state.update(newState) is called, then state.exists() will > return true, and getOption will return Some(...). > - None of the operations are thread-safe. This is to avoid memory barriers. > *Usage* > {code} > val stateFunc = (word: String, words: Iterator[String, runningCount: > KeyedState[Long]) => { > val newCount = words.size + runningCount.getOption.getOrElse(0L) > runningCount.update(newCount) >(word, newCount) > } > dataset // type > is Dataset[String] > .groupByKey[String](w => w) // generates > KeyValueGroupedDataset[String, String] > .mapGroupsWithState[Long, (String, Long)](stateFunc)// returns > Dataset[(String, Long)] > {code} > *Future Directions* > - Timeout based state expiration (that has not received data for a while) > - General expression based expiration -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19067) mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)
[ https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15896866#comment-15896866 ] Amit Sela commented on SPARK-19067: --- It depends, will those timers be "resetable" ? So that once I visit the state on timeout I can re-set a new timeout ? if so, that could work. You can get more insight of what I'm talking about in my implementation of Triggers in Apache Beam: https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java#L92 I used {{updateStateByKey}} because I need to visit the state even in case there are no updates - for example, if the Watermark passed the end-of-window and it is time to fire a result (based on event-time). If the new {{mapWithState}} would support resetable timers, I can keep the "next time/s to fire" in the state and set the timer to the closest time so it would initiate the visit instead of re-visiting all state as I do today. Does this make sense ? > mapGroupsWithState - arbitrary stateful operations with Structured Streaming > (similar to DStream.mapWithState) > -- > > Key: SPARK-19067 > URL: https://issues.apache.org/jira/browse/SPARK-19067 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Reporter: Michael Armbrust >Assignee: Tathagata Das >Priority: Critical > > Right now the only way to do stateful operations with with Aggregator or > UDAF. However, this does not give users control of emission or expiration of > state making it hard to implement things like sessionization. We should add > a more general construct (probably similar to {{DStream.mapWithState}}) to > structured streaming. Here is the design. > *Requirements* > - Users should be able to specify a function that can do the following > - Access the input row corresponding to a key > - Access the previous state corresponding to a key > - Optionally, update or remove the state > - Output any number of new rows (or none at all) > *Proposed API* > {code} > // New methods on KeyValueGroupedDataset > class KeyValueGroupedDataset[K, V] { > // Scala friendly > def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], > State[S]) => U) > def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, > Iterator[V], State[S]) => Iterator[U]) > // Java friendly >def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, > R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) >def flatMapGroupsWithState[S, U](func: > FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], > resultEncoder: Encoder[U]) > } > // --- New Java-friendly function classes --- > public interface MapGroupsWithStateFunctionextends Serializable { > R call(K key, Iterator values, state: State) throws Exception; > } > public interface FlatMapGroupsWithStateFunction extends > Serializable { > Iterator call(K key, Iterator values, state: State) throws > Exception; > } > // -- Wrapper class for state data -- > trait KeyedState[S] { > def exists(): Boolean > def get(): S// throws Exception is state does not > exist > def getOption(): Option[S] > def update(newState: S): Unit > def remove(): Unit // exists() will be false after this > } > {code} > Key Semantics of the State class > - The state can be null. > - If the state.remove() is called, then state.exists() will return false, and > getOption will returm None. > - After that state.update(newState) is called, then state.exists() will > return true, and getOption will return Some(...). > - None of the operations are thread-safe. This is to avoid memory barriers. > *Usage* > {code} > val stateFunc = (word: String, words: Iterator[String, runningCount: > KeyedState[Long]) => { > val newCount = words.size + runningCount.getOption.getOrElse(0L) > runningCount.update(newCount) >(word, newCount) > } > dataset // type > is Dataset[String] > .groupByKey[String](w => w) // generates > KeyValueGroupedDataset[String, String] > .mapGroupsWithState[Long, (String, Long)](stateFunc)// returns > Dataset[(String, Long)] > {code} > *Future Directions* > - Timeout based state expiration (that has not received data for a while) > - General expression based expiration -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (SPARK-19067) mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)
[ https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15859228#comment-15859228 ] Amit Sela commented on SPARK-19067: --- [~tdas] I would like to add another feature to consider, that could give great flexibility into stateful operators in Spark. We should be able to react with the state for more than just an update (new input) or timeout - this way you could fire based on things like Watermarks, Timers etc. I'm currently working on this using {{updateStateByKey}} for the Beam Spark runner, but this could be made much more efficient if Spark implements this in a low-level and could also provide Spark with Watermark-based triggers and Timers (user sets a Timer to react, once the Timer is done the applied function could run doing whatever it does, and a Timer could be reset or dropped). The simplest thing to do would be to simply make this feature here be able to scan through the entire state as well (not just when there's new input), and since this could be expensive, it could happen every X batches (configured) - doesn't this happened anyway today ? on clean-up or checkpointing ? On top of this, future development of Watermark-based triggers and Timers would be easier: # Scanning through the state to fire if reached Watermark. # Scanning through the state to check if user-defined Timers are ready for action. WDYT ? > mapGroupsWithState - arbitrary stateful operations with Structured Streaming > (similar to DStream.mapWithState) > -- > > Key: SPARK-19067 > URL: https://issues.apache.org/jira/browse/SPARK-19067 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Reporter: Michael Armbrust >Assignee: Tathagata Das >Priority: Critical > > Right now the only way to do stateful operations with with Aggregator or > UDAF. However, this does not give users control of emission or expiration of > state making it hard to implement things like sessionization. We should add > a more general construct (probably similar to {{DStream.mapWithState}}) to > structured streaming. Here is the design. > *Requirements* > - Users should be able to specify a function that can do the following > - Access the input row corresponding to a key > - Access the previous state corresponding to a key > - Optionally, update or remove the state > - Output any number of new rows (or none at all) > *Proposed API* > {code} > // New methods on KeyValueGroupedDataset > class KeyValueGroupedDataset[K, V] { > // Scala friendly > def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], > State[S]) => U) > def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, > Iterator[V], State[S]) => Iterator[U]) > // Java friendly >def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, > R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) >def flatMapGroupsWithState[S, U](func: > FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], > resultEncoder: Encoder[U]) > } > // --- New Java-friendly function classes --- > public interface MapGroupsWithStateFunctionextends Serializable { > R call(K key, Iterator values, state: State) throws Exception; > } > public interface FlatMapGroupsWithStateFunction extends > Serializable { > Iterator call(K key, Iterator values, state: State) throws > Exception; > } > // -- Wrapper class for state data -- > trait KeyedState[S] { > def exists(): Boolean > def get(): S// throws Exception is state does not > exist > def getOption(): Option[S] > def update(newState: S): Unit > def remove(): Unit // exists() will be false after this > } > {code} > Key Semantics of the State class > - The state can be null. > - If the state.remove() is called, then state.exists() will return false, and > getOption will returm None. > - After that state.update(newState) is called, then state.exists() will > return true, and getOption will return Some(...). > - None of the operations are thread-safe. This is to avoid memory barriers. > *Usage* > {code} > val stateFunc = (word: String, words: Iterator[String, runningCount: > KeyedState[Long]) => { > val newCount = words.size + runningCount.getOption.getOrElse(0L) > runningCount.update(newCount) >(word, newCount) > } > dataset // type > is Dataset[String] > .groupByKey[String](w => w) // generates > KeyValueGroupedDataset[String,
[jira] [Commented] (SPARK-10816) EventTime based sessionization
[ https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15674147#comment-15674147 ] Amit Sela commented on SPARK-10816: --- [~rxin] it might be worth taking into account a more generic implementation of "Merging Windows". Sessions are merging windows that use "gap duration" to determine when to close the session, such that if we say that the first element arrives in window {{[event_time1, event_time1 + gap_duration)}} and the next one at {{[event_time2, event_time2 + gap_duration)}} and {{event_time1 < event_time2}}, their combined value will belong to {{[event_time1, event_time2 + gap_duration)}}, right ? But the same "merge" of windows could very well be determined by a "close session" element (using Kafka for example would guarantee order of messages), or any user defined logic for that matter, as long as the "merge function" is provided by the user. Of course providing Sessions API out-of-the-box would prove most useful as it is the most common, but I don't see any downside to also have a more "advanced" API here. Thanks! > EventTime based sessionization > -- > > Key: SPARK-10816 > URL: https://issues.apache.org/jira/browse/SPARK-10816 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Reporter: Reynold Xin > -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16607) Aggregator with null initialisation will result in null
[ https://issues.apache.org/jira/browse/SPARK-16607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15382602#comment-15382602 ] Amit Sela commented on SPARK-16607: --- Copy of the thread discussing this in the user mailing list: Aggregator (Spark 2.0) skips aggregation is zero(0 returns null Amit SelaJun 26 to user Sometimes, the BUF for the aggregator may depend on the actual input.. and while this passes the responsibility to handle null in merge/reduce to the developer, it sounds fine to me if he is the one who put null in zero() anyway. Now, it seems that the aggregation is skipped entirely when zero() = null. Not sure if that was the behaviour in 1.6 Is this behaviour wanted ? Thanks, Amit Aggregator example: public static class Agg extends Aggregator , Integer, Integer> { @Override public Integer zero() { return null; } @Override public Integer reduce(Integer b, Tuple2 a) { if (b == null) { b = 0; } return b + a._2(); } @Override public Integer merge(Integer b1, Integer b2) { if (b1 == null) { return b2; } else if (b2 == null) { return b1; } else { return b1 + b2; } } Takeshi Yamamuro Jun 26 to me, user Hi, This behaviour seems to be expected because you must ensure `b + zero() = b` The your case `b + null = null` breaks this rule. This is the same with v1.6.1. See: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57 // maropu Amit Sela Jun 26 to Takeshi, user Not sure about what's the rule in case of `b + null = null` but the same code works perfectly in 1.6.1, just tried it.. Takeshi Yamamuro Jun 26 to me, user Whatever it is, this is expected; if an initial value is null, spark codegen removes all the aggregates. See: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199 // maropu Amit Sela Jun 26 to Takeshi, user This "if (value == null)" condition you point to exists in 1.6 branch as well, so that's probably not the reason. Takeshi Yamamuro Jun 26 to me, user No, TypedAggregateExpression that uses Aggregator#zero is different between v2.0 and v1.6. v2.0: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L91 v1.6: https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L115 // maropu Amit Sela Jun 27 to Takeshi, user OK. I see that, but the current (provided) implementations are very naive - Sum, Count, Average -let's take Max for example: I guess zero() would be set to some value like Long.MIN_VALUE, but what if you trigger (I assume in the future Spark streaming will support time-based triggers) for a result and there are no events ? And like I said, for a more general use case: What if my zero() function depends on my input ? I just don't see the benefit of this behaviour, though I realise this is the implementation. Thanks, Amit Koert Kuipers Jun 30 to me, Takeshi, user its the difference between a semigroup and a monoid, and yes max does not easily fit into a monoid. see also discussion here: https://issues.apache.org/jira/browse/SPARK-15598 Amit Sela Jul 2 to Koert, Takeshi, user Thanks for pointing that Koert! I understand now why zero() and not init(a: IN), though I still don't see a good reason to skip the aggregation if zero returns null. If the user did it, it's on him to take care of null cases in reduce/merge, but it opens-up the possibility to use the input to create the buffer for the aggregator. Wouldn't that at least enable the functionality discussed in SPARK-15598 ? without changing how the Aggregator works. I bypassed it by using Optional (Guava) because I'm using the Java API, but it's a bit cumbersome... Thanks, Amit Koert Kuipers Jul 2 to me, Takeshi, user valid functions can be written for reduce and merge when the zero is null. so not being able to provide null as the initial value is something troublesome. i guess the proper way to do this is use Option, and have the None be the zero, which is what i assumed you did? unfortunately last time i tried using scala Options with spark Aggregators it didnt work quite well. see: https://issues.apache.org/jira/browse/SPARK-15810 lifting a semigroup into a monoid like this using Option is fairly typical, so either null or None has to work or else this api will be somewhat unpleasant to use for anything practical. for an example of this lifting on a related Aggregator class:
[jira] [Commented] (SPARK-15810) Aggregator doesn't play nice with Option
[ https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15382586#comment-15382586 ] Amit Sela commented on SPARK-15810: --- opened https://issues.apache.org/jira/browse/SPARK-16607 > Aggregator doesn't play nice with Option > > > Key: SPARK-15810 > URL: https://issues.apache.org/jira/browse/SPARK-15810 > Project: Spark > Issue Type: Sub-task > Components: SQL > Environment: spark 2.0.0-SNAPSHOT >Reporter: koert kuipers > > {code} > val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS > val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) } > val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), > Option[Int], Option[Int]]{ > def zero: Option[Int] = None > def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = > b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2) > def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => > b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2) > def finish(reduction: Option[Int]): Option[Int] = reduction > def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > }.toColumn) > ds3.printSchema > ds3.show > {code} > i get as output a somewhat odd looking schema, and after that the program > just hangs pinning one cpu at 100%. the data never shows. > output: > {noformat} > root > |-- value: string (nullable = true) > |-- $anon$1(scala.Tuple2): struct (nullable = true) > ||-- value: integer (nullable = true) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-16607) Aggregator with null initialisation will result in null
[ https://issues.apache.org/jira/browse/SPARK-16607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela updated SPARK-16607: -- Description: Java code example: {code} SparkSession session = SparkSession.builder() .appName("TestAggregatorJava") .master("local[*]") .getOrCreate(); Dataset> ds1 = session.createDataset(Arrays.asList( new Tuple2<>("a", 1), new Tuple2<>("a", 2), new Tuple2<>("a", 3) ), Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds2 = ds1.map( new MapFunction , Tuple2 >() { @Override public Tuple2 call(Tuple2 value) throws Exception { if (value._2() > 1) { return value; } else { return new Tuple2<>(value._1, null); } } }, Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds3 = ds2.groupByKey( new MapFunction , String>() { @Override public String call(Tuple2 value) throws Exception { return value._1(); } }, Encoders.STRING()).agg(new Aggregator , Integer, Integer>() { @Override public Integer zero() { return null; } @Override public Integer reduce(Integer b, Tuple2 a) { return merge(b, a._2()); } @Override public Integer merge(Integer b1, Integer b2) { if (b1 == null) { return b2; } else if (b2 == null){ return b1; } else { return b1 + b2; } } @Override public Integer finish(Integer reduction) { return reduction; } @Override public Encoder bufferEncoder() { return Encoders.INT(); } @Override public Encoder outputEncoder() { return Encoders.INT(); } }.toColumn()); ds3.printSchema(); ds3.show(); } {code} I get this schema: {noformat} root |-- value: string (nullable = true) |-- (scala.Tuple2): integer (nullable = true) {noformat} And this result: {noformat} +-+--+ |value|(scala.Tuple2)| +-+--+ |a| null| +-+--+ {noformat} The same happens with Scala, simply wrap Scala's Int with a case class (because it defaults to 0) and you'll get the same result. was: Java code example: {code} SparkSession session = SparkSession.builder() .appName("TestAggregatorJava") .master("local[*]") .getOrCreate(); Dataset > ds1 = session.createDataset(Arrays.asList( new Tuple2<>("a", 1), new Tuple2<>("a", 2), new Tuple2<>("a", 3) ), Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds2 = ds1.map( new MapFunction , Tuple2 >() { @Override public Tuple2 call(Tuple2 value) throws Exception { if (value._2() > 1) { return value; } else { return new Tuple2<>(value._1, null); } } }, Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds3 = ds2.groupByKey( new MapFunction , String>() { @Override public String call(Tuple2 value) throws Exception { return value._1(); } }, Encoders.STRING()).agg(new Aggregator , Integer, Integer>() { @Override public Integer zero() { return null; } @Override public Integer reduce(Integer b, Tuple2 a) { return merge(b, a._2()); } @Override public Integer merge(Integer b1, Integer b2) { if (b1 == null) { return b2; } else if (b2 == null){ return b1; } else { return b1 + b2; } } @Override public Integer finish(Integer reduction) { return reduction; } @Override public Encoder bufferEncoder() { return Encoders.INT(); } @Override public Encoder outputEncoder() { return Encoders.INT(); } }.toColumn()); ds3.printSchema(); ds3.show(); } {code} I get this schema: {noformat} root |-- value: string (nullable = true) |-- (scala.Tuple2): integer (nullable = true) {noformat} And this result: {noformat} +-+--+ |value|(scala.Tuple2)| +-+--+ |
[jira] [Created] (SPARK-16607) Aggregator with null initialisation will result in null
Amit Sela created SPARK-16607: - Summary: Aggregator with null initialisation will result in null Key: SPARK-16607 URL: https://issues.apache.org/jira/browse/SPARK-16607 Project: Spark Issue Type: Sub-task Components: SQL Environment: spark 2.0-branch Reporter: Amit Sela Java code example: {code} SparkSession session = SparkSession.builder() .appName("TestAggregatorJava") .master("local[*]") .getOrCreate(); Dataset> ds1 = session.createDataset(Arrays.asList( new Tuple2<>("a", 1), new Tuple2<>("a", 2), new Tuple2<>("a", 3) ), Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds2 = ds1.map( new MapFunction , Tuple2 >() { @Override public Tuple2 call(Tuple2 value) throws Exception { if (value._2() > 1) { return value; } else { return new Tuple2<>(value._1, null); } } }, Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds3 = ds2.groupByKey( new MapFunction , String>() { @Override public String call(Tuple2 value) throws Exception { return value._1(); } }, Encoders.STRING()).agg(new Aggregator , Integer, Integer>() { @Override public Integer zero() { return null; } @Override public Integer reduce(Integer b, Tuple2 a) { return merge(b, a._2()); } @Override public Integer merge(Integer b1, Integer b2) { if (b1 == null) { return b2; } else if (b2 == null){ return b1; } else { return b1 + b2; } } @Override public Integer finish(Integer reduction) { return reduction; } @Override public Encoder bufferEncoder() { return Encoders.INT(); } @Override public Encoder outputEncoder() { return Encoders.INT(); } }.toColumn()); ds3.printSchema(); ds3.show(); } {code} I get this schema: {noformat} root |-- value: string (nullable = true) |-- (scala.Tuple2): integer (nullable = true) {noformat} And this result: {noformat} +-+--+ |value|(scala.Tuple2)| +-+--+ |a| null| +-+--+ {noformat} The same happens with Scala, just wrap Scala's Int with a case class (because it defaults to 0) and you'll get the same result. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15810) Aggregator doesn't play nice with Option
[ https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15370494#comment-15370494 ] Amit Sela commented on SPARK-15810: --- [~rxin] what's your take on the Java API issue with null ? does this belong here ? do you think it should be a separate issue ? > Aggregator doesn't play nice with Option > > > Key: SPARK-15810 > URL: https://issues.apache.org/jira/browse/SPARK-15810 > Project: Spark > Issue Type: Sub-task > Components: SQL > Environment: spark 2.0.0-SNAPSHOT >Reporter: koert kuipers > > {code} > val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS > val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) } > val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), > Option[Int], Option[Int]]{ > def zero: Option[Int] = None > def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = > b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2) > def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => > b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2) > def finish(reduction: Option[Int]): Option[Int] = reduction > def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > }.toColumn) > ds3.printSchema > ds3.show > {code} > i get as output a somewhat odd looking schema, and after that the program > just hangs pinning one cpu at 100%. the data never shows. > output: > {noformat} > root > |-- value: string (nullable = true) > |-- $anon$1(scala.Tuple2): struct (nullable = true) > ||-- value: integer (nullable = true) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-16474) Global Aggregation doesn't seem to work at all
[ https://issues.apache.org/jira/browse/SPARK-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela closed SPARK-16474. - Resolution: Not A Problem It seems as if the right way to use the agg() API directly on Dataset/DataFrame is with select() or providing an implicit conversion. > Global Aggregation doesn't seem to work at all > --- > > Key: SPARK-16474 > URL: https://issues.apache.org/jira/browse/SPARK-16474 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 1.6.2, 2.0.0 >Reporter: Amit Sela > > Executing a global aggregation (not grouped by key) fails. > Take the following code for example: > {code} > val session = SparkSession.builder() > .appName("TestGlobalAggregator") > .master("local[*]") > .getOrCreate() > import session.implicits._ > val ds1 = List(1, 2, 3).toDS > val ds2 = ds1.agg( > new Aggregator[Int, Int, Int]{ > def zero: Int = 0 > def reduce(b: Int, a: Int): Int = b + a > def merge(b1: Int, b2: Int): Int = b1 + b2 > def finish(reduction: Int): Int = reduction > def bufferEncoder: Encoder[Int] = implicitly[Encoder[Int]] > def outputEncoder: Encoder[Int] = implicitly[Encoder[Int]] > }.toColumn) > ds2.printSchema > ds2.show > {code} > I would expect the result to be 6, but instead I get the following exception: > {noformat} > java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast > to java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) > . > {noformat} > Trying the same code on DataFrames in 1.6.2 results in: > {noformat} > Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved > operator 'Aggregate [(anon$1(),mode=Complete,isDistinct=false) AS > anon$1()#8]; > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16474) Global Aggregation doesn't seem to work at all
[ https://issues.apache.org/jira/browse/SPARK-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369829#comment-15369829 ] Amit Sela commented on SPARK-16474: --- I thought the bufferEncoder is supposed to take care of that.. unless it's because of the input ? would that be solved by https://issues.apache.org/jira/browse/SPARK-15769 ? Anyway, I'll close this, though this is a bit confusing because the API looks the same as if I was doing groupByKey().agg(), and in this case it works. Is this because the groupByKey provides the conversion? > Global Aggregation doesn't seem to work at all > --- > > Key: SPARK-16474 > URL: https://issues.apache.org/jira/browse/SPARK-16474 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 1.6.2, 2.0.0 >Reporter: Amit Sela > > Executing a global aggregation (not grouped by key) fails. > Take the following code for example: > {code} > val session = SparkSession.builder() > .appName("TestGlobalAggregator") > .master("local[*]") > .getOrCreate() > import session.implicits._ > val ds1 = List(1, 2, 3).toDS > val ds2 = ds1.agg( > new Aggregator[Int, Int, Int]{ > def zero: Int = 0 > def reduce(b: Int, a: Int): Int = b + a > def merge(b1: Int, b2: Int): Int = b1 + b2 > def finish(reduction: Int): Int = reduction > def bufferEncoder: Encoder[Int] = implicitly[Encoder[Int]] > def outputEncoder: Encoder[Int] = implicitly[Encoder[Int]] > }.toColumn) > ds2.printSchema > ds2.show > {code} > I would expect the result to be 6, but instead I get the following exception: > {noformat} > java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast > to java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) > . > {noformat} > Trying the same code on DataFrames in 1.6.2 results in: > {noformat} > Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved > operator 'Aggregate [(anon$1(),mode=Complete,isDistinct=false) AS > anon$1()#8]; > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-16474) Global Aggregation doesn't seem to work at all
[ https://issues.apache.org/jira/browse/SPARK-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369706#comment-15369706 ] Amit Sela commented on SPARK-16474: --- Thanks [~koert] that works. > Global Aggregation doesn't seem to work at all > --- > > Key: SPARK-16474 > URL: https://issues.apache.org/jira/browse/SPARK-16474 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 1.6.2, 2.0.0 >Reporter: Amit Sela > > Executing a global aggregation (not grouped by key) fails. > Take the following code for example: > {code} > val session = SparkSession.builder() > .appName("TestGlobalAggregator") > .master("local[*]") > .getOrCreate() > import session.implicits._ > val ds1 = List(1, 2, 3).toDS > val ds2 = ds1.agg( > new Aggregator[Int, Int, Int]{ > def zero: Int = 0 > def reduce(b: Int, a: Int): Int = b + a > def merge(b1: Int, b2: Int): Int = b1 + b2 > def finish(reduction: Int): Int = reduction > def bufferEncoder: Encoder[Int] = implicitly[Encoder[Int]] > def outputEncoder: Encoder[Int] = implicitly[Encoder[Int]] > }.toColumn) > ds2.printSchema > ds2.show > {code} > I would expect the result to be 6, but instead I get the following exception: > {noformat} > java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast > to java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) > . > {noformat} > Trying the same code on DataFrames in 1.6.2 results in: > {noformat} > Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved > operator 'Aggregate [(anon$1(),mode=Complete,isDistinct=false) AS > anon$1()#8]; > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-15810) Aggregator doesn't play nice with Option
[ https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369496#comment-15369496 ] Amit Sela edited comment on SPARK-15810 at 7/10/16 2:53 PM: Just ran this exact code, prefixed by: {code} val session = SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate() import session.implicits._ {code} This is the schema I get: {noformat} root |-- value: string (nullable = true) |-- anon$1(scala.Tuple2): struct (nullable = true) ||-- value: integer (nullable = true) {noformat} And this is the output: {noformat} +-++ |value|anon$1(scala.Tuple2)| +-++ |a| [5]| +-++ {noformat} So I don't know about the schema looking odd, but I get a result. was (Author: amitsela): Just ran this exact code, prefixed by: {code} val session = SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate() import session.implicits._ {code} This is the schema I get: {noformat} root |-- value: string (nullable = true) |-- anon$1(scala.Tuple2): struct (nullable = true) ||-- value: integer (nullable = true) {noformat} And this is the output: {noformat} +-++ |value|anon$1(scala.Tuple2)| +-++ |a| [5]| +-++ {noformat} > Aggregator doesn't play nice with Option > > > Key: SPARK-15810 > URL: https://issues.apache.org/jira/browse/SPARK-15810 > Project: Spark > Issue Type: Sub-task > Components: SQL > Environment: spark 2.0.0-SNAPSHOT >Reporter: koert kuipers > > {code} > val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS > val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) } > val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), > Option[Int], Option[Int]]{ > def zero: Option[Int] = None > def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = > b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2) > def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => > b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2) > def finish(reduction: Option[Int]): Option[Int] = reduction > def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > }.toColumn) > ds3.printSchema > ds3.show > {code} > i get as output a somewhat odd looking schema, and after that the program > just hangs pinning one cpu at 100%. the data never shows. > output: > {noformat} > root > |-- value: string (nullable = true) > |-- $anon$1(scala.Tuple2): struct (nullable = true) > ||-- value: integer (nullable = true) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-16474) Global Aggregation doesn't seem to work at all
Amit Sela created SPARK-16474: - Summary: Global Aggregation doesn't seem to work at all Key: SPARK-16474 URL: https://issues.apache.org/jira/browse/SPARK-16474 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 1.6.2, 2.0.0 Reporter: Amit Sela Executing a global aggregation (not grouped by key) fails. Take the following code for example: {code} val session = SparkSession.builder() .appName("TestGlobalAggregator") .master("local[*]") .getOrCreate() import session.implicits._ val ds1 = List(1, 2, 3).toDS val ds2 = ds1.agg( new Aggregator[Int, Int, Int]{ def zero: Int = 0 def reduce(b: Int, a: Int): Int = b + a def merge(b1: Int, b2: Int): Int = b1 + b2 def finish(reduction: Int): Int = reduction def bufferEncoder: Encoder[Int] = implicitly[Encoder[Int]] def outputEncoder: Encoder[Int] = implicitly[Encoder[Int]] }.toColumn) ds2.printSchema ds2.show {code} I would expect the result to be 6, but instead I get the following exception: {noformat} java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) . {noformat} Trying the same code on DataFrames in 1.6.2 results in: {noformat} Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [(anon$1(),mode=Complete,isDistinct=false) AS anon$1()#8]; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) .. {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-15810) Aggregator doesn't play nice with Option
[ https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369509#comment-15369509 ] Amit Sela edited comment on SPARK-15810 at 7/10/16 9:01 AM: Running the (sort of) same Java code: {code} SparkSession session = SparkSession.builder() .appName("TestAggregatorJava") .master("local[*]") .getOrCreate(); Dataset> ds1 = session.createDataset(Arrays.asList( new Tuple2<>("a", 1), new Tuple2<>("a", 2), new Tuple2<>("a", 3) ), Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds2 = ds1.map( new MapFunction , Tuple2 >() { @Override public Tuple2 call(Tuple2 value) throws Exception { if (value._2() > 1) { return value; } else { return new Tuple2<>(value._1, null); } } }, Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds3 = ds2.groupByKey( new MapFunction , String>() { @Override public String call(Tuple2 value) throws Exception { return value._1(); } }, Encoders.STRING()).agg(new Aggregator , Integer, Integer>() { @Override public Integer zero() { return null; } @Override public Integer reduce(Integer b, Tuple2 a) { return merge(b, a._2()); } @Override public Integer merge(Integer b1, Integer b2) { if (b1 == null) { return b2; } else if (b2 == null){ return b1; } else { return b1 + b2; } } @Override public Integer finish(Integer reduction) { return reduction; } @Override public Encoder bufferEncoder() { return Encoders.INT(); } @Override public Encoder outputEncoder() { return Encoders.INT(); } }.toColumn()); ds3.printSchema(); ds3.show(); } {code} I get this schema: {noformat} root |-- value: string (nullable = true) |-- (scala.Tuple2): integer (nullable = true) {noformat} And this result: {noformat} +-+--+ |value|(scala.Tuple2)| +-+--+ |a| null| +-+--+ {noformat} As for Scala, it's clear that `Option` is preferred on `null`, but because the Dataset API is supposed to support Java as well, it should not discard the aggregation if the zero method returns null. For Java, I currently use Guava's `Optional` but that just seems cumbersome to me. was (Author: amitsela): Running the (sort of) same Java code: {code} SparkSession session = SparkSession.builder() .appName("TestAggregatorJava") .master("local[*]") .getOrCreate(); Dataset > ds1 = session.createDataset(Arrays.asList( new Tuple2<>("a", 1), new Tuple2<>("a", 2), new Tuple2<>("a", 3) ), Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds2 = ds1.map( new MapFunction , Tuple2 >() { @Override public Tuple2 call(Tuple2 value) throws Exception { if (value._2() > 1) { return value; } else { return new Tuple2<>(value._1, null); } } }, Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds3 = ds2.groupByKey( new MapFunction , String>() { @Override public String call(Tuple2 value) throws Exception { return value._1(); } }, Encoders.STRING()).agg(new Aggregator , Integer, Integer>() { @Override public Integer zero() { return null; } @Override public Integer reduce(Integer b, Tuple2 a) { return merge(b, a._2()); } @Override public Integer merge(Integer b1, Integer b2) { if (b1 == null) { return b2; } else if (b2 == null){ return b1; } else { return b1 + b2; } } @Override public Integer finish(Integer reduction) { return reduction; } @Override public Encoder bufferEncoder() { return Encoders.INT(); } @Override public Encoder outputEncoder() { return Encoders.INT(); } }.toColumn());
[jira] [Comment Edited] (SPARK-15810) Aggregator doesn't play nice with Option
[ https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369509#comment-15369509 ] Amit Sela edited comment on SPARK-15810 at 7/10/16 8:59 AM: Running the (sort of) same Java code: {code} SparkSession session = SparkSession.builder() .appName("TestAggregatorJava") .master("local[*]") .getOrCreate(); Dataset> ds1 = session.createDataset(Arrays.asList( new Tuple2<>("a", 1), new Tuple2<>("a", 2), new Tuple2<>("a", 3) ), Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds2 = ds1.map( new MapFunction , Tuple2 >() { @Override public Tuple2 call(Tuple2 value) throws Exception { if (value._2() > 1) { return value; } else { return new Tuple2<>(value._1, null); } } }, Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds3 = ds2.groupByKey( new MapFunction , String>() { @Override public String call(Tuple2 value) throws Exception { return value._1(); } }, Encoders.STRING()).agg(new Aggregator , Integer, Integer>() { @Override public Integer zero() { return null; } @Override public Integer reduce(Integer b, Tuple2 a) { return merge(b, a._2()); } @Override public Integer merge(Integer b1, Integer b2) { if (b1 == null) { return b2; } else if (b2 == null){ return b1; } else { return b1 + b2; } } @Override public Integer finish(Integer reduction) { return reduction; } @Override public Encoder bufferEncoder() { return Encoders.INT(); } @Override public Encoder outputEncoder() { return Encoders.INT(); } }.toColumn()); ds3.printSchema(); ds3.show(); } {code} I get this schema: {noformat} root |-- value: string (nullable = true) |-- (scala.Tuple2): integer (nullable = true) {noformat} And this result: {noformat} +-+--+ |value|(scala.Tuple2)| +-+--+ |a| null| +-+--+ {noformat} As for Scala, it's clear that `Option` is preferred on `null`, but because Dataset API is supposed to support Java as well, it should not discard the aggregation if the zero method returns null. For Java, I currently use Guava's `Optional` but that just seems cumbersome to me. was (Author: amitsela): Running the (sort of) same Java code: {code} SparkSession session = SparkSession.builder() .appName("TestAggregatorJava") .master("local[*]") .getOrCreate(); Dataset > ds1 = session.createDataset(Arrays.asList( new Tuple2<>("a", 1), new Tuple2<>("a", 2), new Tuple2<>("a", 3) ), Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds2 = ds1.map( new MapFunction , Tuple2 >() { @Override public Tuple2 call(Tuple2 value) throws Exception { if (value._2() > 1) { return value; } else { return new Tuple2<>(value._1, null); } } }, Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds3 = ds2.groupByKey( new MapFunction , String>() { @Override public String call(Tuple2 value) throws Exception { return value._1(); } }, Encoders.STRING()).agg(new Aggregator , Integer, Integer>() { @Override public Integer zero() { return null; } @Override public Integer reduce(Integer b, Tuple2 a) { return merge(b, a._2()); } @Override public Integer merge(Integer b1, Integer b2) { if (b1 == null) { return b2; } else if (b2 == null){ return b1; } else { return b1 + b2; } } @Override public Integer finish(Integer reduction) { return reduction; } @Override public Encoder bufferEncoder() { return Encoders.INT(); } @Override public Encoder outputEncoder() { return Encoders.INT(); } }.toColumn());
[jira] [Comment Edited] (SPARK-15810) Aggregator doesn't play nice with Option
[ https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369509#comment-15369509 ] Amit Sela edited comment on SPARK-15810 at 7/10/16 8:59 AM: Running the (sort of) same Java code: {code} SparkSession session = SparkSession.builder() .appName("TestAggregatorJava") .master("local[*]") .getOrCreate(); Dataset> ds1 = session.createDataset(Arrays.asList( new Tuple2<>("a", 1), new Tuple2<>("a", 2), new Tuple2<>("a", 3) ), Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds2 = ds1.map( new MapFunction , Tuple2 >() { @Override public Tuple2 call(Tuple2 value) throws Exception { if (value._2() > 1) { return value; } else { return new Tuple2<>(value._1, null); } } }, Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds3 = ds2.groupByKey( new MapFunction , String>() { @Override public String call(Tuple2 value) throws Exception { return value._1(); } }, Encoders.STRING()).agg(new Aggregator , Integer, Integer>() { @Override public Integer zero() { return null; } @Override public Integer reduce(Integer b, Tuple2 a) { return merge(b, a._2()); } @Override public Integer merge(Integer b1, Integer b2) { if (b1 == null) { return b2; } else if (b2 == null){ return b1; } else { return b1 + b2; } } @Override public Integer finish(Integer reduction) { return reduction; } @Override public Encoder bufferEncoder() { return Encoders.INT(); } @Override public Encoder outputEncoder() { return Encoders.INT(); } }.toColumn()); ds3.printSchema(); ds3.show(); } {code} I get this schema: {noformat} root |-- value: string (nullable = true) |-- (scala.Tuple2): integer (nullable = true) {noformat} And this result: {noformat} +-+--+ |value|(scala.Tuple2)| +-+--+ |a| null| +-+--+ {noformat} As for Scala, it's clear that {noformat}Option{noformat} is preferred on `null`, but because Dataset API is supposed to support Java as well, it should not discard the aggregation if the zero method returns null. For Java, I currently use Guava's `Optional` but that just seems cumbersome to me. was (Author: amitsela): Running the (sort of) same Java code: {code} SparkSession session = SparkSession.builder() .appName("TestAggregatorJava") .master("local[*]") .getOrCreate(); Dataset > ds1 = session.createDataset(Arrays.asList( new Tuple2<>("a", 1), new Tuple2<>("a", 2), new Tuple2<>("a", 3) ), Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds2 = ds1.map( new MapFunction , Tuple2 >() { @Override public Tuple2 call(Tuple2 value) throws Exception { if (value._2() > 1) { return value; } else { return new Tuple2<>(value._1, null); } } }, Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds3 = ds2.groupByKey( new MapFunction , String>() { @Override public String call(Tuple2 value) throws Exception { return value._1(); } }, Encoders.STRING()).agg(new Aggregator , Integer, Integer>() { @Override public Integer zero() { return null; } @Override public Integer reduce(Integer b, Tuple2 a) { return merge(b, a._2()); } @Override public Integer merge(Integer b1, Integer b2) { if (b1 == null) { return b2; } else if (b2 == null){ return b1; } else { return b1 + b2; } } @Override public Integer finish(Integer reduction) { return reduction; } @Override public Encoder bufferEncoder() { return Encoders.INT(); } @Override public Encoder outputEncoder() { return Encoders.INT(); }
[jira] [Commented] (SPARK-15810) Aggregator doesn't play nice with Option
[ https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369509#comment-15369509 ] Amit Sela commented on SPARK-15810: --- Running the (sort of) same Java code: {code} SparkSession session = SparkSession.builder() .appName("TestAggregatorJava") .master("local[*]") .getOrCreate(); Dataset> ds1 = session.createDataset(Arrays.asList( new Tuple2<>("a", 1), new Tuple2<>("a", 2), new Tuple2<>("a", 3) ), Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds2 = ds1.map( new MapFunction , Tuple2 >() { @Override public Tuple2 call(Tuple2 value) throws Exception { if (value._2() > 1) { return value; } else { return new Tuple2<>(value._1, null); } } }, Encoders.tuple(Encoders.STRING(), Encoders.INT())); Dataset > ds3 = ds2.groupByKey( new MapFunction , String>() { @Override public String call(Tuple2 value) throws Exception { return value._1(); } }, Encoders.STRING()).agg(new Aggregator , Integer, Integer>() { @Override public Integer zero() { return null; } @Override public Integer reduce(Integer b, Tuple2 a) { return merge(b, a._2()); } @Override public Integer merge(Integer b1, Integer b2) { if (b1 == null) { return b2; } else if (b2 == null){ return b1; } else { return b1 + b2; } } @Override public Integer finish(Integer reduction) { return reduction; } @Override public Encoder bufferEncoder() { return Encoders.INT(); } @Override public Encoder outputEncoder() { return Encoders.INT(); } }.toColumn()); ds3.printSchema(); ds3.show(); } {code} I get this schema: {noformat} root |-- value: string (nullable = true) |-- (scala.Tuple2): integer (nullable = true) {noformat} And this result: {noformat} +-+--+ |value|(scala.Tuple2)| +-+--+ |a| null| +-+--+ {noformat} As for Scala, it's clear that `Option` is preferred on `null`, but because Dataset API is supposed to support Java as well, it should not discard the aggregation if the zero method returns null. For Java, I currently use Guava's `Optional` but that just seems cumbersome to me. > Aggregator doesn't play nice with Option > > > Key: SPARK-15810 > URL: https://issues.apache.org/jira/browse/SPARK-15810 > Project: Spark > Issue Type: Sub-task > Components: SQL > Environment: spark 2.0.0-SNAPSHOT >Reporter: koert kuipers > > {code} > val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS > val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) } > val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), > Option[Int], Option[Int]]{ > def zero: Option[Int] = None > def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = > b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2) > def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => > b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2) > def finish(reduction: Option[Int]): Option[Int] = reduction > def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > }.toColumn) > ds3.printSchema > ds3.show > {code} > i get as output a somewhat odd looking schema, and after that the program > just hangs pinning one cpu at 100%. the data never shows. > output: > {noformat} > root > |-- value: string (nullable = true) > |-- $anon$1(scala.Tuple2): struct (nullable = true) > ||-- value: integer (nullable = true) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-15810) Aggregator doesn't play nice with Option
[ https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369496#comment-15369496 ] Amit Sela edited comment on SPARK-15810 at 7/10/16 8:28 AM: Just ran this exact code, prefixed by: {code} val session = SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate() import session.implicits._ {code} This is the schema I get: {noformat} root |-- value: string (nullable = true) |-- anon$1(scala.Tuple2): struct (nullable = true) ||-- value: integer (nullable = true) {noformat} And this is the output: {noformat} +-++ |value|anon$1(scala.Tuple2)| +-++ |a| [5]| +-++ {noformat} was (Author: amitsela): Just ran this exact code, prefixed by: {code} val session = SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate() import session.implicits._ {code} This is the schema I get: {noformat} root |-- value: string (nullable = true) |-- anon$1(scala.Tuple2): struct (nullable = true) ||-- value: integer (nullable = true) {noformat} And this is the output: {noformat} +-++ |value|anon$1(scala.Tuple2)| +-++ |a| [5]| +-++ {noformat} > Aggregator doesn't play nice with Option > > > Key: SPARK-15810 > URL: https://issues.apache.org/jira/browse/SPARK-15810 > Project: Spark > Issue Type: Sub-task > Components: SQL > Environment: spark 2.0.0-SNAPSHOT >Reporter: koert kuipers > > {code} > val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS > val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) } > val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), > Option[Int], Option[Int]]{ > def zero: Option[Int] = None > def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = > b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2) > def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => > b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2) > def finish(reduction: Option[Int]): Option[Int] = reduction > def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > }.toColumn) > ds3.printSchema > ds3.show > {code} > i get as output a somewhat odd looking schema, and after that the program > just hangs pinning one cpu at 100%. the data never shows. > output: > {noformat} > root > |-- value: string (nullable = true) > |-- $anon$1(scala.Tuple2): struct (nullable = true) > ||-- value: integer (nullable = true) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-15810) Aggregator doesn't play nice with Option
[ https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369496#comment-15369496 ] Amit Sela edited comment on SPARK-15810 at 7/10/16 8:28 AM: Just ran this exact code, prefixed by: {code} val session = SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate() import session.implicits._ {code} This is the schema I get: {noformat} root |-- value: string (nullable = true) |-- anon$1(scala.Tuple2): struct (nullable = true) ||-- value: integer (nullable = true) {noformat} And this is the output: {noformat} +-++ |value|anon$1(scala.Tuple2)| +-++ |a| [5]| +-++ {noformat} was (Author: amitsela): Just ran this exact code, prefixed by: {code} val session = SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate() import session.implicits._val session = SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate() import session.implicits._ {code} This is the schema I get: {noformat} root |-- value: string (nullable = true) |-- anon$1(scala.Tuple2): struct (nullable = true) ||-- value: integer (nullable = true) {noformat} And this is the output: {noformat} +-++ |value|anon$1(scala.Tuple2)| +-++ |a| [5]| +-++ {noformat} > Aggregator doesn't play nice with Option > > > Key: SPARK-15810 > URL: https://issues.apache.org/jira/browse/SPARK-15810 > Project: Spark > Issue Type: Sub-task > Components: SQL > Environment: spark 2.0.0-SNAPSHOT >Reporter: koert kuipers > > {code} > val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS > val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) } > val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), > Option[Int], Option[Int]]{ > def zero: Option[Int] = None > def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = > b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2) > def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => > b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2) > def finish(reduction: Option[Int]): Option[Int] = reduction > def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > }.toColumn) > ds3.printSchema > ds3.show > {code} > i get as output a somewhat odd looking schema, and after that the program > just hangs pinning one cpu at 100%. the data never shows. > output: > {noformat} > root > |-- value: string (nullable = true) > |-- $anon$1(scala.Tuple2): struct (nullable = true) > ||-- value: integer (nullable = true) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15810) Aggregator doesn't play nice with Option
[ https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15369496#comment-15369496 ] Amit Sela commented on SPARK-15810: --- Just ran this exact code, prefixed by: {code} val session = SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate() import session.implicits._val session = SparkSession.builder().appName("TestAggregator").master("local[*]").getOrCreate() import session.implicits._ {code} This is the schema I get: {noformat} root |-- value: string (nullable = true) |-- anon$1(scala.Tuple2): struct (nullable = true) ||-- value: integer (nullable = true) {noformat} And this is the output: {noformat} +-++ |value|anon$1(scala.Tuple2)| +-++ |a| [5]| +-++ {noformat} > Aggregator doesn't play nice with Option > > > Key: SPARK-15810 > URL: https://issues.apache.org/jira/browse/SPARK-15810 > Project: Spark > Issue Type: Sub-task > Components: SQL > Environment: spark 2.0.0-SNAPSHOT >Reporter: koert kuipers > > {code} > val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS > val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) } > val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), > Option[Int], Option[Int]]{ > def zero: Option[Int] = None > def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = > b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2) > def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => > b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2) > def finish(reduction: Option[Int]): Option[Int] = reduction > def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > }.toColumn) > ds3.printSchema > ds3.show > {code} > i get as output a somewhat odd looking schema, and after that the program > just hangs pinning one cpu at 100%. the data never shows. > output: > {noformat} > root > |-- value: string (nullable = true) > |-- $anon$1(scala.Tuple2): struct (nullable = true) > ||-- value: integer (nullable = true) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15489) Dataset kryo encoder won't load custom user settings
[ https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305997#comment-15305997 ] Amit Sela commented on SPARK-15489: --- [~marmbrus] you can assign this one to me. > Dataset kryo encoder won't load custom user settings > - > > Key: SPARK-15489 > URL: https://issues.apache.org/jira/browse/SPARK-15489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Amit Sela > > When setting a custom "spark.kryo.registrator" (or any other configuration > for that matter) through the API, this configuration will not propagate to > the encoder that uses a KryoSerializer since it instantiates with "new > SparkConf()". > See: > https://github.com/apache/spark/blob/07c36a2f07fcf5da6fb395f830ebbfc10eb27dcc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala#L554 > This could be hacked by providing those configurations as System properties, > but this probably should be passed to the encoder and set in the > SerializerInstance after creation. > Example: > When using Encoders with kryo to encode generically typed Objects in the > following manner: > public static Encoder encoder() { > return Encoders.kryo((Class) Object.class); > } > I get a decoding exception when trying to decode > `java.util.Collections$UnmodifiableCollection`, which probably comes from > Guava's `ImmutableList`. > This happens when running with master = local[1]. Same code had no problems > with RDD api. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-15489) Dataset kryo encoder won't load custom user settings
[ https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela updated SPARK-15489: -- Description: When setting a custom "spark.kryo.registrator" (or any other configuration for that matter) through the API, this configuration will not propagate to the encoder that uses a KryoSerializer since it instantiates with "new SparkConf()". See: https://github.com/apache/spark/blob/07c36a2f07fcf5da6fb395f830ebbfc10eb27dcc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala#L554 This could be hacked by providing those configurations as System properties, but this probably should be passed to the encoder and set in the SerializerInstance after creation. Example: When using Encoders with kryo to encode generically typed Objects in the following manner: public static Encoder encoder() { return Encoders.kryo((Class) Object.class); } I get a decoding exception when trying to decode `java.util.Collections$UnmodifiableCollection`, which probably comes from Guava's `ImmutableList`. This happens when running with master = local[1]. Same code had no problems with RDD api. was: When setting a custom "spark.kryo.registrator" (or any other configuration for that matter) through the API, this configuration will not propagate to the encoder that uses a KryoSerializer since it instantiates with "new SparkConf()". See: Example: When using Encoders with kryo to encode generically typed Objects in the following manner: public static Encoder encoder() { return Encoders.kryo((Class) Object.class); } I get a decoding exception when trying to decode `java.util.Collections$UnmodifiableCollection`, which probably comes from Guava's `ImmutableList`. This happens when running with master = local[1]. Same code had no problems with RDD api. > Dataset kryo encoder won't load custom user settings > - > > Key: SPARK-15489 > URL: https://issues.apache.org/jira/browse/SPARK-15489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Amit Sela > > When setting a custom "spark.kryo.registrator" (or any other configuration > for that matter) through the API, this configuration will not propagate to > the encoder that uses a KryoSerializer since it instantiates with "new > SparkConf()". > See: > https://github.com/apache/spark/blob/07c36a2f07fcf5da6fb395f830ebbfc10eb27dcc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala#L554 > This could be hacked by providing those configurations as System properties, > but this probably should be passed to the encoder and set in the > SerializerInstance after creation. > Example: > When using Encoders with kryo to encode generically typed Objects in the > following manner: > public static Encoder encoder() { > return Encoders.kryo((Class) Object.class); > } > I get a decoding exception when trying to decode > `java.util.Collections$UnmodifiableCollection`, which probably comes from > Guava's `ImmutableList`. > This happens when running with master = local[1]. Same code had no problems > with RDD api. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-15489) Dataset kryo encoder won't load custom user settings
[ https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela updated SPARK-15489: -- Description: When setting a custom "spark.kryo.registrator" (or any other configuration for that matter) through the API, this configuration will not propagate to the encoder that uses a KryoSerializer since it instantiates with "new SparkConf()". See: Example: When using Encoders with kryo to encode generically typed Objects in the following manner: public static Encoder encoder() { return Encoders.kryo((Class) Object.class); } I get a decoding exception when trying to decode `java.util.Collections$UnmodifiableCollection`, which probably comes from Guava's `ImmutableList`. This happens when running with master = local[1]. Same code had no problems with RDD api. was: When using Encoders with kryo to encode generically typed Objects in the following manner: public static Encoder encoder() { return Encoders.kryo((Class) Object.class); } I get a decoding exception when trying to decode `java.util.Collections$UnmodifiableCollection`, which probably comes from Guava's `ImmutableList`. This happens when running with master = local[1]. Same code had no problems with RDD api. > Dataset kryo encoder won't load custom user settings > - > > Key: SPARK-15489 > URL: https://issues.apache.org/jira/browse/SPARK-15489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Amit Sela > > When setting a custom "spark.kryo.registrator" (or any other configuration > for that matter) through the API, this configuration will not propagate to > the encoder that uses a KryoSerializer since it instantiates with "new > SparkConf()". > See: > Example: > When using Encoders with kryo to encode generically typed Objects in the > following manner: > public static Encoder encoder() { > return Encoders.kryo((Class) Object.class); > } > I get a decoding exception when trying to decode > `java.util.Collections$UnmodifiableCollection`, which probably comes from > Guava's `ImmutableList`. > This happens when running with master = local[1]. Same code had no problems > with RDD api. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-15489) Dataset kryo encoder won't load custom user settings
[ https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Amit Sela updated SPARK-15489: -- Summary: Dataset kryo encoder won't load custom user settings (was: Dataset kryo encoder fails on Collections$UnmodifiableCollection) > Dataset kryo encoder won't load custom user settings > - > > Key: SPARK-15489 > URL: https://issues.apache.org/jira/browse/SPARK-15489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Amit Sela > > When using Encoders with kryo to encode generically typed Objects in the > following manner: > public static Encoder encoder() { > return Encoders.kryo((Class) Object.class); > } > I get a decoding exception when trying to decode > `java.util.Collections$UnmodifiableCollection`, which probably comes from > Guava's `ImmutableList`. > This happens when running with master = local[1]. Same code had no problems > with RDD api. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection
[ https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15304893#comment-15304893 ] Amit Sela commented on SPARK-15489: --- The issue here is the fact that setting the SparkConf does not propagate to the KryoSerializer used by the encoder. I managed to make this work by using Java System properties instead of SparkConf#set since the SparkConf constructor will take them into account, but it's a hack... For now I think I'll change the description of the issue, and propose this as a temporary solution. > Dataset kryo encoder fails on Collections$UnmodifiableCollection > > > Key: SPARK-15489 > URL: https://issues.apache.org/jira/browse/SPARK-15489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Amit Sela > > When using Encoders with kryo to encode generically typed Objects in the > following manner: > public static Encoder encoder() { > return Encoders.kryo((Class) Object.class); > } > I get a decoding exception when trying to decode > `java.util.Collections$UnmodifiableCollection`, which probably comes from > Guava's `ImmutableList`. > This happens when running with master = local[1]. Same code had no problems > with RDD api. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection
[ https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15304886#comment-15304886 ] Amit Sela commented on SPARK-15489: --- Got it! So I wasn't using the custom registrator correctly, it works better like this: public class ImmutablesRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { UnmodifiableCollectionsSerializer.registerSerializers(kryo); // Guava ImmutableListSerializer.registerSerializers(kryo); ImmutableSetSerializer.registerSerializers(kryo); ImmutableMapSerializer.registerSerializers(kryo); ImmutableMultimapSerializer.registerSerializers(kryo); } } > Dataset kryo encoder fails on Collections$UnmodifiableCollection > > > Key: SPARK-15489 > URL: https://issues.apache.org/jira/browse/SPARK-15489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Amit Sela > > When using Encoders with kryo to encode generically typed Objects in the > following manner: > public static Encoder encoder() { > return Encoders.kryo((Class) Object.class); > } > I get a decoding exception when trying to decode > `java.util.Collections$UnmodifiableCollection`, which probably comes from > Guava's `ImmutableList`. > This happens when running with master = local[1]. Same code had no problems > with RDD api. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection
[ https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15304222#comment-15304222 ] Amit Sela commented on SPARK-15489: --- I would expect this to be related to KryoSerializer not registering the user-provided registrator, but I've added a print in https://github.com/apache/spark/blob/07c36a2f07fcf5da6fb395f830ebbfc10eb27dcc/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala#L123 to check if any new Kryo is created without the provided registrator, but it seems that all instances have a user-provided registrator (if one is provided). > Dataset kryo encoder fails on Collections$UnmodifiableCollection > > > Key: SPARK-15489 > URL: https://issues.apache.org/jira/browse/SPARK-15489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Amit Sela > > When using Encoders with kryo to encode generically typed Objects in the > following manner: > public static Encoder encoder() { > return Encoders.kryo((Class) Object.class); > } > I get a decoding exception when trying to decode > `java.util.Collections$UnmodifiableCollection`, which probably comes from > Guava's `ImmutableList`. > This happens when running with master = local[1]. Same code had no problems > with RDD api. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection
[ https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298847#comment-15298847 ] Amit Sela commented on SPARK-15489: --- Well it looks like the codegen creates a ```java new SparkConf()```, instead of deserializing a "broadcasted" one. I've tried adding the registrator configuration as a System parameter (-D), but it didn't catch. Is the generated code executed in the JVM ? in the same one if running in standalone ? > Dataset kryo encoder fails on Collections$UnmodifiableCollection > > > Key: SPARK-15489 > URL: https://issues.apache.org/jira/browse/SPARK-15489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Amit Sela > > When using Encoders with kryo to encode generically typed Objects in the > following manner: > public static Encoder encoder() { > return Encoders.kryo((Class) Object.class); > } > I get a decoding exception when trying to decode > `java.util.Collections$UnmodifiableCollection`, which probably comes from > Guava's `ImmutableList`. > This happens when running with master = local[1]. Same code had no problems > with RDD api. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection
[ https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15298847#comment-15298847 ] Amit Sela edited comment on SPARK-15489 at 5/24/16 8:38 PM: Well it looks like the codegen creates a `new SparkConf()`, instead of deserializing a "broadcasted" one. I've tried adding the registrator configuration as a System parameter (-D), but it didn't catch. Is the generated code executed in the JVM ? in the same one if running in standalone ? was (Author: amitsela): Well it looks like the codegen creates a ```java new SparkConf()```, instead of deserializing a "broadcasted" one. I've tried adding the registrator configuration as a System parameter (-D), but it didn't catch. Is the generated code executed in the JVM ? in the same one if running in standalone ? > Dataset kryo encoder fails on Collections$UnmodifiableCollection > > > Key: SPARK-15489 > URL: https://issues.apache.org/jira/browse/SPARK-15489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Amit Sela > > When using Encoders with kryo to encode generically typed Objects in the > following manner: > public static Encoder encoder() { > return Encoders.kryo((Class) Object.class); > } > I get a decoding exception when trying to decode > `java.util.Collections$UnmodifiableCollection`, which probably comes from > Guava's `ImmutableList`. > This happens when running with master = local[1]. Same code had no problems > with RDD api. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection
[ https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15297087#comment-15297087 ] Amit Sela commented on SPARK-15489: --- This is my registrator: public class ImmutableRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo) { try { kryo.register(Class.forName("java.util.Collections$UnmodifiableCollection"), new UnmodifiableCollectionsSerializer()); kryo.register(ImmutableList.class, new ImmutableListSerializer()); } catch (ClassNotFoundException e) { // } } } And I register with: conf.set("spark.kryo.registrator", ImmutableRegistrator.class.getCanonicalName()) When KryoSerializer.deserializeStream is called I see my registrar in KryoSerializerInstance, but when KryoSerializer.deserialize[T: ClassTag](bytes: ByteBuffer) is called I'm not so sure, if the instance is this.ks then no, I don't see my registrar. > Dataset kryo encoder fails on Collections$UnmodifiableCollection > > > Key: SPARK-15489 > URL: https://issues.apache.org/jira/browse/SPARK-15489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Amit Sela > > When using Encoders with kryo to encode generically typed Objects in the > following manner: > public static Encoder encoder() { > return Encoders.kryo((Class) Object.class); > } > I get a decoding exception when trying to decode > `java.util.Collections$UnmodifiableCollection`, which probably comes from > Guava's `ImmutableList`. > This happens when running with master = local[1]. Same code had no problems > with RDD api. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection
[ https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296989#comment-15296989 ] Amit Sela commented on SPARK-15489: --- In 2.0.0-SNAPSHOT it manifests as: java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy at scala.Predef$.assert(Predef.scala:179) at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:157) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1075) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) . at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:287) at org.apache.spark.sql.Dataset$$anonfun$collectAsList$1$$anonfun$apply$12.apply(Dataset.scala:2115) at org.apache.spark.sql.Dataset$$anonfun$collectAsList$1$$anonfun$apply$12.apply(Dataset.scala:2114) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2436) at org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2114) at org.apache.spark.sql.Dataset$$anonfun$collectAsList$1.apply(Dataset.scala:2113) at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2449) at org.apache.spark.sql.Dataset.collectAsList(Dataset.scala:2113) .. > Dataset kryo encoder fails on Collections$UnmodifiableCollection > > > Key: SPARK-15489 > URL: https://issues.apache.org/jira/browse/SPARK-15489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Amit Sela > > When using Encoders with kryo to encode generically typed Objects in the > following manner: > public static Encoder encoder() { > return Encoders.kryo((Class) Object.class); > } > I get a decoding exception when trying to decode > `java.util.Collections$UnmodifiableCollection`, which probably comes from > Guava's `ImmutableList`. > This happens when running with master = local[1]. Same code had no problems > with RDD api. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection
[ https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296950#comment-15296950 ] Amit Sela commented on SPARK-15489: --- I've tried registering `UnmodifiableCollectionsSerializer` and `ImmutableListSerializer` from: https://github.com/magro/kryo-serializers but no luck there.. > Dataset kryo encoder fails on Collections$UnmodifiableCollection > > > Key: SPARK-15489 > URL: https://issues.apache.org/jira/browse/SPARK-15489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Amit Sela > > When using Encoders with kryo to encode generically typed Objects in the > following manner: > public static Encoder encoder() { > return Encoders.kryo((Class) Object.class); > } > I get a decoding exception when trying to decode > `java.util.Collections$UnmodifiableCollection`, which probably comes from > Guava's `ImmutableList`. > This happens when running with master = local[1]. Same code had no problems > with RDD api. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection
[ https://issues.apache.org/jira/browse/SPARK-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15296951#comment-15296951 ] Amit Sela commented on SPARK-15489: --- Serialization trace: tupleTags (org.apache.beam.sdk.values.TupleTagList) tupleTagList (org.apache.beam.sdk.transforms.join.CoGbkResultSchema) schema (org.apache.beam.sdk.transforms.join.CoGbkResult) value (org.apache.beam.sdk.values.KV) value (org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:230) ... 44 more Caused by: java.lang.UnsupportedOperationException at java.util.Collections$UnmodifiableCollection.add(Collections.java:1075) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) ... 61 more > Dataset kryo encoder fails on Collections$UnmodifiableCollection > > > Key: SPARK-15489 > URL: https://issues.apache.org/jira/browse/SPARK-15489 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.1 >Reporter: Amit Sela > > When using Encoders with kryo to encode generically typed Objects in the > following manner: > public static Encoder encoder() { > return Encoders.kryo((Class) Object.class); > } > I get a decoding exception when trying to decode > `java.util.Collections$UnmodifiableCollection`, which probably comes from > Guava's `ImmutableList`. > This happens when running with master = local[1]. Same code had no problems > with RDD api. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-15489) Dataset kryo encoder fails on Collections$UnmodifiableCollection
Amit Sela created SPARK-15489: - Summary: Dataset kryo encoder fails on Collections$UnmodifiableCollection Key: SPARK-15489 URL: https://issues.apache.org/jira/browse/SPARK-15489 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.1 Reporter: Amit Sela When using Encoders with kryo to encode generically typed Objects in the following manner: public static Encoder encoder() { return Encoders.kryo((Class) Object.class); } I get a decoding exception when trying to decode `java.util.Collections$UnmodifiableCollection`, which probably comes from Guava's `ImmutableList`. This happens when running with master = local[1]. Same code had no problems with RDD api. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org