Ben created SPARK-19243:
---------------------------

             Summary: Error when selecting from DataFrame containing parsed 
data from files larger than 1MB
                 Key: SPARK-19243
                 URL: https://issues.apache.org/jira/browse/SPARK-19243
             Project: Spark
          Issue Type: Bug
            Reporter: Ben


I hope I can describe the problem clearly. This error happens with Spark 2.0.1. 
However, I tried with Spark 2.1.0 on my test PC and it worked there, none of 
the issues below, but I can't try it on the test cluster because Spark needs to 
be upgraded there. I'm opening this ticket because if it's a bug, maybe 
something is still partially present in Spark 2.1.0.

Initially I though it was my script's problem so I tried to debug, until I 
found why this is happening.

Step by step, I load XML files through spark-xml into a DataFrame. In my case, 
the rowTag is the root tag, so each XML file creates a row. The XML structure 
is fairly complex, which are converted to nested columns or arrays inside the 
DF. Since I need to flatten the whole table, and since the output is not fixed 
but I dynamically select what I want as output, in case I need to output 
columns that have been parsed as arrays, then I explode them with explode() 
only when needed.

Normally I can select various columns that don't have many entries without a 
problem. 
I select a column that has a lot of entries into a new DF, e.g. simply through
{noformat}
df2 = df.select(...)
{noformat}
and then if I try to do a count() or first() or anything, Spark behaves two 
ways:
1. If the source file was smaller than 1MB, it works.
2. If the source file was larger than 1MB, the following error occurs:

{noformat}
Traceback (most recent call last):
  File \"/myCode.py\", line 71, in main
    df.count()
  File 
\"/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/sql/dataframe.py\",
 line 299, in count
    return int(self._jdf.count())
  File 
\"/usr/hdp/current/spark-client/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py\",
 line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File 
\"/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/sql/utils.py\", 
line 63, in deco
    return f(*a, **kw)
  File 
\"/usr/hdp/current/spark-client/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py\",
 line 319, in get_return_value
    format(target_id, \".\", name), value)
Py4JJavaError: An error occurred while calling o180.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 
6, compname): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
  at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
  at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
  at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
  at 
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:438)
  at org.apache.spark.storage.BlockManager.get(BlockManager.scala:606)
  at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:663)
  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
  at org.apache.spark.scheduler.Task.run(Task.scala:86)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
  at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at scala.Option.foreach(Option.scala:257)
  at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
  at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
  at 
org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2526)
  at 
org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2523)
  at 
org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2523)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
  at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2523)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  at py4j.Gateway.invoke(Gateway.java:280)
  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  at py4j.commands.CallCommand.execute(CallCommand.java:79)
  at py4j.GatewayConnection.run(GatewayConnection.java:214)
  at java.lang.Thread.run(Thread.java:745)
  
Caused by: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
  at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
  at 
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1307)
  at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
  at 
org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:438)
  at org.apache.spark.storage.BlockManager.get(BlockManager.scala:606)
  at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:663)
  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
  at org.apache.spark.scheduler.Task.run(Task.scala:86)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  ... 1 more
{noformat}

It doesn't happen only when calling count(), but anything.
As mentioned, this only happens when there are many entries (This only happens 
in my case in only one configuration, when the column with the most entries is 
selected, a big difference from the others, and these entries were in an array 
before, and have been exploded), otherwise it works fine independently of the 
file size.

The same thing happens if the source is an archive containing the XML files. If 
the archive itself is larger than 1MB, error; smaller, works.

The Spark log shows that the error is when calling e.g. count(), although in my 
script's log I get this error after starting the select(...) command, but maybe 
that's because of the parallel processing. Consequently, I'm not sure whether 
the error happens during the explode(), select(), or for some reason after the 
DF has been prepared and I call e.g. count().
I have allocated more than enough memory. On another system I got a `Java heap 
space` error, I guess on the way to getting the actual error.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to