Re: Using flatMap on Dataframes with Spark 2.0
Just a follow-up on my last question: the RowEncoder has to be defined AFTER declaring the columns, or else the new columns won't be serialized and will disappear after the flatMap. So the code should look like: var df1 = spark.read.parquet(fileName) df1 = df1.withColumn("newCol", df1.col("anyExistingCol")) df1.printSchema() // here newCol exists implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df1.schema) df1 = df1.flatMap(x => List(x)) df1.printSchema() // newCol still exists! Julien - Mail original - De: "Julien Nauroy" À: "Sun Rui" Cc: user@spark.apache.org Envoyé: Dimanche 24 Juillet 2016 12:43:42 Objet: Re: Using flatMap on Dataframes with Spark 2.0 Hi again, Just another strange behavior I stumbled upon. Can anybody reproduce it? Here's the code snippet in scala: var df1 = spark.read.parquet(fileName) df1 = df1.withColumn("newCol", df1.col("anyExistingCol")) df1.printSchema() // here newCol exists df1 = df1.flatMap(x => List(x)) df1.printSchema() // newCol has disappeared Any idea what I could be doing wrong? Why would newCol disappear? Cheers, Julien - Mail original - De: "Julien Nauroy" À: "Sun Rui" Cc: user@spark.apache.org Envoyé: Samedi 23 Juillet 2016 23:39:08 Objet: Re: Using flatMap on Dataframes with Spark 2.0 Thanks, it works like a charm now! Not sure how I could have found it by myself though. Maybe the error message when you don't specify the encoder should point to RowEncoder. Cheers, Julien - Mail original - De: "Sun Rui" À: "Julien Nauroy" Cc: user@spark.apache.org Envoyé: Samedi 23 Juillet 2016 16:27:43 Objet: Re: Using flatMap on Dataframes with Spark 2.0 You should use : import org.apache.spark.sql.catalyst.encoders.RowEncoder val df = spark.read.parquet(fileName) implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) val df1 = df.flatMap { x => List(x) } On Jul 23, 2016, at 22:01, Julien Nauroy < julien.nau...@u-psud.fr > wrote: Thanks for your quick reply. I've tried with this encoder: implicit def RowEncoder: org.apache.spark.sql.Encoder[Row] = org.apache.spark.sql.Encoders.kryo[Row] Using a suggestion from http://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-a-dataset-in-spark-1-6 How did you setup your encoder? - Mail original - De: "Sun Rui" < sunrise_...@163.com > À: "Julien Nauroy" < julien.nau...@u-psud.fr > Cc: user@spark.apache.org Envoyé: Samedi 23 Juillet 2016 15:55:21 Objet: Re: Using flatMap on Dataframes with Spark 2.0 I did a try. the schema after flatMap is the same, which is expected. What’s your Row encoder? On Jul 23, 2016, at 20:36, Julien Nauroy < julien.nau...@u-psud.fr > wrote: Hi, I'm trying to call flatMap on a Dataframe with Spark 2.0 (rc5). The code is the following: var data = spark.read.parquet(fileName).flatMap(x => List(x)) Of course it's an overly simplified example, but the result is the same. The dataframe schema goes from this: root |-- field1: double (nullable = true) |-- field2: integer (nullable = true) (etc) to this: root |-- value: binary (nullable = true) Plus I have to provide an encoder for Row. I expect to get the same schema after calling flatMap. Any idea what I could be doing wrong? Best regards, Julien
Re: Using flatMap on Dataframes with Spark 2.0
Hi again, Just another strange behavior I stumbled upon. Can anybody reproduce it? Here's the code snippet in scala: var df1 = spark.read.parquet(fileName) df1 = df1.withColumn("newCol", df1.col("anyExistingCol")) df1.printSchema() // here newCol exists df1 = df1.flatMap(x => List(x)) df1.printSchema() // newCol has disappeared Any idea what I could be doing wrong? Why would newCol disappear? Cheers, Julien - Mail original - De: "Julien Nauroy" À: "Sun Rui" Cc: user@spark.apache.org Envoyé: Samedi 23 Juillet 2016 23:39:08 Objet: Re: Using flatMap on Dataframes with Spark 2.0 Thanks, it works like a charm now! Not sure how I could have found it by myself though. Maybe the error message when you don't specify the encoder should point to RowEncoder. Cheers, Julien - Mail original - De: "Sun Rui" À: "Julien Nauroy" Cc: user@spark.apache.org Envoyé: Samedi 23 Juillet 2016 16:27:43 Objet: Re: Using flatMap on Dataframes with Spark 2.0 You should use : import org.apache.spark.sql.catalyst.encoders.RowEncoder val df = spark.read.parquet(fileName) implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) val df1 = df.flatMap { x => List(x) } On Jul 23, 2016, at 22:01, Julien Nauroy < julien.nau...@u-psud.fr > wrote: Thanks for your quick reply. I've tried with this encoder: implicit def RowEncoder: org.apache.spark.sql.Encoder[Row] = org.apache.spark.sql.Encoders.kryo[Row] Using a suggestion from http://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-a-dataset-in-spark-1-6 How did you setup your encoder? - Mail original - De: "Sun Rui" < sunrise_...@163.com > À: "Julien Nauroy" < julien.nau...@u-psud.fr > Cc: user@spark.apache.org Envoyé: Samedi 23 Juillet 2016 15:55:21 Objet: Re: Using flatMap on Dataframes with Spark 2.0 I did a try. the schema after flatMap is the same, which is expected. What’s your Row encoder? On Jul 23, 2016, at 20:36, Julien Nauroy < julien.nau...@u-psud.fr > wrote: Hi, I'm trying to call flatMap on a Dataframe with Spark 2.0 (rc5). The code is the following: var data = spark.read.parquet(fileName).flatMap(x => List(x)) Of course it's an overly simplified example, but the result is the same. The dataframe schema goes from this: root |-- field1: double (nullable = true) |-- field2: integer (nullable = true) (etc) to this: root |-- value: binary (nullable = true) Plus I have to provide an encoder for Row. I expect to get the same schema after calling flatMap. Any idea what I could be doing wrong? Best regards, Julien
Re: Using flatMap on Dataframes with Spark 2.0
You should use : import org.apache.spark.sql.catalyst.encoders.RowEncoder val df = spark.read.parquet(fileName) implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) val df1 = df.flatMap { x => List(x) } > On Jul 23, 2016, at 22:01, Julien Nauroy wrote: > > Thanks for your quick reply. > > I've tried with this encoder: > implicit def RowEncoder: org.apache.spark.sql.Encoder[Row] = > org.apache.spark.sql.Encoders.kryo[Row] > Using a suggestion from > http://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-a-dataset-in-spark-1-6 > > <http://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-a-dataset-in-spark-1-6> > > How did you setup your encoder? > > > De: "Sun Rui" > À: "Julien Nauroy" > Cc: user@spark.apache.org > Envoyé: Samedi 23 Juillet 2016 15:55:21 > Objet: Re: Using flatMap on Dataframes with Spark 2.0 > > I did a try. the schema after flatMap is the same, which is expected. > > What’s your Row encoder? > On Jul 23, 2016, at 20:36, Julien Nauroy <mailto:julien.nau...@u-psud.fr>> wrote: > > Hi, > > I'm trying to call flatMap on a Dataframe with Spark 2.0 (rc5). > The code is the following: > var data = spark.read.parquet(fileName).flatMap(x => List(x)) > > Of course it's an overly simplified example, but the result is the same. > The dataframe schema goes from this: > root > |-- field1: double (nullable = true) > |-- field2: integer (nullable = true) > (etc) > > to this: > root > |-- value: binary (nullable = true) > > Plus I have to provide an encoder for Row. > I expect to get the same schema after calling flatMap. > Any idea what I could be doing wrong? > > > Best regards, > Julien > > > >
Re: Using flatMap on Dataframes with Spark 2.0
Thanks for your quick reply. I've tried with this encoder: implicit def RowEncoder: org.apache.spark.sql.Encoder[Row] = org.apache.spark.sql.Encoders.kryo[Row] Using a suggestion from http://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-a-dataset-in-spark-1-6 How did you setup your encoder? - Mail original - De: "Sun Rui" À: "Julien Nauroy" Cc: user@spark.apache.org Envoyé: Samedi 23 Juillet 2016 15:55:21 Objet: Re: Using flatMap on Dataframes with Spark 2.0 I did a try. the schema after flatMap is the same, which is expected. What’s your Row encoder? On Jul 23, 2016, at 20:36, Julien Nauroy < julien.nau...@u-psud.fr > wrote: Hi, I'm trying to call flatMap on a Dataframe with Spark 2.0 (rc5). The code is the following: var data = spark.read.parquet(fileName).flatMap(x => List(x)) Of course it's an overly simplified example, but the result is the same. The dataframe schema goes from this: root |-- field1: double (nullable = true) |-- field2: integer (nullable = true) (etc) to this: root |-- value: binary (nullable = true) Plus I have to provide an encoder for Row. I expect to get the same schema after calling flatMap. Any idea what I could be doing wrong? Best regards, Julien
Re: Using flatMap on Dataframes with Spark 2.0
I did a try. the schema after flatMap is the same, which is expected. What’s your Row encoder? > On Jul 23, 2016, at 20:36, Julien Nauroy wrote: > > Hi, > > I'm trying to call flatMap on a Dataframe with Spark 2.0 (rc5). > The code is the following: > var data = spark.read.parquet(fileName).flatMap(x => List(x)) > > Of course it's an overly simplified example, but the result is the same. > The dataframe schema goes from this: > root > |-- field1: double (nullable = true) > |-- field2: integer (nullable = true) > (etc) > > to this: > root > |-- value: binary (nullable = true) > > Plus I have to provide an encoder for Row. > I expect to get the same schema after calling flatMap. > Any idea what I could be doing wrong? > > > Best regards, > Julien > >