Upon downsizing to 20 partitions some of your partitions become too big, and I see that you're doing caching, and executors try to write big partitions to disk, but fail because they exceed 2GiB
> Caused by: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4. apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4. apply(DiskStore.scala:124) You can try to coalesce to 100 and reduce the number of executors to keep the load on MySQL reasonable On Wed, May 16, 2018 at 5:36 AM, Davide Brambilla < davide.brambi...@contentwise.tv> wrote: > Hi all, > we have a dataframe with 1000 partitions and we need to write the > dataframe into a MySQL using this command: > > df.coalesce(20) > df.write.jdbc(url=url, > table=table, > mode=mode, > properties=properties) > > and we get this errors randomly > > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) > at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4. > apply(DiskStore.scala:125) > at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4. > apply(DiskStore.scala:124) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:126) > at org.apache.spark.storage.BlockManager.getLocalValues( > BlockManager.scala:520) > at org.apache.spark.storage.BlockManager.get(BlockManager.scala:693) > at org.apache.spark.storage.BlockManager.getOrElseUpdate( > BlockManager.scala:753) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:96) > at org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ > scheduler$DAGScheduler$$failJobAndIndependentStages( > DAGScheduler.scala:1690) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( > DAGScheduler.scala:1678) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( > DAGScheduler.scala:1677) > at scala.collection.mutable.ResizableArray$class.foreach( > ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at org.apache.spark.scheduler.DAGScheduler.abortStage( > DAGScheduler.scala:1677) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$ > handleTaskSetFailed$1.apply(DAGScheduler.scala:855) > at org.apache.spark.scheduler.DAGScheduler$$anonfun$ > handleTaskSetFailed$1.apply(DAGScheduler.scala:855) > at scala.Option.foreach(Option.scala:257) > at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed( > DAGScheduler.scala:855) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > doOnReceive(DAGScheduler.scala:1905) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > onReceive(DAGScheduler.scala:1860) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. > onReceive(DAGScheduler.scala:1849) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062) > at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:446) > at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:280) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:214) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalArgumentException: Size exceeds > Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) > at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4. > apply(DiskStore.scala:125) > at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4. > apply(DiskStore.scala:124) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) > at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:126) > at org.apache.spark.storage.BlockManager.getLocalValues( > BlockManager.scala:520) > at org.apache.spark.storage.BlockManager.get(BlockManager.scala:693) > at org.apache.spark.storage.BlockManager.getOrElseUpdate( > BlockManager.scala:753) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:96) > at org.apache.spark.scheduler.ShuffleMapTask.runTask( > ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1149) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:624) > ... 1 more > > According to JDBC connector *numPartitions* determines the maximum write > parallelism, we use coalesce instead to set partitions to 20 to avoid > generating too much load on MySQL and it seems that the issue is due to the > fact that Spark is repartitioning from 1000 to 20 before writing. > > If the issue is due to this configuration which is the best practice to > write a dataframe with lots of partitions into our database limiting the > concurrent connections? > > Thanks > > > Davide B. > ------------------------------------------------------------ > ---------------------------- > Davide Brambilla > ContentWise R&D > ContentWise > davide.brambi...@contentwise.tv > phone: +39 02 49517001 mobile: 345 71 13 800 > ContentWise SrL - Via Schiaffino, 11 - 20158 Milano (MI) – ITALY > <https://maps.google.com/?q=Via+Schiaffino,+11+-+20158+Milano+(MI)+%E2%80%93+ITALY&entry=gmail&source=g> > -- Sent from my iPhone