[ 
https://issues.apache.org/jira/browse/SPARK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcin Tustin updated SPARK-14163:
----------------------------------
    Description: 
The bug exists in these lines: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala#L59-L61

In this code
{code:title=SumEvaluator.scala|borderStyle=solid}
          val degreesOfFreedom = (counter.count - 1).toInt
          new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 - 
(1 - confidence) / 2)
{code}

If {{counter.count}} is 1 or 0 then {{new TDistribution(degreesOfFreedom)}} 
will raise an exception because {{TDistribution}} expects its 
{{degreesOfFreedom}} parameter to be 1 or greater.

An example (written in pyspark):

{noformat}
>>> rdd = sc.parallelize([1])
>>> rdd.countApprox(1000,0.5)
16/03/25 18:09:36 INFO SparkContext: Starting job: sumApprox at 
NativeMethodAccessorImpl.java:-2
16/03/25 18:09:36 INFO DAGScheduler: Got job 1 (sumApprox at 
NativeMethodAccessorImpl.java:-2) with 2 output partitions
16/03/25 18:09:36 INFO DAGScheduler: Final stage: ResultStage 1(sumApprox at 
NativeMethodAccessorImpl.java:-2)
16/03/25 18:09:36 INFO DAGScheduler: Parents of final stage: List()
16/03/25 18:09:36 INFO DAGScheduler: Missing parents: List()
16/03/25 18:09:36 INFO DAGScheduler: Submitting ResultStage 1 
(MapPartitionsRDD[6] at mapPartitions at SerDeUtil.scala:147), which has no 
missing parents
16/03/25 18:09:36 INFO MemoryStore: ensureFreeSpace(4328) called with 
curMem=7140, maxMem=555755765
16/03/25 18:09:36 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 4.2 KB, free 530.0 MB)
16/03/25 18:09:36 INFO MemoryStore: ensureFreeSpace(2821) called with 
curMem=11468, maxMem=555755765
16/03/25 18:09:36 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in 
memory (estimated size 2.8 KB, free 530.0 MB)
16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
10.5.5.158:56348 (size: 2.8 KB, free: 530.0 MB)
16/03/25 18:09:36 INFO SparkContext: Created broadcast 1 from broadcast at 
DAGScheduler.scala:861
16/03/25 18:09:36 INFO DAGScheduler: Submitting 2 missing tasks from 
ResultStage 1 (MapPartitionsRDD[6] at mapPartitions at SerDeUtil.scala:147)
16/03/25 18:09:36 INFO YarnScheduler: Adding task set 1.0 with 2 tasks
16/03/25 18:09:36 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 
r-hadoopeco-data-66215afe.hbinternal.com, PROCESS_LOCAL, 2071 bytes)
16/03/25 18:09:36 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 
r-hadoopeco-data-84205b1c.hbinternal.com, PROCESS_LOCAL, 2090 bytes)
16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
r-hadoopeco-data-66215afe.hbinternal.com:43011 (size: 2.8 KB, free: 530.0 MB)
16/03/25 18:09:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) 
in 66 ms on r-hadoopeco-data-66215afe.hbinternal.com (1/2)
16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
r-hadoopeco-data-84205b1c.hbinternal.com:41613 (size: 2.8 KB, free: 530.0 MB)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2227, in 
countApprox
    return int(drdd.sumApprox(timeout, confidence))
  File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2243, in 
sumApprox
    r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
  File 
"/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
 line 538, in __call__
  File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/utils.py", line 36, in 
deco
    return f(*a, **kw)
  File 
"/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", 
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o69.sumApprox.
: org.apache.commons.math3.exception.NotStrictlyPositiveException: degrees of 
freedom (0)
        at 
org.apache.commons.math3.distribution.TDistribution.<init>(TDistribution.java:120)
        at 
org.apache.commons.math3.distribution.TDistribution.<init>(TDistribution.java:86)
        at 
org.apache.commons.math3.distribution.TDistribution.<init>(TDistribution.java:63)
        at 
