Metadata is not propagating with Dataset.map()
Hello, It seems that metadata is not propagating when using Dataset.map(). Is there a workaround? Below are the steps to reproduce: import spark.implicits._ val columnName = "col1" val meta = new MetadataBuilder().putString("foo", "bar").build() val schema = StructType(Array(StructField(columnName, DoubleType, true, metadata = meta))) def printSchema(d: Dataset[_]) = { d.printSchema() d.schema.fields.foreach(field => println("metadata for '" + field.name + "': " + field.metadata.json)) } val rows = spark.sparkContext.parallelize(Seq(1.0, 5.0, 3.0, 2.0, 6.0, null).map(Row(_))) val df = spark.createDataFrame(rows, schema) printSchema(df) // metadata printed correctly printSchema(df.select(columnName)) // metadata printed correctly printSchema(df.map(r => r.getDouble(0))) // metadata is missing Thank you, -Matthew -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Metadata-is-not-propagating-with-Dataset-map-tp28310.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Incorrect results with reduceByKey
Deep copying the data solved the issue: data.map(r => {val t = SpecificData.get().deepCopy(r.getSchema, r); (t.id, List(t)) }).reduceByKey(_ ++ _) (noted here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1003) Thanks Igor Berman, for pointing that out. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Incorrect-results-with-reduceByKey-tp25410p25420.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Incorrect results with reduceByKey
Howdy, We've noticed a strange behavior with Avro serialized data and reduceByKey RDD functionality. Please see below: // We're reading a bunch of Avro serialized data val data: RDD[T] = sparkContext.hadoopFile(path, classOf[AvroInputFormat[T]], classOf[AvroWrapper[T]], classOf[NullWritable]) // Incorrect data returned val bad: RDD[(String,List[T])] = data.map(r => (r.id, List(r))).reduceByKey(_ ++ _) // After adding the partitioner we get everything as expected val good: RDD[(String,List[T])] = data.map(r => (r.id, List(r))).partitionBy(Partitioner.defaultPartitioner(data)).reduceByKey(_ ++ _) Any ideas? Thanks in advance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Incorrect-results-with-reduceByKey-tp25410.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org