[ 
https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007191#comment-17007191
 ] 

fqaiser94 edited comment on SPARK-22231 at 1/3/20 3:07 AM:
-----------------------------------------------------------

I feel strongly that {{withField}} and {{withFieldRenamed}} methods should be 
on Column for the following reasons:

+*Reason #1*+

These methods are intended to only ever operate on {{StructType}} columns. 
 It does *not* make much sense to me to add methods to Dataframe that operate 
only on {{StructType}} columns. 
 It does however make sense to me to add methods to Column that operate only on 
{{StructType}} columns and there is lots of precedent for this e.g. the 
existing {{getField}} method on Column operates only on {{StructType}} columns, 
the existing {{getItem}} method on Column operates only on {{ArrayType}} and 
{{MapType}} columns, etc.

+*Reason #2*+

I'm struggling to see how one would express operations on deeply nested 
StructFields easily if you were to put these methods on Dataframe. Using the 
{{withField}} method I added to Column in my PR, you can add or replace deeply 
nested StructFields like this:
{code:java}
data.show(false)
+---------------------------------+
|a                                |
+---------------------------------+
|[[1, 2, 3], [[4,, 6], [7, 8, 9]]]|
+---------------------------------+

data.withColumn("a", 'a.withField(
  "b", $"a.b".withField(
    "a", $"a.b.a".withField(
      "b", lit(5))))).show(false)
+-----------------------------------+
|a                                  |
+-----------------------------------+
|[[1, 2, 3], [[4, 5, 6], [7, 8, 9]]]|
+-----------------------------------+
{code}
You can see a fully reproducible examples of this in my PR: 
[https://github.com/apache/spark/pull/27066]

Another common use-case that would be hard to express by adding the methods to 
Dataframe would be operations on {{ArrayType(StructType)}} or 
{{MapType(}}{{StructType}}{{, StructType)}} columns. By adding the methods to 
Column, you can express such operations naturally using the recently added 
higher-order-functions feature in Spark:
{code:java}
val data = spark.createDataFrame(
  sc.parallelize(Seq(Row(List(Row(1, 2, 3), Row(4, 5, 6))))),
  StructType(Seq(
    StructField("array", ArrayType(StructType(Seq(
      StructField("a", IntegerType),
      StructField("b", IntegerType),
      StructField("c", IntegerType)))))
    ))
  ).cache

data.show(false)
+----------------------+
|array                 |
+----------------------+
|[[1, 2, 3], [4, 5, 6]]|
+----------------------+

data.withColumn("newArray", transform('array, structElem => 
structElem.withField("d", lit("hello")))).show(false)
+----------------------+------------------------------------+
|array                 |newArray                            |
+----------------------+------------------------------------+
|[[1, 2, 3], [4, 5, 6]]|[[1, 2, 3, hello], [4, 5, 6, hello]]|
+----------------------+------------------------------------+
{code}
 +*Reason #3*+

To add these methods to Dataframe, we would need signatures that would look 
strange compared to the methods available on Dataframe today. Off the top of my 
head, their signatures could look like this:
 * {{def withField(colName: String, structColumn: Column, fieldName: String, 
field: Column)}}{{: Dataframe}}{{}}
 Returns a new Dataframe with a new Column (colName) containing a copy of 
structColumn with field added/replaced based on name.
 * {{def withFieldRenamed(}}{{structColumn}}{{: Column, existingFieldName: 
String, newFieldName: String): Dataframe}}
 Returns a new Dataframe with existingFieldName in structColumn renamed to 
newFieldName.

Maybe these signatures could be refined further? I'm not sure. 

 

For these reasons, it seems more natural and logical to me to have the 
{{withField}} and {{withFieldRenamed}} methods on Column rather than Dataframe. 
Regarding the proposed {{drop}} method, I think its debatable: 
 * {{drop}} method could be added to Column. 
 I can't at present think of a particular use-case which would necessitate 
adding the {{drop}} method to Column.
 * The existing {{drop}} method on Dataframe could be augmented to support 
dropping StructField if a reference to a StructField is passed to it. I'm not 
sure how challenging this would be to implement but presumably this should be 
possible. 


was (Author: fqaiser94):
I feel strongly that {{withField}} and {{withFieldRenamed}} methods should be 
on Column for the following reasons:

+*Reason #1*+

These methods are intended to only ever operate on {{StructType}} columns. 
 It does *not* make much sense to me to add methods to Dataframe that operate 
only on {{StructType}} columns. 
 It does however make sense to me to add methods to Column that operate only on 
{{StructType}} columns and there is lots of precedent for this e.g. the 
existing {{getField}} method on Column operates only on {{StructType}} columns, 
the existing {{getItem}} method on Column operates only on {{ArrayType}} and 
{{MapType}} columns, etc.

+*Reason #2*+

I'm struggling to see how one would express operations on deeply nested 
StructFields easily if you were to put these methods on Dataframe. Using the 
{{withField}} method I added to Column in my PR, you can add or replace deeply 
nested StructFields like this:
{code:java}
data.show(false)
+---------------------------------+
|a                                |
+---------------------------------+
|[[1, 2, 3], [[4,, 6], [7, 8, 9]]]|
+---------------------------------+

data.withColumn("a", 'a.withField(
  "b", $"a.b".withField(
    "a", $"a.b.a".withField(
      "b", lit(5))))).show(false)
