[ANNOUNCE] Apache Spark 2.1.3
We are happy to announce the availability of Spark 2.1.3! Apache Spark 2.1.3 is a maintenance release, based on the branch-2.1 maintenance branch of Spark. We strongly recommend all 2.1.x users to upgrade to this stable release. The release notes are available at http://spark.apache.org/releases/spark-release-2-1-3.html To download Apache Spark 2.1.3 visit http://spark.apache.org/downloads.html. This version of Spark is also available on Maven and PyPI. We would like to acknowledge all community members for contributing patches to this release. Special thanks to Marcelo Vanzin for making the release, I'm just handling the last few details this time.
Unable to acquire N bytes of memory, got 0
Is it normal to get exception like : "Previous exception in task: Unable to acquire 65536 bytes of memory, got 0" In my understanding, in current memory management, no enough memory will anyway trigger spill so such kind of exception will not be thrown. Unless some operators are not implemented with spill so many objects remained in memory. Please correct me if I'm wrong. Here pasted the stack trace: Previous exception in task: Unable to acquire 65536 bytes of memory, got 0 org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157) org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98) org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset(UnsafeInMemorySorter.java:186) org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:229) org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:204) org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:283) org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:96) org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:348) org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:403) org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.sort_addToSorter$(generated.java:32) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(generated.java:41) org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.findNextInnerJoinRows$(generated.java:407) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.agg_doAggregateWithKeys$(generated.java:199) org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage10.processNext(generated.java:695) org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$12$$anon$2.hasNext(WholeStageCodegenExec.scala:633) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) org.apache.spark.scheduler.Task.run(Task.scala:109) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:139) at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:117) at org.apache.spark.scheduler.Task.run(Task.scala:119) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Chrysan Wu Phone:+86 17717640807
Re: Repartition not working on a csv file
I prefer not to do a .cache() due to memory limits. But I did try a persist() with DISK_ONLY I did the repartition(), followed by a .count() followed by a persist() of DISK_ONLY That didn't change the number of tasks either On Sun, Jul 1, 2018, 15:50 Alexander Czech wrote: > You could try to force a repartion right at that point by producing a > cached version of the DF with .cache() if memory allows it. > > On Sun, Jul 1, 2018 at 5:04 AM, Abdeali Kothari > wrote: > >> I've tried that too - it doesn't work. It does a repetition, but not >> right after the broadcast join - it does a lot more processing and does the >> repetition right before I do my next sortmerge join (stage 12 I described >> above) >> As the heavy processing is before the sort merge join, it still doesn't >> help >> >> On Sun, Jul 1, 2018, 08:30 yujhe.li wrote: >> >>> Abdeali Kothari wrote >>> > My entire CSV is less than 20KB. >>> > By somewhere in between, I do a broadcast join with 3500 records in >>> > another >>> > file. >>> > After the broadcast join I have a lot of processing to do. Overall, the >>> > time to process a single record goes up-to 5mins on 1 executor >>> > >>> > I'm trying to increase the partitions that my data is in so that I >>> have at >>> > maximum 1 record per executor (currently it sets 2 tasks, and hence 2 >>> > executors... I want it to split it into at least 100 tasks at a time >>> so I >>> > get 5 records per task => ~20min per task) >>> >>> Maybe you can try repartition(100) after broadcast join, the task number >>> should change to 100 for your later transformation. >>> >>> >>> >>> -- >>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >>> >
Re: Repartition not working on a csv file
You could try to force a repartion right at that point by producing a cached version of the DF with .cache() if memory allows it. On Sun, Jul 1, 2018 at 5:04 AM, Abdeali Kothari wrote: > I've tried that too - it doesn't work. It does a repetition, but not right > after the broadcast join - it does a lot more processing and does the > repetition right before I do my next sortmerge join (stage 12 I described > above) > As the heavy processing is before the sort merge join, it still doesn't > help > > On Sun, Jul 1, 2018, 08:30 yujhe.li wrote: > >> Abdeali Kothari wrote >> > My entire CSV is less than 20KB. >> > By somewhere in between, I do a broadcast join with 3500 records in >> > another >> > file. >> > After the broadcast join I have a lot of processing to do. Overall, the >> > time to process a single record goes up-to 5mins on 1 executor >> > >> > I'm trying to increase the partitions that my data is in so that I have >> at >> > maximum 1 record per executor (currently it sets 2 tasks, and hence 2 >> > executors... I want it to split it into at least 100 tasks at a time so >> I >> > get 5 records per task => ~20min per task) >> >> Maybe you can try repartition(100) after broadcast join, the task number >> should change to 100 for your later transformation. >> >> >> >> -- >> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >>