[ 
https://issues.apache.org/jira/browse/SPARK-15822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324475#comment-15324475
 ] 

Adam Roberts edited comment on SPARK-15822 at 6/10/16 9:39 PM:
---------------------------------------------------------------

Herman, here's the application, note my HashedRelation comment is only a theory 
at this stage (edit: now looks irrelevant)

{code}
import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

object SQLFlights {
  def displayTop(title: String, df: DataFrame) {
    println(title);
    df.sort(desc("rank")).take(10).foreach(println)
  }

  def main(args: Array[String]) {
    val inputfile = args(0)
    val airport = args(1)

    val conf = new SparkConf().setAppName("SQL Flights")
    val sqlContext = 
org.apache.spark.sql.SparkSession.builder.config(conf).getOrCreate()

    val df = sqlContext.read.format("com.databricks.spark.csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(inputfile)
        .cache()

    val arrivals = df.filter(s"Dest = '$airport'").cache();
    val departures = df.filter(s"Origin = '$airport'").cache();
    val departuresByCarrier = departures.groupBy("Dest", 
"UniqueCarrier").count().withColumnRenamed("count", "total")
    val a = departures.filter("Cancelled != 0 and CancellationCode = 'A'")
    println("done a")
    val b = a.groupBy("Dest", "UniqueCarrier").count()
    println("done b")
    val c = b.join(departuresByCarrier, Seq("Dest", "UniqueCarrier"))
    println("done c")
    val d = c.selectExpr("Dest", "UniqueCarrier", "round(count * 100 / total, 
2) as rank")
    println("done d")
    displayTop("Top Departure Carrier Cancellations:", d)
  }
}
{code}

in conf/spark-env.sh:
{code}
export SPARK_WORKER_CORES=2
export SPARK_WORKER_INSTANCES=2
{code}

in conf/spark-defaults.conf:
{code}
spark.sql.warehouse.dir /home/aroberts/sql-flights
{code}

Submit including --packages com.databricks:spark-csv_2.11:1.4.0

The job will complete but if you look in the $SPARK_HOME/work dir you'll see 
that after our queries are done, executors will die due to the segv and by 
looking in the stderr files we can see the problem.

Data set to use as the first arg can be downloaded at 
http://stat-computing.org/dataexpo/2009/2008.csv.bz2 (after extracting we can 
do head -1000000 to create a smaller file and still get the problem without 
waiting so long).

As the second arg you can use ORD as the airport name.


was (Author: aroberts):
Herman, here's the application, note my HashedRelation comment is only a theory 
at this stage.

{code}
import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

object SQLFlights {
  def displayTop(title: String, df: DataFrame) {
    println(title);
    df.sort(desc("rank")).take(10).foreach(println)
  }

  def main(args: Array[String]) {
    val inputfile = args(0)
    val airport = args(1)

    val conf = new SparkConf().setAppName("SQL Flights")
    val sqlContext = 
org.apache.spark.sql.SparkSession.builder.config(conf).getOrCreate()

    val df = sqlContext.read.format("com.databricks.spark.csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(inputfile)
        .cache()

    val arrivals = df.filter(s"Dest = '$airport'").cache();
    val departures = df.filter(s"Origin = '$airport'").cache();
    val departuresByCarrier = departures.groupBy("Dest", 
"UniqueCarrier").count().withColumnRenamed("count", "total")
    val a = departures.filter("Cancelled != 0 and CancellationCode = 'A'")
    println("done a")
    val b = a.groupBy("Dest", "UniqueCarrier").count()
    println("done b")
    val c = b.join(departuresByCarrier, Seq("Dest", "UniqueCarrier"))
    println("done c")
    val d = c.selectExpr("Dest", "UniqueCarrier", "round(count * 100 / total, 
2) as rank")
    println("done d")
    displayTop("Top Departure Carrier Cancellations:", d)
  }
}
{code}

in conf/spark-env.sh:
{code}
export SPARK_WORKER_CORES=2
export SPARK_WORKER_INSTANCES=2
{code}

in conf/spark-defaults.conf:
{code}
spark.sql.warehouse.dir /home/aroberts/sql-flights
{code}

Submit including --packages com.databricks:spark-csv_2.11:1.4.0

The job will complete but if you look in the $SPARK_HOME/work dir you'll see 
that after our queries are done, executors will die due to the segv and by 
looking in the stderr files we can see the problem.

> segmentation violation in o.a.s.unsafe.types.UTF8String 
> --------------------------------------------------------
>
>                 Key: SPARK-15822
>                 URL: https://issues.apache.org/jira/browse/SPARK-15822
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>         Environment: linux amd64
> openjdk version "1.8.0_91"
> OpenJDK Runtime Environment (build 1.8.0_91-b14)
> OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode)
>            Reporter: Pete Robbins
>            Assignee: Herman van Hovell
>            Priority: Blocker
>
> Executors fail with segmentation violation while running application with
> spark.memory.offHeap.enabled true
> spark.memory.offHeap.size 512m
> Also now reproduced with 
> spark.memory.offHeap.enabled false
> {noformat}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x00007f4559b4d4bd, pid=14182, tid=139935319750400
> #
> # JRE version: OpenJDK Runtime Environment (8.0_91-b14) (build 1.8.0_91-b14)
> # Java VM: OpenJDK 64-Bit Server VM (25.91-b14 mixed mode linux-amd64 
> compressed oops)
> # Problematic frame:
> # J 4816 C2 
> org.apache.spark.unsafe.types.UTF8String.compareTo(Lorg/apache/spark/unsafe/types/UTF8String;)I
>  (64 bytes) @ 0x00007f4559b4d4bd [0x00007f4559b4d460+0x5d]
> {noformat}
> We initially saw this on IBM java on PowerPC box but is recreatable on linux 
> with OpenJDK. On linux with IBM Java 8 we see a null pointer exception at the 
> same code point:
> {noformat}
> 16/06/08 11:14:58 ERROR Executor: Exception in task 1.0 in stage 5.0 (TID 48)
> java.lang.NullPointerException
>       at 
> org.apache.spark.unsafe.types.UTF8String.compareTo(UTF8String.java:831)
>       at org.apache.spark.unsafe.types.UTF8String.compare(UTF8String.java:844)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.findNextInnerJoinRows$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$doExecute$2$$anon$2.hasNext(WholeStageCodegenExec.scala:377)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>       at 
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
>       at org.spark_project.guava.collect.Ordering.leastOf(Ordering.java:664)
>       at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1365)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$30.apply(RDD.scala:1362)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:757)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:757)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>       at org.apache.spark.scheduler.Task.run(Task.scala:85)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       at java.lang.Thread.run(Thread.java:785)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to