[ 
https://issues.apache.org/jira/browse/SPARK-45644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17781276#comment-17781276
 ] 

Adi Wehrli commented on SPARK-45644:
------------------------------------

Thanks for this information, [~bersprockets]. 

I could now cut the {{MapObjects_10}} method for either version:

h4. Spark 3.5.0
{code:java}
private ArrayData MapObjects_10(InternalRow i) {
  scala.Option value_2284 = null;
  if (!isNull_ExternalMapToCatalyst_value_lambda_variable_42) {
    if 
(value_ExternalMapToCatalyst_value_lambda_variable_42.getClass().isArray() || 
value_ExternalMapToCatalyst_value_lambda_variable_42 instanceof 
scala.collection.Seq || value_ExternalMapToCatalyst_value_lambda_variable_42 
instanceof scala.collection.immutable.Set || 
value_ExternalMapToCatalyst_value_lambda_variable_42 instanceof java.util.List) 
{
      value_2284 = (scala.Option) 
value_ExternalMapToCatalyst_value_lambda_variable_42;
    } else {
      throw new 
RuntimeException(value_ExternalMapToCatalyst_value_lambda_variable_42.getClass().getName()
 + ((java.lang.String) references[212] /* errMsg */));
    }
  }
  final boolean isNull_1963 = 
isNull_ExternalMapToCatalyst_value_lambda_variable_42 || value_2284.isEmpty();
scala.collection.Seq value_2283 = isNull_1963 ? null :
  (scala.collection.Seq) value_2284.get();
  ArrayData value_2282 = null;

  if (!isNull_1963) {

    int dataLength_10 = value_2283.size();

    UTF8String[] convertedArray_10 = null;
    convertedArray_10 = new UTF8String[dataLength_10];


    int loopIndex_10 = 0;
    scala.collection.Iterator it_10 = value_2283.toIterator();
    while (loopIndex_10 < dataLength_10) {
      value_MapObject_lambda_variable_43 = (java.lang.Object) (it_10.next());
      isNull_MapObject_lambda_variable_43 = value_MapObject_lambda_variable_43 
== null;

      resultIsNull_127 = false;
      if (!resultIsNull_127) {
        java.lang.String value_2286 = null;
        if (!isNull_MapObject_lambda_variable_43) {
          if (value_MapObject_lambda_variable_43 instanceof java.lang.String) {
            value_2286 = (java.lang.String) value_MapObject_lambda_variable_43;
          } else {
            throw new 
RuntimeException(value_MapObject_lambda_variable_43.getClass().getName() + 
((java.lang.String) references[213] /* errMsg */));
          }
        }
        resultIsNull_127 = isNull_MapObject_lambda_variable_43;
        mutableStateArray_0[121] = value_2286;
      }

      boolean isNull_1965 = resultIsNull_127;
      UTF8String value_2285 = null;
      if (!resultIsNull_127) {
        value_2285 = 
org.apache.spark.unsafe.types.UTF8String.fromString(mutableStateArray_0[121]);
      }
      if (isNull_1965) {
        convertedArray_10[loopIndex_10] = null;
      } else {
        convertedArray_10[loopIndex_10] = value_2285;
      }

      loopIndex_10 += 1;
    }

    value_2282 = new 
org.apache.spark.sql.catalyst.util.GenericArrayData(convertedArray_10);
  }
  globalIsNull_320 = isNull_1963;
  return value_2282;
}
{code}

h4. Spark 3.3.3:
{code:java}
private scala.collection.Seq MapObjects_10(InternalRow i) {
  scala.collection.Seq value_1083 = null;

  if (!isNull_CatalystToExternalMap_value_lambda_variable_21) {

    int dataLength_11 = 
value_CatalystToExternalMap_value_lambda_variable_21.numElements();

    scala.collection.mutable.Builder collectionBuilder_10 = 
scala.collection.Seq$.MODULE$.newBuilder();
    collectionBuilder_10.sizeHint(dataLength_11);


    int loopIndex_11 = 0;

    while (loopIndex_11 < dataLength_11) {
      value_MapObject_lambda_variable_22 = (UTF8String) 
(value_CatalystToExternalMap_value_lambda_variable_21.getUTF8String(loopIndex_11));
      isNull_MapObject_lambda_variable_22 = 
value_CatalystToExternalMap_value_lambda_variable_21.isNullAt(loopIndex_11);

      boolean isNull_957 = true;
      java.lang.String value_1084 = null;
      if (!isNull_MapObject_lambda_variable_22) {
        isNull_957 = false;
        if (!isNull_957) {

          Object funcResult_121 = null;
          funcResult_121 = value_MapObject_lambda_variable_22.toString();
          value_1084 = (java.lang.String) funcResult_121;

        }
      }
      if (isNull_957) {
        collectionBuilder_10.$plus$eq(null);
      } else {
        collectionBuilder_10.$plus$eq(value_1084);
      }

      loopIndex_11 += 1;
    }

    value_1083 = (scala.collection.Seq) collectionBuilder_10.result();
  }
  globalIsNull_81 = isNull_CatalystToExternalMap_value_lambda_variable_21;
  return value_1083;
}
{code}


