[ https://issues.apache.org/jira/browse/SPARK-22231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
DB Tsai updated SPARK-22231: ---------------------------- Summary: Support of map, filter, withField, dropFields in nested list of structures (was: Support of map, filter, withColumn, dropColumn in nested list of structures) > Support of map, filter, withField, dropFields in nested list of structures > -------------------------------------------------------------------------- > > Key: SPARK-22231 > URL: https://issues.apache.org/jira/browse/SPARK-22231 > Project: Spark > Issue Type: New Feature > Components: SQL > Affects Versions: 2.2.0 > Reporter: DB Tsai > Priority: Major > > At Netflix's algorithm team, we work on ranking problems to find the great > content to fulfill the unique tastes of our members. Before building a > recommendation algorithms, we need to prepare the training, testing, and > validation datasets in Apache Spark. Due to the nature of ranking problems, > we have a nested list of items to be ranked in one column, and the top level > is the contexts describing the setting for where a model is to be used (e.g. > profiles, country, time, device, etc.) Here is a blog post describing the > details, [Distributed Time Travel for Feature > Generation|https://medium.com/netflix-techblog/distributed-time-travel-for-feature-generation-389cccdd3907]. > > To be more concrete, for the ranks of videos for a given profile_id at a > given country, our data schema can be looked like this, > {code:java} > root > |-- profile_id: long (nullable = true) > |-- country_iso_code: string (nullable = true) > |-- items: array (nullable = false) > | |-- element: struct (containsNull = false) > | | |-- title_id: integer (nullable = true) > | | |-- scores: double (nullable = true) > ... > {code} > We oftentimes need to work on the nested list of structs by applying some > functions on them. Sometimes, we're dropping or adding new columns in the > nested list of structs. Currently, there is no easy solution in open source > Apache Spark to perform those operations using SQL primitives; many people > just convert the data into RDD to work on the nested level of data, and then > reconstruct the new dataframe as workaround. This is extremely inefficient > because all the optimizations like predicate pushdown in SQL can not be > performed, we can not leverage on the columnar format, and the serialization > and deserialization cost becomes really huge even we just want to add a new > column in the nested level. > We built a solution internally at Netflix which we're very happy with. We > plan to make it open source in Spark upstream. We would like to socialize the > API design to see if we miss any use-case. > The first API we added is *mapItems* on dataframe which take a function from > *Column* to *Column*, and then apply the function on nested dataframe. Here > is an example, > {code:java} > case class Data(foo: Int, bar: Double, items: Seq[Double]) > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(10.1, 10.2, 10.3, 10.4)), > Data(20, 20.0, Seq(20.1, 20.2, 20.3, 20.4)) > )) > val result = df.mapItems("items") { > item => item * 2.0 > } > result.printSchema() > // root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // | |-- element: double (containsNull = true) > result.show() > // +---+----+--------------------+ > // |foo| bar| items| > // +---+----+--------------------+ > // | 10|10.0|[20.2, 20.4, 20.6...| > // | 20|20.0|[40.2, 40.4, 40.6...| > // +---+----+--------------------+ > {code} > Now, with the ability of applying a function in the nested dataframe, we can > add a new function, *withColumn* in *Column* to add or replace the existing > column that has the same name in the nested list of struct. Here is two > examples demonstrating the API together with *mapItems*; the first one > replaces the existing column, > {code:java} > case class Item(a: Int, b: Double) > case class Data(foo: Int, bar: Double, items: Seq[Item]) > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))), > Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0))) > )) > val result = df.mapItems("items") { > item => item.withColumn(item("b") + 1 as "b") > } > result.printSchema > root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // | |-- element: struct (containsNull = true) > // | | |-- a: integer (nullable = true) > // | | |-- b: double (nullable = true) > result.show(false) > // +---+----+----------------------+ > // |foo|bar |items | > // +---+----+----------------------+ > // |10 |10.0|[[10,11.0], [11,12.0]]| > // |20 |20.0|[[20,21.0], [21,22.0]]| > // +---+----+----------------------+ > {code} > and the second one adds a new column in the nested dataframe. > {code:java} > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))), > Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0))) > )) > val result = df.mapItems("items") { > item => item.withColumn(item("b") + 1 as "c") > } > result.printSchema > root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // | |-- element: struct (containsNull = true) > // | | |-- a: integer (nullable = true) > // | | |-- b: double (nullable = true) > // | | |-- c: double (nullable = true) > result.show(false) > // +---+----+--------------------------------+ > // |foo|bar |items | > // +---+----+--------------------------------+ > // |10 |10.0|[[10,10.0,11.0], [11,11.0,12.0]]| > // |20 |20.0|[[20,20.0,21.0], [21,21.0,22.0]]| > // +---+----+--------------------------------+ > {code} > We also implement a filter predicate to nested list of struct, and it will > return those items which matched the predicate. The following is the API > example, > {code:java} > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))), > Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0))) > )) > val result = df.filterItems("items") { > item => item("a") < 20 > } > // +---+----+----------------------+ > // |foo|bar |items | > // +---+----+----------------------+ > // |10 |10.0|[[10,10.0], [11,11.0]]| > // |20 |20.0|[] | > // +---+----+----------------------+ > {code} > Dropping a column in the nested list of struct can be achieved by similar API > to *withColumn*. We add *drop* method to *Column* to implement this. Here is > an example, > {code:java} > val df: Dataset[Data] = spark.createDataset(Seq( > Data(10, 10.0, Seq(Item(10, 10.0), Item(11, 11.0))), > Data(20, 20.0, Seq(Item(20, 20.0), Item(21, 21.0))) > )) > val result = df.mapItems("items") { > item => item.drop("b") > } > result.printSchema > root > // |-- foo: integer (nullable = false) > // |-- bar: double (nullable = false) > // |-- items: array (nullable = true) > // | |-- element: struct (containsNull = true) > // | | |-- a: integer (nullable = true) > result.show(false) > // +---+----+------------+ > // |foo|bar |items | > // +---+----+------------+ > // |10 |10.0|[[10], [11]]| > // |20 |20.0|[[20], [21]]| > // +---+----+------------+ > {code} > Note that all of those APIs are implemented by SQL expression with codegen; > as a result, those APIs are not opaque to Spark optimizers, and can fully > take advantage of columnar data structure. > We're looking forward to the community feedback and suggestion! Thanks. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org