[GitHub] spark pull request #22347: [SPARK-25353][SQL] executeTake in SparkPlan is mo...
Github user Dooyoung-Hwang commented on a diff in the pull request: https://github.com/apache/spark/pull/22347#discussion_r222623133 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -348,30 +349,30 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ // Otherwise, interpolate the number of partitions we need to try, but overestimate // it by 50%. We also cap the estimation in the end. val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2) -if (buf.isEmpty) { +if (scannedRowCount == 0) { numPartsToTry = partsScanned * limitScaleUpFactor } else { - val left = n - buf.size + val left = n - scannedRowCount // As left > 0, numPartsToTry is always >= 1 - numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt + numPartsToTry = Math.ceil(1.5 * left * partsScanned / scannedRowCount).toInt numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor) } } val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) val sc = sqlContext.sparkContext - val res = sc.runJob(childRDD, -(it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else Array.empty[Byte], p) - - buf ++= res.flatMap(decodeUnsafeRows) + val res = sc.runJob(childRDD, (it: Iterator[(Long, Array[Byte])]) => +if (it.hasNext) it.next() else (0L, Array.empty[Byte]), p) + buf ++= res.map(_._2) + scannedRowCount += res.map(_._1).sum partsScanned += p.size } -if (buf.size > n) { - buf.take(n).toArray +if (scannedRowCount > n) { + buf.toArray.view.flatMap(decodeUnsafeRows).take(n).force --- End diff -- If the part of calling "view" is removed, no behavior changes compared to before. If you don't want to use view here, what about "iterator" instead of "view". It would be like this. buf.iterator.flatMap(decodeUnsafeRows).take(n).toArray --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22347: [SPARK-25353][SQL] executeTake in SparkPlan is mo...
Github user Dooyoung-Hwang commented on a diff in the pull request: https://github.com/apache/spark/pull/22347#discussion_r222622501 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -348,30 +349,30 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ // Otherwise, interpolate the number of partitions we need to try, but overestimate // it by 50%. We also cap the estimation in the end. val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2) -if (buf.isEmpty) { +if (scannedRowCount == 0) { numPartsToTry = partsScanned * limitScaleUpFactor } else { - val left = n - buf.size + val left = n - scannedRowCount // As left > 0, numPartsToTry is always >= 1 - numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt + numPartsToTry = Math.ceil(1.5 * left * partsScanned / scannedRowCount).toInt numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor) } } val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) val sc = sqlContext.sparkContext - val res = sc.runJob(childRDD, -(it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else Array.empty[Byte], p) - - buf ++= res.flatMap(decodeUnsafeRows) + val res = sc.runJob(childRDD, (it: Iterator[(Long, Array[Byte])]) => +if (it.hasNext) it.next() else (0L, Array.empty[Byte]), p) + buf ++= res.map(_._2) + scannedRowCount += res.map(_._1).sum partsScanned += p.size } -if (buf.size > n) { - buf.take(n).toArray +if (scannedRowCount > n) { + buf.toArray.view.flatMap(decodeUnsafeRows).take(n).force } else { - buf.toArray + buf.toArray.view.flatMap(decodeUnsafeRows).force --- End diff -- Ok~ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22347: [SPARK-25353][SQL] executeTake in SparkPlan is mo...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22347#discussion_r222158101 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -348,30 +349,30 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ // Otherwise, interpolate the number of partitions we need to try, but overestimate // it by 50%. We also cap the estimation in the end. val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2) -if (buf.isEmpty) { +if (scannedRowCount == 0) { numPartsToTry = partsScanned * limitScaleUpFactor } else { - val left = n - buf.size + val left = n - scannedRowCount // As left > 0, numPartsToTry is always >= 1 - numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt + numPartsToTry = Math.ceil(1.5 * left * partsScanned / scannedRowCount).toInt numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor) } } val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) val sc = sqlContext.sparkContext - val res = sc.runJob(childRDD, -(it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else Array.empty[Byte], p) - - buf ++= res.flatMap(decodeUnsafeRows) + val res = sc.runJob(childRDD, (it: Iterator[(Long, Array[Byte])]) => +if (it.hasNext) it.next() else (0L, Array.empty[Byte]), p) + buf ++= res.map(_._2) + scannedRowCount += res.map(_._1).sum partsScanned += p.size } -if (buf.size > n) { - buf.take(n).toArray +if (scannedRowCount > n) { + buf.toArray.view.flatMap(decodeUnsafeRows).take(n).force } else { - buf.toArray + buf.toArray.view.flatMap(decodeUnsafeRows).force --- End diff -- `buf.flatMap(decodeUnsafeRows).toArray` ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22347: [SPARK-25353][SQL] executeTake in SparkPlan is mo...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/22347#discussion_r222158041 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -348,30 +349,30 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ // Otherwise, interpolate the number of partitions we need to try, but overestimate // it by 50%. We also cap the estimation in the end. val limitScaleUpFactor = Math.max(sqlContext.conf.limitScaleUpFactor, 2) -if (buf.isEmpty) { +if (scannedRowCount == 0) { numPartsToTry = partsScanned * limitScaleUpFactor } else { - val left = n - buf.size + val left = n - scannedRowCount // As left > 0, numPartsToTry is always >= 1 - numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt + numPartsToTry = Math.ceil(1.5 * left * partsScanned / scannedRowCount).toInt numPartsToTry = Math.min(numPartsToTry, partsScanned * limitScaleUpFactor) } } val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) val sc = sqlContext.sparkContext - val res = sc.runJob(childRDD, -(it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else Array.empty[Byte], p) - - buf ++= res.flatMap(decodeUnsafeRows) + val res = sc.runJob(childRDD, (it: Iterator[(Long, Array[Byte])]) => +if (it.hasNext) it.next() else (0L, Array.empty[Byte]), p) + buf ++= res.map(_._2) + scannedRowCount += res.map(_._1).sum partsScanned += p.size } -if (buf.size > n) { - buf.take(n).toArray +if (scannedRowCount > n) { + buf.toArray.view.flatMap(decodeUnsafeRows).take(n).force --- End diff -- Can we simplify like the following? ```scala buf.flatMap(decodeUnsafeRows).take(n).toArray ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org