[jira] [Updated] (SPARK-14163) SumEvaluator and countApprox cannot reliably handle RDDs of size 1

2016-04-03 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-14163:
--
Assignee: Marcin Tustin

> SumEvaluator and countApprox cannot reliably handle RDDs of size 1
> --
>
> Key: SPARK-14163
> URL: https://issues.apache.org/jira/browse/SPARK-14163
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.5.2, 1.6.0, 1.6.1, 2.0.0
>Reporter: Marcin Tustin
>Assignee: Marcin Tustin
>Priority: Minor
> Fix For: 2.0.0
>
>
> The bug exists in these lines: 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala#L59-L61
> In this code
> {code:title=SumEvaluator.scala|borderStyle=solid}
>   val degreesOfFreedom = (counter.count - 1).toInt
>   new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 
> - (1 - confidence) / 2)
> {code}
> If {{counter.count}} is 1 or 0 then {{new TDistribution(degreesOfFreedom)}} 
> will raise an exception because {{TDistribution}} expects its 
> {{degreesOfFreedom}} parameter to be 1 or greater.
> An example (written in pyspark):
> {noformat}
> >>> rdd = sc.parallelize([1])
> >>> rdd.countApprox(1000,0.5)
> 16/03/25 18:09:36 INFO SparkContext: Starting job: sumApprox at 
> NativeMethodAccessorImpl.java:-2
> 16/03/25 18:09:36 INFO DAGScheduler: Got job 1 (sumApprox at 
> NativeMethodAccessorImpl.java:-2) with 2 output partitions
> 16/03/25 18:09:36 INFO DAGScheduler: Final stage: ResultStage 1(sumApprox at 
> NativeMethodAccessorImpl.java:-2)
> 16/03/25 18:09:36 INFO DAGScheduler: Parents of final stage: List()
> 16/03/25 18:09:36 INFO DAGScheduler: Missing parents: List()
> 16/03/25 18:09:36 INFO DAGScheduler: Submitting ResultStage 1 
> (MapPartitionsRDD[6] at mapPartitions at SerDeUtil.scala:147), which has no 
> missing parents
> 16/03/25 18:09:36 INFO MemoryStore: ensureFreeSpace(4328) called with 
> curMem=7140, maxMem=555755765
> 16/03/25 18:09:36 INFO MemoryStore: Block broadcast_1 stored as values in 
> memory (estimated size 4.2 KB, free 530.0 MB)
> 16/03/25 18:09:36 INFO MemoryStore: ensureFreeSpace(2821) called with 
> curMem=11468, maxMem=555755765
> 16/03/25 18:09:36 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes 
> in memory (estimated size 2.8 KB, free 530.0 MB)
> 16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on 10.5.5.158:56348 (size: 2.8 KB, free: 530.0 MB)
> 16/03/25 18:09:36 INFO SparkContext: Created broadcast 1 from broadcast at 
> DAGScheduler.scala:861
> 16/03/25 18:09:36 INFO DAGScheduler: Submitting 2 missing tasks from 
> ResultStage 1 (MapPartitionsRDD[6] at mapPartitions at SerDeUtil.scala:147)
> 16/03/25 18:09:36 INFO YarnScheduler: Adding task set 1.0 with 2 tasks
> 16/03/25 18:09:36 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 
> r-hadoopeco-data-66215afe.hbinternal.com, PROCESS_LOCAL, 2071 bytes)
> 16/03/25 18:09:36 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 
> r-hadoopeco-data-84205b1c.hbinternal.com, PROCESS_LOCAL, 2090 bytes)
> 16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on r-hadoopeco-data-66215afe.hbinternal.com:43011 (size: 2.8 KB, free: 530.0 
> MB)
> 16/03/25 18:09:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) 
> in 66 ms on r-hadoopeco-data-66215afe.hbinternal.com (1/2)
> 16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on r-hadoopeco-data-84205b1c.hbinternal.com:41613 (size: 2.8 KB, free: 530.0 
> MB)
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2227, in 
> countApprox
> return int(drdd.sumApprox(timeout, confidence))
>   File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2243, in 
> sumApprox
> r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
>   File 
> "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>  line 538, in __call__
>   File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/utils.py", line 36, in 
> deco
> return f(*a, **kw)
>   File 
> "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>  line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o69.sumApprox.
> : org.apache.commons.math3.exception.NotStrictlyPositiveException: degrees of 
> freedom (0)
>   at 
> org.apache.commons.math3.distribution.TDistribution.(TDistribution.java:120)
>   at 
> org.apache.commons.math3.distribution.TDistribution.(TDistribution.java:86)
>   at 
> org.apache.commons.math3.distribution.TDistribution.(TDistribution.java:63)
>   at 
> 

