[GitHub] spark pull request: Removed some HashMaps from DAGScheduler by sto...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15331043 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -355,14 +351,13 @@ class DAGScheduler( logDebug(Removing running stage %d.format(stageId)) runningStages -= stage } -stageToInfos -= stage for ((k, v) - shuffleToMapStage.find(_._2 == stage)) { shuffleToMapStage.remove(k) } -if (pendingTasks.contains(stage) !pendingTasks(stage).isEmpty) { +if (stage.pendingTasks.isDefined !stage.pendingTasks.isEmpty) { logDebug(Removing pending status for stage %d.format(stageId)) } -pendingTasks -= stage +stage.pendingTasks = None --- End diff -- Is it necessary to clear this? Aren't all references to the Stage removed here anyway? Is the issue that you want this cleared in case the stage is resubmitted? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r15331051 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -225,9 +374,18 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) return ResultWithDroppedBlocks(success = false, droppedBlocks) } -if (maxMemory - currentMemory space) { +// Take into account the amount of memory currently occupied by unrolling blocks +val freeSpace = + if (!unrolling) { +val unrollMemoryMap = SparkEnv.get.unrollMemoryMap +unrollMemoryMap.synchronized { freeMemory - unrollMemoryMap.values.sum } --- End diff -- If you remove the separate locking on unrollMemoryMap you can get rid of the synchronized here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2250: show stage RDDs in UI
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1188#discussion_r15331036 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala --- @@ -99,19 +99,30 @@ private[ui] class StageTableBase( {s.name} /a +val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions 0) val details = if (s.details.nonEmpty) { span onclick=this.parentNode.querySelector('.stage-details').classList.toggle('collapsed') class=expand-details -+show details - /span - pre class=stage-details collapsed{s.details}/pre ++call stack + /span ++ + div class=stage-details collapsed +{if (cachedRddInfos.nonEmpty) { + strongPersisted RDD:/strong ++ + cachedRddInfos.map { i = +a href={%s/storage/rdd?id=%d.format(UIUtils.prependBaseUri(basePath), i.id)} + {i.name} +/a + } +}} +pre{s.details}/pre + /div } val stageDataOption = listener.stageIdToData.get(s.stageId) // Too many nested map/flatMaps with options are just annoying to read. Do this imperatively. if (stageDataOption.isDefined stageDataOption.get.description.isDefined) { val desc = stageDataOption.get.description - divem{desc}/em/divdiv{nameLink} {killLink}/div + divem{desc}/em/divdiv{killLink} {nameLink} {details}/div --- End diff -- Is this a separate fix here? (looks like we didn't correctly add details in this case) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r15331070 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -215,7 +361,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) * * Return whether there is enough free space, along with the blocks dropped in the process. */ - private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = { + private def ensureFreeSpace( + blockIdToAdd: BlockId, + space: Long, + unrolling: Boolean = false): ResultWithDroppedBlocks = { --- End diff -- Why is there a boolean parameter for unrolling? Some threads might be unrolling stuff while others are doing put() for other reasons, so we need to account for the unroll memory the same way both times to avoid overcommitting it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2250: show stage RDDs in UI
Github user nevillelyh commented on a diff in the pull request: https://github.com/apache/spark/pull/1188#discussion_r15331106 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala --- @@ -99,19 +99,30 @@ private[ui] class StageTableBase( {s.name} /a +val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions 0) val details = if (s.details.nonEmpty) { span onclick=this.parentNode.querySelector('.stage-details').classList.toggle('collapsed') class=expand-details -+show details - /span - pre class=stage-details collapsed{s.details}/pre ++call stack + /span ++ + div class=stage-details collapsed +{if (cachedRddInfos.nonEmpty) { + strongPersisted RDD:/strong ++ + cachedRddInfos.map { i = +a href={%s/storage/rdd?id=%d.format(UIUtils.prependBaseUri(basePath), i.id)} + {i.name} +/a + } +}} +pre{s.details}/pre + /div } val stageDataOption = listener.stageIdToData.get(s.stageId) // Too many nested map/flatMaps with options are just annoying to read. Do this imperatively. if (stageDataOption.isDefined stageDataOption.get.description.isDefined) { val desc = stageDataOption.get.description - divem{desc}/em/divdiv{nameLink} {killLink}/div + divem{desc}/em/divdiv{killLink} {nameLink} {details}/div --- End diff -- Yeah I reused SPARK-2035, the ticket that introduced details tab. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r15331108 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -215,7 +361,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) * * Return whether there is enough free space, along with the blocks dropped in the process. */ - private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = { + private def ensureFreeSpace( + blockIdToAdd: BlockId, + space: Long, + unrolling: Boolean = false): ResultWithDroppedBlocks = { --- End diff -- If you want a different calculation of how much space to ensure, it would be better to just pass a different value of space when we're unrolling. That way this method always does the same thing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15331132 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala --- @@ -29,6 +29,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi override def eval(input: Row): Any = { children.size match { + case 0 = function.asInstanceOf[() = Any]() --- End diff -- this is for another time, but if you add an explicitly init to expressions, we can move all of these branches from the inner loop (once per row) directly to the outer loop (once per partition). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15331160 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() --- End diff -- we should comment on the behavior of this cache so readers don't have to go read Guava documentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2099. Report progress while task is runn...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/1056#issuecomment-49970751 I don't entirely understand the advantage of having a separate PartialTaskMetrics. Ultimately every field of TaskMetrics except for maybe shuffleFinishTime will be able to be updated incrementally. Any time the driver is dealing with a TaskMetrics, the context makes it pretty clear on whether it's full or not. We do already have code that touches the TaskMetrics from multiple threads. For example, the ShuffleReadMetrics gets updated like this. I think you're right that I've missed some race conditions - we probably need to clone the metrics in a synchronized block before sending them off, but I'm not convinced that a superclass makes these easier to fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r15331171 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -141,6 +189,104 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** + * Unroll the given block in memory safely. + * + * The safety of this operation refers to avoiding potential OOM exceptions caused by + * unrolling the entirety of the block in memory at once. This is achieved by periodically + * checking whether the memory restrictions for unrolling blocks are still satisfied, + * stopping immediately if not. This check is a safeguard against the scenario in which + * there is not enough free memory to accommodate the entirety of a single block. + * + * This method returns either an array with the contents of the entire block or an iterator + * containing the values of the block (if the array would have exceeded available memory). + */ + def unrollSafely( + blockId: BlockId, + values: Iterator[Any], + droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) +: Either[Array[Any], Iterator[Any]] = { + +// Number of elements unrolled so far +var elementsUnrolled = 0 +// Whether there is still enough memory for us to continue unrolling this block +var keepUnrolling = true +// Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing. +val initialMemoryThreshold = conf.getLong(spark.storage.unrollMemoryThreshold, 1024 * 1024) +// How often to check whether we need to request more memory. Exposed for testing. +val memoryCheckPeriod = conf.getLong(spark.storage.unrollCheckPeriod, 16) +// Memory currently reserved by this thread (bytes) +var memoryThreshold = initialMemoryThreshold +// Memory to request as a multiple of current vector size +val memoryGrowthFactor = 1.5 + +val threadId = Thread.currentThread().getId +val unrollMemoryMap = SparkEnv.get.unrollMemoryMap +var vector = new SizeTrackingVector[Any] + +// Request memory for our vector and return whether request is granted. +// This involves synchronizing across all threads and can be expensive if called frequently. +def requestMemory(memoryToRequest: Long): Boolean = { + unrollMemoryMap.synchronized { +val previouslyOccupiedMemory = unrollMemoryMap.get(threadId).getOrElse(0L) +val otherThreadsMemory = unrollMemoryMap.values.sum - previouslyOccupiedMemory +val availableMemory = freeMemory - otherThreadsMemory +val granted = availableMemory memoryToRequest +if (granted) { unrollMemoryMap(threadId) = memoryToRequest } +granted + } +} + +// Request enough memory to begin unrolling +keepUnrolling = requestMemory(memoryThreshold) + +// Unroll this block safely, checking whether we have exceeded our threshold periodically +try { + while (values.hasNext keepUnrolling) { +vector += values.next() +if (elementsUnrolled % memoryCheckPeriod == 0) { + // If our vector's size has exceeded the threshold, request more memory + val currentSize = vector.estimateSize() + if (currentSize = memoryThreshold) { +val amountToRequest = (currentSize * memoryGrowthFactor).toLong +// Hold the put lock, in case another thread concurrently puts a block that +// takes up the free space we just ensured for unrolling here +putLock.synchronized { + if (!requestMemory(amountToRequest)) { +// If the first request is not granted, try again after ensuring free space +// If there is still not enough space, give up and drop the partition +val result = ensureFreeSpace(blockId, globalUnrollMemory, unrolling = true) --- End diff -- Instead of trying to ensureFreeSpace on all of the globalUnrollMemory at once, let's maybe ensure only enough for the amount we want to request. If you change ensureFreeSpace to not do accounting differently when we're unrolling (as per my comment below), I think this will become something like this: ``` val maxWeCanFree = globalUnrollMemory - unrollMemory // (the sum of your map) if (maxWeCanFree extraAmountWeNeed) { ensureFreeSpace(blockId, extraAmountWeNeed) } ``` Basically this means we can only request extra space until 20% of the memory pool is allocated to unrolling. If we can fit our next request within that 20%, we go ahead and do it. Otherwise we don't. --- If your project is set up for
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15331172 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -395,16 +389,9 @@ class DAGScheduler( activeJobs -= job if (resultStage.isEmpty) { --- End diff -- Great question...I think we just didn't notice that. I think it's safe to remove it, and you can also remove it as a parameter to failJobAndIndependentStages --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1165#issuecomment-49970876 QA results for PR 1165:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brcase class Sample(size: Long, numUpdates: Long)brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17099/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r15331224 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -141,6 +189,104 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** + * Unroll the given block in memory safely. + * + * The safety of this operation refers to avoiding potential OOM exceptions caused by + * unrolling the entirety of the block in memory at once. This is achieved by periodically + * checking whether the memory restrictions for unrolling blocks are still satisfied, + * stopping immediately if not. This check is a safeguard against the scenario in which + * there is not enough free memory to accommodate the entirety of a single block. + * + * This method returns either an array with the contents of the entire block or an iterator + * containing the values of the block (if the array would have exceeded available memory). + */ + def unrollSafely( + blockId: BlockId, + values: Iterator[Any], + droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) +: Either[Array[Any], Iterator[Any]] = { + +// Number of elements unrolled so far +var elementsUnrolled = 0 +// Whether there is still enough memory for us to continue unrolling this block +var keepUnrolling = true +// Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing. +val initialMemoryThreshold = conf.getLong(spark.storage.unrollMemoryThreshold, 1024 * 1024) +// How often to check whether we need to request more memory. Exposed for testing. +val memoryCheckPeriod = conf.getLong(spark.storage.unrollCheckPeriod, 16) --- End diff -- Can we just keep this at 16? It will be slower if we make it configurable, because the compiler won't be able to optimize mod 16, and it also seems like something we don't need to add a config setting for. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r15331245 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -141,6 +189,104 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** + * Unroll the given block in memory safely. + * + * The safety of this operation refers to avoiding potential OOM exceptions caused by + * unrolling the entirety of the block in memory at once. This is achieved by periodically + * checking whether the memory restrictions for unrolling blocks are still satisfied, + * stopping immediately if not. This check is a safeguard against the scenario in which + * there is not enough free memory to accommodate the entirety of a single block. + * + * This method returns either an array with the contents of the entire block or an iterator + * containing the values of the block (if the array would have exceeded available memory). + */ + def unrollSafely( + blockId: BlockId, + values: Iterator[Any], + droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) +: Either[Array[Any], Iterator[Any]] = { + +// Number of elements unrolled so far +var elementsUnrolled = 0 +// Whether there is still enough memory for us to continue unrolling this block +var keepUnrolling = true +// Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing. +val initialMemoryThreshold = conf.getLong(spark.storage.unrollMemoryThreshold, 1024 * 1024) +// How often to check whether we need to request more memory. Exposed for testing. +val memoryCheckPeriod = conf.getLong(spark.storage.unrollCheckPeriod, 16) --- End diff -- BTW this would also make the tests slightly more realistic in that there will be fewer differences between them and real-world operation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15331280 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -395,16 +389,9 @@ class DAGScheduler( activeJobs -= job if (resultStage.isEmpty) { - // Clean up result stages. - val resultStagesForJob = resultStageToJob.keySet.filter( -stage = resultStageToJob(stage).jobId == job.jobId) - if (resultStagesForJob.size != 1) { -logWarning( - s${resultStagesForJob.size} result stages for job ${job.jobId} (expect exactly 1)) - } - resultStageToJob --= resultStagesForJob + job.finalStage.resultOfJob = None } else { - resultStageToJob -= resultStage.get + resultStage.get.resultOfJob = None --- End diff -- And as a result of the above, you can consolidate this if/else into one thing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15331310 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -355,14 +351,13 @@ class DAGScheduler( logDebug(Removing running stage %d.format(stageId)) runningStages -= stage } -stageToInfos -= stage for ((k, v) - shuffleToMapStage.find(_._2 == stage)) { shuffleToMapStage.remove(k) } -if (pendingTasks.contains(stage) !pendingTasks(stage).isEmpty) { +if (stage.pendingTasks.isDefined !stage.pendingTasks.isEmpty) { logDebug(Removing pending status for stage %d.format(stageId)) } -pendingTasks -= stage +stage.pendingTasks = None --- End diff -- Ok don't worry about it, as discussed. I want to remove this pendingTasks anyway, which is redundant with information stored by the TSM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2014] Make PySpark store RDDs in MEMORY...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1051#issuecomment-49971143 QA tests have started for PR 1051. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17104/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2410][SQL] Cherry picked Hive Thrift/JD...
Github user liancheng commented on the pull request: https://github.com/apache/spark/pull/1399#issuecomment-49971137 Removed the WIP tag. It's ready to go. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r15331347 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -87,9 +97,47 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) values: Iterator[Any], level: StorageLevel, returnValues: Boolean): PutResult = { -val valueEntries = new ArrayBuffer[Any]() -valueEntries ++= values -putValues(blockId, valueEntries, level, returnValues) +putValues(blockId, values, level, returnValues, allowPersistToDisk = true) + } + + /** + * Attempt to put the given block in memory store. + * + * There may not be enough space to fully unroll the iterator in memory, in which case we + * optionally drop the values to disk if + * (1) the block's storage level specifies useDisk, and + * (2) `allowPersistToDisk` is true. + * + * One scenario in which `allowPersistToDisk` is false is when the BlockManager reads a block + * back from disk and attempts to cache it in memory. In this case, we should not persist the + * block back on disk again, as it is already in disk store. + */ + private[storage] def putValues( --- End diff -- Nit: it's kind of weird that this is called putValues when it takes an Iterator, and BlockManager has something called putIterator. Maybe this should also be putIterator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2310. Support arbitrary Spark properties...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/1253 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user colorant commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15331341 --- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.io.{DataInputStream, FileInputStream} + +import org.apache.spark.shuffle._ +import org.apache.spark.{TaskContext, ShuffleDependency} +import org.apache.spark.shuffle.hash.HashShuffleReader +import org.apache.spark.storage.{DiskBlockManager, FileSegment, ShuffleBlockId} + +private[spark] class SortShuffleManager extends ShuffleManager { + /** + * Register a shuffle with the manager and obtain a handle for it to pass to tasks. + */ + override def registerShuffle[K, V, C]( + shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { +new BaseShuffleHandle(shuffleId, numMaps, dependency) + } + + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Called on executors by reduce tasks. + */ + override def getReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext): ShuffleReader[K, C] = { +// We currently use the same block store shuffle fetcher as the hash-based shuffle. +new HashShuffleReader( + handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) + } + + /** Get a writer for a given partition. Called on executors by map tasks. */ + override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) + : ShuffleWriter[K, V] = { +new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) + } + + /** Remove a shuffle's metadata from the ShuffleManager. */ + override def unregisterShuffle(shuffleId: Int): Unit = {} --- End diff -- shuffle output file in sortShuffleWritter do not get cleaned. We might need to add map to save registered shuffle handle, Then try to remove the data file in unregisterShuffle method. Though at present, in HashShuffleManager, this is also not implemented. But HashShuffleManager depends on shuffleBlockManager and the file will be cleaned there. I have a PR to generalize shuffleBlockManager and hide it behind shuffleMananger. as in #1241 , and upon blockMananger do remove shuffle, will call into this unregisterShuffle method. Will rebase upon this PR been merged. Should we fix this issue in my PR, or add the store/clean shuffleHandle logic here in this PR firstly? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15331362 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= --- End diff -- nit: space before = Also would be great to add inline doc on what these apply's are doing, since apply's are used quite a lot in catalyst with different semantics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2260] Fix standalone-cluster mode, whic...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1538#issuecomment-49971356 QA results for PR 1538:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17100/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15331481 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala --- @@ -22,6 +22,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.CallSite +import scala.collection.mutable.HashSet --- End diff -- ha this is WIP. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15331465 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala --- @@ -22,6 +22,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.CallSite +import scala.collection.mutable.HashSet --- End diff -- Mr. Xin your imports are ordered incorrectly! I expected better of you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15331488 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala --- @@ -56,6 +58,16 @@ private[spark] class Stage( val numPartitions = rdd.partitions.size val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) var numAvailableOutputs = 0 + + /** List of jobs that this stage belong to. */ --- End diff -- as long as we're nit-ing...belong - belongs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15331519 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala --- @@ -56,6 +58,16 @@ private[spark] class Stage( val numPartitions = rdd.partitions.size val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) var numAvailableOutputs = 0 + + /** List of jobs that this stage belong to. */ + val jobIds = new HashSet[Int] + /** For stages that are the final (consists of only ResultTasks), link to the ActiveJob. */ + var resultOfJob: Option[ActiveJob] = None + var pendingTasks: Option[HashSet[Task[_]]] = None --- End diff -- Why use an option here, as opposed to just a (possibly empty) hash set? The option makes many of the accesses more clumsy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user kayousterhout commented on the pull request: https://github.com/apache/spark/pull/1561#issuecomment-49971683 Love the cleanup here!!! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2410][SQL] Cherry picked Hive Thrift/JD...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/1399#issuecomment-49971698 @liancheng I just merged a patch related to SparkSubmit... can you rebase? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2310. Support arbitrary Spark properties...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/1253#issuecomment-49971888 @sryza I merged this just now because another patch was going to change this code and I wanted to avoid you having to rebase again. That said, I found an issue with this after merging. Would you be able to fix this? https://issues.apache.org/jira/browse/SPARK-2664 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15331775 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -355,14 +351,13 @@ class DAGScheduler( logDebug(Removing running stage %d.format(stageId)) runningStages -= stage } -stageToInfos -= stage for ((k, v) - shuffleToMapStage.find(_._2 == stage)) { shuffleToMapStage.remove(k) } -if (pendingTasks.contains(stage) !pendingTasks(stage).isEmpty) { +if (stage.pendingTasks.isDefined !stage.pendingTasks.isEmpty) { logDebug(Removing pending status for stage %d.format(stageId)) } -pendingTasks -= stage +stage.pendingTasks = None --- End diff -- The old pendingTasks needed the clearing step so that it wouldn't continue to have removed stages mapping to empty sets of remaining tasks. I agree that a similar step shouldn't be necessary now that pendingTasks is encapsulated in a Stage that is going away. In fact, in the event of resubmission don't we want this to _not_ be None, else stage.pendingTasks.get will fail? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15331819 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala --- @@ -22,6 +22,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.CallSite +import scala.collection.mutable.HashSet --- End diff -- I'll have to remember that one! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Build should not run hive tests by default.
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1565#issuecomment-49972683 QA results for PR 1565:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17095/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15332026 --- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala --- @@ -56,6 +58,16 @@ private[spark] class Stage( val numPartitions = rdd.partitions.size val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) var numAvailableOutputs = 0 + + /** List of jobs that this stage belong to. */ + val jobIds = new HashSet[Int] + /** For stages that are the final (consists of only ResultTasks), link to the ActiveJob. */ + var resultOfJob: Option[ActiveJob] = None + var pendingTasks: Option[HashSet[Task[_]]] = None --- End diff -- And potentially error prone. I don't think there is anyplace left where the distinction between None and Some(emptySet) is useful except for the check in removeStage (https://github.com/rxin/spark/blob/dagSchedulerHashMaps/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L357), and that's not really even useful anymore. On the other hand, having a None where we expect at least Some(emptySet) can be a problem. I think we're better off without the Option. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15332107 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + + +/** + * Generates bytecode that produces a new [[Row]] object based on a fixed set of input + * [[Expression Expressions]] and a given input [[Row]]. The returned [[Row]] object is custom + * generated based on the output types of the [[Expression]] to avoid boxing of primitive values. + */ +object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + protected def canonicalize(in: Seq[Expression]): Seq[Expression] = +in.map(ExpressionCanonicalizer(_)) + + protected def bind(in: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] = +in.map(BindReferences.bindReference(_, inputSchema)) + + // Make Mutablility optional... + protected def create(expressions: Seq[Expression]): Projection = { +val tupleLength = ru.Literal(Constant(expressions.length)) +val lengthDef = qfinal val length = $tupleLength + +/* TODO: Configurable... +val nullFunctions = + q +private final val nullSet = new org.apache.spark.util.collection.BitSet(length) +final def setNullAt(i: Int) = nullSet.set(i) +final def isNullAt(i: Int) = nullSet.get(i) + + */ + +val nullFunctions = + q +private[this] var nullBits = new Array[Boolean](${expressions.size}) +final def setNullAt(i: Int) = { nullBits(i) = true } +final def isNullAt(i: Int) = nullBits(i) + .children + +val tupleElements = expressions.zipWithIndex.flatMap { + case (e, i) = +val elementName = newTermName(sc$i) +val evaluatedExpression = expressionEvaluator(e) --- End diff -- where is expressionEvaluator defined? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15332175 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= --- End diff -- actually after reading through the code more, i'd argue we should rename this function from apply to something more informative. unless semantically very explicit obvious (e.g. array), the use of apply makes it harder to understand code around its usage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2250: show stage RDDs in UI
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1188#discussion_r15332185 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala --- @@ -99,19 +99,30 @@ private[ui] class StageTableBase( {s.name} /a +val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions 0) val details = if (s.details.nonEmpty) { span onclick=this.parentNode.querySelector('.stage-details').classList.toggle('collapsed') class=expand-details -+show details - /span - pre class=stage-details collapsed{s.details}/pre ++call stack + /span ++ + div class=stage-details collapsed +{if (cachedRddInfos.nonEmpty) { + strongPersisted RDD:/strong ++ + cachedRddInfos.map { i = +a href={%s/storage/rdd?id=%d.format(UIUtils.prependBaseUri(basePath), i.id)} --- End diff -- I tested this locally and it would be good to pull this onto one line, because otherwise the hyperlink renders with a space at the beginning. If you need to violate the style guidelines you can disable scalastyle using `//scalastyle off` and `//scalastyle on` (see where we do this elsewhere in the UI code). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2250: show stage RDDs in UI
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1188#discussion_r15332227 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala --- @@ -99,19 +99,30 @@ private[ui] class StageTableBase( {s.name} /a +val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions 0) val details = if (s.details.nonEmpty) { span onclick=this.parentNode.querySelector('.stage-details').classList.toggle('collapsed') class=expand-details -+show details - /span - pre class=stage-details collapsed{s.details}/pre ++call stack + /span ++ + div class=stage-details collapsed +{if (cachedRddInfos.nonEmpty) { + strongPersisted RDD:/strong ++ --- End diff -- I prefer this being normal text instead of bold. It already pops out because the guidelines are all grey. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2250: show stage RDDs in UI
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/1188#issuecomment-49973457 Built and tested it locally. I was actually proposing pulling the Persisted RDD part outside of the toggle component. I think it's fine either way, but if we decide to put it in the toggle box, I agree the name should be more generically details. If we decide to pull it outside of the toggle box, then we can leave the box as stack trace. I slightly prefer leaving it out, but maybe if the name is really long that will be annoying? I'm worried people will never see it if it's hidden in the toggle. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2014] Make PySpark store RDDs in MEMORY...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1051#issuecomment-49973607 QA results for PR 1051:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17104/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15332387 --- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.io.{DataInputStream, FileInputStream} + +import org.apache.spark.shuffle._ +import org.apache.spark.{TaskContext, ShuffleDependency} +import org.apache.spark.shuffle.hash.HashShuffleReader +import org.apache.spark.storage.{DiskBlockManager, FileSegment, ShuffleBlockId} + +private[spark] class SortShuffleManager extends ShuffleManager { + /** + * Register a shuffle with the manager and obtain a handle for it to pass to tasks. + */ + override def registerShuffle[K, V, C]( + shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { +new BaseShuffleHandle(shuffleId, numMaps, dependency) + } + + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Called on executors by reduce tasks. + */ + override def getReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext): ShuffleReader[K, C] = { +// We currently use the same block store shuffle fetcher as the hash-based shuffle. +new HashShuffleReader( + handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) + } + + /** Get a writer for a given partition. Called on executors by map tasks. */ + override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) + : ShuffleWriter[K, V] = { +new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) + } + + /** Remove a shuffle's metadata from the ShuffleManager. */ + override def unregisterShuffle(shuffleId: Int): Unit = {} --- End diff -- Good question, but files on disk actually get cleaned by BlockManager directly if cleaning is turned on, and sort-based shuffle is set up to only use files. ShuffleBlockManager had extra state in memory in the form of metadata, that's why it needs its own cleaner. But I don't think there's an issue here. It will be important to do something for unregisterShuffle, but right now nothing calls that. That's part of the API because I wanted to move the MapOutputTracker behind the ShuffleManager as well. But any patches to move more stuff behind it are welcome, and I agree the ShuffleBlockManager should be specific to hash-based shuffle. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2250: show stage RDDs in UI
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/1188#issuecomment-49973700 One more thing - after playing in my browser I think just saying RDD: XX is sufficient. Not sure Persistent adds much, and it will save space. Still prefer it non-bold. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1499#issuecomment-49973755 Added one more commit that fixes the type of ShuffledRDD, because in this new shuffle it's not possible to return a custom Product2 the way it's written now, and in the old one it wasn't possible anyway with aggregation. See https://github.com/mateiz/spark/commit/9c299579f13f004f5fd1f4dd0b98b7d76cac2a55 for details. @rxin you should look at this one. With this commit there's now a version of ShuffleSuite that passes using SortBasedShuffle. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2652] [PySpark] Turning some default co...
GitHub user davies opened a pull request: https://github.com/apache/spark/pull/1568 [SPARK-2652] [PySpark] Turning some default configs for PySpark Add several default configs for PySpark, related to serialization in JVM. spark.serializer = org.apache.spark.serializer.KryoSerializer spark.kryo.referenceTracking = False spark.serializer.objectStreamReset = 1 This will help to reduce the memory usage during RDD.partitionBy() You can merge this pull request into a Git repository by running: $ git pull https://github.com/davies/spark conf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1568.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1568 commit c04a83dc311c5e073c8227665ac318647be9a0e4 Author: Davies Liu davies@gmail.com Date: 2014-07-23T20:25:11Z some default configs for PySpark --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2125] Add sort flag and move sort into ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1210#issuecomment-49973939 QA results for PR 1210:br- This patch PASSES unit tests.brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17098/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Update HiveMetastoreCatalog.scala
GitHub user baishuo opened a pull request: https://github.com/apache/spark/pull/1569 Update HiveMetastoreCatalog.scala I think it's better to defined hiveQlTable as a val You can merge this pull request into a Git repository by running: $ git pull https://github.com/baishuo/spark patch-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1569.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1569 commit a7b32a28a59886dfac45331d781a548fc18b098f Author: baishuo(白硕) vc_j...@hotmail.com Date: 2014-07-24T07:01:33Z Update HiveMetastoreCatalog.scala I think it's better to defined hiveQlTable as a val --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2652] [PySpark] Turning some default co...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1568#issuecomment-49974134 QA tests have started for PR 1568. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17107/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2260] Fix standalone-cluster mode, whic...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1538#discussion_r15332601 --- Diff: core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala --- @@ -62,20 +62,23 @@ object CommandUtils extends Logging { val joined = command.libraryPathEntries.mkString(File.pathSeparator) Seq(s-Djava.library.path=$joined) } else { - Seq() +Seq() } val permGenOpt = Seq(-XX:MaxPermSize=128m) +// Convert Spark properties to java system properties +val sparkOpts = command.sparkProps.map { case (k, v) = s-D$k=$v } --- End diff -- Yeah, I'd prefer not to change the executor code path here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Update HiveMetastoreCatalog.scala
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1569#issuecomment-49974351 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2125] Add sort flag and move sort into ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1210#issuecomment-49974441 QA tests have started for PR 1210. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17108/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2661][bagel]unpersist old processed rdd
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/1519 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2661][bagel]unpersist old processed rdd
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1519#issuecomment-49974606 Thanks Adrian, merged this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2661][bagel]unpersist old processed rdd
Github user adrian-wang commented on the pull request: https://github.com/apache/spark/pull/1519#issuecomment-49975077 Thanks Matei! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2014] Make PySpark store RDDs in MEMORY...
Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/spark/pull/1051#discussion_r15333083 --- Diff: python/pyspark/conf.py --- @@ -99,6 +99,12 @@ def set(self, key, value): self._jconf.set(key, unicode(value)) return self +def setIfMissing(self, key, value): +Set a configuration property, if not already set. +if self.get(key) == None: --- End diff -- We may have to make SparkConf iterable for this to work. It does not work as is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1460#discussion_r15333099 --- Diff: python/pyspark/shuffle.py --- @@ -0,0 +1,436 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the License); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an AS IS BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, PickleSerializer + +try: +import psutil + +def get_used_memory(): + Return the used memory in MB +process = psutil.Process(os.getpid()) +if hasattr(process, memory_info): +info = process.memory_info() +else: +info = process.get_memory_info() +return info.rss 20 +except ImportError: + +def get_used_memory(): + Return the used memory in MB +if platform.system() == 'Linux': +for line in open('/proc/self/status'): +if line.startswith('VmRSS:'): +return int(line.split()[1]) 10 +else: +warnings.warn(Please install psutil to have better +support with spilling) +if platform.system() == Darwin: +import resource +rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss +return rss 20 +# TODO: support windows +return 0 + + +class Aggregator(object): + + +Aggregator has tree functions to merge values into combiner. + +createCombiner: (value) - combiner +mergeValue: (combine, value) - combiner +mergeCombiners: (combiner, combiner) - combiner + + +def __init__(self, createCombiner, mergeValue, mergeCombiners): +self.createCombiner = createCombiner +self.mergeValue = mergeValue +self.mergeCombiners = mergeCombiners + + +class SimpleAggregator(Aggregator): + + +SimpleAggregator is useful for the cases that combiners have +same type with values + + +def __init__(self, combiner): +Aggregator.__init__(self, lambda x: x, combiner, combiner) + + +class Merger(object): + + +Merge shuffled data together by aggregator + + +def __init__(self, aggregator): +self.agg = aggregator + +def mergeValues(self, iterator): + Combine the items by creator and combiner +raise NotImplementedError + +def mergeCombiners(self, iterator): + Merge the combined items by mergeCombiner +raise NotImplementedError + +def iteritems(self): + Return the merged items ad iterator +raise NotImplementedError + + +class InMemoryMerger(Merger): + + +In memory merger based on in-memory dict. + + +def __init__(self, aggregator): +Merger.__init__(self, aggregator) +self.data = {} + +def mergeValues(self, iterator): + Combine the items by creator and combiner +# speed up attributes lookup +d, creator = self.data, self.agg.createCombiner +comb = self.agg.mergeValue +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else creator(v) + +def mergeCombiners(self, iterator): + Merge the combined items by mergeCombiner +# speed up attributes lookup +d, comb = self.data, self.agg.mergeCombiners +for k, v in iterator: +d[k] = comb(d[k], v) if k in d else v + +def iteritems(self): + Return the merged items ad iterator +return self.data.iteritems() + + +class ExternalMerger(Merger): + + +External merger will dump the aggregated data into disks when +memory usage goes above the limit, then merge them together. + +This
[GitHub] spark pull request: SPARK-2099. Report progress while task is runn...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/1056#issuecomment-49975385 I was actually a bit confused why `updateShuffleReadMetrics` is synchronized. Can that be called from multiple threads as-is? I wasn't aware of cases where we had multi-threaded access to `TaskMetrics` prior to this patch. To your point, there are really only two fields here that are not just commutative counters - `hostname` and `shuffleFinishTime`. Is there is a race where `hosntame` gets sent over as null in a heartbeat and it triggers an NPE downstream? Maybe we should set `shuffleFinishTime` to -1 by default to indicate it's not populated. My motivation for adding extra typing was to make it explicit for people extending TaskMetrics in the future. We already have some proposals for augmenting the `TaskMetrics` class and some of them add more complicated fields. Perhaps an alternative is just to document clearly in the class that we might take a snapshot of the TaskMetrics at any time and send it to consumers of the SparkListener interface. Over time, I'd actually hope these get re-implemented as accumulators... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: (WIP) SPARK-2045 Sort-based shuffle
Github user colorant commented on a diff in the pull request: https://github.com/apache/spark/pull/1499#discussion_r15333148 --- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort + +import java.io.{DataInputStream, FileInputStream} + +import org.apache.spark.shuffle._ +import org.apache.spark.{TaskContext, ShuffleDependency} +import org.apache.spark.shuffle.hash.HashShuffleReader +import org.apache.spark.storage.{DiskBlockManager, FileSegment, ShuffleBlockId} + +private[spark] class SortShuffleManager extends ShuffleManager { + /** + * Register a shuffle with the manager and obtain a handle for it to pass to tasks. + */ + override def registerShuffle[K, V, C]( + shuffleId: Int, + numMaps: Int, + dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { +new BaseShuffleHandle(shuffleId, numMaps, dependency) + } + + /** + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). + * Called on executors by reduce tasks. + */ + override def getReader[K, C]( + handle: ShuffleHandle, + startPartition: Int, + endPartition: Int, + context: TaskContext): ShuffleReader[K, C] = { +// We currently use the same block store shuffle fetcher as the hash-based shuffle. +new HashShuffleReader( + handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context) + } + + /** Get a writer for a given partition. Called on executors by map tasks. */ + override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext) + : ShuffleWriter[K, V] = { +new SortShuffleWriter(handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context) + } + + /** Remove a shuffle's metadata from the ShuffleManager. */ + override def unregisterShuffle(shuffleId: Int): Unit = {} --- End diff -- Maybe I miss some code? But from what I understanding, when the cleaning is turned on, it seems to me, if by timestamp approaching, blockManager won't remove shuffle data,ShuffleBlockManager will do the work by itself. And if by auto clean approaching when doCleanShuffle is called, it go through the current ShuffleBlockManager interface. In neither case, these sortShuffleWritter generated file will be cleaned? Or do you mean, as long as there are no metadata in memory, shuffle file on disk is ok to not be removed until application exit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2099. Report progress while task is runn...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/1056#discussion_r15333186 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -348,4 +352,46 @@ private[spark] class Executor( } } } + + def stop() { +isStopped = true +threadPool.shutdown() + } + + def startDriverHeartbeater() { +val interval = conf.getInt(spark.executor.heartbeatInterval, 2000) +val timeout = AkkaUtils.lookupTimeout(conf) +val retryAttempts = AkkaUtils.numRetries(conf) +val retryIntervalMs = AkkaUtils.retryWaitMs(conf) +val heartbeatReceiverRef = AkkaUtils.makeDriverRef(HeartbeatReceiver, conf, env.actorSystem) + +val t = new Thread() { + override def run() { +// Sleep a random interval so the heartbeats don't end up in sync +Thread.sleep(interval + (math.random * interval).asInstanceOf[Int]) + +while (!isStopped) { + val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]() + for (taskRunner - runningTasks.values()) { +Option(taskRunner.task).flatMap(_.metrics).foreach { metrics = + tasksMetrics += ((taskRunner.taskId, metrics)) +} + } + + val message = Heartbeat(executorId, tasksMetrics.toArray, --- End diff -- I'm getting 101 :( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2492][Streaming] kafkaReceiver minor ch...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1420#issuecomment-49975753 QA tests have started for PR 1420. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17109/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r1512 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s$prefix$javaSeparator${curId.getAndIncrement}) + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[EvaluatedExpression]], which contains Scala trees that + * can be
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r1552 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s$prefix$javaSeparator${curId.getAndIncrement}) + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[EvaluatedExpression]], which contains Scala trees that + * can be
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r1579 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s$prefix$javaSeparator${curId.getAndIncrement}) + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[EvaluatedExpression]], which contains Scala trees that + * can be
[GitHub] spark pull request: [SPARK-2014] Make PySpark store RDDs in MEMORY...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/1051#discussion_r15333403 --- Diff: python/pyspark/conf.py --- @@ -99,6 +99,12 @@ def set(self, key, value): self._jconf.set(key, unicode(value)) return self +def setIfMissing(self, key, value): +Set a configuration property, if not already set. +if self.get(key) == None: --- End diff -- ah, yes. but self.get(key) is None sounds more Pythonic :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333476 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s$prefix$javaSeparator${curId.getAndIncrement}) + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[EvaluatedExpression]], which contains Scala trees that + * can be
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333508 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s$prefix$javaSeparator${curId.getAndIncrement}) + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[EvaluatedExpression]], which contains Scala trees that + * can be
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333558 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s$prefix$javaSeparator${curId.getAndIncrement}) + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[EvaluatedExpression]], which contains Scala trees that + * can be
[GitHub] spark pull request: Build should not run hive tests by default.
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1565#issuecomment-49976502 QA results for PR 1565:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17102/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333592 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala --- @@ -47,4 +47,30 @@ package org.apache.spark.sql.catalyst * ==Evaluation== * The result of expressions can be evaluated using the `Expression.apply(Row)` method. */ -package object expressions +package object expressions { + + /** + * Converts a [[Row]] to another Row given a sequence of expression that define each column of the + * new row. If the schema of the input row is specified, then the given expression will be bound + * to that schema. + */ + abstract class Projection extends (Row = Row) + + /** + * Converts a [[Row]] to another Row given a sequence of expression that define each column of the + * new row. If the schema of the input row is specified, then the given expression will be bound + * to that schema. + * + * In contrast to a normal projection, a MutableProjection reuses the same underlying row object + * each time an input row is added. This significantly reduces the cost of calculating the + * projection, but means that it is not safe to hold on to a reference to a [[Row]] after `next()` + * has been called on the [[Iterator]] that produced it. Instead, the user must call `Row.copy()` + * and hold on to the returned [[Row]] before calling `next()`. + */ + abstract class MutableProjection extends Projection { +def currentValue: Row + +/** Updates the target of this projection to a new MutableRow */ --- End diff -- maybe ```Uses the given row to store the output of the projection.``` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2014] Make PySpark store RDDs in MEMORY...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1051#issuecomment-49976557 QA tests have started for PR 1051. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17111/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2538] [PySpark] Hash based disk spillin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1460#issuecomment-49976553 QA tests have started for PR 1460. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17110/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333688 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratedEvaluationSuite.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ + +/** + * Overrides our expression evaluation tests to use code generation for evaluation. + */ +class GeneratedEvaluationSuite extends ExpressionEvaluationSuite { + override def checkEvaluation( + expression: Expression, + expected: Any, + inputRow: Row = EmptyRow): Unit = { +val plan = try { + GenerateMutableProjection(Alias(expression, sOptimized($expression))() :: Nil)() +} catch { + case e: Throwable = +val evaluated = GenerateProjection.expressionEvaluator(expression) +fail( + s +|Code generation of $expression failed: +|${evaluated.code.mkString(\n)} +|$e + .stripMargin) +} + +val actual = plan(inputRow).apply(0) +if(actual != expected) { + val input = if(inputRow == EmptyRow) else s, input: $inputRow + fail(sIncorrect Evaluation: $expression, actual: $actual, expected: $expected$input) +} + } + + + test(multithreaded eval) { +import scala.concurrent._ +import ExecutionContext.Implicits.global +import scala.concurrent.duration._ + +val futures = (1 to 20).map { _ = + future { +GeneratePredicate(EqualTo(Literal(1), Literal(1))) +GenerateProjection(EqualTo(Literal(1), Literal(1)) :: Nil) +GenerateMutableProjection(EqualTo(Literal(1), Literal(1)) :: Nil) +GenerateOrdering(Add(Literal(1), Literal(1)).asc :: Nil) + } +} + +futures.foreach(Await.result(_, 10.seconds)) + } +} + +/** + * Overrides our expression evaluation tests to use generated code on mutable rows. + */ +class GeneratedMutableEvaluationSuite extends ExpressionEvaluationSuite { --- End diff -- Best to take this out into a separate file, since that's the common standard across all Spark modules (each test suite has its own file) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333694 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratedEvaluationSuite.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ + +/** + * Overrides our expression evaluation tests to use code generation for evaluation. + */ +class GeneratedEvaluationSuite extends ExpressionEvaluationSuite { + override def checkEvaluation( + expression: Expression, + expected: Any, + inputRow: Row = EmptyRow): Unit = { +val plan = try { + GenerateMutableProjection(Alias(expression, sOptimized($expression))() :: Nil)() +} catch { + case e: Throwable = +val evaluated = GenerateProjection.expressionEvaluator(expression) +fail( + s +|Code generation of $expression failed: +|${evaluated.code.mkString(\n)} +|$e + .stripMargin) +} + +val actual = plan(inputRow).apply(0) +if(actual != expected) { + val input = if(inputRow == EmptyRow) else s, input: $inputRow + fail(sIncorrect Evaluation: $expression, actual: $actual, expected: $expected$input) +} + } + + + test(multithreaded eval) { +import scala.concurrent._ +import ExecutionContext.Implicits.global +import scala.concurrent.duration._ + +val futures = (1 to 20).map { _ = + future { +GeneratePredicate(EqualTo(Literal(1), Literal(1))) +GenerateProjection(EqualTo(Literal(1), Literal(1)) :: Nil) +GenerateMutableProjection(EqualTo(Literal(1), Literal(1)) :: Nil) +GenerateOrdering(Add(Literal(1), Literal(1)).asc :: Nil) + } +} + +futures.foreach(Await.result(_, 10.seconds)) + } +} + +/** + * Overrides our expression evaluation tests to use generated code on mutable rows. + */ +class GeneratedMutableEvaluationSuite extends ExpressionEvaluationSuite { + override def checkEvaluation( + expression: Expression, + expected: Any, + inputRow: Row = EmptyRow): Unit = { +lazy val evaluated = GenerateProjection.expressionEvaluator(expression) + +val plan = try { + GenerateProjection(Alias(expression, sOptimized($expression))() :: Nil) +} catch { + case e: Throwable = +fail( + s +|Code generation of $expression failed: +|${evaluated.code.mkString(\n)} +|$e + .stripMargin) +} + +val actual = plan(inputRow) +val expectedRow = new GenericRow(Array[Any](expected)) +if (actual.hashCode() != expectedRow.hashCode()) { + fail( +s + |Mismatched hashCodes for values: $actual, $expectedRow + |Hash Codes: ${actual.hashCode()} != ${expectedRow.hashCode()} + |${evaluated.code.mkString(\n)} +.stripMargin) +} +if (actual != expectedRow) { + val input = if(inputRow == EmptyRow) else s, input: $inputRow + fail(sIncorrect Evaluation: $expression, actual: $actual, expected: $expected$input) +} + } +} --- End diff -- nitpick - add a new line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333762 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala --- @@ -47,23 +47,29 @@ case class Generate( } } - override def output = + // This must be a val since the generator output expr ids are not preserved by serialization. + override val output = if (join) child.output ++ generatorOutput else generatorOutput + val boundGenerator = BindReferences.bindReference(generator, child.output) + + /** Codegenned rows are not serializable... */ --- End diff -- TODO? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333803 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.types._ + +case class AggregateEvaluation( +schema: Seq[Attribute], +initialValues: Seq[Expression], +update: Seq[Expression], +result: Expression) + +/** + * :: DeveloperApi :: + * Alternate version of aggregation that leverages projection and thus code generation. + * Aggregations are converted into a set of projections from a aggregation buffer tuple back onto + * itself. Currently only used for simple aggregations like SUM, COUNT, or AVERAGE are supported. + * + * @param partial if true then aggregation is done partially on local data without shuffling to + *ensure all values where `groupingExpressions` are equal are present. + * @param groupingExpressions expressions that are evaluated to determine grouping. + * @param aggregateExpressions expressions that are computed for each group. + * @param child the input data source. + */ +@DeveloperApi +case class GeneratedAggregate( +partial: Boolean, +groupingExpressions: Seq[Expression], +aggregateExpressions: Seq[NamedExpression], +child: SparkPlan) + extends UnaryNode { + + override def requiredChildDistribution = +if (partial) { + UnspecifiedDistribution :: Nil +} else { + if (groupingExpressions == Nil) { +AllTuples :: Nil + } else { +ClusteredDistribution(groupingExpressions) :: Nil + } +} + + override def output = aggregateExpressions.map(_.toAttribute) + + override def execute() = { +val aggregatesToCompute = aggregateExpressions.flatMap { a = + a.collect { case agg: AggregateExpression = agg} +} + +val computeFunctions = aggregatesToCompute.map { + case c @ Count(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val initialValue = Literal(0L) +val updateFunction = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val result = currentCount + +AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case Sum(expr) = +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialValue = Cast(Literal(0L), expr.dataType) + +// Coalasce avoids double calculation... +// but really, common sub expression elimination would be better +val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) +val result = currentSum + +AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case a @ Average(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialCount = Literal(0L) +val initialSum = Cast(Literal(0L), expr.dataType) +val updateCount = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) + +val result = Divide(Cast(currentSum, DoubleType), Cast(currentCount, DoubleType)) + +AggregateEvaluation( + currentCount :: currentSum :: Nil, +
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333814 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.types._ + +case class AggregateEvaluation( +schema: Seq[Attribute], +initialValues: Seq[Expression], +update: Seq[Expression], +result: Expression) + +/** + * :: DeveloperApi :: + * Alternate version of aggregation that leverages projection and thus code generation. + * Aggregations are converted into a set of projections from a aggregation buffer tuple back onto + * itself. Currently only used for simple aggregations like SUM, COUNT, or AVERAGE are supported. + * + * @param partial if true then aggregation is done partially on local data without shuffling to + *ensure all values where `groupingExpressions` are equal are present. + * @param groupingExpressions expressions that are evaluated to determine grouping. + * @param aggregateExpressions expressions that are computed for each group. + * @param child the input data source. + */ +@DeveloperApi +case class GeneratedAggregate( +partial: Boolean, +groupingExpressions: Seq[Expression], +aggregateExpressions: Seq[NamedExpression], +child: SparkPlan) + extends UnaryNode { + + override def requiredChildDistribution = +if (partial) { + UnspecifiedDistribution :: Nil +} else { + if (groupingExpressions == Nil) { +AllTuples :: Nil + } else { +ClusteredDistribution(groupingExpressions) :: Nil + } +} + + override def output = aggregateExpressions.map(_.toAttribute) + + override def execute() = { +val aggregatesToCompute = aggregateExpressions.flatMap { a = + a.collect { case agg: AggregateExpression = agg} +} + +val computeFunctions = aggregatesToCompute.map { + case c @ Count(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val initialValue = Literal(0L) +val updateFunction = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val result = currentCount + +AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case Sum(expr) = +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialValue = Cast(Literal(0L), expr.dataType) + +// Coalasce avoids double calculation... +// but really, common sub expression elimination would be better +val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) +val result = currentSum + +AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case a @ Average(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialCount = Literal(0L) +val initialSum = Cast(Literal(0L), expr.dataType) +val updateCount = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) + +val result = Divide(Cast(currentSum, DoubleType), Cast(currentCount, DoubleType)) + +AggregateEvaluation( + currentCount :: currentSum :: Nil, +
[GitHub] spark pull request: [SPARK-2652] [PySpark] Turning some default co...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1568#issuecomment-49977116 QA results for PR 1568:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17107/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333832 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.types._ + +case class AggregateEvaluation( +schema: Seq[Attribute], +initialValues: Seq[Expression], +update: Seq[Expression], +result: Expression) + +/** + * :: DeveloperApi :: + * Alternate version of aggregation that leverages projection and thus code generation. + * Aggregations are converted into a set of projections from a aggregation buffer tuple back onto + * itself. Currently only used for simple aggregations like SUM, COUNT, or AVERAGE are supported. + * + * @param partial if true then aggregation is done partially on local data without shuffling to + *ensure all values where `groupingExpressions` are equal are present. + * @param groupingExpressions expressions that are evaluated to determine grouping. + * @param aggregateExpressions expressions that are computed for each group. + * @param child the input data source. + */ +@DeveloperApi +case class GeneratedAggregate( +partial: Boolean, +groupingExpressions: Seq[Expression], +aggregateExpressions: Seq[NamedExpression], +child: SparkPlan) + extends UnaryNode { + + override def requiredChildDistribution = +if (partial) { + UnspecifiedDistribution :: Nil +} else { + if (groupingExpressions == Nil) { +AllTuples :: Nil + } else { +ClusteredDistribution(groupingExpressions) :: Nil + } +} + + override def output = aggregateExpressions.map(_.toAttribute) + + override def execute() = { +val aggregatesToCompute = aggregateExpressions.flatMap { a = + a.collect { case agg: AggregateExpression = agg} +} + +val computeFunctions = aggregatesToCompute.map { + case c @ Count(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val initialValue = Literal(0L) +val updateFunction = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val result = currentCount + +AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case Sum(expr) = +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialValue = Cast(Literal(0L), expr.dataType) + +// Coalasce avoids double calculation... +// but really, common sub expression elimination would be better +val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) +val result = currentSum + +AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case a @ Average(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialCount = Literal(0L) +val initialSum = Cast(Literal(0L), expr.dataType) +val updateCount = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) + +val result = Divide(Cast(currentSum, DoubleType), Cast(currentCount, DoubleType)) + +AggregateEvaluation( + currentCount :: currentSum :: Nil, +
[GitHub] spark pull request: Build should not run hive tests by default.
Github user ScrapCodes commented on the pull request: https://github.com/apache/spark/pull/1565#issuecomment-49977140 I feel, making those two examples to compile optionally(by isolating) is better than isolating all the hive tests like this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333860 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.types._ + +case class AggregateEvaluation( +schema: Seq[Attribute], +initialValues: Seq[Expression], +update: Seq[Expression], +result: Expression) + +/** + * :: DeveloperApi :: + * Alternate version of aggregation that leverages projection and thus code generation. + * Aggregations are converted into a set of projections from a aggregation buffer tuple back onto + * itself. Currently only used for simple aggregations like SUM, COUNT, or AVERAGE are supported. + * + * @param partial if true then aggregation is done partially on local data without shuffling to + *ensure all values where `groupingExpressions` are equal are present. + * @param groupingExpressions expressions that are evaluated to determine grouping. + * @param aggregateExpressions expressions that are computed for each group. + * @param child the input data source. + */ +@DeveloperApi +case class GeneratedAggregate( +partial: Boolean, +groupingExpressions: Seq[Expression], +aggregateExpressions: Seq[NamedExpression], +child: SparkPlan) + extends UnaryNode { + + override def requiredChildDistribution = +if (partial) { + UnspecifiedDistribution :: Nil +} else { + if (groupingExpressions == Nil) { +AllTuples :: Nil + } else { +ClusteredDistribution(groupingExpressions) :: Nil + } +} + + override def output = aggregateExpressions.map(_.toAttribute) + + override def execute() = { +val aggregatesToCompute = aggregateExpressions.flatMap { a = + a.collect { case agg: AggregateExpression = agg} +} + +val computeFunctions = aggregatesToCompute.map { + case c @ Count(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val initialValue = Literal(0L) +val updateFunction = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val result = currentCount + +AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case Sum(expr) = +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialValue = Cast(Literal(0L), expr.dataType) + +// Coalasce avoids double calculation... +// but really, common sub expression elimination would be better +val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) +val result = currentSum + +AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case a @ Average(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialCount = Literal(0L) +val initialSum = Cast(Literal(0L), expr.dataType) +val updateCount = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) + +val result = Divide(Cast(currentSum, DoubleType), Cast(currentCount, DoubleType)) + +AggregateEvaluation( + currentCount :: currentSum :: Nil, +
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333900 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.types._ + +case class AggregateEvaluation( +schema: Seq[Attribute], +initialValues: Seq[Expression], +update: Seq[Expression], +result: Expression) + +/** + * :: DeveloperApi :: + * Alternate version of aggregation that leverages projection and thus code generation. + * Aggregations are converted into a set of projections from a aggregation buffer tuple back onto + * itself. Currently only used for simple aggregations like SUM, COUNT, or AVERAGE are supported. + * + * @param partial if true then aggregation is done partially on local data without shuffling to + *ensure all values where `groupingExpressions` are equal are present. + * @param groupingExpressions expressions that are evaluated to determine grouping. + * @param aggregateExpressions expressions that are computed for each group. + * @param child the input data source. + */ +@DeveloperApi +case class GeneratedAggregate( +partial: Boolean, +groupingExpressions: Seq[Expression], +aggregateExpressions: Seq[NamedExpression], +child: SparkPlan) + extends UnaryNode { + + override def requiredChildDistribution = +if (partial) { + UnspecifiedDistribution :: Nil +} else { + if (groupingExpressions == Nil) { +AllTuples :: Nil + } else { +ClusteredDistribution(groupingExpressions) :: Nil + } +} + + override def output = aggregateExpressions.map(_.toAttribute) + + override def execute() = { +val aggregatesToCompute = aggregateExpressions.flatMap { a = + a.collect { case agg: AggregateExpression = agg} +} + +val computeFunctions = aggregatesToCompute.map { + case c @ Count(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val initialValue = Literal(0L) +val updateFunction = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val result = currentCount + +AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case Sum(expr) = +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialValue = Cast(Literal(0L), expr.dataType) + +// Coalasce avoids double calculation... +// but really, common sub expression elimination would be better +val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) +val result = currentSum + +AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case a @ Average(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialCount = Literal(0L) +val initialSum = Cast(Literal(0L), expr.dataType) +val updateCount = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) + +val result = Divide(Cast(currentSum, DoubleType), Cast(currentCount, DoubleType)) + +AggregateEvaluation( + currentCount :: currentSum :: Nil, +
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333888 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.types._ + +case class AggregateEvaluation( +schema: Seq[Attribute], +initialValues: Seq[Expression], +update: Seq[Expression], +result: Expression) + +/** + * :: DeveloperApi :: + * Alternate version of aggregation that leverages projection and thus code generation. + * Aggregations are converted into a set of projections from a aggregation buffer tuple back onto + * itself. Currently only used for simple aggregations like SUM, COUNT, or AVERAGE are supported. + * + * @param partial if true then aggregation is done partially on local data without shuffling to + *ensure all values where `groupingExpressions` are equal are present. + * @param groupingExpressions expressions that are evaluated to determine grouping. + * @param aggregateExpressions expressions that are computed for each group. + * @param child the input data source. + */ +@DeveloperApi +case class GeneratedAggregate( +partial: Boolean, +groupingExpressions: Seq[Expression], +aggregateExpressions: Seq[NamedExpression], +child: SparkPlan) + extends UnaryNode { + + override def requiredChildDistribution = +if (partial) { + UnspecifiedDistribution :: Nil +} else { + if (groupingExpressions == Nil) { +AllTuples :: Nil + } else { +ClusteredDistribution(groupingExpressions) :: Nil + } +} + + override def output = aggregateExpressions.map(_.toAttribute) + + override def execute() = { +val aggregatesToCompute = aggregateExpressions.flatMap { a = + a.collect { case agg: AggregateExpression = agg} +} + +val computeFunctions = aggregatesToCompute.map { + case c @ Count(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val initialValue = Literal(0L) +val updateFunction = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val result = currentCount + +AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case Sum(expr) = +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialValue = Cast(Literal(0L), expr.dataType) + +// Coalasce avoids double calculation... +// but really, common sub expression elimination would be better +val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) +val result = currentSum + +AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case a @ Average(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialCount = Literal(0L) +val initialSum = Cast(Literal(0L), expr.dataType) +val updateCount = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) + +val result = Divide(Cast(currentSum, DoubleType), Cast(currentCount, DoubleType)) + +AggregateEvaluation( + currentCount :: currentSum :: Nil, +
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333936 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -18,22 +18,53 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Logging, Row} +import org.apache.spark.sql.{SQLContext, Row} import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.GenericRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.BaseRelation import org.apache.spark.sql.catalyst.plans.physical._ + +object SparkPlan { + protected[sql] val currentContext = new ThreadLocal[SQLContext]() +} + /** * :: DeveloperApi :: */ @DeveloperApi -abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { +abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable { self: Product = + /** + * A handle to the SQL Context that was used to create this plan. Since many operators need + * access to the sqlContext for RDD operations or configuration this field is automatically + * populated by the query planning infrastructure. + */ + @transient + protected val sqlContext = SparkPlan.currentContext.get() + + protected def sparkContext = sqlContext.sparkContext + + def logger = log + + val codegenEnabled: Boolean = if(sqlContext != null) { --- End diff -- when is context null? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333966 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -51,8 +82,46 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { */ def executeCollect(): Array[Row] = execute().map(_.copy()).collect() - protected def buildRow(values: Seq[Any]): Row = -new GenericRow(values.toArray) + protected def newProjection( + expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = { +log.debug( + sCreating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled) +if (codegenEnabled) { + GenerateProjection(expressions, inputSchema) +} else { + new InterpretedProjection(expressions, inputSchema) +} + } + + protected def newMutableProjection( + expressions: Seq[Expression], + inputSchema: Seq[Attribute]): () = MutableProjection = { +log.debug( + sCreating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled) +if(codegenEnabled) { + GenerateMutableProjection(expressions, inputSchema) --- End diff -- This is another use of apply that is very confusing, because it is not obvious GenerateMutableProjection is a closure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2125] Add sort flag and move sort into ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1210#issuecomment-49977441 QA results for PR 1210:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17108/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333978 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -192,9 +187,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { val relation = ParquetRelation.create(path, child, sparkContext.hadoopConfiguration) // Note: overwrite=false because otherwise the metadata we just created will be deleted -InsertIntoParquetTable(relation, planLater(child), overwrite=false)(sqlContext) :: Nil +InsertIntoParquetTable(relation, planLater(child), overwrite=false) :: Nil --- End diff -- nit: space after/before = --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15334051 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala --- @@ -300,8 +298,16 @@ case class LeftSemiJoinBNL( case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode { def output = left.output ++ right.output - def execute() = left.execute().map(_.copy()).cartesian(right.execute().map(_.copy())).map { -case (l: Row, r: Row) = buildRow(l ++ r) + def execute() = { +val leftResults = left.execute().map(_.copy()) +val rightResults = right.execute().map(_.copy()) + +leftResults.cartesian(rightResults).mapPartitions { iter = + val joinedRow = new JoinedRow + iter.map { +case (l: Row, r: Row) = joinedRow(l, r) --- End diff -- maybe perf doesn't matter too much here since it is a cartesian product already, but you can remove the pattern matching to improve perf. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15334075 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala --- @@ -17,6 +17,7 @@ package org.apache.spark.sql.parquet +import org.apache.spark.sql.execution.SparkPlan --- End diff -- import order --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15334329 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -341,8 +336,9 @@ class DAGScheduler( if (registeredStages.isEmpty || registeredStages.get.isEmpty) { logError(No stages registered for job + job.jobId) } else { - stageIdToJobIds.filterKeys(stageId = registeredStages.get.contains(stageId)).foreach { -case (stageId, jobSet) = + stageIdToStage.filter(s = registeredStages.get.contains(s._1)).foreach { --- End diff -- i'd need to do more hash lookups if it is filterKeys --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15334407 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s$prefix$javaSeparator${curId.getAndIncrement}) + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[EvaluatedExpression]], which contains Scala trees that + *
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1561#issuecomment-49978964 Thanks. Pushed a version that should have addressed most of the comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1561#discussion_r15334478 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -728,8 +697,6 @@ class DAGScheduler( private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug(submitMissingTasks( + stage + )) // Get our pending tasks and remember them in our pendingTasks entry -val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet) --- End diff -- do i need to call ```stage.pendingTasks = new HashSet``` here? @kayousterhout @markhamstra --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2492][Streaming] kafkaReceiver minor ch...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1420#issuecomment-49979126 QA results for PR 1420:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17109/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2665] [SQL] Add EqualNS Unit Tests
GitHub user chenghao-intel opened a pull request: https://github.com/apache/spark/pull/1570 [SPARK-2665] [SQL] Add EqualNS Unit Tests Hive Supports the operator =, which returns same result with EQUAL(=) operator for non-null operands, but returns TRUE if both are NULL, FALSE if one of the them is NULL. You can merge this pull request into a Git repository by running: $ git pull https://github.com/chenghao-intel/spark equalns Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1570.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1570 commit 7af4b0b3894fbd416ba2ecacbe2a705219f52619 Author: Cheng Hao hao.ch...@intel.com Date: 2014-07-24T08:03:01Z Add EqualNS Unit Tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Part of [SPARK-2456] Removed some HashMaps fro...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1561#issuecomment-49979231 QA tests have started for PR 1561. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17112/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2665] [SQL] Add EqualNS Unit Tests
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1570#issuecomment-49979539 This is really nice - passes a lot more tests. I guess we will eventually run into the problem that the unit tests without parallelization will take too long... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2665] [SQL] Add EqualNS Unit Tests
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1570#issuecomment-49979557 Do you mind looking into how we can parallelize the hive compatibility tests? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2665] [SQL] Add EqualNS Unit Tests
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1570#issuecomment-49979649 QA tests have started for PR 1570. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17113/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2663] [SQL] Support the Grouping Set
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1567#issuecomment-49979679 QA tests have started for PR 1567. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17114/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2665] [SQL] Add EqualNS Unit Tests
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/1570#issuecomment-49979808 Yes, I will think about how to parallelize those tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2657 Use more compact data structures th...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1555#discussion_r15334913 --- Diff: core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +/** + * An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers. + * ArrayBuffer always allocates an Object array to store the data, with 16 entries by default, + * so it has about 80-100 bytes of overhead. In contrast, CompactBuffer can keep up to two + * elements in fields of the main object, and only allocates an Array[AnyRef] if there are more + * entries than that. This makes it more efficient for operations like groupBy where we expect + * some keys to have very few elements. + */ +private[spark] class CompactBuffer[T] extends Seq[T] with Serializable { + // First two elements + private var element0: T = _ + private var element1: T = _ + + // Number of elements, including our two in the main object + private var curSize = 0 + + // Array for extra elements + private var otherElements: Array[AnyRef] = null + + def apply(position: Int): T = { +if (position 0 || position = curSize) { + throw new IndexOutOfBoundsException +} +if (position == 0) { + element0 +} else if (position == 1) { + element1 +} else { + otherElements(position - 2).asInstanceOf[T] +} + } + + def update(position: Int, value: T): Unit = { --- End diff -- i guess internally. maybe private? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---