[jira] [Updated] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema
[ https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Peloton updated SPARK-48492: --- Issue Type: Bug (was: New Feature) > batch-read parquet files written by streaming returns non-nullable fields in > schema > --- > > Key: SPARK-48492 > URL: https://issues.apache.org/jira/browse/SPARK-48492 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 3.4.1 > Environment: python --version > Python 3.9.13 > > spark-submit --version > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ > /___/ .__/\_,_/_/ /_/\_\ version 3.4.1 > /_/ > > Using Scala version 2.12.17, OpenJDK 64-Bit Server VM, 1.8.0_302 > Branch HEAD > Compiled by user centos on 2023-06-19T23:01:01Z > Revision 6b1ff22dde1ead51cbf370be6e48a802daae58b6 >Reporter: Julien Peloton >Priority: Major > > Hello, > In the documentation, it is stated that > > When reading Parquet files, all columns are automatically converted to be > > nullable for compatibility reasons. > While this seems correct for static DataFrames, I have a counter example for > streaming ones: > > {code:java} > from pyspark.sql import SparkSession > from pyspark.sql import Row > import pyspark.sql.functions as F > spark = SparkSession.builder.getOrCreate() > spark.sparkContext.setLogLevel("WARN") > df = spark.createDataFrame( > [ > Row(a=1, b=2.0, c="toto"), > Row(a=3, b=4.0, c="titi"), > Row(a=10, b=11.0, c="tutu"), > ] > ) > # add a non-nullable column > df = df.withColumn('d', F.lit(1.0)) > print("Original dataframe") > df.printSchema() > # Write this on disk > df.write.parquet('static.parquet') > # Now load a stream > df_stream = ( > spark.readStream.format("parquet") > .schema(df.schema) > .option("path", "static.parquet") > .option("latestFirst", False) > .load() > ) > # add a non-nullable column > df_stream = df_stream.withColumn('e', F.lit("error")) > print("Streaming dataframe") > df_stream.printSchema() > # Now write the dataframe using writestream > query = ( > df_stream.writeStream.outputMode("append") > .format("parquet") > .option("checkpointLocation", 'test_parquet_checkpoint') > .option("path", 'test_parquet') > .trigger(availableNow=True) > .start() > ) > spark.streams.awaitAnyTermination() > # Now read back > df_stream_2 = spark.read.format("parquet").load("test_parquet") > print("Static dataframe from the streaming job (read)") > df_stream_2.printSchema() > # Now load a stream > df_stream_3 = ( > spark.readStream.format("parquet") > .schema(df_stream_2.schema) > .option("path", "test_parquet") > .option("latestFirst", False) > .load() > ) > print("Streaming dataframe from the streaming job (readStream)") > df_stream_3.printSchema(){code} > > > which outputs: > {noformat} > Original dataframe > root > |-- a: long (nullable = true) > |-- b: double (nullable = true) > |-- c: string (nullable = true) > |-- d: double (nullable = false) > Streaming dataframe > root > |-- a: long (nullable = true) > |-- b: double (nullable = true) > |-- c: string (nullable = true) > |-- d: double (nullable = true) > |-- e: string (nullable = false) > Static dataframe from the streaming job (read) > root > |-- a: long (nullable = true) > |-- b: double (nullable = true) > |-- c: string (nullable = true) > |-- d: double (nullable = true) > |-- e: string (nullable = false) > Streaming dataframe from the streaming job (readStream) > root > |-- a: long (nullable = true) > |-- b: double (nullable = true) > |-- c: string (nullable = true) > |-- d: double (nullable = true) > |-- e: string (nullable = true){noformat} > > So the column `d` is correctly set to `nullable = true` (expected), but in > the case of the column `e`, it stays non-nullable if it is read using the > `read` method and it is correctly set to `nullable = true` if read with > `readStream`. Is that expected? According to this old issue, > https://issues.apache.org/jira/browse/SPARK-28651, it was supposed to be > resolved. Any ideas? -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema
[ https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Peloton updated SPARK-48492: --- Description: Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for static DataFrames, I have a counter example for streaming ones: {code:java} from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) # add a non-nullable column df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema() # Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) # add a non-nullable column df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job (read)") df_stream_2.printSchema() # Now load a stream df_stream_3 = ( spark.readStream.format("parquet") .schema(df_stream_2.schema) .option("path", "test_parquet") .option("latestFirst", False) .load() ) print("Streaming dataframe from the streaming job (readStream)") df_stream_3.printSchema(){code} which outputs: {noformat} Original dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = false) Streaming dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Static dataframe from the streaming job (read) root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Streaming dataframe from the streaming job (readStream) root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = true){noformat} So the column `d` is correctly set to `nullable = true` (expected), but in the case of the column `e`, it stays non-nullable if it is read using the `read` method and it is correctly set to `nullable = true` if read with `readStream`. Is that expected? According to this old issue, https://issues.apache.org/jira/browse/SPARK-28651, it was supposed to be resolved. Any ideas? was: Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for static DataFrames, I have a counter example for streaming ones: {code:java} from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) # add a non-nullable column df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema() # Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) # add a non-nullable column df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job") df_stream_2.printSchema() # Now load a stream df_stream_3 = ( spark.readStream.format("parquet") .schema(df_stream_2.schema) .option("path", "test_parquet")
[jira] [Updated] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema
[ https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Peloton updated SPARK-48492: --- Description: Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for static DataFrames, I have a counter example for streaming ones: {code:java} from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) # add a non-nullable column df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema() # Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) # add a non-nullable column df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job") df_stream_2.printSchema() # Now load a stream df_stream_3 = ( spark.readStream.format("parquet") .schema(df_stream_2.schema) .option("path", "test_parquet") .option("latestFirst", False) .load() ) print("Streaming dataframe from the streaming job") df_stream_3.printSchema(){code} which outputs: {noformat} Original dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = false) Streaming dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Static dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Streaming dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = true){noformat} So the column `d` is correctly set to `nullable = true` (expected), but in the case of the column `e`, it stays non-nullable if it is read using the `read` method and it is correctly set to `nullable = true` if read with `readStream`. Is that expected? According to this old issue, https://issues.apache.org/jira/browse/SPARK-28651, it was supposed to be resolved. Any ideas? was: Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for static DataFrames, I have a counter example for streaming ones: {code:java} from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) # add a non-nullable column df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema() # Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) # add a non-nullable column df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job") df_stream_2.printSchema() # Now load a stream df_stream_3 = ( spark.readStream.format("parquet") .schema(df_stream_2.schema) .option("path", "test_parquet") .option("latestFirst", False) .lo
[jira] [Updated] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema
[ https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Peloton updated SPARK-48492: --- Description: Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for static DataFrames, I have a counter example for streaming ones: {code:java} from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) # add a non-nullable column df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema() # Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) # add a non-nullable column df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job") df_stream_2.printSchema() # Now load a stream df_stream_3 = ( spark.readStream.format("parquet") .schema(df_stream_2.schema) .option("path", "test_parquet") .option("latestFirst", False) .load() ) print("Streaming dataframe from the streaming job") df_stream_3.printSchema(){code} which outputs: {noformat} Original dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = false) Streaming dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Static dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Streaming dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = true){noformat} So the column `d` is correctly set to `nullable = true` (expected), but in the case of the column `e`, it stays non-nullable if it is read using the `read` method and it is correctly set to `nullable = true` if read with `readStream`. Is that expected? According to this old PR [https://github.com/apache/spark/pull/25382|https://github.com/apache/spark/pull/25382)] it was supposed to be resolved. Any ideas? was: Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for static DataFrames, I have a counter example for streaming ones: {code:java} from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) # add a non-nullable column df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema() # Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) # add a non-nullable column df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job") df_stream_2.printSchema() # Now load a stream df_stream_3 = ( spark.readStream.format("parquet") .schema(df_stream_2.schema) .option("path", "test_parquet") .op
[jira] [Updated] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema
[ https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Peloton updated SPARK-48492: --- Description: Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for static DataFrames, I have a counter example for streaming ones: {code:java} from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) # add a non-nullable column df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema() # Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) # add a non-nullable column df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job") df_stream_2.printSchema() # Now load a stream df_stream_3 = ( spark.readStream.format("parquet") .schema(df_stream_2.schema) .option("path", "test_parquet") .option("latestFirst", False) .load() ) print("Streaming dataframe from the streaming job") df_stream_3.printSchema(){code} which outputs: {noformat} Original dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = false) Streaming dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Static dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Streaming dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = true){noformat} So the column `d` is correctly set to `nullable = true` (expected), but in the case of the column `e`, it stays non-nullable if it is read using `read` method and it is correctly set to `nullable = true` is read with `readStream`. Is that expected? According to this old PR [https://github.com/apache/spark/pull/25382|https://github.com/apache/spark/pull/25382)] it was supposed to be resolved. Any ideas? was: Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for static DataFrames, I have a counter example for streaming ones: {code:java} from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) # add a non-nullable column df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema() # Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) # add a non-nullable column df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job") df_stream_2.printSchema() {code} which outputs: {noformat} Original dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = tr
[jira] [Updated] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema
[ https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Peloton updated SPARK-48492: --- Description: Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for static DataFrames, I have a counter example for streaming ones: {code:java} from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) # add a non-nullable column df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema() # Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) # add a non-nullable column df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job") df_stream_2.printSchema() {code} which outputs: {noformat} Original dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = false) Streaming dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Static dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false){noformat} So the column `d` is correctly set to `nullable = true`, but not the column `e`, which stays non-nullable. Is that expected? According to this old PR [https://github.com/apache/spark/pull/25382|https://github.com/apache/spark/pull/25382)] it was supposed to be resolved. Any ideas? was: Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for batch DataFrames, I have a counter example for streaming ones: {code:java} from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) # add a non-nullable column df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema() # Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) # add a non-nullable column df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job") df_stream_2.printSchema() {code} which outputs: {noformat} Original dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = false) Streaming dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Static dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false){noformat} So the column `d` is correctly set to `nullable = true`, but not the column `e`, which stays non-nullable. Is that expected? According to this old PR [https:
[jira] [Updated] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema
[ https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Peloton updated SPARK-48492: --- Description: Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for batch DataFrames, I have a counter example for streaming ones: {code:java} from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) # add a non-nullable column df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema() # Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) # add a non-nullable column df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job") df_stream_2.printSchema() {code} which outputs: {noformat} Original dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = false) Streaming dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Static dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false){noformat} So the column `d` is correctly set to `nullable = true`, but not the column `e`, which stays non-nullable. Is that expected? According to this old PR [https://github.com/apache/spark/pull/25382|https://github.com/apache/spark/pull/25382)] it was supposed to be resolved. Any ideas? was: Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for batch DataFrames, I have a counter example for streaming ones: {code:java} from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) # add a non-nullable column df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema()# Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) # add a non-nullable column df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job") df_stream_2.printSchema() {code} which outputs: {noformat} Original dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = false) Streaming dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Static dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false){noformat} So the column `d` is correctly set to `nullable = true`, but not the column `e`, which stays non-nullable. Is that expected? According to this [old PR]([https:/
[jira] [Updated] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema
[ https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Peloton updated SPARK-48492: --- Description: Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for batch DataFrames, I have a counter example for streaming ones: {code:java} from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) # add a non-nullable column df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema()# Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) # add a non-nullable column df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job") df_stream_2.printSchema() {code} which outputs: {noformat} Original dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = false) Streaming dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Static dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false){noformat} So the column `d` is correctly set to `nullable = true`, but not the column `e`, which stays non-nullable. Is that expected? According to this [old PR]([https://github.com/apache/spark/pull/25382)] it was supposed to be resolved. Any ideas? was: Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for batch DataFrames, I have a counter example for streaming ones: ```python from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema() # Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job") df_stream_2.printSchema() ``` which outputs: ``` Original dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = false) Streaming dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Static dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) ``` So the column `d` is correctly set to `nullable = true`, but not the column `e`, which stays non-nullable. Is that expected? According to this [old PR]([https://github.com/apache/spark/pull/25382)] it was supposed to be resolved. Any ideas? > batch-read parquet files writte
[jira] [Created] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema
Julien Peloton created SPARK-48492: -- Summary: batch-read parquet files written by streaming returns non-nullable fields in schema Key: SPARK-48492 URL: https://issues.apache.org/jira/browse/SPARK-48492 Project: Spark Issue Type: New Feature Components: Structured Streaming Affects Versions: 3.4.1 Environment: python --version Python 3.9.13 spark-submit --version Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.4.1 /_/ Using Scala version 2.12.17, OpenJDK 64-Bit Server VM, 1.8.0_302 Branch HEAD Compiled by user centos on 2023-06-19T23:01:01Z Revision 6b1ff22dde1ead51cbf370be6e48a802daae58b6 Reporter: Julien Peloton Hello, In the documentation, it is stated that > When reading Parquet files, all columns are automatically converted to be > nullable for compatibility reasons. While this seems correct for batch DataFrames, I have a counter example for streaming ones: ```python from pyspark.sql import SparkSession from pyspark.sql import Row import pyspark.sql.functions as F spark = SparkSession.builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame( [ Row(a=1, b=2.0, c="toto"), Row(a=3, b=4.0, c="titi"), Row(a=10, b=11.0, c="tutu"), ] ) df = df.withColumn('d', F.lit(1.0)) print("Original dataframe") df.printSchema() # Write this on disk df.write.parquet('static.parquet') # Now load a stream df_stream = ( spark.readStream.format("parquet") .schema(df.schema) .option("path", "static.parquet") .option("latestFirst", False) .load() ) df_stream = df_stream.withColumn('e', F.lit("error")) print("Streaming dataframe") df_stream.printSchema() # Now write the dataframe using writestream query = ( df_stream.writeStream.outputMode("append") .format("parquet") .option("checkpointLocation", 'test_parquet_checkpoint') .option("path", 'test_parquet') .trigger(availableNow=True) .start() ) spark.streams.awaitAnyTermination() # Now read back df_stream_2 = spark.read.format("parquet").load("test_parquet") print("Static dataframe from the streaming job") df_stream_2.printSchema() ``` which outputs: ``` Original dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = false) Streaming dataframe root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) Static dataframe from the streaming job root |-- a: long (nullable = true) |-- b: double (nullable = true) |-- c: string (nullable = true) |-- d: double (nullable = true) |-- e: string (nullable = false) ``` So the column `d` is correctly set to `nullable = true`, but not the column `e`, which stays non-nullable. Is that expected? According to this [old PR]([https://github.com/apache/spark/pull/25382)] it was supposed to be resolved. Any ideas? -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38435) Pandas UDF with type hints crashes at import
[ https://issues.apache.org/jira/browse/SPARK-38435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Peloton updated SPARK-38435: --- Description: h2. Old style pandas UDF let's consider a pandas UDF defined in the old style: {code:python} # mymod.py from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import StringType @pandas_udf(StringType(), PandasUDFType.SCALAR) def to_upper(s): return s.str.upper() {code} I can import it and use it as: {code:python} # main.py from pyspark.sql import SparkSession from mymod import to_upper spark = SparkSession.builder.getOrCreate() df = spark.createDataFrame([("John Doe",)], ("name",)) df.select(to_upper("name")).show() {code} and launch it via: {code:python} spark-submit main.py spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py:392: UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details. +--+ |to_upper(name)| +--+ | JOHN DOE| +--+ {code} as expected, we obtain the `UserWarning`, but the code is working fine. h2. New style pandas UDF: using type hint Let's now switch to the version using type hints: {code:python} # mymod.py import pandas as pd from pyspark.sql.functions import pandas_udf @pandas_udf("string") def to_upper(s: pd.Series) -> pd.Series: return s.str.upper() {code} But this time, I obtain an `AttributeError`: {code:bash} spark-submit main.py Traceback (most recent call last): File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 835, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 827, in from_ddl_datatype AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the above exception, another exception occurred:Traceback (most recent call last): File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 839, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 827, in from_ddl_datatype AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the above exception, another exception occurred:Traceback (most recent call last): File "/Users/julien/Documents/workspace/myrepos/fink-filters/main.py", line 2, in from mymod import to_upper File "/Users/julien/Documents/workspace/myrepos/fink-filters/mymod.py", line 5, in def to_upper(s: pd.Series) -> pd.Series: File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py", line 432, in _create_pandas_udf File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 43, in _create_udf File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 206, in _wrapped File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 96, in returnType File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 841, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 831, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 823, in from_ddl_schema AttributeError: 'NoneType' object has no attribute '_jvm' log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. {code} The code crashes at the import level. Looking at the code, the spark context needs to exist: [https://github.com/apache/spark/blob/8d70d5da3d74ebdd612b2cdc38201e121b88b24f/python/pyspark/sql/types.py#L819-L827] which at the time of the import is not the case. h2. Questions First, am I doing something wrong? I do not see in the documentation ([https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html]) mention of this, and it seems it should affect many users that are moving from old style to new style pandas UDF. Second, is this the expected behaviour? Looking at the old style pandas UDF, where the module can be imported without problem, the new behaviour looks like a regress. Why users would have to have the spark context active to just import a module that contains pandas UDF? Third, what could we do? I see in the master branch that an assert has been recently added (https://issues.apache.org/jira/browse/SPARK-37620): [https://
[jira] [Updated] (SPARK-38435) Pandas UDF with type hints crashes at import
[ https://issues.apache.org/jira/browse/SPARK-38435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Peloton updated SPARK-38435: --- Description: h2. Old style pandas UDF let's consider a pandas UDF defined in the old style: {code:python} # mymod.py from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import StringType @pandas_udf(StringType(), PandasUDFType.SCALAR) def to_upper(s): return s.str.upper() {code} I can import it and use it as: {code:python} # main.py from pyspark.sql import SparkSession from mymod import to_upper spark = SparkSession.builder.getOrCreate() df = spark.createDataFrame([("John Doe",)], ("name",)) df.select(to_upper("name")).show() {code} and launch it via: {code:python} spark-submit main.py spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py:392: UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details. +--+ |to_upper(name)| +--+ | JOHN DOE| +--+ {code} Except the `UserWarning`, the code is working as expected. h2. New style pandas UDF: using type hint Let's now switch to the version using type hints: {code:python} # mymod.py import pandas as pd from pyspark.sql.functions import pandas_udf @pandas_udf("string") def to_upper(s: pd.Series) -> pd.Series: return s.str.upper() {code} But this time, I obtain an `AttributeError`: {code:bash} spark-submit main.py Traceback (most recent call last): File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 835, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 827, in from_ddl_datatype AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the above exception, another exception occurred:Traceback (most recent call last): File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 839, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 827, in from_ddl_datatype AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the above exception, another exception occurred:Traceback (most recent call last): File "/Users/julien/Documents/workspace/myrepos/fink-filters/main.py", line 2, in from mymod import to_upper File "/Users/julien/Documents/workspace/myrepos/fink-filters/mymod.py", line 5, in def to_upper(s: pd.Series) -> pd.Series: File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py", line 432, in _create_pandas_udf File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 43, in _create_udf File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 206, in _wrapped File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 96, in returnType File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 841, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 831, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 823, in from_ddl_schema AttributeError: 'NoneType' object has no attribute '_jvm' log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. {code} The code crashes at the import level. Looking at the code, the spark context needs to exist: [https://github.com/apache/spark/blob/8d70d5da3d74ebdd612b2cdc38201e121b88b24f/python/pyspark/sql/types.py#L819-L827] which at the time of the import is not the case. h2. Questions First, am I doing something wrong? I do not see in the documentation ([https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html]) mention of this, and it seems it should affect many users that are moving from old style to new style pandas UDF. Second, is this the expected behaviour? Looking at the old style pandas UDF, where the module can be imported without problem, the new behaviour looks like a regress. Why users would have to have the spark context active to just import a module that contains pandas UDF? Third, what could we do? I see in the master branch that an assert has been recently added (https://issues.apache.org/jira/browse/SPARK-37620): [https://github.com/ap
[jira] [Updated] (SPARK-38435) Pandas UDF with type hints crashes at import
[ https://issues.apache.org/jira/browse/SPARK-38435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Peloton updated SPARK-38435: --- Description: h2. Old style pandas UDF let's consider a pandas UDF defined in the old style: {code:python} # mymod.py import pandas as pd from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import StringType @pandas_udf(StringType(), PandasUDFType.SCALAR) def to_upper(s): return s.str.upper() {code} I can import it and use it as: {code:python} # main.py from pyspark.sql import SparkSession from mymod import to_upper spark = SparkSession.builder.getOrCreate() df = spark.createDataFrame([("John Doe",)], ("name",)) df.select(to_upper("name")).show() {code} and launch it via: {code:python} spark-submit main.py spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py:392: UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details. +--+ |to_upper(name)| +--+ | JOHN DOE| +--+ {code} Except the `UserWarning`, the code is working as expected. h2. New style pandas UDF: using type hint Let's now switch to the version using type hints: {code:python} # mymod.py import pandas as pd from pyspark.sql.functions import pandas_udf @pandas_udf("string") def to_upper(s: pd.Series) -> pd.Series: return s.str.upper() {code} But this time, I obtain an `AttributeError`: {code:bash} spark-submit main.py Traceback (most recent call last): File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 835, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 827, in from_ddl_datatype AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the above exception, another exception occurred:Traceback (most recent call last): File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 839, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 827, in from_ddl_datatype AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the above exception, another exception occurred:Traceback (most recent call last): File "/Users/julien/Documents/workspace/myrepos/fink-filters/main.py", line 2, in from mymod import to_upper File "/Users/julien/Documents/workspace/myrepos/fink-filters/mymod.py", line 5, in def to_upper(s: pd.Series) -> pd.Series: File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py", line 432, in _create_pandas_udf File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 43, in _create_udf File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 206, in _wrapped File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 96, in returnType File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 841, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 831, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 823, in from_ddl_schema AttributeError: 'NoneType' object has no attribute '_jvm' log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. {code} The code crashes at the import level. Looking at the code, the spark context needs to exist: [https://github.com/apache/spark/blob/8d70d5da3d74ebdd612b2cdc38201e121b88b24f/python/pyspark/sql/types.py#L819-L827] which at the time of the import is not the case. h2. Questions First, am I doing something wrong? I do not see in the documentation ([https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html]) mention of this, and it seems it should affect many users that are moving from old style to new style pandas UDF. Second, is this the expected behaviour? Looking at the old style pandas UDF, where the module can be imported without problem, the new behaviour looks like a regress. Why users would have to have the spark context active to just import a module that contains pandas UDF? Third, what could we do? I see in the master branch that an assert has been recently added (https://issues.apache.org/jira/browse/SPARK-37620): [h
[jira] [Updated] (SPARK-38435) Pandas UDF with type hints crashes at import
[ https://issues.apache.org/jira/browse/SPARK-38435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Peloton updated SPARK-38435: --- Description: h2. Old style pandas UDF let's consider a pandas UDF defined in the old style: {code:python} # mymod.py import pandas as pd from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import StringType @pandas_udf(StringType(), PandasUDFType.SCALAR) def to_upper(s): return s.str.upper() {code} I can import it and use it as: {code:python} # main.py from pyspark.sql import SparkSession from mymod import to_upperspark = SparkSession.builder.getOrCreate() df = spark.createDataFrame([("John Doe",)], ("name",)) df.select(to_upper("name")).show() {code} and launch it via: {code:python} spark-submit main.py spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py:392: UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details. +--+ |to_upper(name)| +--+ | JOHN DOE| +--+ {code} Except the `UserWarning`, the code is working as expected. h2. New style pandas UDF: using type hint Let's now switch to the version using type hints: {code:python} # mymod.py import pandas as pd from pyspark.sql.functions import pandas_udf @pandas_udf("string") def to_upper(s: pd.Series) -> pd.Series: return s.str.upper() {code} But this time, I obtain an `AttributeError`: {code:bash} spark-submit main.py Traceback (most recent call last): File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 835, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 827, in from_ddl_datatype AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the above exception, another exception occurred:Traceback (most recent call last): File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 839, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 827, in from_ddl_datatype AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the above exception, another exception occurred:Traceback (most recent call last): File "/Users/julien/Documents/workspace/myrepos/fink-filters/main.py", line 2, in from mymod import to_upper File "/Users/julien/Documents/workspace/myrepos/fink-filters/mymod.py", line 5, in def to_upper(s: pd.Series) -> pd.Series: File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py", line 432, in _create_pandas_udf File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 43, in _create_udf File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 206, in _wrapped File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 96, in returnType File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 841, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 831, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 823, in from_ddl_schema AttributeError: 'NoneType' object has no attribute '_jvm' log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. {code} The code crashes at the import level. Looking at the code, the spark context needs to exist: [https://github.com/apache/spark/blob/8d70d5da3d74ebdd612b2cdc38201e121b88b24f/python/pyspark/sql/types.py#L819-L827] which at the time of the import is not the case. h2. Questions First, am I doing something wrong? I do not see in the documentation ([https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html]) mention of this, and it seems it should affect many users that are moving from old style to new style pandas UDF. Second, is this the expected behaviour? Looking at the old style pandas UDF, where the module can be imported without problem, the new behaviour looks like a regress. Why users would have to have the spark context active to just import a module that contains pandas UDF? Third, what could we do? I see in the master branch that an assert has been recently added (https://issues.apache.org/jira/browse/SPARK-37620): [ht
[jira] [Updated] (SPARK-38435) Pandas UDF with type hints crashes at import
[ https://issues.apache.org/jira/browse/SPARK-38435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Peloton updated SPARK-38435: --- Description: h2. Old style pandas UDF let's consider a pandas UDF defined in the old style: {code:java} {code:python} # mymod.py import pandas as pd from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import StringType @pandas_udf(StringType(), PandasUDFType.SCALAR) def to_upper(s): return s.str.upper() {code}{code} I can import it and use it as: {code:java} # main.py from pyspark.sql import SparkSession from mymod import to_upperspark = SparkSession.builder.getOrCreate() df = spark.createDataFrame([("John Doe",)], ("name",)) df.select(to_upper("name")).show() {code} and launch it via: {code:java} spark-submit main.py spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py:392: UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details. +--+ |to_upper(name)| +--+ | JOHN DOE| +--+ {code} Except the `UserWarning`, the code is working as expected. h2. New style pandas UDF: using type hint Let's now switch to the version using type hints: {code:java} # mymod.py import pandas as pd from pyspark.sql.functions import pandas_udf @pandas_udf("string") def to_upper(s: pd.Series) -> pd.Series: return s.str.upper() {code} But this time, I obtain an `AttributeError`: {code:java} spark-submit main.py Traceback (most recent call last): File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 835, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 827, in from_ddl_datatype AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the above exception, another exception occurred:Traceback (most recent call last): File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 839, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 827, in from_ddl_datatype AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the above exception, another exception occurred:Traceback (most recent call last): File "/Users/julien/Documents/workspace/myrepos/fink-filters/main.py", line 2, in from mymod import to_upper File "/Users/julien/Documents/workspace/myrepos/fink-filters/mymod.py", line 5, in def to_upper(s: pd.Series) -> pd.Series: File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py", line 432, in _create_pandas_udf File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 43, in _create_udf File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 206, in _wrapped File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 96, in returnType File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 841, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 831, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 823, in from_ddl_schema AttributeError: 'NoneType' object has no attribute '_jvm' log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. {code} The code crashes at the import level. Looking at the code, the spark context needs to exist: [https://github.com/apache/spark/blob/8d70d5da3d74ebdd612b2cdc38201e121b88b24f/python/pyspark/sql/types.py#L819-L827] which at the time of the import is not the case. h2. Questions First, am I doing something wrong? I do not see in the documentation ([https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html]) mention of this, and it seems it should affect many users that are moving from old style to new style pandas UDF. Second, is this the expected behaviour? Looking at the old style pandas UDF, where the module can be imported without problem, the new behaviour looks like a regress. Why users would have to have the spark context active to just import a module that contains pandas UDF? Third, what could we do? I see in the master branch that an assert has been recently added (https://issues.apache.org/jira/browse/SPARK-3
[jira] [Updated] (SPARK-38435) Pandas UDF with type hints crashes at import
[ https://issues.apache.org/jira/browse/SPARK-38435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Peloton updated SPARK-38435: --- Description: h2. Old style pandas UDF let's consider a pandas UDF defined in the old style: {code:java} # mymod.py import pandas as pd from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import StringType @pandas_udf(StringType(), PandasUDFType.SCALAR) def to_upper(s): return s.str.upper(){code} I can import it and use it as: {code:java} # main.py from pyspark.sql import SparkSession from mymod import to_upperspark = SparkSession.builder.getOrCreate() df = spark.createDataFrame([("John Doe",)], ("name",)) df.select(to_upper("name")).show() {code} and launch it via: {code:java} spark-submit main.py spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py:392: UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details. +--+ |to_upper(name)| +--+ | JOHN DOE| +--+ {code} Except the `UserWarning`, the code is working as expected. h2. New style pandas UDF: using type hint Let's now switch to the version using type hints: {code:java} # mymod.py import pandas as pd from pyspark.sql.functions import pandas_udf @pandas_udf("string") def to_upper(s: pd.Series) -> pd.Series: return s.str.upper() {code} But this time, I obtain an `AttributeError`: {code:java} spark-submit main.py Traceback (most recent call last): File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 835, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 827, in from_ddl_datatype AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the above exception, another exception occurred:Traceback (most recent call last): File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 839, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 827, in from_ddl_datatype AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the above exception, another exception occurred:Traceback (most recent call last): File "/Users/julien/Documents/workspace/myrepos/fink-filters/main.py", line 2, in from mymod import to_upper File "/Users/julien/Documents/workspace/myrepos/fink-filters/mymod.py", line 5, in def to_upper(s: pd.Series) -> pd.Series: File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py", line 432, in _create_pandas_udf File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 43, in _create_udf File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 206, in _wrapped File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 96, in returnType File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 841, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 831, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 823, in from_ddl_schema AttributeError: 'NoneType' object has no attribute '_jvm' log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. {code} The code crashes at the import level. Looking at the code, the spark context needs to exist: [https://github.com/apache/spark/blob/8d70d5da3d74ebdd612b2cdc38201e121b88b24f/python/pyspark/sql/types.py#L819-L827] which at the time of the import is not the case. h2. Questions First, am I doing something wrong? I do not see in the documentation ([https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html]) mention of this, and it seems it should affect many users that are moving from old style to new style pandas UDF. Second, is this the expected behaviour? Looking at the old style pandas UDF, where the module can be imported without problem, the new behaviour looks like a regress. Why users would have to have the spark context active to just import a module that contains pandas UDF? Third, what could we do? I see in the master branch that an assert has been recently added (https://issues.apache.org/jira/browse/SPARK-37620): [https://gith
[jira] [Updated] (SPARK-38435) Pandas UDF with type hints crashes at import
[ https://issues.apache.org/jira/browse/SPARK-38435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Peloton updated SPARK-38435: --- Description: h2. Old style pandas UDF let's consider a pandas UDF defined in the old style: {code:java} # mymod.py import pandas as pd from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import StringType @pandas_udf(StringType(), PandasUDFType.SCALAR) def to_upper(s): return s.str.upper(){code} I can import it and use it as: {code:java} # main.py from pyspark.sql import SparkSession from mymod import to_upperspark = SparkSession.builder.getOrCreate() df = spark.createDataFrame([("John Doe",)], ("name",)) df.select(to_upper("name")).show() {code} and launch it via: {code:java} spark-submit main.py spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py:392: UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details. +--+ |to_upper(name)| +--+ | JOHN DOE| +--+ {code} Except the `UserWarning`, the code is working as expected. h2. New style pandas UDF: using type hint Let's now switch to the version using type hints: ```python # mymod.py import pandas as pd from pyspark.sql.functions import pandas_udf @pandas_udf("string") def to_upper(s: pd.Series) -> pd.Series: return s.str.upper() ``` But this time, I obtain an `AttributeError`: ``` spark-submit main.py Traceback (most recent call last): File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 835, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 827, in from_ddl_datatype AttributeError: 'NoneType' object has no attribute '_jvm' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 839, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 827, in from_ddl_datatype AttributeError: 'NoneType' object has no attribute '_jvm' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/julien/Documents/workspace/myrepos/fink-filters/main.py", line 2, in from mymod import to_upper File "/Users/julien/Documents/workspace/myrepos/fink-filters/mymod.py", line 5, in def to_upper(s: pd.Series) -> pd.Series: File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py", line 432, in _create_pandas_udf File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 43, in _create_udf File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 206, in _wrapped File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 96, in returnType File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 841, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 831, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 823, in from_ddl_schema AttributeError: 'NoneType' object has no attribute '_jvm' log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See [http://logging.apache.org/log4j/1.2/faq.html#noconfig] for more info. ``` The code crashes at the import level. Looking at the code, the spark context needs to exist: [https://github.com/apache/spark/blob/8d70d5da3d74ebdd612b2cdc38201e121b88b24f/python/pyspark/sql/types.py#L819-L827] which at the time of the import is not the case. # ## Questions First, am I doing something wrong? I do not see in the documentation ([https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html]) mention of this, and it seems it should affect many users that are moving from old style to new style pandas UDF. Second, is this the expected behaviour? Looking at the old style pandas UDF, where the module can be imported without problem, the new behaviour looks like a regress. Why users would have to have the spark context active to just import a module that contains pandas UDF? Third, what could we do? I see in the master branch that an assert has been recently added (https://issues.apache.org/jira/browse/SPARK-37620): [https://github
[jira] [Created] (SPARK-38435) Pandas UDF with type hints crashes at import
Julien Peloton created SPARK-38435: -- Summary: Pandas UDF with type hints crashes at import Key: SPARK-38435 URL: https://issues.apache.org/jira/browse/SPARK-38435 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 3.1.0 Environment: Spark: 3.1 Python: 3.7 Reporter: Julien Peloton ## Old style pandas UDF let's consider a pandas UDF defined in the old style: {code:java} // code placeholder {code} ```python # mymod.py import pandas as pd from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import StringType @pandas_udf(StringType(), PandasUDFType.SCALAR) def to_upper(s): return s.str.upper() ``` I can import it and use it as: ```python # main.py from pyspark.sql import SparkSession from mymod import to_upper spark = SparkSession.builder.getOrCreate() df = spark.createDataFrame([("John Doe",)], ("name",)) df.select(to_upper("name")).show() ``` and launch it via: ```bash spark-submit main.py spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py:392: UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details. +--+ |to_upper(name)| +--+ | JOHN DOE| +--+ ``` Except the `UserWarning`, the code is working as expected. ## New style pandas UDF: using type hint Let's now switch to the version using type hints: ```python # mymod.py import pandas as pd from pyspark.sql.functions import pandas_udf @pandas_udf("string") def to_upper(s: pd.Series) -> pd.Series: return s.str.upper() ``` But this time, I obtain an `AttributeError`: ``` spark-submit main.py Traceback (most recent call last): File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 835, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 827, in from_ddl_datatype AttributeError: 'NoneType' object has no attribute '_jvm' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 839, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 827, in from_ddl_datatype AttributeError: 'NoneType' object has no attribute '_jvm' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/julien/Documents/workspace/myrepos/fink-filters/main.py", line 2, in from mymod import to_upper File "/Users/julien/Documents/workspace/myrepos/fink-filters/mymod.py", line 5, in def to_upper(s: pd.Series) -> pd.Series: File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py", line 432, in _create_pandas_udf File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 43, in _create_udf File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 206, in _wrapped File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py", line 96, in returnType File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 841, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 831, in _parse_datatype_string File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 823, in from_ddl_schema AttributeError: 'NoneType' object has no attribute '_jvm' log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. ``` The code crashes at the import level. Looking at the code, the spark context needs to exist: https://github.com/apache/spark/blob/8d70d5da3d74ebdd612b2cdc38201e121b88b24f/python/pyspark/sql/types.py#L819-L827 which at the time of the import is not the case. ## Questions First, am I doing something wrong? I do not see in the documentation (https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html) mention of this, and it seems it should affect many users that are moving from old style to new style pandas UDF. Second, is this the expected behaviour? Looking at the old style pandas UDF, where the module can be imported without problem, the new behaviour looks like a regress. Why users wou
[jira] [Commented] (SPARK-29367) pandas udf not working with latest pyarrow release (0.15.0)
[ https://issues.apache.org/jira/browse/SPARK-29367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16946266#comment-16946266 ] Julien Peloton commented on SPARK-29367: Thanks, it works like a charm! Julien > pandas udf not working with latest pyarrow release (0.15.0) > --- > > Key: SPARK-29367 > URL: https://issues.apache.org/jira/browse/SPARK-29367 > Project: Spark > Issue Type: Documentation > Components: PySpark >Affects Versions: 2.4.0, 2.4.1, 2.4.3 >Reporter: Julien Peloton >Assignee: Bryan Cutler >Priority: Major > > Hi, > I recently upgraded pyarrow from 0.14 to 0.15 (released on Oct 5th), and my > pyspark jobs using pandas udf are failing with > java.lang.IllegalArgumentException (tested with Spark 2.4.0, 2.4.1, and > 2.4.3). Here is a full example to reproduce the failure with pyarrow 0.15: > {code:python} > from pyspark.sql import SparkSession > from pyspark.sql.functions import pandas_udf, PandasUDFType > from pyspark.sql.types import BooleanType > import pandas as pd > @pandas_udf(BooleanType(), PandasUDFType.SCALAR) > def qualitycuts(nbad: int, rb: float, magdiff: float) -> pd.Series: > """ Apply simple quality cuts > Returns > -- > out: pandas.Series of booleans > Return a Pandas DataFrame with the appropriate flag: false for bad alert, > and true for good alert. > """ > mask = nbad.values == 0 > mask *= rb.values >= 0.55 > mask *= abs(magdiff.values) <= 0.1 > return pd.Series(mask) > spark = SparkSession.builder.getOrCreate() > # Create dummy DF > colnames = ["nbad", "rb", "magdiff"] > df = spark.sparkContext.parallelize( > zip( > [0, 1, 0, 0], > [0.01, 0.02, 0.6, 0.01], > [0.02, 0.05, 0.1, 0.01] > ) > ).toDF(colnames) > df.show() > # Apply cuts > df = df\ > .withColumn("toKeep", qualitycuts(*colnames))\ > .filter("toKeep == true")\ > .drop("toKeep") > # This will fail if latest pyarrow 0.15.0 is used > df.show() > {code} > and the log is: > {code} > Driver stacktrace: > 19/10/07 09:37:49 INFO DAGScheduler: Job 3 failed: showString at > NativeMethodAccessorImpl.java:0, took 0.660523 s > Traceback (most recent call last): > File > "/Users/julien/Documents/workspace/myrepos/fink-broker/test_pyarrow.py", line > 44, in > df.show() > File > "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", > line 378, in show > File > "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", > line 1257, in __call__ > File > "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", > line 63, in deco > File > "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", > line 328, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o64.showString. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 > (TID 5, localhost, executor driver): java.lang.IllegalArgumentException > at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) > at > org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) > at > org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) > at > org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) > at > org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181) > at > org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172) > at > org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65) > at > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162) > at > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at > org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.(ArrowEvalPythonExec.scala:98) > at > org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:96) > at > org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:127) > at > org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:89) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$app
[jira] [Created] (SPARK-29367) pandas udf not working with latest pyarrow release (0.15.0)
Julien Peloton created SPARK-29367: -- Summary: pandas udf not working with latest pyarrow release (0.15.0) Key: SPARK-29367 URL: https://issues.apache.org/jira/browse/SPARK-29367 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.4.3, 2.4.1, 2.4.0 Reporter: Julien Peloton Hi, I recently upgraded pyarrow from 0.14 to 0.15 (released on Oct 5th), and my pyspark jobs using pandas udf are failing with java.lang.IllegalArgumentException (tested with Spark 2.4.0, 2.4.1, and 2.4.3). Here is a full example to reproduce the failure with pyarrow 0.15: {code:python} from pyspark.sql import SparkSession from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import BooleanType import pandas as pd @pandas_udf(BooleanType(), PandasUDFType.SCALAR) def qualitycuts(nbad: int, rb: float, magdiff: float) -> pd.Series: """ Apply simple quality cuts Returns -- out: pandas.Series of booleans Return a Pandas DataFrame with the appropriate flag: false for bad alert, and true for good alert. """ mask = nbad.values == 0 mask *= rb.values >= 0.55 mask *= abs(magdiff.values) <= 0.1 return pd.Series(mask) spark = SparkSession.builder.getOrCreate() # Create dummy DF colnames = ["nbad", "rb", "magdiff"] df = spark.sparkContext.parallelize( zip( [0, 1, 0, 0], [0.01, 0.02, 0.6, 0.01], [0.02, 0.05, 0.1, 0.01] ) ).toDF(colnames) df.show() # Apply cuts df = df\ .withColumn("toKeep", qualitycuts(*colnames))\ .filter("toKeep == true")\ .drop("toKeep") # This will fail if latest pyarrow 0.15.0 is used df.show() {code} and the log is: {code} Driver stacktrace: 19/10/07 09:37:49 INFO DAGScheduler: Job 3 failed: showString at NativeMethodAccessorImpl.java:0, took 0.660523 s Traceback (most recent call last): File "/Users/julien/Documents/workspace/myrepos/fink-broker/test_pyarrow.py", line 44, in df.show() File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 378, in show File "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o64.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 5, localhost, executor driver): java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181) at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172) at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.(ArrowEvalPythonExec.scala:98) at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:96) at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:127) at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:89) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scal
[jira] [Commented] (SPARK-26024) Dataset API: repartitionByRange(...) has inconsistent behaviour
[ https://issues.apache.org/jira/browse/SPARK-26024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685638#comment-16685638 ] Julien Peloton commented on SPARK-26024: [~cloud_fan] I would not advocate to increase the size of {{sampleSizePerPartition}} as well, but mentioning on the doc its impact on the result makes sense to me. I will submit a patch then, thanks. ??Why would you need a super accurate range partitioner for your (large) data set?? It's more about reproducibility than anything else. For a given input set of parameters, I want the same repartitioned output - partition size included. In addition, I'm working in astronomy and I am used to the low-level RDD API (using {{partitioners}}) which is more flexible and detailed for custom partitioning to my opinion. > Dataset API: repartitionByRange(...) has inconsistent behaviour > --- > > Key: SPARK-26024 > URL: https://issues.apache.org/jira/browse/SPARK-26024 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.3.1, 2.3.2 > Environment: Spark version 2.3.2 >Reporter: Julien Peloton >Priority: Major > Labels: dataFrame, partitioning, repartition, spark-sql > > Hi, > I recently played with the {{repartitionByRange}} method for DataFrame > introduced in SPARK-22614. For DataFrames larger than the one tested in the > code (which has only 10 elements), the code sends back random results. > As a test for showing the inconsistent behaviour, I start as the unit code > used to test {{repartitionByRange}} > ([here|https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala#L352]) > but I increase the size of the initial array to 1000, repartition using 3 > partitions, and count the number of element per-partitions: > > {code} > // Shuffle numbers from 0 to 1000, and make a DataFrame > val df = Random.shuffle(0.to(1000)).toDF("val") > // Repartition it using 3 partitions > // Sum up number of elements in each partition, and collect it. > // And do it several times > for (i <- 0 to 9) { > var counts = df.repartitionByRange(3, col("val")) > .mapPartitions{part => Iterator(part.size)} > .collect() > println(counts.toList) > } > // -> the number of elements in each partition varies... > {code} > I do not know whether it is expected (I will dig further in the code), but it > sounds like a bug. > Or I just misinterpret what {{repartitionByRange}} is for? > Any ideas? > Thanks! > Julien -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26024) Dataset API: repartitionByRange(...) has inconsistent behaviour
[ https://issues.apache.org/jira/browse/SPARK-26024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685181#comment-16685181 ] Julien Peloton commented on SPARK-26024: Hi Marco and thanks for your quick reply. You are absolutely right, setting a higher {{sampleSizePerPartition}} fixes the number of elements to be the same at all iterations. I note however the lack of documentation on this. To my opinion, adding documentation appears to me quite crucial as the default {{sampleSizePerPartition}} is quite small (100). So in most of the real-life cases the same input to {{repartitionByRange}} will lead to random number of elements per partition. > Dataset API: repartitionByRange(...) has inconsistent behaviour > --- > > Key: SPARK-26024 > URL: https://issues.apache.org/jira/browse/SPARK-26024 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.3.1, 2.3.2 > Environment: Spark version 2.3.2 >Reporter: Julien Peloton >Priority: Major > Labels: dataFrame, partitioning, repartition, spark-sql > > Hi, > I recently played with the {{repartitionByRange}} method for DataFrame > introduced in SPARK-22614. For DataFrames larger than the one tested in the > code (which has only 10 elements), the code sends back random results. > As a test for showing the inconsistent behaviour, I start as the unit code > used to test {{repartitionByRange}} > ([here|https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala#L352]) > but I increase the size of the initial array to 1000, repartition using 3 > partitions, and count the number of element per-partitions: > > {code} > // Shuffle numbers from 0 to 1000, and make a DataFrame > val df = Random.shuffle(0.to(1000)).toDF("val") > // Repartition it using 3 partitions > // Sum up number of elements in each partition, and collect it. > // And do it several times > for (i <- 0 to 9) { > var counts = df.repartitionByRange(3, col("val")) > .mapPartitions{part => Iterator(part.size)} > .collect() > println(counts.toList) > } > // -> the number of elements in each partition varies... > {code} > I do not know whether it is expected (I will dig further in the code), but it > sounds like a bug. > Or I just misinterpret what {{repartitionByRange}} is for? > Any ideas? > Thanks! > Julien -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26024) Dataset API: repartitionByRange(...) has inconsistent behaviour
[ https://issues.apache.org/jira/browse/SPARK-26024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julien Peloton updated SPARK-26024: --- Affects Version/s: 2.3.0 2.3.1 > Dataset API: repartitionByRange(...) has inconsistent behaviour > --- > > Key: SPARK-26024 > URL: https://issues.apache.org/jira/browse/SPARK-26024 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.3.1, 2.3.2 > Environment: Spark version 2.3.2 >Reporter: Julien Peloton >Priority: Major > Labels: dataFrame, partitioning, repartition, spark-sql > > Hi, > I recently played with the {{repartitionByRange}} method for DataFrame > introduced in SPARK-22614. For DataFrames larger than the one tested in the > code (which has only 10 elements), the code sends back random results. > As a test for showing the inconsistent behaviour, I start as the unit code > used to test {{repartitionByRange}} > ([here|https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala#L352]) > but I increase the size of the initial array to 1000, repartition using 3 > partitions, and count the number of element per-partitions: > > {code} > // Shuffle numbers from 0 to 1000, and make a DataFrame > val df = Random.shuffle(0.to(1000)).toDF("val") > // Repartition it using 3 partitions > // Sum up number of elements in each partition, and collect it. > // And do it several times > for (i <- 0 to 9) { > var counts = df.repartitionByRange(3, col("val")) > .mapPartitions{part => Iterator(part.size)} > .collect() > println(counts.toList) > } > // -> the number of elements in each partition varies... > {code} > I do not know whether it is expected (I will dig further in the code), but it > sounds like a bug. > Or I just misinterpret what {{repartitionByRange}} is for? > Any ideas? > Thanks! > Julien -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26024) Dataset API: repartitionByRange(...) has inconsistent behaviour
Julien Peloton created SPARK-26024: -- Summary: Dataset API: repartitionByRange(...) has inconsistent behaviour Key: SPARK-26024 URL: https://issues.apache.org/jira/browse/SPARK-26024 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.2 Environment: Spark version 2.3.2 Reporter: Julien Peloton Hi, I recently played with the {{repartitionByRange}} method for DataFrame introduced in SPARK-22614. For DataFrames larger than the one tested in the code (which has only 10 elements), the code sends back random results. As a test for showing the inconsistent behaviour, I start as the unit code used to test {{repartitionByRange}} ([here|https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala#L352]) but I increase the size of the initial array to 1000, repartition using 3 partitions, and count the number of element per-partitions: {code} // Shuffle numbers from 0 to 1000, and make a DataFrame val df = Random.shuffle(0.to(1000)).toDF("val") // Repartition it using 3 partitions // Sum up number of elements in each partition, and collect it. // And do it several times for (i <- 0 to 9) { var counts = df.repartitionByRange(3, col("val")) .mapPartitions{part => Iterator(part.size)} .collect() println(counts.toList) } // -> the number of elements in each partition varies... {code} I do not know whether it is expected (I will dig further in the code), but it sounds like a bug. Or I just misinterpret what {{repartitionByRange}} is for? Any ideas? Thanks! Julien -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org