+-----------------------------------+
|a                                  |
+-----------------------------------+
|[[1, 2, 3], [[4, 5, 6], [7, 8, 9]]]|
+-----------------------------------+
{code}
You can see a fully reproducible examples of this in my PR: 
[https://github.com/apache/spark/pull/27066]

Another common use-case that would be hard to express by adding the methods to 
Dataframe would be operations on {{ArrayType(StructType)}} or 
{{MapType(}}{{StructType}}{{, StructType)}} columns. By adding the methods to 
Column, you can express such operations naturally using the recently added 
higher-order-functions feature in Spark:
{code:java}
val data = spark.createDataFrame(
  sc.parallelize(Seq(Row(List(Row(1, 2, 3), Row(4, 5, 6))))),
  StructType(Seq(
    StructField("array", ArrayType(StructType(Seq(
      StructField("a", IntegerType),
      StructField("b", IntegerType),
      StructField("c", IntegerType)))))
    ))
  ).cache

data.show(false)
+----------------------+
|array                 |
+----------------------+
|[[1, 2, 3], [4, 5, 6]]|
+----------------------+

data.withColumn("newArray", transform('array, structElem => 
structElem.withField("d", lit("hello")))).show(false)
+----------------------+------------------------------------+
|array                 |newArray                            |
+----------------------+------------------------------------+
|[[1, 2, 3], [4, 5, 6]]|[[1, 2, 3, hello], [4, 5, 6, hello]]|
+----------------------+------------------------------------+
{code}
 +*Reason #3*+

To add these methods to Dataframe, we would need signatures that would look 
strange compared to the methods available on Dataframe today. Off the top of my 
head, their signatures could look like this:
 * {{def withField(colName: String, structColumn: Column, fieldName: String, 
field: Column)}}
 Returns a new Dataframe with a new Column (colName) containing a copy of 
structColumn with field added/replaced based on name.
 * {{def withFieldRenamed(}}{{structColumn}}{{: Column, existingFieldName: 
String, newFieldName: String)}}
 Returns a new Dataframe with existingFieldName in structColumn renamed to 
newFieldName.

Maybe these signatures could be refined further? I'm not sure. 

 

For these reasons, it seems more natural and logical to me to have the 
{{withField}} and {{withFieldRenamed}} methods on Column rather than Dataframe. 
Regarding the proposed {{drop}} method, I think its debatable: 
 * {{drop}} method could be added to Column. 
 I can't at present think of a particular use-case which would necessitate 
adding the {{drop}} method to Column.
 * The existing {{drop}} method on Dataframe could be augmented to support 
dropping StructField if a reference to a StructField is passed to it. I'm not 
sure how challenging this would be to implement but presumably this should be 
possible. 

> Support of map, filter, withColumn, dropColumn in nested list of structures
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-22231
>                 URL: https://issues.apache.org/jira/browse/SPARK-22231
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: DB Tsai
>            Assignee: Jeremy Smith
>            Priority: Major
>
> At Netflix's algorithm team, we work on ranking problems to find the great 
> content to fulfill the unique tastes of our members. Before building a 
> recommendation algorithms, we need to prepare the training, testing, and 
> validation datasets in Apache Spark. Due to the nature of ranking problems, 
> we have a nested list of items to be ranked in one column, and the top level 
> is the contexts describing the setting for where a model is to be used (e.g. 
> profiles, country, time, device, etc.)  Here is a blog post describing the 
> details, [Distributed Time Travel for Feature 
> Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907].
>  
> To be more concrete, for the ranks of videos for a given profile_id at a 
> given country, our data schema can be looked like this,
> {code:java}
> root
>  |-- profile_id: long (nullable = true)
>  |-- country_iso_code: string (nullable = true)
>  |-- items: array (nullable = false)
>  |    |-- element: struct (containsNull = false)
>  |    |    |-- title_id: integer (nullable = true)
>  |    |    |-- scores: double (nullable = true)
> ...
> {code}
> We oftentimes need to work on the nested list of structs by applying some 
> functions on them. Sometimes, we're dropping or adding new columns in the 
> nested list of structs. Currently, there is no easy solution in open source 
> Apache Spark to perform those operations using SQL primitives; many people 
> just convert the data into RDD to work on the nested level of data, and then 
> reconstruct the new dataframe as workaround. This is extremely inefficient 
> because all the optimizations like predicate pushdown in SQL can not be 
> performed, we can not leverage on the columnar format, and the serialization 
> and deserialization cost becomes really huge even we just want to add a new 
> column in the nested level.
> We built a solution internally at Netflix which we're very happy with. We 
> plan to make it open source in Spark upstream. We would like to socialize the 
> API design to see if we miss any use-case.  
> The first API we added is *mapItems* on dataframe which take a function from 
> *Column* to *Column*, and then apply the function on nested dataframe. Here 
> is an example,
> {code:java}
> case class Data(foo: Int, bar: Double, items: Seq[Double])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)),
>   Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4))
> ))
> val result = df.mapItems("items") {
>   item => item * 2.0
> }
> result.printSchema()
> // root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |    |-- element: double (containsNull = true)
> result.show()
> // +---+----+--------------------+
> // |foo| bar|               items|
> // +---+----+--------------------+
> // | 10|10.0|[20.2, 20.4, 20.6...|
> // | 20|20.0|[40.2, 40.4, 40.6...|
> // +---+----+--------------------+
> {code}
> Now, with the ability of applying a function in the nested dataframe, we can 
> add a new function, *withColumn* in *Column* to add or replace the existing 
> column that has the same name in the nested list of struct. Here is two 
> examples demonstrating the API together with *mapItems*; the first one 
> replaces the existing column,
> {code:java}
> case class Item(a: Int, b: Double)
> case class Data(foo: Int, bar: Double, items: Seq[Item])
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |    |-- element: struct (containsNull = true)
> // |    |    |-- a: integer (nullable = true)
> // |    |    |-- b: double (nullable = true)
> result.show(false)
> // +---+----+----------------------+
> // |foo|bar |items                 |
> // +---+----+----------------------+
> // |10 |10.0|[[10,11.0], [11,12.0]]|
> // |20 |20.0|[[20,21.0], [21,22.0]]|
> // +---+----+----------------------+
> {code}
> and the second one adds a new column in the nested dataframe.
> {code:java}
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.withColumn(item("b") + 1 as "c")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |    |-- element: struct (containsNull = true)
> // |    |    |-- a: integer (nullable = true)
> // |    |    |-- b: double (nullable = true)
> // |    |    |-- c: double (nullable = true)
> result.show(false)
> // +---+----+--------------------------------+
> // |foo|bar |items                           |
> // +---+----+--------------------------------+
> // |10 |10.0|[[10,10.0,11.0], [11,11.0,12.0]]|
> // |20 |20.0|[[20,20.0,21.0], [21,21.0,22.0]]|
> // +---+----+--------------------------------+
> {code}
> We also implement a filter predicate to nested list of struct, and it will 
> return those items which matched the predicate. The following is the API 
> example,
> {code:java}
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.filterItems("items") {
>   item => item("a") < 20
> }
> // +---+----+----------------------+
> // |foo|bar |items                 |
> // +---+----+----------------------+
> // |10 |10.0|[[10,10.0], [11,11.0]]|
> // |20 |20.0|[]                    |
> // +---+----+----------------------+
> {code}
> Dropping a column in the nested list of struct can be achieved by similar API 
> to *withColumn*. We add *drop* method to *Column* to implement this. Here is 
> an example,
> {code:java}
> val df: Dataset[Data] = spark.createDataset(Seq(
>   Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))),
>   Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0)))
> ))
> val result = df.mapItems("items") {
>   item => item.drop("b")
> }
> result.printSchema
> root
> // |-- foo: integer (nullable = false)
> // |-- bar: double (nullable = false)
> // |-- items: array (nullable = true)
> // |    |-- element: struct (containsNull = true)
> // |    |    |-- a: integer (nullable = true)
> result.show(false)
> // +---+----+------------+
> // |foo|bar |items       |
> // +---+----+------------+
> // |10 |10.0|[[10], [11]]|
> // |20 |20.0|[[20], [21]]|
> // +---+----+------------+
> {code}
> Note that all of those APIs are implemented by SQL expression with codegen; 
> as a result, those APIs are not opaque to Spark optimizers, and can fully 
> take advantage of columnar data structure. 
> We're looking forward to the community feedback and suggestion! Thanks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to