[ 
https://issues.apache.org/jira/browse/SPARK-35700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arghya Saha updated SPARK-35700:
--------------------------------
    Description: 
We are not able to upgrade to Spark 3.1.1 from Spark 2.4.x as the join on 
varchar column is failing which is unexpected and works on Spark 3.0.0.  We are 
to run it on Spark 3.1.1 (MR 3.2) on K8s 

Below is my use case:

Tables are external hive table and files are stored as ORC. We do have varchar 
column and when we are trying to perform join on varchar column we are getting 
the exception.

As I understand Spark 3.1.1 have introduced varchar data type but seems its not 
well tested with ORC and does not have backward compatibility. I have even 
tried with below config without luck

*spark.sql.legacy.charVarcharAsString: "true"*

We are not getting the error when *spark.sql.orc.filterPushdown=false*

Below is the code: Here col1 is of type varchar(32) in hive
{code:java}
df = spark.sql("select col1, col2 from table1 a inner join table2 on b 
(a.col1=b.col1 and a.col2 > b.col2 )") 
df.write.format("orc").option("compression", 
"zlib").mode("Append").save("<s3_path>")
{code}
Below is the error:

 
{code:java}
Job aborted due to stage failure: Task 43 in stage 5.0 failed 4 times, most 
recent failure: Lost task 43.3 in stage 5.0 (TID 524) (10.219.36.64 executor 
5): java.lang.UnsupportedOperationException: DataType: varchar(32)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFilters$.getPredicateLeafType(OrcFilters.scala:150)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFilters$.getType$1(OrcFilters.scala:222)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFilters$.buildLeafSearchArgument(OrcFilters.scala:266)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFilters$.convertibleFiltersHelper$1(OrcFilters.scala:132)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFilters$.$anonfun$convertibleFilters$4(OrcFilters.scala:135)
        at 
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
        at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
        at scala.collection.immutable.List.flatMap(List.scala:355)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFilters$.convertibleFilters(OrcFilters.scala:134)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFilters$.createFilter(OrcFilters.scala:73)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$4(OrcFileFormat.scala:189)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$4$adapted(OrcFileFormat.scala:188)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$1(OrcFileFormat.scala:188)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at 
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:503)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.columnartorow_nextBatch_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)

Driver stacktrace:3
{code}
 

I can see there is no mapping of varchar in OrcFilters.scala:150

