[ https://issues.apache.org/jira/browse/SPARK-50603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Adam Binford updated SPARK-50603: --------------------------------- Description: Using the `basePath` option on a streaming read from a file source can cause the query to fail when the `path` doesn't contain a glob. Example: {code:python} from pyspark.sql import SparkSession import pyspark.sql.functions as F import tempfile spark = SparkSession.Builder().getOrCreate() spark.sparkContext.setLogLevel('WARN') with tempfile.TemporaryDirectory() as dir: table_dir=f'\{dir}/test-json' (spark.range(100) .withColumns({ 'part': F.col('id') %5, }) .write .partitionBy('part') .json(table_dir) ) (spark.readStream .schema('id long, part long') .option('basePath', table_dir) .json(f'\{table_dir}/part=0') .writeStream .format('console') .trigger(once=True) .start() .awaitTermination() ) {code} Fails with Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 0 out of bounds for length 0 at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:173) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt(rows.scala:36) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt$(rows.scala:36) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.isNullAt(rows.scala:167) at org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:80) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.$anonfun$apply$2(FileFormat.scala:161) at scala.collection.Iterator$$anon$9.next(Iterator.scala:584) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:211) at scala.collection.Iterator$$anon$9.next(Iterator.scala:584) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$IteratorWithMetrics.next(WriteToDataSourceV2Exec.scala:537) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$IteratorWithMetrics.next(WriteToDataSourceV2Exec.scala:523) at org.apache.spark.sql.connector.write.DataWriter.writeAll(DataWriter.java:91) at org.apache.spark.sql.execution.streaming.sources.PackedRowDataWriter.writeAll(PackedRowWriterFactory.scala:53) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:545) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:470) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1323) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:519) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:453) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:542) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:414) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171) at org.apache.spark.scheduler.Task.run(Task.scala:146) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$6(Executor.scala:658) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:661) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) was: Using the `basePath` option on a streaming read from a file source can cause the query to fail when the `path` doesn't contain a glob. Example: {{from pyspark.sql import SparkSession import pyspark.sql.functions as F import tempfile spark = SparkSession.Builder().getOrCreate() spark.sparkContext.setLogLevel('WARN') with tempfile.TemporaryDirectory() as dir: table_dir=f'\{dir}/test-json' (spark.range(100) .withColumns({ 'part': F.col('id') %5, }) .write .partitionBy('part') .json(table_dir) ) (spark.readStream .schema('id long, part long') .option('basePath', table_dir) .json(f'\{table_dir}/part=0') .writeStream .format('console') .trigger(once=True) .start() .awaitTermination() )}} Fails with Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 0 out of bounds for length 0 at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:173) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt(rows.scala:36) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt$(rows.scala:36) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.isNullAt(rows.scala:167) at org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:80) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.$anonfun$apply$2(FileFormat.scala:161) at scala.collection.Iterator$$anon$9.next(Iterator.scala:584) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:211) at scala.collection.Iterator$$anon$9.next(Iterator.scala:584) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$IteratorWithMetrics.next(WriteToDataSourceV2Exec.scala:537) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$IteratorWithMetrics.next(WriteToDataSourceV2Exec.scala:523) at org.apache.spark.sql.connector.write.DataWriter.writeAll(DataWriter.java:91) at org.apache.spark.sql.execution.streaming.sources.PackedRowDataWriter.writeAll(PackedRowWriterFactory.scala:53) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:545) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:470) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1323) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:519) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:453) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:542) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:414) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171) at org.apache.spark.scheduler.Task.run(Task.scala:146) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$6(Executor.scala:658) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:661) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) > basePath can cause failures on streaming reads for file sources > --------------------------------------------------------------- > > Key: SPARK-50603 > URL: https://issues.apache.org/jira/browse/SPARK-50603 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.5.3 > Reporter: Adam Binford > Priority: Major > > Using the `basePath` option on a streaming read from a file source can cause > the query to fail when the `path` doesn't contain a glob. > Example: > {code:python} > from pyspark.sql import SparkSession > import pyspark.sql.functions as F > import tempfile > spark = SparkSession.Builder().getOrCreate() > spark.sparkContext.setLogLevel('WARN') > with tempfile.TemporaryDirectory() as dir: > table_dir=f'\{dir}/test-json' > (spark.range(100) > .withColumns({ > 'part': F.col('id') %5, > }) > .write > .partitionBy('part') > .json(table_dir) > ) > (spark.readStream > .schema('id long, part long') > .option('basePath', table_dir) > .json(f'\{table_dir}/part=0') > .writeStream > .format('console') > .trigger(once=True) > .start() > .awaitTermination() > ) > {code} > Fails with > Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 0 out of bounds > for length 0 > at > org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:173) > at > org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35) > at > org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt(rows.scala:36) > at > org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt$(rows.scala:36) > at > org.apache.spark.sql.catalyst.expressions.GenericInternalRow.isNullAt(rows.scala:167) > at > org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:80) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown > Source) > at > org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.$anonfun$apply$2(FileFormat.scala:161) > at scala.collection.Iterator$$anon$9.next(Iterator.scala:584) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:211) > at scala.collection.Iterator$$anon$9.next(Iterator.scala:584) > at > org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$IteratorWithMetrics.next(WriteToDataSourceV2Exec.scala:537) > at > org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$IteratorWithMetrics.next(WriteToDataSourceV2Exec.scala:523) > at > org.apache.spark.sql.connector.write.DataWriter.writeAll(DataWriter.java:91) > at > org.apache.spark.sql.execution.streaming.sources.PackedRowDataWriter.writeAll(PackedRowWriterFactory.scala:53) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:545) > at > org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:470) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1323) > at > org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:519) > at > org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:453) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:542) > at > org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:414) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) > at > org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171) > at org.apache.spark.scheduler.Task.run(Task.scala:146) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$6(Executor.scala:658) > at > org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80) > at > org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:661) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > at java.base/java.lang.Thread.run(Thread.java:840) -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org