[ 
https://issues.apache.org/jira/browse/SPARK-50603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adam Binford updated SPARK-50603:
---------------------------------
    Description: 
Using the `basePath` option on a streaming read from a file source can cause 
the query to fail when the `path` doesn't contain a glob.

Example:

{code:python}
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

import tempfile

spark = SparkSession.Builder().getOrCreate()
spark.sparkContext.setLogLevel('WARN')

with tempfile.TemporaryDirectory() as dir:
table_dir=f'\{dir}/test-json'

(spark.range(100)
.withColumns({
'part': F.col('id') %5,
})
.write
.partitionBy('part')
.json(table_dir)
)

(spark.readStream
.schema('id long, part long')
.option('basePath', table_dir)
.json(f'\{table_dir}/part=0')
.writeStream
.format('console')
.trigger(once=True)
.start()
.awaitTermination()
)
{code}


Fails with
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 0 out of bounds for 
length 0
        at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:173)
        at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35)
        at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt(rows.scala:36)
        at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt$(rows.scala:36)
        at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.isNullAt(rows.scala:167)
        at 
org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:80)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
        at 
org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.$anonfun$apply$2(FileFormat.scala:161)
        at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:211)
        at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$IteratorWithMetrics.next(WriteToDataSourceV2Exec.scala:537)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$IteratorWithMetrics.next(WriteToDataSourceV2Exec.scala:523)
        at 
org.apache.spark.sql.connector.write.DataWriter.writeAll(DataWriter.java:91)
        at 
org.apache.spark.sql.execution.streaming.sources.PackedRowDataWriter.writeAll(PackedRowWriterFactory.scala:53)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:545)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:470)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1323)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:519)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:453)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:542)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:414)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
        at org.apache.spark.scheduler.Task.run(Task.scala:146)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$6(Executor.scala:658)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:661)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)

  was:
Using the `basePath` option on a streaming read from a file source can cause 
the query to fail when the `path` doesn't contain a glob.

Example:
{{from pyspark.sql import SparkSession
import pyspark.sql.functions as F

import tempfile

spark = SparkSession.Builder().getOrCreate()
spark.sparkContext.setLogLevel('WARN')

with tempfile.TemporaryDirectory() as dir:
table_dir=f'\{dir}/test-json'

(spark.range(100)
.withColumns({
'part': F.col('id') %5,
})
.write
.partitionBy('part')
.json(table_dir)
)

(spark.readStream
.schema('id long, part long')
.option('basePath', table_dir)
.json(f'\{table_dir}/part=0')
.writeStream
.format('console')
.trigger(once=True)
.start()
.awaitTermination()
)}}

Fails with
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 0 out of bounds for 
length 0
        at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:173)
        at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35)
        at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt(rows.scala:36)
        at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt$(rows.scala:36)
        at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.isNullAt(rows.scala:167)
        at 
org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:80)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
        at 
org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.$anonfun$apply$2(FileFormat.scala:161)
        at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:211)
        at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$IteratorWithMetrics.next(WriteToDataSourceV2Exec.scala:537)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$IteratorWithMetrics.next(WriteToDataSourceV2Exec.scala:523)
        at 
org.apache.spark.sql.connector.write.DataWriter.writeAll(DataWriter.java:91)
        at 
org.apache.spark.sql.execution.streaming.sources.PackedRowDataWriter.writeAll(PackedRowWriterFactory.scala:53)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:545)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:470)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1323)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:519)
        at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:453)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:542)
        at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:414)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
        at org.apache.spark.scheduler.Task.run(Task.scala:146)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$6(Executor.scala:658)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:661)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)


> basePath can cause failures on streaming reads for file sources
> ---------------------------------------------------------------
>
>                 Key: SPARK-50603
>                 URL: https://issues.apache.org/jira/browse/SPARK-50603
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.5.3
>            Reporter: Adam Binford
>            Priority: Major
>
> Using the `basePath` option on a streaming read from a file source can cause 
> the query to fail when the `path` doesn't contain a glob.
> Example:
> {code:python}
> from pyspark.sql import SparkSession
> import pyspark.sql.functions as F
> import tempfile
> spark = SparkSession.Builder().getOrCreate()
> spark.sparkContext.setLogLevel('WARN')
> with tempfile.TemporaryDirectory() as dir:
> table_dir=f'\{dir}/test-json'
> (spark.range(100)
> .withColumns({
> 'part': F.col('id') %5,
> })
> .write
> .partitionBy('part')
> .json(table_dir)
> )
> (spark.readStream
> .schema('id long, part long')
> .option('basePath', table_dir)
> .json(f'\{table_dir}/part=0')
> .writeStream
> .format('console')
> .trigger(once=True)
> .start()
> .awaitTermination()
> )
> {code}
> Fails with
> Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 0 out of bounds 
> for length 0
>         at 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:173)
>         at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35)
>         at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt(rows.scala:36)
>         at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.isNullAt$(rows.scala:36)
>         at 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.isNullAt(rows.scala:167)
>         at 
> org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:80)
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>         at 
> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.$anonfun$apply$2(FileFormat.scala:161)
>         at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
>         at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.next(FileScanRDD.scala:211)
>         at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
>         at 
> org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$IteratorWithMetrics.next(WriteToDataSourceV2Exec.scala:537)
>         at 
> org.apache.spark.sql.execution.datasources.v2.WritingSparkTask$IteratorWithMetrics.next(WriteToDataSourceV2Exec.scala:523)
>         at 
> org.apache.spark.sql.connector.write.DataWriter.writeAll(DataWriter.java:91)
>         at 
> org.apache.spark.sql.execution.streaming.sources.PackedRowDataWriter.writeAll(PackedRowWriterFactory.scala:53)
>         at 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:545)
>         at 
> org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$5(WriteToDataSourceV2Exec.scala:470)
>         at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1323)
>         at 
> org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:519)
>         at 
> org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:453)
>         at 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:542)
>         at 
> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:414)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
>         at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
>         at org.apache.spark.scheduler.Task.run(Task.scala:146)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$6(Executor.scala:658)
>         at 
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
>         at 
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:661)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>         at java.base/java.lang.Thread.run(Thread.java:840)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to