[ 
https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-48492:
-----------------------------------
    Description: 
Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for static DataFrames, I have a counter example for 
streaming ones:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

# add a non-nullable column
df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()

# Write this on disk
df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

# add a non-nullable column
df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")

print("Static dataframe from the streaming job")
df_stream_2.printSchema() 

# Now load a stream
df_stream_3 = (
    spark.readStream.format("parquet")
    .schema(df_stream_2.schema)
    .option("path", "test_parquet")
    .option("latestFirst", False)
    .load()
)

print("Streaming dataframe from the streaming job")
df_stream_3.printSchema(){code}
 

 

which outputs:
{noformat}
Original dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = false)

Streaming dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Static dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Streaming dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = true){noformat}
 

So the column `d` is correctly set to `nullable = true` (expected), but in the 
case of the column `e`, it stays non-nullable if it is read using the `read` 
method and it is correctly set to `nullable = true` if read with `readStream`. 
Is that expected? According to this old PR 
[https://github.com/apache/spark/pull/25382|https://github.com/apache/spark/pull/25382)]
 it was supposed to be resolved. Any ideas?

  was:
Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for static DataFrames, I have a counter example for 
streaming ones:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

# add a non-nullable column
df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()

# Write this on disk
df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

# add a non-nullable column
df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")

print("Static dataframe from the streaming job")
df_stream_2.printSchema() 

# Now load a stream
df_stream_3 = (
    spark.readStream.format("parquet")
    .schema(df_stream_2.schema)
    .option("path", "test_parquet")
    .option("latestFirst", False)
    .load()
)

print("Streaming dataframe from the streaming job")
df_stream_3.printSchema(){code}
 

 

which outputs:
{noformat}
Original dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = false)

Streaming dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Static dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Streaming dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = true){noformat}
 

So the column `d` is correctly set to `nullable = true` (expected), but in the 
case of the column `e`, it stays non-nullable if it is read using `read` method 
and it is correctly set to `nullable = true` is read with `readStream`. Is that 
expected? According to this old PR 
[https://github.com/apache/spark/pull/25382|https://github.com/apache/spark/pull/25382)]
 it was supposed to be resolved. Any ideas?


> batch-read parquet files written by streaming returns non-nullable fields in 
> schema
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-48492
>                 URL: https://issues.apache.org/jira/browse/SPARK-48492
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>    Affects Versions: 3.4.1
>         Environment: python --version
> Python 3.9.13
>  
> spark-submit --version
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /___/ .__/\_,_/_/ /_/\_\   version 3.4.1
>       /_/
>                         
> Using Scala version 2.12.17, OpenJDK 64-Bit Server VM, 1.8.0_302
> Branch HEAD
> Compiled by user centos on 2023-06-19T23:01:01Z
> Revision 6b1ff22dde1ead51cbf370be6e48a802daae58b6
>            Reporter: Julien Peloton
>            Priority: Major
>
> Hello,
> In the documentation, it is stated that
> > When reading Parquet files, all columns are automatically converted to be 
> > nullable for compatibility reasons.
> While this seems correct for static DataFrames, I have a counter example for 
> streaming ones:
>  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql import Row
> import pyspark.sql.functions as F
> spark = SparkSession.builder.getOrCreate()
> spark.sparkContext.setLogLevel("WARN")
> df = spark.createDataFrame(
>     [
>         Row(a=1, b=2.0, c="toto"),
>         Row(a=3, b=4.0, c="titi"),
>         Row(a=10, b=11.0, c="tutu"),
>     ]
> )
> # add a non-nullable column
> df = df.withColumn('d', F.lit(1.0))
> print("Original dataframe")
> df.printSchema()
> # Write this on disk
> df.write.parquet('static.parquet')
> # Now load a stream
> df_stream = (
>     spark.readStream.format("parquet")
>     .schema(df.schema)
>     .option("path", "static.parquet")
>     .option("latestFirst", False)
>     .load()
> )
> # add a non-nullable column
> df_stream = df_stream.withColumn('e', F.lit("error"))
> print("Streaming dataframe")
> df_stream.printSchema()
> # Now write the dataframe using writestream
> query = (
>     df_stream.writeStream.outputMode("append")
>     .format("parquet")
>     .option("checkpointLocation", 'test_parquet_checkpoint')
>     .option("path", 'test_parquet')
>     .trigger(availableNow=True)
>     .start()
> )
> spark.streams.awaitAnyTermination()
> # Now read back
> df_stream_2 = spark.read.format("parquet").load("test_parquet")
> print("Static dataframe from the streaming job")
> df_stream_2.printSchema() 
> # Now load a stream
> df_stream_3 = (
>     spark.readStream.format("parquet")
>     .schema(df_stream_2.schema)
>     .option("path", "test_parquet")
>     .option("latestFirst", False)
>     .load()
> )
> print("Streaming dataframe from the streaming job")
> df_stream_3.printSchema(){code}
>  
>  
> which outputs:
> {noformat}
> Original dataframe
> root
>  |-- a: long (nullable = true)
>  |-- b: double (nullable = true)
>  |-- c: string (nullable = true)
>  |-- d: double (nullable = false)
> Streaming dataframe
> root
>  |-- a: long (nullable = true)
>  |-- b: double (nullable = true)
>  |-- c: string (nullable = true)
>  |-- d: double (nullable = true)
>  |-- e: string (nullable = false)
> Static dataframe from the streaming job
> root
>  |-- a: long (nullable = true)
>  |-- b: double (nullable = true)
>  |-- c: string (nullable = true)
>  |-- d: double (nullable = true)
>  |-- e: string (nullable = false)
> Streaming dataframe from the streaming job
> root
>  |-- a: long (nullable = true)
>  |-- b: double (nullable = true)
>  |-- c: string (nullable = true)
>  |-- d: double (nullable = true)
>  |-- e: string (nullable = true){noformat}
>  
> So the column `d` is correctly set to `nullable = true` (expected), but in 
> the case of the column `e`, it stays non-nullable if it is read using the 
> `read` method and it is correctly set to `nullable = true` if read with 
> `readStream`. Is that expected? According to this old PR 
> [https://github.com/apache/spark/pull/25382|https://github.com/apache/spark/pull/25382)]
>  it was supposed to be resolved. Any ideas?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to