[jira] [Updated] (SPARK-14163) SumEvaluator and countApprox cannot reliably handle RDDs of size 1

2016-03-25 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-14163:
--
Priority: Minor  (was: Major)

> SumEvaluator and countApprox cannot reliably handle RDDs of size 1
> --
>
> Key: SPARK-14163
> URL: https://issues.apache.org/jira/browse/SPARK-14163
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.5.2, 1.6.0, 1.6.1, 2.0.0
>Reporter: Marcin Tustin
>Priority: Minor
>
> The bug exists in these lines: 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala#L59-L61
> In this code
> {code:title=SumEvaluator.scala|borderStyle=solid}
>   val degreesOfFreedom = (counter.count - 1).toInt
>   new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 
> - (1 - confidence) / 2)
> {code}
> If {{counter.count}} is 1 or 0 then {{new TDistribution(degreesOfFreedom)}} 
> will raise an exception because {{TDistribution}} expects its 
> {{degreesOfFreedom}} parameter to be 1 or greater.
> An example (written in pyspark):
> {noformat}
> >>> rdd = sc.parallelize([1])
> >>> rdd.countApprox(1000,0.5)
> 16/03/25 18:09:36 INFO SparkContext: Starting job: sumApprox at 
> NativeMethodAccessorImpl.java:-2
> 16/03/25 18:09:36 INFO DAGScheduler: Got job 1 (sumApprox at 
> NativeMethodAccessorImpl.java:-2) with 2 output partitions
> 16/03/25 18:09:36 INFO DAGScheduler: Final stage: ResultStage 1(sumApprox at 
> NativeMethodAccessorImpl.java:-2)
> 16/03/25 18:09:36 INFO DAGScheduler: Parents of final stage: List()
> 16/03/25 18:09:36 INFO DAGScheduler: Missing parents: List()
> 16/03/25 18:09:36 INFO DAGScheduler: Submitting ResultStage 1 
> (MapPartitionsRDD[6] at mapPartitions at SerDeUtil.scala:147), which has no 
> missing parents
> 16/03/25 18:09:36 INFO MemoryStore: ensureFreeSpace(4328) called with 
> curMem=7140, maxMem=555755765
> 16/03/25 18:09:36 INFO MemoryStore: Block broadcast_1 stored as values in 
> memory (estimated size 4.2 KB, free 530.0 MB)
> 16/03/25 18:09:36 INFO MemoryStore: ensureFreeSpace(2821) called with 
> curMem=11468, maxMem=555755765
> 16/03/25 18:09:36 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes 
> in memory (estimated size 2.8 KB, free 530.0 MB)
> 16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on 10.5.5.158:56348 (size: 2.8 KB, free: 530.0 MB)
> 16/03/25 18:09:36 INFO SparkContext: Created broadcast 1 from broadcast at 
> DAGScheduler.scala:861
> 16/03/25 18:09:36 INFO DAGScheduler: Submitting 2 missing tasks from 
> ResultStage 1 (MapPartitionsRDD[6] at mapPartitions at SerDeUtil.scala:147)
> 16/03/25 18:09:36 INFO YarnScheduler: Adding task set 1.0 with 2 tasks
> 16/03/25 18:09:36 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 
> r-hadoopeco-data-66215afe.hbinternal.com, PROCESS_LOCAL, 2071 bytes)
> 16/03/25 18:09:36 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 
> r-hadoopeco-data-84205b1c.hbinternal.com, PROCESS_LOCAL, 2090 bytes)
> 16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on r-hadoopeco-data-66215afe.hbinternal.com:43011 (size: 2.8 KB, free: 530.0 
> MB)
> 16/03/25 18:09:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) 
> in 66 ms on r-hadoopeco-data-66215afe.hbinternal.com (1/2)
> 16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on r-hadoopeco-data-84205b1c.hbinternal.com:41613 (size: 2.8 KB, free: 530.0 
> MB)
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2227, in 
> countApprox
> return int(drdd.sumApprox(timeout, confidence))
>   File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2243, in 
> sumApprox
> r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
>   File 
> "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>  line 538, in __call__
>   File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/utils.py", line 36, in 
> deco
> return f(*a, **kw)
>   File 
> "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>  line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o69.sumApprox.
> : org.apache.commons.math3.exception.NotStrictlyPositiveException: degrees of 
> freedom (0)
>   at 
> org.apache.commons.math3.distribution.TDistribution.(TDistribution.java:120)
>   at 
> org.apache.commons.math3.distribution.TDistribution.(TDistribution.java:86)
>   at 
> org.apache.commons.math3.distribution.TDistribution.(TDistribution.java:63)
>   at 
> org.apache.spark.partial.SumEvaluator.currentResult(SumEvaluator.scala:61)
>   at 
> 