> After upgrading to Spark 3.4.1 and 3.5.0 we receive RuntimeException 
> "scala.Some is not a valid external type for schema of array<string>"
> ------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-45644
>                 URL: https://issues.apache.org/jira/browse/SPARK-45644
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 3.4.1, 3.5.0
>            Reporter: Adi Wehrli
>            Priority: Major
>
> I do not really know if this is a bug, but I am at the end with my knowledge.
> A Spark job ran successfully with Spark 3.2.x and 3.3.x. 
> But after upgrading to 3.4.1 (as well as with 3.5.0) running the same job 
> with the same data the following always occurs now:
> {code}
> scala.Some is not a valid external type for schema of array<string>
> {code}
> The corresponding stacktrace is:
> {code}
> 2023-10-24T06:28:50.932 level=ERROR logger=org.apache.spark.executor.Executor 
> msg="Exception in task 0.0 in stage 0.0 (TID 0)" thread="Executor task launch 
> worker for task 0.0 in stage 0.0 (TID 0)"
> java.lang.RuntimeException: scala.Some is not a valid external type for 
> schema of array<string>
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_10$(Unknown
>  Source) ~[?:?]
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ExternalMapToCatalyst_1$(Unknown
>  Source) ~[?:?]
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.createNamedStruct_14_3$(Unknown
>  Source) ~[?:?]
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_12$(Unknown
>  Source) ~[?:?]
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source) ~[?:?]
>       at 
> org.apache.spark.sql.execution.ObjectOperator$.$anonfun$serializeObjectToRow$1(objects.scala:165)
>  ~[spark-sql_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.sql.execution.AppendColumnsWithObjectExec.$anonfun$doExecute$15(objects.scala:380)
>  ~[spark-sql_2.12-3.5.0.jar:3.5.0]
>       at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) 
> ~[scala-library-2.12.15.jar:?]
>       at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) 
> ~[scala-library-2.12.15.jar:?]
>       at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
>  ~[spark-core_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>  ~[spark-core_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) 
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) 
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) 
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
>       at org.apache.spark.scheduler.Task.run(Task.scala:141) 
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
>  ~[spark-core_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
>  ~[spark-common-utils_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
>  ~[spark-common-utils_2.12-3.5.0.jar:3.5.0]
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) 
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) 
> [spark-core_2.12-3.5.0.jar:3.5.0]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]
>       at java.lang.Thread.run(Thread.java:834) [?:?]
> 2023-10-24T06:28:50.932 level=ERROR logger=org.apache.spark.executor.Executor 
> msg="Exception in task 1.0 in stage 0.0 (TID 1)" thread="Executor task launch 
> worker for task 1.0 in stage 0.0 (TID 1)"
> java.lang.RuntimeException: scala.Some is not a valid external type for 
> schema of array<string>
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_10$(Unknown
>  Source) ~[?:?]
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ExternalMapToCatalyst_1$(Unknown
>  Source) ~[?:?]
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.createNamedStruct_14_3$(Unknown
>  Source) ~[?:?]
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_12$(Unknown
>  Source) ~[?:?]
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source) ~[?:?]
>       at 
> org.apache.spark.sql.execution.ObjectOperator$.$anonfun$serializeObjectToRow$1(objects.scala:165)
>  ~[spark-sql_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.sql.execution.AppendColumnsWithObjectExec.$anonfun$doExecute$15(objects.scala:380)
>  ~[spark-sql_2.12-3.5.0.jar:3.5.0]
>       at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) 
> ~[scala-library-2.12.15.jar:?]
>       at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) 
> ~[scala-library-2.12.15.jar:?]
>       at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
>  ~[spark-core_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>  ~[spark-core_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) 
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) 
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) 
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
>       at org.apache.spark.scheduler.Task.run(Task.scala:141) 
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
>  ~[spark-core_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
>  ~[spark-common-utils_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
>  ~[spark-common-utils_2.12-3.5.0.jar:3.5.0]
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) 
> ~[spark-core_2.12-3.5.0.jar:3.5.0]
>       at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) 
> [spark-core_2.12-3.5.0.jar:3.5.0]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]
>       at java.lang.Thread.run(Thread.java:834) [?:?]
> {code}
> As the error occurs in generated code we cannot debug what was really the 
> cause. We patched the {{ValidateExternalType}} case class (in trait 
> {{org.apache.spark.sql.catalyst.expressions.InvokeLike}}) adding some sysout 
> statements but we could still not get any answer which data structure was 
> causing this.
> And we did not find anything in the upgrade guides about such a behaviour or 
> how to change some property to re-gain the former behaviour.
> What could be the cause for this? In Spark 3.3.3 {{ScalaReflection}} was used 
> in {{InvokeLite}}, Spark 3.4.x and 3.5.0 now use {{EncoderUtils}} instead. 
> The same also occurs if we use Scala 2.12.18.
> h4. Some dependencies information:
> h5. Spark 3.3.3
> * Avro {{1.11.0}}
> * SnakeYAML {{1.31}}
> * FasterXML Jackson {{2.13.4}}
> * Json4s {{3.7.0-M11}}
> * scala-collection-compat_2.12 {{2.3.0}}
> * Kafka {{3.4.1}}
> * kafka-avro-serializer {{7.4.1}}
> h5. Spark 3.5.0
> * Avro {{1.11.2}}
> * SnakeYAML {{2.0}}
> * FasterXML Jackson {{2.15.2}}
> * Json4s {{3.7.0-M11}}
> * scala-collection-compat_2.12 {{2.3.0}}
> * Kafka {{3.5.1}}
> * kafka-avro-serializer {{7.5.1}}
> BTW: I tried Spark 3.5.0 with the same dependcies as listed above for Spark 
> 3.3.3 but the error still occurred.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to