Re: NegativeArraySizeException / segfault

2016-06-08 Thread Andres Perez
We were able to reproduce it with a minimal example. I've opened a jira
issue:

https://issues.apache.org/jira/browse/SPARK-15825

On Wed, Jun 8, 2016 at 12:43 PM, Koert Kuipers  wrote:

> great!
>
> we weren't able to reproduce it because the unit tests use a
> broadcast-join while on the cluster it uses sort-merge-join. so the issue
> is in sort-merge-join.
>
> we are now able to reproduce it in tests using
> spark.sql.autoBroadcastJoinThreshold=-1
> it produces weird looking garbled results in the join.
> hoping to get a minimal reproducible example soon.
>
> On Wed, Jun 8, 2016 at 10:24 AM, Pete Robbins  wrote:
>
>> I just raised https://issues.apache.org/jira/browse/SPARK-15822 for a
>> similar looking issue. Analyzing the core dump from the segv with Memory
>> Analyzer it looks very much like a UTF8String is very corrupt.
>>
>> Cheers,
>>
>>
>> On Fri, 27 May 2016 at 21:00 Koert Kuipers  wrote:
>>
>>> hello all,
>>> after getting our unit tests to pass on spark 2.0.0-SNAPSHOT we are now
>>> trying to run some algorithms at scale on our cluster.
>>> unfortunately this means that when i see errors i am having a harder
>>> time boiling it down to a small reproducible example.
>>>
>>> today we are running an iterative algo using the dataset api and we are
>>> seeing tasks fail with errors which seem to related to unsafe operations.
>>> the same tasks succeed without issues in our unit tests.
>>>
>>> i see either:
>>>
>>> 16/05/27 12:54:46 ERROR executor.Executor: Exception in task 31.0 in
>>> stage 21.0 (TID 1073)
>>> java.lang.NegativeArraySizeException
>>> at
>>> org.apache.spark.unsafe.types.UTF8String.getBytes(UTF8String.java:229)
>>> at
>>> org.apache.spark.unsafe.types.UTF8String.toString(UTF8String.java:821)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown
>>> Source)
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>>> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>>> Source)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>>> Source)
>>> at
>>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>> at
>>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$7$$anon$1.hasNext(WholeStageCodegenExec.scala:359)
>>> at
>>> org.apache.spark.sql.execution.aggregate.SortBasedAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortBasedAggregateExec.scala:74)
>>> at
>>> org.apache.spark.sql.execution.aggregate.SortBasedAggregateExec$$anonfun$doExecute$1$$anonfun$3.apply(SortBasedAggregateExec.scala:71)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:775)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:775)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>
>>> or alternatively:
>>>
>>> # A fatal error has been detected by the Java Runtime Environment:
>>> #
>>> #  SIGSEGV (0xb) at pc=0x7fe571041cba, pid=2450, tid=140622965913344
>>> #
>>> # JRE version: Java(TM) SE Runtime Environment (7.0_75-b13) (build
>>> 1.7.0_75-b13)
>>> # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.75-b04 mixed mode
>>> linux-amd64 compressed oops)
>>> # Problematic frame:
>>> # v  ~StubRoutines::jbyte_disjoint_arraycopy

Dataset reduceByKey

2016-05-19 Thread Andres Perez
Hi all,

We were in the process of porting an RDD program to one which uses
Datasets. Most things were easy to transition, but one hole in
functionality we found was the ability to reduce a Dataset by key,
something akin to PairRDDFunctions.reduceByKey. Our first attempt of adding
the functionality ourselves involved creating a KeyValueGroupedDataset and
calling reduceGroups to get the reduced Dataset.

  class RichPairDataset[K, V: ClassTag](val ds: Dataset[(K, V)]) {
def reduceByKey(func: (V, V) => V)(implicit e1: Encoder[K], e2:
Encoder[V], e3: Encoder[(K, V)]): Dataset[(K, V)] =
  ds.groupByKey(_._1).reduceGroups { (tup1, tup2) => (tup1._1,
func(tup1._2, tup2._2)) }.map { case (k, (_, v)) => (k, v) }
  }

Note that the functions passed into .reduceGroups takes in the key-value
pair. It'd be nicer to pass in a function that maps just the values, i.e.
reduceGroups(func). This would require the ability to modify the values of
the KeyValueGroupedDataset (which is returned by the .groupByKey call on a
Dataset). Such a function (e.g., KeyValuedGroupedDataset.mapValues(func: V
=> U)) does not currently exist.

The more important issue, however, is the inefficiency of .reduceGroups.
The function does not support partial aggregation (reducing map-side), and
as a result requires shuffling all the data in the Dataset. A more
efficient alternative that that we explored involved creating a Dataset
from the KeyValueGroupedDataset by creating an Aggregator and passing it as
a TypedColumn to KeyValueGroupedDataset's .agg function. Unfortunately, the
Aggregator necessitated the creation of a zero to create a valid monoid.
However, the zero is dependent on the reduce function. The zero for a
function such as addition on Ints would be different from the zero for
taking the minimum over Ints, for example. The Aggregator requires that we
not break the rule of reduce(a, zero) == a. To do this we had to create an
Aggregator with a buffer type that stores the value along with a null flag
(using Scala's nice Option syntax yielded some mysterious errors that I
haven't worked through yet, unfortunately), used by the zero element to
signal that it should not participate in the reduce function.

-Andy


right outer joins on Datasets

2016-05-19 Thread Andres Perez
Hi all, I'm getting some odd behavior when using the joinWith functionality
for Datasets. Here is a small test case:

  val left = List(("a", 1), ("a", 2), ("b", 3), ("c", 4)).toDS()
  val right = List(("a", "x"), ("b", "y"), ("d", "z")).toDS()

  val joined = left.toDF("k", "v").as[(String, Int)].alias("left")
.joinWith(right.toDF("k", "u").as[(String, String)].alias("right"),
functions.col("left.k") === functions.col("right.k"), "right_outer")
.as[((String, Int), (String, String))]
.map { case ((k, v), (_, u)) => (k, (v, u)) }.as[(String, (Int,
String))]

I would expect the result of this right-join to be:

  (a,(1,x))
  (a,(2,x))
  (b,(3,y))
  (d,(null,z))

but instead I'm getting:

  (a,(1,x))
  (a,(2,x))
  (b,(3,y))
  (null,(-1,z))

Not that the key for the final tuple is null instead of "d". (Also, is
there a reason the value for the left-side of the last tuple is -1 and not
null?)

-Andy