[ 
https://issues.apache.org/jira/browse/SPARK-15822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324331#comment-15324331
 ] 

Pete Robbins commented on SPARK-15822:
--------------------------------------

I'm still looking into this tracing back through the code using Memory Analyzer 
on the core dumps.

Currently on the stack we have the following generated code

{code}
public Object generate(Object[] references) {
return new GeneratedIterator(references);
}

final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
private Object[] references;
private boolean sort_needToSort;
private org.apache.spark.sql.execution.SortExec sort_plan;
private org.apache.spark.sql.execution.UnsafeExternalRowSorter sort_sorter;
private org.apache.spark.executor.TaskMetrics sort_metrics;
private scala.collection.Iterator<UnsafeRow> sort_sortedIter;
private boolean agg_initAgg;
private boolean agg_bufIsNull;
private long agg_bufValue;
private org.apache.spark.sql.execution.aggregate.HashAggregateExec agg_plan;
private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap 
agg_hashMap;
private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter;
private org.apache.spark.unsafe.KVIterator agg_mapIter;
private org.apache.spark.sql.execution.metric.SQLMetric agg_peakMemory;
private org.apache.spark.sql.execution.metric.SQLMetric agg_spillSize;
private scala.collection.Iterator inputadapter_input;
private UnsafeRow agg_result;
private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
agg_holder;
private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
agg_rowWriter;
private UnsafeRow agg_result1;
private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
agg_holder1;
private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
agg_rowWriter1;
private org.apache.spark.sql.execution.metric.SQLMetric sort_numOutputRows;
private org.apache.spark.sql.execution.metric.SQLMetric sort_aggTime;
private org.apache.spark.sql.execution.metric.SQLMetric sort_peakMemory;
private org.apache.spark.sql.execution.metric.SQLMetric sort_spillSize;
private org.apache.spark.sql.execution.metric.SQLMetric sort_sortTime;

public GeneratedIterator(Object[] references) {
this.references = references;
}

public void init(int index, scala.collection.Iterator inputs[]) {
partitionIndex = index;
sort_needToSort = true;
this.sort_plan = (org.apache.spark.sql.execution.SortExec) references[0];
sort_sorter = sort_plan.createSorter();
sort_metrics = org.apache.spark.TaskContext.get().taskMetrics();

agg_initAgg = false;

this.agg_plan = (org.apache.spark.sql.execution.aggregate.HashAggregateExec) 
references[1];

this.agg_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) 
references[2];
this.agg_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) 
references[3];
inputadapter_input = inputs[0];
agg_result = new UnsafeRow(2);
this.agg_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result, 64);
this.agg_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder, 
2);
agg_result1 = new UnsafeRow(3);
this.agg_holder1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result1, 64);
this.agg_rowWriter1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_holder1, 
3);
this.sort_numOutputRows = (org.apache.spark.sql.execution.metric.SQLMetric) 
references[4];
this.sort_aggTime = (org.apache.spark.sql.execution.metric.SQLMetric) 
references[5];
this.sort_peakMemory = (org.apache.spark.sql.execution.metric.SQLMetric) 
references[6];
this.sort_spillSize = (org.apache.spark.sql.execution.metric.SQLMetric) 
references[7];
this.sort_sortTime = (org.apache.spark.sql.execution.metric.SQLMetric) 
references[8];
}

private void agg_doAggregateWithKeys() throws java.io.IOException {
agg_hashMap = agg_plan.createHashMap();

while (inputadapter_input.hasNext()) {
InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
UTF8String inputadapter_value = inputadapter_isNull ? null : 
(inputadapter_row.getUTF8String(0));
boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
UTF8String inputadapter_value1 = inputadapter_isNull1 ? null : 
(inputadapter_row.getUTF8String(1));
long inputadapter_value2 = inputadapter_row.getLong(2);

UnsafeRow agg_unsafeRowAggBuffer = null;
org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row 
agg_vectorizedAggBuffer = null;

if (agg_vectorizedAggBuffer == null) {
// generate grouping key
agg_holder.reset();

agg_rowWriter.zeroOutNullBytes();

if (inputadapter_isNull) {
agg_rowWriter.setNullAt(0);
} else {
agg_rowWriter.write(0, inputadapter_value);
}

if (inputadapter_isNull1) {
agg_rowWriter.setNullAt(1);
} else {
agg_rowWriter.write(1, inputadapter_value1);
}
agg_result.setTotalSize(agg_holder.totalSize());
int agg_value6 = 42;

if (!inputadapter_isNull) {
agg_value6 = 
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value.getBaseObject(),
 inputadapter_value.getBaseOffset(), inputadapter_value.numBytes(), agg_value6);
}

if (!inputadapter_isNull1) {
agg_value6 = 
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(inputadapter_value1.getBaseObject(),
 inputadapter_value1.getBaseOffset(), inputadapter_value1.numBytes(), 
agg_value6);
}
if (true) {
// try to get the buffer from hash map
agg_unsafeRowAggBuffer =
agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value6);
}
if (agg_unsafeRowAggBuffer == null) {
if (agg_sorter == null) {
agg_sorter = agg_hashMap.destructAndCreateExternalSorter();
} else {
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
}

// the hash map had be spilled, it should have enough memory now,
// try  to allocate buffer again.
agg_unsafeRowAggBuffer =
agg_hashMap.getAggregationBufferFromUnsafeRow(agg_result, agg_value6);
if (agg_unsafeRowAggBuffer == null) {
// failed to allocate the first page
throw new OutOfMemoryError("No enough memory for aggregation");
}
}
}