org.apache.spark.partial.SumEvaluator.currentResult(SumEvaluator.scala:61)
        at 
org.apache.spark.partial.SumEvaluator.currentResult(SumEvaluator.scala:29)
        at 
org.apache.spark.partial.ApproximateActionListener.awaitResult(ApproximateActionListener.scala:79)
        at 
org.apache.spark.scheduler.DAGScheduler.runApproximateJob(DAGScheduler.scala:586)
        at 
org.apache.spark.SparkContext.runApproximateJob(SparkContext.scala:1962)
        at 
org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sumApprox$1.apply(DoubleRDDFunctions.scala:99)
        at 
org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sumApprox$1.apply(DoubleRDDFunctions.scala:96)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
        at 
org.apache.spark.rdd.DoubleRDDFunctions.sumApprox(DoubleRDDFunctions.scala:96)
        at 
org.apache.spark.api.java.JavaDoubleRDD.sumApprox(JavaDoubleRDD.scala:224)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

Note that this only happens occasionally, as befits a probabilistic counting 
method. A good way to reproduce is:

{code}
rdd = sc.parallelize([1]); for x in xrange(1000): rdd.countApprox(1+x,0.5)
{code}


  was:
The bug exists in these lines: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala#L59-L61

In this code
          val degreesOfFreedom = (counter.count - 1).toInt
          new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 - 
(1 - confidence) / 2)

If counter.count is 1 or 0 new TDistribution(degreesOfFreedom) will raise an 
exception because TDistribution expects its degressOfFreedom parameter to be 1 
or greater.

An example (written in pyspark):

>>> rdd = sc.parallelize([1])
>>> rdd.countApprox(1000,0.5)
16/03/25 18:09:36 INFO SparkContext: Starting job: sumApprox at 
NativeMethodAccessorImpl.java:-2
16/03/25 18:09:36 INFO DAGScheduler: Got job 1 (sumApprox at 
NativeMethodAccessorImpl.java:-2) with 2 output partitions
16/03/25 18:09:36 INFO DAGScheduler: Final stage: ResultStage 1(sumApprox at 
NativeMethodAccessorImpl.java:-2)
16/03/25 18:09:36 INFO DAGScheduler: Parents of final stage: List()
16/03/25 18:09:36 INFO DAGScheduler: Missing parents: List()
16/03/25 18:09:36 INFO DAGScheduler: Submitting ResultStage 1 
(MapPartitionsRDD[6] at mapPartitions at SerDeUtil.scala:147), which has no 
missing parents
16/03/25 18:09:36 INFO MemoryStore: ensureFreeSpace(4328) called with 
curMem=7140, maxMem=555755765
16/03/25 18:09:36 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 4.2 KB, free 530.0 MB)
16/03/25 18:09:36 INFO MemoryStore: ensureFreeSpace(2821) called with 
curMem=11468, maxMem=555755765
16/03/25 18:09:36 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in 
memory (estimated size 2.8 KB, free 530.0 MB)
16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
10.5.5.158:56348 (size: 2.8 KB, free: 530.0 MB)
16/03/25 18:09:36 INFO SparkContext: Created broadcast 1 from broadcast at 
DAGScheduler.scala:861
16/03/25 18:09:36 INFO DAGScheduler: Submitting 2 missing tasks from 
ResultStage 1 (MapPartitionsRDD[6] at mapPartitions at SerDeUtil.scala:147)
16/03/25 18:09:36 INFO YarnScheduler: Adding task set 1.0 with 2 tasks
16/03/25 18:09:36 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 
r-hadoopeco-data-66215afe.hbinternal.com, PROCESS_LOCAL, 2071 bytes)
16/03/25 18:09:36 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 
r-hadoopeco-data-84205b1c.hbinternal.com, PROCESS_LOCAL, 2090 bytes)
16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
r-hadoopeco-data-66215afe.hbinternal.com:43011 (size: 2.8 KB, free: 530.0 MB)
16/03/25 18:09:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) 
in 66 ms on r-hadoopeco-data-66215afe.hbinternal.com (1/2)
16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 
r-hadoopeco-data-84205b1c.hbinternal.com:41613 (size: 2.8 KB, free: 530.0 MB)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2227, in 
countApprox
    return int(drdd.sumApprox(timeout, confidence))
  File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2243, in 
