Metadata is not propagating with Dataset.map()

2017-01-16 Thread tovbinm
Hello,

It seems that metadata is not propagating when using Dataset.map(). Is there
a workaround?

Below are the steps to reproduce:

import spark.implicits._
val columnName = "col1"
val meta = new MetadataBuilder().putString("foo", "bar").build()
val schema = StructType(Array(StructField(columnName, DoubleType, true,
metadata = meta)))
def printSchema(d: Dataset[_]) = {
d.printSchema()
d.schema.fields.foreach(field => println("metadata for '" + field.name +
"': " + field.metadata.json))
}
val rows = spark.sparkContext.parallelize(Seq(1.0, 5.0, 3.0, 2.0, 6.0,
null).map(Row(_)))
val df = spark.createDataFrame(rows, schema)
printSchema(df) // metadata printed correctly
printSchema(df.select(columnName)) // metadata printed correctly
printSchema(df.map(r => r.getDouble(0))) // metadata is missing


Thank you,

-Matthew



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Metadata-is-not-propagating-with-Dataset-map-tp28310.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Incorrect results with reduceByKey

2015-11-18 Thread tovbinm
Deep copying the data solved the issue:
data.map(r => {val t = SpecificData.get().deepCopy(r.getSchema, r); (t.id,
List(t)) }).reduceByKey(_ ++ _)

(noted here:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1003)


Thanks Igor Berman, for pointing that out.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Incorrect-results-with-reduceByKey-tp25410p25420.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Incorrect results with reduceByKey

2015-11-17 Thread tovbinm
Howdy,

We've noticed a strange behavior with Avro serialized data and reduceByKey
RDD functionality. Please see below:

 // We're reading a bunch of Avro serialized data
val data: RDD[T] = sparkContext.hadoopFile(path,
classOf[AvroInputFormat[T]], classOf[AvroWrapper[T]], classOf[NullWritable])
// Incorrect data returned
val bad: RDD[(String,List[T])] = data.map(r => (r.id,
List(r))).reduceByKey(_ ++ _)
// After adding the partitioner we get everything as expected
val good: RDD[(String,List[T])] = data.map(r => (r.id,
List(r))).partitionBy(Partitioner.defaultPartitioner(data)).reduceByKey(_ ++
_)


Any ideas? 

Thanks in advance



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Incorrect-results-with-reduceByKey-tp25410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org