Hi, 

I have a use case where I've loaded data into DataFrame and would like to
split the DataFrame into two on some predicate, modify one split DataFrame
using withColumn so as to prevent the need to reprocess the data, union the
data, and write out to the filesystem.  An example of the schema (purely for
testing's sake) is as follows:

    val schema = StructType(
      Seq(
        StructField("a",
          MapType(StringType,
            StructType(
              Seq(
                StructField("b", StringType, true),
                StructField("c", MapType(StringType, StringType), true)
              )
            )
          )
        ),
        StructField("other", StringType, true)
      )
    )

In this I will split the DataFrame on the value of "other", update one where
the value of the "a" MapType key is some value and set its "c" to null.  I
can see how this can be done using DataFrame.withColumn using structs and
lits, but I've not been able to find a way to manipulate it if the "a"'s
dataType is not another StructType.  

Example code:

    val schema = StructType(
      Seq(
        StructField("a",
          MapType(StringType,
            StructType(
              Seq(
                StructField("b", StringType, true),
                StructField("c", MapType(StringType, StringType), true)
              )
            )
          )
        ),
        StructField("other", StringType, true)
      )
    )


    val rdd = sc.parallelize(Seq(
      Row(Map("foo" ->("bar", Map("foo" -> "bar"))), "foo"),
      Row(Map("foo" ->("cat", Map("bar" -> "foo"))), "bar")
    ))

    val df = sqlCtx.createDataFrame(rdd, schema)

    // I wish there were a functions.map
    // here I'd like to create a map with expressions for what goes into the
map
    // using functions.col("a").getItem(only key I want).getField(field I
want)
    val col = functions.struct(
      functions.col("a").getItem("foo").getField("b"),
      functions.lit(null).cast(MapType(StringType, StringType)).alias("c")
    )

    val rename = df.withColumn("a", col)

    val mutated = sqlContext.createDataFrame(rename.rdd, df.schema)

    mutated.unionAll(df).show()


Any help would be greatly appreciated!

Thanks.

dan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/modifying-a-DataFrame-column-that-is-a-MapType-tp26820.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to