[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198965#comment-16198965
 ] 

Jeremy Smith commented on SPARK-22231:
--------------------------------------

I implemented these at Netflix, so I wanted to provide some more context and 
answer some of the questions above.

First, [~nkronenfeld]:
1. I named it `withField` on a column, because it makes it clear that it's 
expected to be a struct column and you're dealing with fields of the struct. My 
take was that `withColumn` on a Column is somewhat confusing.
2. Maybe `withFieldRenamed`, following the above reasoning?
3. `mapItems` works at the Dataset level as well. We have extension methods for 
`Column` and `Dataset` to provide `mapItems` and `filterItems`. On `Column` 
there are two versions:

  * `[map|filter]Items(fn: Column => Column)` assumes the column upon which 
it's invoked is an array column, and returns a new array column which is 
updated using the given `Column => Column` lambda. The input `Column` to the 
lambda represents an element of the array column.
  * `[map|filter]Items(fieldName: String)(fn: Column => Column): Column` 
assumes the column upon which it's invoked is a struct, and that `fieldName` is 
an array field of that struct. It applies the `Column => Column` lambda to each 
element of the array field, and returns a new struct column with the 
`fieldName` field updated and all other fields of the original struct preserved.

On `Dataset`, the enrichment provides only the second form of extension methods:

* `[map|filter]Items(columnName: String)(fn: Column => Column): Dataset[Row]` 
assumes the given `columnName` of the `Dataset` upon which it's invoked is an 
array column, and returns a new `Dataset` with that column mapped using the 
given `Column => Column` lambda. All other columns of the original `Dataset` 
are preserved.

I wanted to also add a bit of context around how these are implemented and why 
we didn't simply make a PR with the code.

`mapItems` and `filterItems` behind the scenes use catalyst expressions for 
their implementation. `mapItems` simply provides an interface to the existing 
`MapObjects` expression, and `filterItems` uses a similar expression 
`FilterObjects` that we implemented. This presents a couple of issues with 
implementation:

1. `MapObjects` does not support `eval`, so if you get into the `eval` codepath 
(i.e. with anything that does `CodegenFallback`) then it will fail. We mitigate 
this by never using the default `CodegenFallback` and instead implemented our 
own `CodegenFallback`-like traits which use `eval` only for the current 
expression but use codegen to evaluate child expressions. We implemented both 
codegen and eval for `FilterObjects`, and we had originally intended to also 
implement eval for `MapObjects` and submit both in a PR. But it proved very 
difficult to test these in the existing expression test framework in the Spark 
codebase (we used our own test harness to test both code paths of 
`FilterObjects`). So that effort never came to fruition. Note that we're 
currently on Spark 2.0 with some backported patches, so maybe this will be 
easier when we migrate to Spark 2.1?
2. In order to work properly, we had to make sure that the columns given to 
`MapObjects` and `FilterObjects` are fully resolved when the expression is 
created; this was a while ago, but I think the reason was that transformation 
rules during analysis aren't easily extensible and the rules around 
`MapObjects` enforce that it must be fully resolved before analysis. You also 
need a DataType to pass to these, which you can only get from a resolved 
column. So you currently must do something like `df("col")` rather than 
`$"col"`, or you'll get an immediate exception. Another issue with this is that 
if you're doing nested transformations with `mapItems`, you cannot have fully 
resolved columns. We mitigated this by doing an out-of-band "ad-hoc analysis" 
step where we manually traverse the expression tree and replace 
`UnresolvedAlias` and `UnresolvedExtractValue` with their resolved versions, 
purely for the purposes of getting the DataType out. This is obviously not 
ideal, but again it's something that might require less workaround in newer 
Spark versions.

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-22231
>                 URL: https://issues.apache.org/jira/browse/SPARK-22231
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: DB Tsai
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  |    |-- element: struct (containsNull = false)
>  |    |    |-- title_id: integer (nullable = true)
>  |    |    |-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |    |-- element: double (containsNull = true)
> result.show()
> // +---+----+--------------------+
> // |foo| bar|               items|
> // +---+----+--------------------+
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+----+--------------------+
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |    |-- element: struct (containsNull = true)
> // |    |    |-- a: integer (nullable = true)
> // |    |    |-- b: double (nullable = true)
> result.show(false)
> // +---+----+----------------------+
> // |foo|bar |items                 |
> // +---+----+----------------------+
> // |10 |10.0|[[10,11.0], [11,12.0]]|
> // |20 |20.0|[[20,21.0], [21,22.0]]|
> // +---+----+----------------------+
> {code}
> and the second one adds a new column in the nested dataframe.
> {code:java}
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "c")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |    |-- element: struct (containsNull = true)
> // |    |    |-- a: integer (nullable = true)
> // |    |    |-- b: double (nullable = true)
> // |    |    |-- c: double (nullable = true)
> result.show(false)
> // +---+----+--------------------------------+
> // |foo|bar |items                           |
> // +---+----+--------------------------------+
> // |10 |10.0|[[10,10.0,11.0], [11,11.0,12.0]]|
> // |20 |20.0|[[20,20.0,21.0], [21,21.0,22.0]]|
> // +---+----+--------------------------------+
> {code}
> We also implement a filter predicate to nested list of struct, and it will 
> return those items which matched the predicate. The following is the API 
> example,
> {code:java}
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.filterItems("items") {
>   item => item("a") < 20
> }
> // +---+----+----------------------+
> // |foo|bar |items                 |
> // +---+----+----------------------+
> // |10 |10.0|[[10,10.0], [11,11.0]]|
> // |20 |20.0|[]                    |
> // +---+----+----------------------+
> {code}
> Dropping a column in the nested list of struct can be achieved by similar API 
> to *withColumn*. We add *drop* method to *Column* to implement this. Here is 
> an example,
> {code:java}
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.drop("b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |    |-- element: struct (containsNull = true)
> // |    |    |-- a: integer (nullable = true)
> result.show(false)
> // +---+----+------------+
> // |foo|bar |items       |
> // +---+----+------------+
> // |10 |10.0|[[10], [11]]|
> // |20 |20.0|[[20], [21]]|
> // +---+----+------------+
> {code}
> Note that all of those APIs are implemented by SQL expression with codegen; 
> as a result, those APIs are not opaque to Spark optimizers, and can fully 
> take advantage of columnar data structure. 
> We're looking forward to the community feedback and suggestion! Thanks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to