[ https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Julien Peloton updated SPARK-48492: ----------------------------------- Description: Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for static DataFrames, I have a counter example for streaming ones: {code:java} from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) # add a non-nullable column df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema() # Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) # add a non-nullable column df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job") df_stream_2.printSchema() # Now load a stream df_stream_3 = ( spark.readStream.format("parquet") .schema(df_stream_2.schema) .option("path", "test_parquet") .option("latestFirst", False) .load() ) print("Streaming dataframe from the streaming job") df_stream_3.printSchema(){code} which outputs: {noformat} Original dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = false) Streaming dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Static dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Streaming dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = true){noformat} So the column `d` is correctly set to `nullable = true` (expected), but in the case of the column `e`, it stays non-nullable if it is read using the `read` method and it is correctly set to `nullable = true` if read with `readStream`. Is that expected? According to this old PR [https://github.com/apache/spark/pull/25382|https://github.com/apache/spark/pull/25382)] it was supposed to be resolved. Any ideas? was: Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for static DataFrames, I have a counter example for streaming ones: {code:java} from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) # add a non-nullable column df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema() # Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) # add a non-nullable column df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job") df_stream_2.printSchema() # Now load a stream df_stream_3 = ( spark.readStream.format("parquet") .schema(df_stream_2.schema) .option("path", "test_parquet") .option("latestFirst", False) .load() ) print("Streaming dataframe from the streaming job") df_stream_3.printSchema(){code} which outputs: {noformat} Original dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = false) Streaming dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Static dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Streaming dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = true){noformat} So the column `d` is correctly set to `nullable = true` (expected), but in the case of the column `e`, it stays non-nullable if it is read using `read` method and it is correctly set to `nullable = true` is read with `readStream`. Is that expected? According to this old PR [https://github.com/apache/spark/pull/25382|https://github.com/apache/spark/pull/25382)] it was supposed to be resolved. Any ideas? > batch-read parquet files written by streaming returns non-nullable fields in > schema > ----------------------------------------------------------------------------------- > > Key: SPARK-48492 > URL: https://issues.apache.org/jira/browse/SPARK-48492 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming > Affects Versions: 3.4.1 > Environment: python --version > Python 3.9.13 > > spark-submit --version > Welcome to > ____ __ > / __/__ ___ _____/ /__ > _\ \/ _ \/ _ `/ __/ '_/ > /___/ .__/\_,_/_/ /_/\_\ version 3.4.1 > /_/ > > Using Scala version 2.12.17, OpenJDK 64-Bit Server VM, 1.8.0_302 > Branch HEAD > Compiled by user centos on 2023-06-19T23:01:01Z > Revision 6b1ff22dde1ead51cbf370be6e48a802daae58b6 > Reporter: Julien Peloton > Priority: Major > > Hello, > In the documentation, it is stated that > > When reading Parquet files, all columns are automatically converted to be > > nullable for compatibility reasons. > While this seems correct for static DataFrames, I have a counter example for > streaming ones: > > {code:java} > from pyspark.sql import SparkSession > from pyspark.sql import Row > import pyspark.sql.functions as F > spark = SparkSession.builder.getOrCreate() > spark.sparkContext.setLogLevel("WARN") > df = spark.createDataFrame( > [ > Row(a=1, b=2.0, c="toto"), > Row(a=3, b=4.0, c="titi"), > Row(a=10, b=11.0, c="tutu"), > ] > ) > # add a non-nullable column > df = df.withColumn('d', F.lit(1.0)) > print("Original dataframe") > df.printSchema() > # Write this on disk > df.write.parquet('static.parquet') > # Now load a stream > df_stream = ( > spark.readStream.format("parquet") > .schema(df.schema) > .option("path", "static.parquet") > .option("latestFirst", False) > .load() > ) > # add a non-nullable column > df_stream = df_stream.withColumn('e', F.lit("error")) > print("Streaming dataframe") > df_stream.printSchema() > # Now write the dataframe using writestream > query = ( > df_stream.writeStream.outputMode("append") > .format("parquet") > .option("checkpointLocation", 'test_parquet_checkpoint') > .option("path", 'test_parquet') > .trigger(availableNow=True) > .start() > ) > spark.streams.awaitAnyTermination() > # Now read back > df_stream_2 = spark.read.format("parquet").load("test_parquet") > print("Static dataframe from the streaming job") > df_stream_2.printSchema() > # Now load a stream > df_stream_3 = ( > spark.readStream.format("parquet") > .schema(df_stream_2.schema) > .option("path", "test_parquet") > .option("latestFirst", False) > .load() > ) > print("Streaming dataframe from the streaming job") > df_stream_3.printSchema(){code} > > > which outputs: > {noformat} > Original dataframe > root > |-- a: long (nullable = true) > |-- b: double (nullable = true) > |-- c: string (nullable = true) > |-- d: double (nullable = false) > Streaming dataframe > root > |-- a: long (nullable = true) > |-- b: double (nullable = true) > |-- c: string (nullable = true) > |-- d: double (nullable = true) > |-- e: string (nullable = false) > Static dataframe from the streaming job > root > |-- a: long (nullable = true) > |-- b: double (nullable = true) > |-- c: string (nullable = true) > |-- d: double (nullable = true) > |-- e: string (nullable = false) > Streaming dataframe from the streaming job > root > |-- a: long (nullable = true) > |-- b: double (nullable = true) > |-- c: string (nullable = true) > |-- d: double (nullable = true) > |-- e: string (nullable = true){noformat} > > So the column `d` is correctly set to `nullable = true` (expected), but in > the case of the column `e`, it stays non-nullable if it is read using the > `read` method and it is correctly set to `nullable = true` if read with > `readStream`. Is that expected? According to this old PR > [https://github.com/apache/spark/pull/25382|https://github.com/apache/spark/pull/25382)] > it was supposed to be resolved. Any ideas? -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org