[jira] [Created] (SPARK-26899) CountMinSketchAgg ExpressionDescription is not so correct
tomzhu created SPARK-26899: -- Summary: CountMinSketchAgg ExpressionDescription is not so correct Key: SPARK-26899 URL: https://issues.apache.org/jira/browse/SPARK-26899 Project: Spark Issue Type: Documentation Components: SQL Affects Versions: 2.4.0 Reporter: tomzhu Hi, all, there are some not-so-correct comment in CountMinSketchAgg.scala, the ExpressionDescription says: {code:java} @ExpressionDescription( usage = """ _FUNC_(col, eps, confidence, seed) - Returns a count-min sketch of a column with the given esp, confidence and seed. The result is an array of bytes, which can be deserialized to a `CountMinSketch` before usage. Count-min sketch is a probabilistic data structure used for cardinality estimation using sub-linear space. """, since = "2.2.0") {code} , *the Count-min sketch is a probabilistic data structure used for cardinality estimation*, ** actually, Count-min sketch is mainly used for point query, self_join size query, how can it support cardinality estimation? a fix might be better. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22593) submitMissingTask in DagScheduler will call partitions function many times whch may be time consuming
[ https://issues.apache.org/jira/browse/SPARK-22593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16268525#comment-16268525 ] tomzhu commented on SPARK-22593: oh, my mistake. sorry to bother you both. :( > submitMissingTask in DagScheduler will call partitions function many times > whch may be time consuming > - > > Key: SPARK-22593 > URL: https://issues.apache.org/jira/browse/SPARK-22593 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: tomzhu >Priority: Minor > > when dagScheduler call submitMissing task, will create tasks and calling > stage.rdd.partitions, it will can many times which may be time-consuming, the > code is: > {quote} > val tasks: Seq[Task[_]] = try { > val serializedTaskMetrics = > closureSerializer.serialize(stage.latestInfo.taskMetrics).array() > stage match { > case stage: ShuffleMapStage => > stage.pendingPartitions.clear() > partitionsToCompute.map { id => > val locs = taskIdToLocations(id) > val part = stage.rdd.partitions(id) > stage.pendingPartitions += id > new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, > taskBinary, part, locs, properties, serializedTaskMetrics, > Option(jobId), > Option(sc.applicationId), sc.applicationAttemptId) > } > case stage: ResultStage => > partitionsToCompute.map { id => > val p: Int = stage.partitions(id) > val part = stage.rdd.partitions(p) //here is a little time > consuming. > val locs = taskIdToLocations(id) > new ResultTask(stage.id, stage.latestInfo.attemptId, > taskBinary, part, locs, id, properties, serializedTaskMetrics, > Option(jobId), Option(sc.applicationId), > sc.applicationAttemptId) > } > } > } > {quote} > for example, for a parallelCollectionRdd with 3 slices or partitions, to > create task, the code will call stage.rdd.partitions three times, since > stage.rdd.partitions will call getPartitions, so getPartions will call three > times, it is a little time-cousuming. the stage.rdd.partitions code : > {quote} > final def partitions: Array[Partition] = { > checkpointRDD.map(_.partitions).getOrElse { > if (partitions_ == null) { > partitions_ = getPartitions > partitions_.zipWithIndex.foreach { case (partition, index) => > require(partition.index == index, > s"partitions($index).partition == ${partition.index}, but it > should equal $index") > } > } > partitions_ > } > } > {quote} > it would be better to avoid this. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22593) submitMissingTask in DagScheduler will call partitions function many times whch may be time consuming
[ https://issues.apache.org/jira/browse/SPARK-22593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16268408#comment-16268408 ] tomzhu commented on SPARK-22593: yes, u are right, it just waste a little time which it's not important compared to most cases where the most significant time is the job execution on servers. I haven't test yet, anyway, it's not a bug. it's neither a question or issue, I do this because it may be better to avoid this. > submitMissingTask in DagScheduler will call partitions function many times > whch may be time consuming > - > > Key: SPARK-22593 > URL: https://issues.apache.org/jira/browse/SPARK-22593 > Project: Spark > Issue Type: Question > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: tomzhu >Priority: Minor > > when dagScheduler call submitMissing task, will create tasks and calling > stage.rdd.partitions, it will can many times which may be time-consuming, the > code is: > {quote} > val tasks: Seq[Task[_]] = try { > val serializedTaskMetrics = > closureSerializer.serialize(stage.latestInfo.taskMetrics).array() > stage match { > case stage: ShuffleMapStage => > stage.pendingPartitions.clear() > partitionsToCompute.map { id => > val locs = taskIdToLocations(id) > val part = stage.rdd.partitions(id) > stage.pendingPartitions += id > new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, > taskBinary, part, locs, properties, serializedTaskMetrics, > Option(jobId), > Option(sc.applicationId), sc.applicationAttemptId) > } > case stage: ResultStage => > partitionsToCompute.map { id => > val p: Int = stage.partitions(id) > val part = stage.rdd.partitions(p) //here is a little time > consuming. > val locs = taskIdToLocations(id) > new ResultTask(stage.id, stage.latestInfo.attemptId, > taskBinary, part, locs, id, properties, serializedTaskMetrics, > Option(jobId), Option(sc.applicationId), > sc.applicationAttemptId) > } > } > } > {quote} > for example, for a parallelCollectionRdd with 3 slices or partitions, to > create task, the code will call stage.rdd.partitions three times, since > stage.rdd.partitions will call getPartitions, so getPartions will call three > times, it is a little time-cousuming. the stage.rdd.partitions code : > {quote} > final def partitions: Array[Partition] = { > checkpointRDD.map(_.partitions).getOrElse { > if (partitions_ == null) { > partitions_ = getPartitions > partitions_.zipWithIndex.foreach { case (partition, index) => > require(partition.index == index, > s"partitions($index).partition == ${partition.index}, but it > should equal $index") > } > } > partitions_ > } > } > {quote} > it would be better to avoid this. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22593) submitMissingTask in DagScheduler will call partitions function many times whch may be time consuming
tomzhu created SPARK-22593: -- Summary: submitMissingTask in DagScheduler will call partitions function many times whch may be time consuming Key: SPARK-22593 URL: https://issues.apache.org/jira/browse/SPARK-22593 Project: Spark Issue Type: Question Components: Spark Core Affects Versions: 2.2.0 Reporter: tomzhu Priority: Minor when dagScheduler call submitMissing task, will create tasks and calling stage.rdd.partitions, it will can many times which may be time-consuming, the code is: {quote} val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) //here is a little time consuming. val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId) } } } {quote} for example, for a parallelCollectionRdd with 3 slices or partitions, to create task, the code will call stage.rdd.partitions three times, since stage.rdd.partitions will call getPartitions, so getPartions will call three times, it is a little time-cousuming. the stage.rdd.partitions code : {quote} final def partitions: Array[Partition] = { checkpointRDD.map(_.partitions).getOrElse { if (partitions_ == null) { partitions_ = getPartitions partitions_.zipWithIndex.foreach { case (partition, index) => require(partition.index == index, s"partitions($index).partition == ${partition.index}, but it should equal $index") } } partitions_ } } {quote} it would be better to avoid this. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org