sumApprox
    r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
  File 
"/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
 line 538, in __call__
  File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/utils.py", line 36, in 
deco
    return f(*a, **kw)
  File 
"/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", 
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o69.sumApprox.
: org.apache.commons.math3.exception.NotStrictlyPositiveException: degrees of 
freedom (0)
        at 
org.apache.commons.math3.distribution.TDistribution.<init>(TDistribution.java:120)
        at 
org.apache.commons.math3.distribution.TDistribution.<init>(TDistribution.java:86)
        at 
org.apache.commons.math3.distribution.TDistribution.<init>(TDistribution.java:63)
        at 
org.apache.spark.partial.SumEvaluator.currentResult(SumEvaluator.scala:61)
        at 
org.apache.spark.partial.SumEvaluator.currentResult(SumEvaluator.scala:29)
        at 
org.apache.spark.partial.ApproximateActionListener.awaitResult(ApproximateActionListener.scala:79)
        at 
org.apache.spark.scheduler.DAGScheduler.runApproximateJob(DAGScheduler.scala:586)
        at 
org.apache.spark.SparkContext.runApproximateJob(SparkContext.scala:1962)
        at 
org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sumApprox$1.apply(DoubleRDDFunctions.scala:99)
        at 
org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sumApprox$1.apply(DoubleRDDFunctions.scala:96)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
        at 
org.apache.spark.rdd.DoubleRDDFunctions.sumApprox(DoubleRDDFunctions.scala:96)
        at 
org.apache.spark.api.java.JavaDoubleRDD.sumApprox(JavaDoubleRDD.scala:224)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)

Note that this only happens occasionally, as befits a probabilistic counting 
method. A good way to reproduce is:

rdd = sc.parallelize([1]); for x in xrange(1000): rdd.countApprox(1+x,0.5)