[https://github.com/apache/spark/blob/v3.1.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala#L142]

 

 

  was:
We are not able to upgrade to Spark 3.1.1 from Spark 2.4.x as the join on 
varchar column is failing which is unexpected and works on Spark 3.0.0.  We are 
running Spark 3.1.1 (MR 3.2) on K8s 

Below is my use case:

Tables are external hive table and files are stored as ORC. We do have varchar 
column and when we are trying to perform join on varchar column we are getting 
the exception.

As I understand Spark 3.1.1 have introduced varchar data type but seems its not 
well tested with ORC and does not have backward compatibility. I have even 
tried with below config without luck

*spark.sql.legacy.charVarcharAsString: "true"*

We are not getting the error when *spark.sql.orc.filterPushdown=false*

Below is the code: Here col1 is of type varchar(32) in hive
{code:java}
df = spark.sql("select col1, col2 from table1 a inner join table2 on b 
(a.col1=b.col1 and a.col2 > b.col2 )") 
df.write.format("orc").option("compression", 
"zlib").mode("Append").save("<s3_path>")
{code}
Below is the error:

 
{code:java}
Job aborted due to stage failure: Task 43 in stage 5.0 failed 4 times, most 
recent failure: Lost task 43.3 in stage 5.0 (TID 524) (10.219.36.64 executor 
5): java.lang.UnsupportedOperationException: DataType: varchar(32)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFilters$.getPredicateLeafType(OrcFilters.scala:150)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFilters$.getType$1(OrcFilters.scala:222)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFilters$.buildLeafSearchArgument(OrcFilters.scala:266)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFilters$.convertibleFiltersHelper$1(OrcFilters.scala:132)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFilters$.$anonfun$convertibleFilters$4(OrcFilters.scala:135)
        at 
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
        at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
        at scala.collection.immutable.List.flatMap(List.scala:355)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFilters$.convertibleFilters(OrcFilters.scala:134)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFilters$.createFilter(OrcFilters.scala:73)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$4(OrcFileFormat.scala:189)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$4$adapted(OrcFileFormat.scala:188)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$1(OrcFileFormat.scala:188)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
        at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at 
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:503)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.columnartorow_nextBatch_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
        at java.base/java.lang.Thread.run(Unknown Source)

Driver stacktrace:3
{code}
 

I can see there is no mapping of varchar in OrcFilters.scala:150

[https://github.com/apache/spark/blob/v3.1.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala#L142]

 

 


> spark.sql.orc.filterPushdown not working with Spark 3.1.1 for tables with 
> varchar data type
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-35700
>                 URL: https://issues.apache.org/jira/browse/SPARK-35700
>             Project: Spark
>          Issue Type: Bug
>          Components: Kubernetes, PySpark, Spark Core
>    Affects Versions: 3.1.1
>         Environment: Spark 3.1.1 on K8S
>            Reporter: Arghya Saha
>            Priority: Major
>
> We are not able to upgrade to Spark 3.1.1 from Spark 2.4.x as the join on 
> varchar column is failing which is unexpected and works on Spark 3.0.0.  We 
> are to run it on Spark 3.1.1 (MR 3.2) on K8s 
> Below is my use case:
> Tables are external hive table and files are stored as ORC. We do have 
> varchar column and when we are trying to perform join on varchar column we 
> are getting the exception.
> As I understand Spark 3.1.1 have introduced varchar data type but seems its 
> not well tested with ORC and does not have backward compatibility. I have 
> even tried with below config without luck
> *spark.sql.legacy.charVarcharAsString: "true"*
> We are not getting the error when *spark.sql.orc.filterPushdown=false*
> Below is the code: Here col1 is of type varchar(32) in hive
> {code:java}
> df = spark.sql("select col1, col2 from table1 a inner join table2 on b 
> (a.col1=b.col1 and a.col2 > b.col2 )") 
> df.write.format("orc").option("compression", 
> "zlib").mode("Append").save("<s3_path>")
> {code}
> Below is the error:
>  
> {code:java}
> Job aborted due to stage failure: Task 43 in stage 5.0 failed 4 times, most 
> recent failure: Lost task 43.3 in stage 5.0 (TID 524) (10.219.36.64 executor 
> 5): java.lang.UnsupportedOperationException: DataType: varchar(32)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFilters$.getPredicateLeafType(OrcFilters.scala:150)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFilters$.getType$1(OrcFilters.scala:222)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFilters$.buildLeafSearchArgument(OrcFilters.scala:266)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFilters$.convertibleFiltersHelper$1(OrcFilters.scala:132)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFilters$.$anonfun$convertibleFilters$4(OrcFilters.scala:135)
>       at 
> scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
>       at scala.collection.immutable.List.foreach(List.scala:392)
>       at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
>       at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
>       at scala.collection.immutable.List.flatMap(List.scala:355)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFilters$.convertibleFilters(OrcFilters.scala:134)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFilters$.createFilter(OrcFilters.scala:73)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$4(OrcFileFormat.scala:189)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$4$adapted(OrcFileFormat.scala:188)
>       at scala.Option.foreach(Option.scala:407)
>       at 
> org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$1(OrcFileFormat.scala:188)
>       at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:116)
>       at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
>       at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
>       at 
> org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:503)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.columnartorow_nextBatch_0$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
>       at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>       at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:177)
>       at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
>       at org.apache.spark.scheduler.Task.run(Task.scala:131)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
>       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>       at java.base/java.lang.Thread.run(Unknown Source)
> Driver stacktrace:3
> {code}
>  
> I can see there is no mapping of varchar in OrcFilters.scala:150
> [https://github.com/apache/spark/blob/v3.1.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala#L142]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to