[
https://issues.apache.org/jira/browse/SPARK-45644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781276#comment-17781276
]
Adi Wehrli commented on SPARK-45644:
------------------------------------
Thanks for this information, [~bersprockets].
I could now cut the {{MapObjects_10}} method for either version:
h4. Spark 3.5.0
{code:java}
private ArrayData MapObjects_10(InternalRow i) {
scala.Option value_2284 = null;
if (!isNull_ExternalMapToCatalyst_value_lambda_variable_42) {
if
(value_ExternalMapToCatalyst_value_lambda_variable_42.getClass().isArray() ||
value_ExternalMapToCatalyst_value_lambda_variable_42 instanceof
scala.collection.Seq || value_ExternalMapToCatalyst_value_lambda_variable_42
instanceof scala.collection.immutable.Set ||
value_ExternalMapToCatalyst_value_lambda_variable_42 instanceof java.util.List)
{
value_2284 = (scala.Option)
value_ExternalMapToCatalyst_value_lambda_variable_42;
} else {
throw new
RuntimeException(value_ExternalMapToCatalyst_value_lambda_variable_42.getClass().getName()
+ ((java.lang.String) references[212] /* errMsg */));
}
}
final boolean isNull_1963 =
isNull_ExternalMapToCatalyst_value_lambda_variable_42 || value_2284.isEmpty();
scala.collection.Seq value_2283 = isNull_1963 ? null :
(scala.collection.Seq) value_2284.get();
ArrayData value_2282 = null;
if (!isNull_1963) {
int dataLength_10 = value_2283.size();
UTF8String[] convertedArray_10 = null;
convertedArray_10 = new UTF8String[dataLength_10];
int loopIndex_10 = 0;
scala.collection.Iterator it_10 = value_2283.toIterator();
while (loopIndex_10 < dataLength_10) {
value_MapObject_lambda_variable_43 = (java.lang.Object) (it_10.next());
isNull_MapObject_lambda_variable_43 = value_MapObject_lambda_variable_43
== null;
resultIsNull_127 = false;
if (!resultIsNull_127) {
java.lang.String value_2286 = null;
if (!isNull_MapObject_lambda_variable_43) {
if (value_MapObject_lambda_variable_43 instanceof java.lang.String) {
value_2286 = (java.lang.String) value_MapObject_lambda_variable_43;
} else {
throw new
RuntimeException(value_MapObject_lambda_variable_43.getClass().getName() +
((java.lang.String) references[213] /* errMsg */));
}
}
resultIsNull_127 = isNull_MapObject_lambda_variable_43;
mutableStateArray_0[121] = value_2286;
}
boolean isNull_1965 = resultIsNull_127;
UTF8String value_2285 = null;
if (!resultIsNull_127) {
value_2285 =
org.apache.spark.unsafe.types.UTF8String.fromString(mutableStateArray_0[121]);
}
if (isNull_1965) {
convertedArray_10[loopIndex_10] = null;
} else {
convertedArray_10[loopIndex_10] = value_2285;
}
loopIndex_10 += 1;
}
value_2282 = new
org.apache.spark.sql.catalyst.util.GenericArrayData(convertedArray_10);
}
globalIsNull_320 = isNull_1963;
return value_2282;
}
{code}
h4. Spark 3.3.3:
{code:java}
private scala.collection.Seq MapObjects_10(InternalRow i) {
scala.collection.Seq value_1083 = null;
if (!isNull_CatalystToExternalMap_value_lambda_variable_21) {
int dataLength_11 =
value_CatalystToExternalMap_value_lambda_variable_21.numElements();
scala.collection.mutable.Builder collectionBuilder_10 =
scala.collection.Seq$.MODULE$.newBuilder();
collectionBuilder_10.sizeHint(dataLength_11);
int loopIndex_11 = 0;
while (loopIndex_11 < dataLength_11) {
value_MapObject_lambda_variable_22 = (UTF8String)
(value_CatalystToExternalMap_value_lambda_variable_21.getUTF8String(loopIndex_11));
isNull_MapObject_lambda_variable_22 =
value_CatalystToExternalMap_value_lambda_variable_21.isNullAt(loopIndex_11);
boolean isNull_957 = true;
java.lang.String value_1084 = null;
if (!isNull_MapObject_lambda_variable_22) {
isNull_957 = false;
if (!isNull_957) {
Object funcResult_121 = null;
funcResult_121 = value_MapObject_lambda_variable_22.toString();
value_1084 = (java.lang.String) funcResult_121;
}
}
if (isNull_957) {
collectionBuilder_10.$plus$eq(null);
} else {
collectionBuilder_10.$plus$eq(value_1084);
}
loopIndex_11 += 1;
}
value_1083 = (scala.collection.Seq) collectionBuilder_10.result();
}
globalIsNull_81 = isNull_CatalystToExternalMap_value_lambda_variable_21;
return value_1083;
}
{code}
> After upgrading to Spark 3.4.1 and 3.5.0 we receive RuntimeException
> "scala.Some is not a valid external type for schema of array<string>"
> ------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-45644
> URL: https://issues.apache.org/jira/browse/SPARK-45644
> Project: Spark
> Issue Type: Bug
> Components: Spark Core, SQL
> Affects Versions: 3.4.1, 3.5.0
> Reporter: Adi Wehrli
> Priority: Major
>
> I do not really know if this is a bug, but I am at the end with my knowledge.
> A Spark job ran successfully with Spark 3.2.x and 3.3.x.
> But after upgrading to 3.4.1 (as well as with 3.5.0) running the same job
> with the same data the following always occurs now:
> {code}
> scala.Some is not a valid external type for schema of array<string>
> {code}
> The corresponding stacktrace is:
> {code}
> 2023-10-24T06:28:50.932 level=ERROR logger=org.apache.spark.executor.Executor
> msg="Exception in task 0.0 in stage 0.0 (TID 0)" thread="Executor task launch
> worker for task 0.0 in stage 0.0 (TID 0)"
> java.lang.RuntimeException: scala.Some is not a valid external type for
> schema of array<string>
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_10$(Unknown
> Source) ~[?:?]
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ExternalMapToCatalyst_1$(Unknown
> Source) ~[?:?]
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.createNamedStruct_14_3$(Unknown
> Source) ~[?:?]
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_12$(Unknown
> Source) ~[?:?]
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
> Source) ~[?:?]
> at
> org.apache.spark.sql.execution.ObjectOperator$.$anonfun$serializeObjectToRow$1(objects.scala:165)
> ~[spark-sql_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.sql.execution.AppendColumnsWithObjectExec.$anonfun$doExecute$15(objects.scala:380)
> ~[spark-sql_2.12-3.5.0.jar:3.5.0]
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
> ~[scala-library-2.12.15.jar:?]
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
> ~[scala-library-2.12.15.jar:?]
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
> at org.apache.spark.scheduler.Task.run(Task.scala:141)
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
> ~[spark-common-utils_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
> ~[spark-common-utils_2.12-3.5.0.jar:3.5.0]
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
> [spark-core_2.12-3.5.0.jar:3.5.0]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
> at java.lang.Thread.run(Thread.java:834) [?:?]
> 2023-10-24T06:28:50.932 level=ERROR logger=org.apache.spark.executor.Executor
> msg="Exception in task 1.0 in stage 0.0 (TID 1)" thread="Executor task launch
> worker for task 1.0 in stage 0.0 (TID 1)"
> java.lang.RuntimeException: scala.Some is not a valid external type for
> schema of array<string>
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_10$(Unknown
> Source) ~[?:?]
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ExternalMapToCatalyst_1$(Unknown
> Source) ~[?:?]
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.createNamedStruct_14_3$(Unknown
> Source) ~[?:?]
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_12$(Unknown
> Source) ~[?:?]
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
> Source) ~[?:?]
> at
> org.apache.spark.sql.execution.ObjectOperator$.$anonfun$serializeObjectToRow$1(objects.scala:165)
> ~[spark-sql_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.sql.execution.AppendColumnsWithObjectExec.$anonfun$doExecute$15(objects.scala:380)
> ~[spark-sql_2.12-3.5.0.jar:3.5.0]
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
> ~[scala-library-2.12.15.jar:?]
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
> ~[scala-library-2.12.15.jar:?]
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
> at org.apache.spark.scheduler.Task.run(Task.scala:141)
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
> ~[spark-common-utils_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
> ~[spark-common-utils_2.12-3.5.0.jar:3.5.0]
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
> [spark-core_2.12-3.5.0.jar:3.5.0]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> [?:?]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> [?:?]
> at java.lang.Thread.run(Thread.java:834) [?:?]
> {code}
> As the error occurs in generated code we cannot debug what was really the
> cause. We patched the {{ValidateExternalType}} case class (in trait
> {{org.apache.spark.sql.catalyst.expressions.InvokeLike}}) adding some sysout
> statements but we could still not get any answer which data structure was
> causing this.
> And we did not find anything in the upgrade guides about such a behaviour or
> how to change some property to re-gain the former behaviour.
> What could be the cause for this? In Spark 3.3.3 {{ScalaReflection}} was used
> in {{InvokeLite}}, Spark 3.4.x and 3.5.0 now use {{EncoderUtils}} instead.
> The same also occurs if we use Scala 2.12.18.
> h4. Some dependencies information:
> h5. Spark 3.3.3
> * Avro {{1.11.0}}
> * SnakeYAML {{1.31}}
> * FasterXML Jackson {{2.13.4}}
> * Json4s {{3.7.0-M11}}
> * scala-collection-compat_2.12 {{2.3.0}}
> * Kafka {{3.4.1}}
> * kafka-avro-serializer {{7.4.1}}
> h5. Spark 3.5.0
> * Avro {{1.11.2}}
> * SnakeYAML {{2.0}}
> * FasterXML Jackson {{2.15.2}}
> * Json4s {{3.7.0-M11}}
> * scala-collection-compat_2.12 {{2.3.0}}
> * Kafka {{3.5.1}}
> * kafka-avro-serializer {{7.5.1}}
> BTW: I tried Spark 3.5.0 with the same dependcies as listed above for Spark
> 3.3.3 but the error still occurred.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]