masseyke opened a new pull request #34838:
URL: https://github.com/apache/spark/pull/34838


   This commit adds support for RDDs containing ShortWritables to pyspark. 
Right now if a user calls sc.newAPIHadoopRDD() with an InputFormat that 
provides ShortWritables, the call will fail with an error like the one below 
because ShortWritable is not explicitly handled by PythonHadoopUtil.
   ```
   >>> rdd = 
sc.newAPIHadoopRDD(inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
   ...                          keyClass="org.apache.hadoop.io.NullWritable",
   ...                          
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
   ...                          conf=conf)
   2021-12-08 14:38:40,439 ERROR scheduler.TaskSetManager: task 0.0 in stage 
15.0 (TID 31) had a not serializable result: org.apache.hadoop.io.ShortWritable
   Serialization stack:
        - object not serializable (class: org.apache.hadoop.io.ShortWritable, 
value: 1)
        - writeObject data (class: java.util.HashMap)
        - object (class java.util.HashMap, {price=1})
        - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
        - object (class scala.Tuple2, (1,{price=1}))
        - element of array (index: 0)
        - array (class [Lscala.Tuple2;, size 1); not retrying
   Traceback (most recent call last):
     File "<stdin>", line 4, in <module>
     File "/home/hduser/spark-3.1.2-bin-hadoop3.2/python/pyspark/context.py", 
line 853, in newAPIHadoopRDD
       jconf, batchSize)
     File 
"/home/hduser/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
 line 1305, in __call__
     File "/home/hduser/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/utils.py", 
line 111, in deco
       return f(*a, **kw)
     File 
"/home/hduser/spark-3.1.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
 line 328, in get_return_value
   py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
   : org.apache.spark.SparkException: Job aborted due to stage failure: task 
0.0 in stage 15.0 (TID 31) had a not serializable result: 
org.apache.hadoop.io.ShortWritable
   Serialization stack:
        - object not serializable (class: org.apache.hadoop.io.ShortWritable, 
value: 1)
        - writeObject data (class: java.util.HashMap)
        - object (class java.util.HashMap, {price=1})
        - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
        - object (class scala.Tuple2, (1,{price=1}))
        - element of array (index: 0)
        - array (class [Lscala.Tuple2;, size 1)
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
        at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1422)
        at 
org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:173)
        at 
org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:385)
        at 
org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to