[ https://issues.apache.org/jira/browse/SPARK-15822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324331#comment-15324331 ]
Pete Robbins commented on SPARK-15822: -------------------------------------- I'm still looking into this tracing back through the code using Memory Analyzer on the core dumps. Currently on the stack we have the following generated code {code} public Object generate(Object[] references) { return new GeneratedIterator(references); } final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { private Object[] references; private boolean sort_needToSort; private org.apache.spark.sql.execution.SortExec sort_plan; private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter; private org.apache.spark.executor.TaskMetrics sort_metrics; private scala.collection.Iterator<UnsafeRow> sort_sortedIter; private boolean agg_initAgg; private boolean agg_bufIsNull; private long agg_bufValue; private org.apache.spark.sql.execution.aggregate.HashAggregateExec agg_plan; private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap; private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter; private org.apache.spark.unsafe.KVIterator agg_mapIter; private org.apache.spark.sql.execution.metric.SQLMetric agg_peakMemory; private org.apache.spark.sql.execution.metric.SQLMetric agg_spillSize; private scala.collection.Iterator inputadapter_input; private UnsafeRow agg_result; private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder; private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter; private UnsafeRow agg_result1; private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder1; private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter1; private org.apache.spark.sql.execution.metric.SQLMetric sort_numOutputRows; private org.apache.spark.sql.execution.metric.SQLMetric sort_aggTime; private org.apache.spark.sql.execution.metric.SQLMetric sort_peakMemory; private org.apache.spark.sql.execution.metric.SQLMetric sort_spillSize; private org.apache.spark.sql.execution.metric.SQLMetric sort_sortTime; public GeneratedIterator(Object[] references) { this.references = references; } public void init(int index, scala.collection.Iterator inputs[]) { partitionIndex = index; sort_needToSort = true; this.sort_plan = (org.apache.spark.sql.execution.SortExec) references[0]; sort_sorter = sort_plan.createSorter(); sort_metrics = org.apache.spark.TaskContext.get().taskMetrics(); agg_initAgg = false; this.agg_plan = (org.apache.spark.sql.execution.aggregate.HashAggregateExec) references[1]; this.agg_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) references[2]; this.agg_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) references[3]; inputadapter_input = inputs[0]; agg_result = new UnsafeRow(2); this.agg_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 64); this.agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 2); agg_result1 = new UnsafeRow(3); this.agg_holder1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result1, 64); this.agg_rowWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder1, 3); this.sort_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) references[4]; this.sort_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[5]; this.sort_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) references[6]; this.sort_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) references[7]; this.sort_sortTime = (org.apache.spark.sql.execution.metric.SQLMetric) references[8]; } private void agg_doAggregateWithKeys() throws java.io.IOException { agg_hashMap = agg_plan.createHashMap(); while (inputadapter_input.hasNext()) { InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); boolean inputadapter_isNull = inputadapter_row.isNullAt(0); UTF8String inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getUTF8String(0)); boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1); UTF8String inputadapter_value1 = inputadapter_isNull1 ? null : (inputadapter_row.getUTF8String(1)); long inputadapter_value2 = inputadapter_row.getLong(2); UnsafeRow agg_unsafeRowAggBuffer = null; org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row agg_vectorizedAggBuffer = null; if (agg_vectorizedAggBuffer == null) { // generate grouping key agg_holder.reset(); agg_rowWriter.zeroOutNullBytes(); if (inputadapter_isNull) { agg_rowWriter.setNullAt(0); } else { agg_rowWriter.write(0, inputadapter_value); } if (inputadapter_isNull1) { agg_rowWriter.setNullAt(1); } else { agg_rowWriter.write(1, inputadapter_value1); } agg_result.setTotalSize(agg_holder.totalSize()); int agg_value6 = 42; if (!inputadapter_isNull) { agg_value6 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value.getBaseObject(), inputadapter_value.getBaseOffset(), inputadapter_value.numBytes(), agg_value6); } if (!inputadapter_isNull1) { agg_value6 = org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value1.getBaseObject(), inputadapter_value1.getBaseOffset(), inputadapter_value1.numBytes(), agg_value6); } if (true) { // try to get the buffer from hash map agg_unsafeRowAggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value6); } if (agg_unsafeRowAggBuffer == null) { if (agg_sorter == null) { agg_sorter = agg_hashMap.destructAndCreateExternalSorter(); } else { agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter()); } // the hash map had be spilled, it should have enough memory now, // try to allocate buffer again. agg_unsafeRowAggBuffer = agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value6); if (agg_unsafeRowAggBuffer == null) { // failed to allocate the first page throw new OutOfMemoryError("No enough memory for aggregation"); } } } if (agg_vectorizedAggBuffer != null) { // update vectorized row } else { // update unsafe row // common sub-expressions // evaluate aggregate function long agg_value10 = agg_unsafeRowAggBuffer.getLong(0); long agg_value9 = -1L; agg_value9 = agg_value10 + inputadapter_value2; // update unsafe row buffer agg_unsafeRowAggBuffer.setLong(0, agg_value9); } if (shouldStop()) return; } agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter, agg_peakMemory, agg_spillSize); } private void sort_addToSorter() throws java.io.IOException { if (!agg_initAgg) { agg_initAgg = true; long sort_beforeAgg = System.nanoTime(); agg_doAggregateWithKeys(); sort_aggTime.add((System.nanoTime() - sort_beforeAgg) / 1000000); } // output the result while (agg_mapIter.next()) { sort_numOutputRows.add(1); UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey(); UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue(); boolean agg_isNull11 = agg_aggKey.isNullAt(0); UTF8String agg_value12 = agg_isNull11 ? null : (agg_aggKey.getUTF8String(0)); boolean agg_isNull12 = agg_aggKey.isNullAt(1); UTF8String agg_value13 = agg_isNull12 ? null : (agg_aggKey.getUTF8String(1)); long agg_value14 = agg_aggBuffer.getLong(0); agg_holder1.reset(); agg_rowWriter1.zeroOutNullBytes(); if (agg_isNull11) { agg_rowWriter1.setNullAt(0); } else { agg_rowWriter1.write(0, agg_value12); } if (agg_isNull12) { agg_rowWriter1.setNullAt(1); } else { agg_rowWriter1.write(1, agg_value13); } agg_rowWriter1.write(2, agg_value14); agg_result1.setTotalSize(agg_holder1.totalSize()); sort_sorter.insertRow((UnsafeRow)agg_result1); if (shouldStop()) return; } agg_mapIter.close(); if (agg_sorter == null) { agg_hashMap.free(); } } protected void processNext() throws java.io.IOException { if (sort_needToSort) { long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled(); sort_addToSorter(); sort_sortedIter = sort_sorter.sort(); sort_sortTime.add(sort_sorter.getSortTimeNanos() / 1000000); sort_peakMemory.add(sort_sorter.getPeakMemoryUsage()); sort_spillSize.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore); sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage()); sort_needToSort = false; } while (sort_sortedIter.hasNext()) { UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next(); append(sort_outputRow); if (shouldStop()) return; } } } {code} the agg_mapiter is return a key and value of Unsafe rows key: baseObject null baseOffset 140362032947276 numFields 2 sizeInBytes 40 butSetWidthInBytes 8 ie. it is corrupt and value is similarly corrupt Now investigating at which point the corruption occurred > segmentation violation in o.a.s.unsafe.types.UTF8String with > spark.memory.offHeap.enabled=true > ---------------------------------------------------------------------------------------------- > > Key: SPARK-15822 > URL: https://issues.apache.org/jira/browse/SPARK-15822 > Project: Spark > Issue Type: Bug > Affects Versions: 2.0.0 > Environment: linux amd64 > openjdk version "1.8.0_91" > OpenJDK Runtime Environment (build 1.8.0_91-b14) > OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode) > Reporter: Pete Robbins > Assignee: Herman van Hovell > Priority: Blocker > > Executors fail with segmentation violation while running application with > spark.memory.offHeap.enabled true > spark.memory.offHeap.size 512m > {noformat} > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x00007f4559b4d4bd, pid=14182, tid=139935319750400 > # > # JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 1.8.0_91-b14) > # Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64 > compressed oops) > # Problematic frame: > # J 4816 C2 > org.apache.spark.unsafe.types.UTF8String.compareTo(Lorg/apache/spark/unsafe/types/UTF8String;)I > (64 bytes) @ 0x00007f4559b4d4bd [0x00007f4559b4d460+0x5d] > {noformat} > We initially saw this on IBM java on PowerPC box but is recreatable on linux > with OpenJDK. On linux with IBM Java 8 we see a null pointer exception at the > same code point: > {noformat} > 16/06/08 11:14:58 ERROR Executor: Exception in task 1.0 in stage 5.0 (TID 48) > java.lang.NullPointerException > at > org.apache.spark.unsafe.types.UTF8String.compareTo(UTF8String.java:831) > at org.apache.spark.unsafe.types.UTF8String.compare(UTF8String.java:844) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.findNextInnerJoinRows$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$doExecute$2$$anon$2.hasNext(WholeStageCodegenExec.scala:377) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30) > at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:664) > at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37) > at > org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1365) > at > org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1362) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:757) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:757) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:282) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.lang.Thread.run(Thread.java:785) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org