[jira] [Updated] (SPARK-14163) SumEvaluator and countApprox cannot reliably handle RDDs of size 1

2016-03-25 Thread Marcin Tustin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcin Tustin updated SPARK-14163:
--
Description: 
The bug exists in these lines: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala#L59-L61

In this code
{code:title=SumEvaluator.scala|borderStyle=solid}
  val degreesOfFreedom = (counter.count - 1).toInt
  new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 - 
(1 - confidence) / 2)
{code}

If {{counter.count}} is 1 or 0 then {{new TDistribution(degreesOfFreedom)}} 
will raise an exception because {{TDistribution}} expects its 
{{degreesOfFreedom}} parameter to be 1 or greater.

An example (written in pyspark):

{noformat}
>>> rdd = sc.parallelize([1])
>>> rdd.countApprox(1000,0.5)
16/03/25 18:09:36 INFO SparkContext: Starting job: sumApprox at 
NativeMethodAccessorImpl.java:-2
16/03/25 18:09:36 INFO DAGScheduler: Got job 1 (sumApprox at 
NativeMethodAccessorImpl.java:-2) with 2 output partitions
16/03/25 18:09:36 INFO DAGScheduler: Final stage: ResultStage 1(sumApprox at 
NativeMethodAccessorImpl.java:-2)
16/03/25 18:09:36 INFO DAGScheduler: Parents of final stage: List()
16/03/25 18:09:36 INFO DAGScheduler: Missing parents: List()
16/03/25 18:09:36 INFO DAGScheduler: Submitting ResultStage 1 
(MapPartitionsRDD[6] at mapPartitions at SerDeUtil.scala:147), which has no 
missing parents
16/03/25 18:09:36 INFO MemoryStore: ensureFreeSpace(4328) called with 
curMem=7140, maxMem=555755765
16/03/25 18:09:36 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 4.2 KB, free 530.0 MB)
16/03/25 18:09:36 INFO MemoryStore: ensureFreeSpace(2821) called with 
curMem=11468, maxMem=555755765
16/03/25 18:09:36 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in 
memory (estimated size 2.8 KB, free 530.0 MB)
16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
10.5.5.158:56348 (size: 2.8 KB, free: 530.0 MB)
16/03/25 18:09:36 INFO SparkContext: Created broadcast 1 from broadcast at 
DAGScheduler.scala:861
16/03/25 18:09:36 INFO DAGScheduler: Submitting 2 missing tasks from 
ResultStage 1 (MapPartitionsRDD[6] at mapPartitions at SerDeUtil.scala:147)
16/03/25 18:09:36 INFO YarnScheduler: Adding task set 1.0 with 2 tasks
16/03/25 18:09:36 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 
r-hadoopeco-data-66215afe.hbinternal.com, PROCESS_LOCAL, 2071 bytes)
16/03/25 18:09:36 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 
r-hadoopeco-data-84205b1c.hbinternal.com, PROCESS_LOCAL, 2090 bytes)
16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
r-hadoopeco-data-66215afe.hbinternal.com:43011 (size: 2.8 KB, free: 530.0 MB)
16/03/25 18:09:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) 
in 66 ms on r-hadoopeco-data-66215afe.hbinternal.com (1/2)
16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
r-hadoopeco-data-84205b1c.hbinternal.com:41613 (size: 2.8 KB, free: 530.0 MB)
Traceback (most recent call last):
  File "", line 1, in 
  File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2227, in 
countApprox
return int(drdd.sumApprox(timeout, confidence))
  File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2243, in 
sumApprox
r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
  File 
"/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
 line 538, in __call__
  File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/utils.py", line 36, in 
deco
return f(*a, **kw)
  File 
"/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", 
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o69.sumApprox.
: org.apache.commons.math3.exception.NotStrictlyPositiveException: degrees of 
freedom (0)
at 
org.apache.commons.math3.distribution.TDistribution.(TDistribution.java:120)
at 
org.apache.commons.math3.distribution.TDistribution.(TDistribution.java:86)
at 
org.apache.commons.math3.distribution.TDistribution.(TDistribution.java:63)
at 
org.apache.spark.partial.SumEvaluator.currentResult(SumEvaluator.scala:61)
at 
org.apache.spark.partial.SumEvaluator.currentResult(SumEvaluator.scala:29)
at 
org.apache.spark.partial.ApproximateActionListener.awaitResult(ApproximateActionListener.scala:79)
at 
org.apache.spark.scheduler.DAGScheduler.runApproximateJob(DAGScheduler.scala:586)
at 
org.apache.spark.SparkContext.runApproximateJob(SparkContext.scala:1962)
at 
org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sumApprox$1.apply(DoubleRDDFunctions.scala:99)
at 
org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sumApprox$1.apply(DoubleRDDFunctions.scala:96)
at