masseyke opened a new pull request #34838: URL: https://github.com/apache/spark/pull/34838
This commit adds support for RDDs containing ShortWritables to pyspark. Right now if a user calls sc.newAPIHadoopRDD() with an InputFormat that provides ShortWritables, the call will fail with an error like the one below because ShortWritable is not explicitly handled by PythonHadoopUtil. ``` >>> rdd = sc.newAPIHadoopRDD(inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", ... keyClass="org.apache.hadoop.io.NullWritable", ... valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", ... conf=conf) 2021-12-08 14:38:40,439 ERROR scheduler.TaskSetManager: task 0.0 in stage 15.0 (TID 31) had a not serializable result: org.apache.hadoop.io.ShortWritable Serialization stack: - object not serializable (class: org.apache.hadoop.io.ShortWritable, value: 1) - writeObject data (class: java.util.HashMap) - object (class java.util.HashMap, {price=1}) - field (class: scala.Tuple2, name: _2, type: class java.lang.Object) - object (class scala.Tuple2, (1,{price=1})) - element of array (index: 0) - array (class [Lscala.Tuple2;, size 1); not retrying Traceback (most recent call last): File "<stdin>", line 4, in <module> File "/home/hduser/spark-3.1.2-bin-hadoop3.2/python/pyspark/context.py", line 853, in newAPIHadoopRDD jconf, batchSize) File "/home/hduser/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/home/hduser/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/home/hduser/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. : org.apache.spark.SparkException: Job aborted due to stage failure: task 0.0 in stage 15.0 (TID 31) had a not serializable result: org.apache.hadoop.io.ShortWritable Serialization stack: - object not serializable (class: org.apache.hadoop.io.ShortWritable, value: 1) - writeObject data (class: java.util.HashMap) - object (class java.util.HashMap, {price=1}) - field (class: scala.Tuple2, name: _2, type: class java.lang.Object) - object (class scala.Tuple2, (1,{price=1})) - element of array (index: 0) - array (class [Lscala.Tuple2;, size 1) at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236) at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.take(RDD.scala:1422) at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:173) at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:385) at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org