if (agg_vectorizedAggBuffer != null) {
// update vectorized row

} else {
// update unsafe row

// common sub-expressions

// evaluate aggregate function
long agg_value10 = agg_unsafeRowAggBuffer.getLong(0);

long agg_value9 = -1L;
agg_value9 = agg_value10 + inputadapter_value2;
// update unsafe row buffer
agg_unsafeRowAggBuffer.setLong(0, agg_value9);

}
if (shouldStop()) return;
}

agg_mapIter = agg_plan.finishAggregate(agg_hashMap, agg_sorter, agg_peakMemory, 
agg_spillSize);
}

private void sort_addToSorter() throws java.io.IOException {
if (!agg_initAgg) {
agg_initAgg = true;
long sort_beforeAgg = System.nanoTime();
agg_doAggregateWithKeys();
sort_aggTime.add((System.nanoTime() - sort_beforeAgg) / 1000000);
}

// output the result

while (agg_mapIter.next()) {
sort_numOutputRows.add(1);
UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
UnsafeRow agg_aggBuffer = (UnsafeRow) agg_mapIter.getValue();

boolean agg_isNull11 = agg_aggKey.isNullAt(0);
UTF8String agg_value12 = agg_isNull11 ? null : (agg_aggKey.getUTF8String(0));
boolean agg_isNull12 = agg_aggKey.isNullAt(1);
UTF8String agg_value13 = agg_isNull12 ? null : (agg_aggKey.getUTF8String(1));
long agg_value14 = agg_aggBuffer.getLong(0);

agg_holder1.reset();

agg_rowWriter1.zeroOutNullBytes();

if (agg_isNull11) {
agg_rowWriter1.setNullAt(0);
} else {
agg_rowWriter1.write(0, agg_value12);
}

if (agg_isNull12) {
agg_rowWriter1.setNullAt(1);
} else {
agg_rowWriter1.write(1, agg_value13);
}

agg_rowWriter1.write(2, agg_value14);
agg_result1.setTotalSize(agg_holder1.totalSize());
sort_sorter.insertRow((UnsafeRow)agg_result1);

if (shouldStop()) return;
}

agg_mapIter.close();
if (agg_sorter == null) {
agg_hashMap.free();
}

}

protected void processNext() throws java.io.IOException {
if (sort_needToSort) {
long sort_spillSizeBefore = sort_metrics.memoryBytesSpilled();
sort_addToSorter();
sort_sortedIter = sort_sorter.sort();
sort_sortTime.add(sort_sorter.getSortTimeNanos() / 1000000);
sort_peakMemory.add(sort_sorter.getPeakMemoryUsage());
sort_spillSize.add(sort_metrics.memoryBytesSpilled() - sort_spillSizeBefore);
sort_metrics.incPeakExecutionMemory(sort_sorter.getPeakMemoryUsage());
sort_needToSort = false;
}

while (sort_sortedIter.hasNext()) {
UnsafeRow sort_outputRow = (UnsafeRow)sort_sortedIter.next();

append(sort_outputRow);

if (shouldStop()) return;
}
}
}

{code}

the agg_mapiter is return a key and value of Unsafe rows 
key:
baseObject null
baseOffset 140362032947276
numFields 2
sizeInBytes 40
butSetWidthInBytes 8

ie. it is corrupt and value is similarly corrupt

Now investigating at which point the corruption occurred


> segmentation violation in o.a.s.unsafe.types.UTF8String with 
> spark.memory.offHeap.enabled=true
> ----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-15822
>                 URL: https://issues.apache.org/jira/browse/SPARK-15822
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>         Environment: linux amd64
> openjdk version "1.8.0_91"
> OpenJDK Runtime Environment (build 1.8.0_91-b14)
> OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode)
>            Reporter: Pete Robbins
>            Assignee: Herman van Hovell
>            Priority: Blocker
>
> Executors fail with segmentation violation while running application with
> spark.memory.offHeap.enabled true
> spark.memory.offHeap.size 512m
> {noformat}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x00007f4559b4d4bd, pid=14182, tid=139935319750400
> #
> # JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 1.8.0_91-b14)
> # Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64 
> compressed oops)
> # Problematic frame:
> # J 4816 C2 
> org.apache.spark.unsafe.types.UTF8String.compareTo(Lorg/apache/spark/unsafe/types/UTF8String;)I
>  (64 bytes) @ 0x00007f4559b4d4bd [0x00007f4559b4d460+0x5d]
> {noformat}
> We initially saw this on IBM java on PowerPC box but is recreatable on linux 
> with OpenJDK. On linux with IBM Java 8 we see a null pointer exception at the 
> same code point:
> {noformat}
> 16/06/08 11:14:58 ERROR Executor: Exception in task 1.0 in stage 5.0 (TID 48)
> java.lang.NullPointerException
>       at 
> org.apache.spark.unsafe.types.UTF8String.compareTo(UTF8String.java:831)
>       at org.apache.spark.unsafe.types.UTF8String.compare(UTF8String.java:844)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.findNextInnerJoinRows$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$doExecute$2$$anon$2.hasNext(WholeStageCodegenExec.scala:377)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at 
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
>       at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:664)
>       at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1365)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1362)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:757)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:757)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>       at org.apache.spark.scheduler.Task.run(Task.scala:85)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       at java.lang.Thread.run(Thread.java:785)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to