> SumEvaluator and countApprox cannot reliably handle RDDs of size 1
> ------------------------------------------------------------------
>
>                 Key: SPARK-14163
>                 URL: https://issues.apache.org/jira/browse/SPARK-14163
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.5.2, 1.6.0, 1.6.1, 2.0.0
>            Reporter: Marcin Tustin
>
> The bug exists in these lines: 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala#L59-L61
> In this code
> {code:title=SumEvaluator.scala|borderStyle=solid}
>           val degreesOfFreedom = (counter.count - 1).toInt
>           new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 
> - (1 - confidence) / 2)
> {code}
> If {{counter.count}} is 1 or 0 then {{new TDistribution(degreesOfFreedom)}} 
> will raise an exception because {{TDistribution}} expects its 
> {{degreesOfFreedom}} parameter to be 1 or greater.
> An example (written in pyspark):
> {noformat}
> >>> rdd = sc.parallelize([1])
> >>> rdd.countApprox(1000,0.5)
> 16/03/25 18:09:36 INFO SparkContext: Starting job: sumApprox at 
> NativeMethodAccessorImpl.java:-2
> 16/03/25 18:09:36 INFO DAGScheduler: Got job 1 (sumApprox at 
> NativeMethodAccessorImpl.java:-2) with 2 output partitions
> 16/03/25 18:09:36 INFO DAGScheduler: Final stage: ResultStage 1(sumApprox at 
> NativeMethodAccessorImpl.java:-2)
> 16/03/25 18:09:36 INFO DAGScheduler: Parents of final stage: List()
> 16/03/25 18:09:36 INFO DAGScheduler: Missing parents: List()
> 16/03/25 18:09:36 INFO DAGScheduler: Submitting ResultStage 1 
> (MapPartitionsRDD[6] at mapPartitions at SerDeUtil.scala:147), which has no 
> missing parents
> 16/03/25 18:09:36 INFO MemoryStore: ensureFreeSpace(4328) called with 
> curMem=7140, maxMem=555755765
> 16/03/25 18:09:36 INFO MemoryStore: Block broadcast_1 stored as values in 
> memory (estimated size 4.2 KB, free 530.0 MB)
> 16/03/25 18:09:36 INFO MemoryStore: ensureFreeSpace(2821) called with 
> curMem=11468, maxMem=555755765
> 16/03/25 18:09:36 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes 
> in memory (estimated size 2.8 KB, free 530.0 MB)
> 16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on 10.5.5.158:56348 (size: 2.8 KB, free: 530.0 MB)
> 16/03/25 18:09:36 INFO SparkContext: Created broadcast 1 from broadcast at 
> DAGScheduler.scala:861
> 16/03/25 18:09:36 INFO DAGScheduler: Submitting 2 missing tasks from 
> ResultStage 1 (MapPartitionsRDD[6] at mapPartitions at SerDeUtil.scala:147)
> 16/03/25 18:09:36 INFO YarnScheduler: Adding task set 1.0 with 2 tasks
> 16/03/25 18:09:36 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 
> r-hadoopeco-data-66215afe.hbinternal.com, PROCESS_LOCAL, 2071 bytes)
> 16/03/25 18:09:36 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 
> r-hadoopeco-data-84205b1c.hbinternal.com, PROCESS_LOCAL, 2090 bytes)
> 16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on r-hadoopeco-data-66215afe.hbinternal.com:43011 (size: 2.8 KB, free: 530.0 
> MB)
> 16/03/25 18:09:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) 
> in 66 ms on r-hadoopeco-data-66215afe.hbinternal.com (1/2)
> 16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on r-hadoopeco-data-84205b1c.hbinternal.com:41613 (size: 2.8 KB, free: 530.0 
> MB)
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
>   File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2227, in 
> countApprox
>     return int(drdd.sumApprox(timeout, confidence))
>   File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2243, in 
> sumApprox
>     r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
>   File 
> "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>  line 538, in __call__
>   File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/utils.py", line 36, in 
> deco
>     return f(*a, **kw)
>   File 
> "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
>  line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o69.sumApprox.
> : org.apache.commons.math3.exception.NotStrictlyPositiveException: degrees of 
> freedom (0)
>       at 
> org.apache.commons.math3.distribution.TDistribution.<init>(TDistribution.java:120)
>       at 
> org.apache.commons.math3.distribution.TDistribution.<init>(TDistribution.java:86)
>       at 
> org.apache.commons.math3.distribution.TDistribution.<init>(TDistribution.java:63)
>       at 
> org.apache.spark.partial.SumEvaluator.currentResult(SumEvaluator.scala:61)
>       at 
> org.apache.spark.partial.SumEvaluator.currentResult(SumEvaluator.scala:29)
>       at 
> org.apache.spark.partial.ApproximateActionListener.awaitResult(ApproximateActionListener.scala:79)
>       at 
> org.apache.spark.scheduler.DAGScheduler.runApproximateJob(DAGScheduler.scala:586)
>       at 
> org.apache.spark.SparkContext.runApproximateJob(SparkContext.scala:1962)
>       at 
> org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sumApprox$1.apply(DoubleRDDFunctions.scala:99)
>       at 
> org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sumApprox$1.apply(DoubleRDDFunctions.scala:96)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
>       at 
> org.apache.spark.rdd.DoubleRDDFunctions.sumApprox(DoubleRDDFunctions.scala:96)
>       at 
> org.apache.spark.api.java.JavaDoubleRDD.sumApprox(JavaDoubleRDD.scala:224)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>       at py4j.Gateway.invoke(Gateway.java:259)
>       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>       at py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at py4j.GatewayConnection.run(GatewayConnection.java:207)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> Note that this only happens occasionally, as befits a probabilistic counting 
> method. A good way to reproduce is:
> {code}
> rdd = sc.parallelize([1]); for x in xrange(1000): rdd.countApprox(1+x,0.5)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to