[ https://issues.apache.org/jira/browse/SPARK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15212280#comment-15212280 ]
Sean Owen commented on SPARK-14163: ----------------------------------- Yeah, if n <= 1, stuff is already messed up here since the variance will be undefined. If n == 0 I suppose you'd have the same case as when 0 outputs have been merged and can return the same result straight away. When n == 1 I suppose the same thing can be returned, but in that case at least sumEstimate is valid. > SumEvaluator and countApprox cannot reliably handle RDDs of size 1 > ------------------------------------------------------------------ > > Key: SPARK-14163 > URL: https://issues.apache.org/jira/browse/SPARK-14163 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.5.2, 1.6.0, 1.6.1, 2.0.0 > Reporter: Marcin Tustin > > The bug exists in these lines: > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala#L59-L61 > In this code > {code:title=SumEvaluator.scala|borderStyle=solid} > val degreesOfFreedom = (counter.count - 1).toInt > new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 > - (1 - confidence) / 2) > {code} > If {{counter.count}} is 1 or 0 then {{new TDistribution(degreesOfFreedom)}} > will raise an exception because {{TDistribution}} expects its > {{degreesOfFreedom}} parameter to be 1 or greater. > An example (written in pyspark): > {noformat} > >>> rdd = sc.parallelize([1]) > >>> rdd.countApprox(1000,0.5) > 16/03/25 18:09:36 INFO SparkContext: Starting job: sumApprox at > NativeMethodAccessorImpl.java:-2 > 16/03/25 18:09:36 INFO DAGScheduler: Got job 1 (sumApprox at > NativeMethodAccessorImpl.java:-2) with 2 output partitions > 16/03/25 18:09:36 INFO DAGScheduler: Final stage: ResultStage 1(sumApprox at > NativeMethodAccessorImpl.java:-2) > 16/03/25 18:09:36 INFO DAGScheduler: Parents of final stage: List() > 16/03/25 18:09:36 INFO DAGScheduler: Missing parents: List() > 16/03/25 18:09:36 INFO DAGScheduler: Submitting ResultStage 1 > (MapPartitionsRDD[6] at mapPartitions at SerDeUtil.scala:147), which has no > missing parents > 16/03/25 18:09:36 INFO MemoryStore: ensureFreeSpace(4328) called with > curMem=7140, maxMem=555755765 > 16/03/25 18:09:36 INFO MemoryStore: Block broadcast_1 stored as values in > memory (estimated size 4.2 KB, free 530.0 MB) > 16/03/25 18:09:36 INFO MemoryStore: ensureFreeSpace(2821) called with > curMem=11468, maxMem=555755765 > 16/03/25 18:09:36 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes > in memory (estimated size 2.8 KB, free 530.0 MB) > 16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory > on 10.5.5.158:56348 (size: 2.8 KB, free: 530.0 MB) > 16/03/25 18:09:36 INFO SparkContext: Created broadcast 1 from broadcast at > DAGScheduler.scala:861 > 16/03/25 18:09:36 INFO DAGScheduler: Submitting 2 missing tasks from > ResultStage 1 (MapPartitionsRDD[6] at mapPartitions at SerDeUtil.scala:147) > 16/03/25 18:09:36 INFO YarnScheduler: Adding task set 1.0 with 2 tasks > 16/03/25 18:09:36 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, > r-hadoopeco-data-66215afe.hbinternal.com, PROCESS_LOCAL, 2071 bytes) > 16/03/25 18:09:36 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, > r-hadoopeco-data-84205b1c.hbinternal.com, PROCESS_LOCAL, 2090 bytes) > 16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory > on r-hadoopeco-data-66215afe.hbinternal.com:43011 (size: 2.8 KB, free: 530.0 > MB) > 16/03/25 18:09:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) > in 66 ms on r-hadoopeco-data-66215afe.hbinternal.com (1/2) > 16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory > on r-hadoopeco-data-84205b1c.hbinternal.com:41613 (size: 2.8 KB, free: 530.0 > MB) > Traceback (most recent call last): > File "<stdin>", line 1, in <module> > File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2227, in > countApprox > return int(drdd.sumApprox(timeout, confidence)) > File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2243, in > sumApprox > r = jdrdd.sumApprox(timeout, confidence).getFinalValue() > File > "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", > line 538, in __call__ > File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/utils.py", line 36, in > deco > return f(*a, **kw) > File > "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", > line 300, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o69.sumApprox. > : org.apache.commons.math3.exception.NotStrictlyPositiveException: degrees of > freedom (0) > at > org.apache.commons.math3.distribution.TDistribution.<init>(TDistribution.java:120) > at > org.apache.commons.math3.distribution.TDistribution.<init>(TDistribution.java:86) > at > org.apache.commons.math3.distribution.TDistribution.<init>(TDistribution.java:63) > at > org.apache.spark.partial.SumEvaluator.currentResult(SumEvaluator.scala:61) > at > org.apache.spark.partial.SumEvaluator.currentResult(SumEvaluator.scala:29) > at > org.apache.spark.partial.ApproximateActionListener.awaitResult(ApproximateActionListener.scala:79) > at > org.apache.spark.scheduler.DAGScheduler.runApproximateJob(DAGScheduler.scala:586) > at > org.apache.spark.SparkContext.runApproximateJob(SparkContext.scala:1962) > at > org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sumApprox$1.apply(DoubleRDDFunctions.scala:99) > at > org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sumApprox$1.apply(DoubleRDDFunctions.scala:96) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) > at > org.apache.spark.rdd.DoubleRDDFunctions.sumApprox(DoubleRDDFunctions.scala:96) > at > org.apache.spark.api.java.JavaDoubleRDD.sumApprox(JavaDoubleRDD.scala:224) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) > at py4j.Gateway.invoke(Gateway.java:259) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:207) > at java.lang.Thread.run(Thread.java:745) > {noformat} > Note that this only happens occasionally, as befits a probabilistic counting > method. A good way to reproduce is: > {code} > rdd = sc.parallelize([1]); for x in xrange(1000): rdd.countApprox(1+x,0.5) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org