Upon downsizing to 20 partitions some of your partitions become too big,
and I see that you're doing caching, and executors try to write big
partitions to disk, but fail because they exceed 2GiB

> Caused by: java.lang.IllegalArgumentException: Size exceeds
Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
apply(DiskStore.scala:125)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
apply(DiskStore.scala:124)

You can try to coalesce to 100 and reduce the number of executors to keep
the load on MySQL reasonable

On Wed, May 16, 2018 at 5:36 AM, Davide Brambilla <
davide.brambi...@contentwise.tv> wrote:

> Hi all,
>    we have a dataframe with 1000 partitions and we need to write the
> dataframe into a MySQL using this command:
>
> df.coalesce(20)
> df.write.jdbc(url=url,
>                   table=table,
>                   mode=mode,
>                   properties=properties)
>
> and we get this errors randomly
>
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
> apply(DiskStore.scala:125)
> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
> apply(DiskStore.scala:124)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:126)
> at org.apache.spark.storage.BlockManager.getLocalValues(
> BlockManager.scala:520)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:693)
> at org.apache.spark.storage.BlockManager.getOrElseUpdate(
> BlockManager.scala:753)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:96)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
> scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1690)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1678)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1677)
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1677)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
> at scala.Option.foreach(Option.scala:257)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
> DAGScheduler.scala:855)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> doOnReceive(DAGScheduler.scala:1905)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1860)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1849)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
> at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:446)
> at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:280)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:214)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: Size exceeds
> Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
> apply(DiskStore.scala:125)
> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
> apply(DiskStore.scala:124)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:126)
> at org.apache.spark.storage.BlockManager.getLocalValues(
> BlockManager.scala:520)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:693)
> at org.apache.spark.storage.BlockManager.getOrElseUpdate(
> BlockManager.scala:753)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:96)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> ... 1 more
>
> According to JDBC connector *numPartitions* determines the maximum write
> parallelism, we use coalesce instead to set partitions to 20 to avoid
> generating too much load on MySQL and it seems that the issue is due to the
> fact that Spark is repartitioning from 1000 to 20 before writing.
>
> If the issue is due to this configuration which is the best practice to
> write a dataframe with lots of partitions into our database limiting the
> concurrent connections?
>
> Thanks
>
>
> Davide B.
> ------------------------------------------------------------
> ----------------------------
> Davide Brambilla
> ContentWise R&D
> ContentWise
> davide.brambi...@contentwise.tv
> phone: +39 02 49517001 mobile: 345 71 13 800
> ContentWise SrL - Via Schiaffino, 11 - 20158 Milano (MI) – ITALY
> <https://maps.google.com/?q=Via+Schiaffino,+11+-+20158+Milano+(MI)+%E2%80%93+ITALY&entry=gmail&source=g>
>



-- 
Sent from my iPhone
  • [no subject] Davide Brambilla
    • Re: Jörn Franke
      • Re: Davide Brambilla
    • Re: Vadim Semenov
      • Re: Nicolas Paris

Reply via email to