[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2065#discussion_r16758766 --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.flume + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + +import com.google.common.base.Throwables + +import org.apache.spark.Logging +import org.apache.spark.streaming.flume.sink._ + +private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with --- End diff -- I missed this earlier. Can you add docs on what this class does?? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Added support for :cp jar that was broken in...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1929#issuecomment-53529843 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19293/consoleFull) for PR 1929 at commit [`f420cbf`](https://github.com/apache/spark/commit/f420cbf00a5f98c8eec73d251ed1d6b9352ad063). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2065#discussion_r16758798 --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.flume + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + +import com.google.common.base.Throwables + +import org.apache.spark.Logging +import org.apache.spark.streaming.flume.sink._ + +private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with + Logging { + + def run(): Unit = { +while (!receiver.isStopped()) { + val connection = receiver.getConnections.poll() + val client = connection.client + var batchReceived = false + var seq: CharSequence = null + try { +getBatch(client) match { + case Some(eventBatch) = +batchReceived = true +seq = eventBatch.getSequenceNumber +val events = toSparkFlumeEvents(eventBatch.getEvents) +if (store(events)) { + sendAck(client, seq) +} else { + sendNack(batchReceived, client, seq) +} + case None = +} + } catch { +case e: Exception = + Throwables.getRootCause(e) match { +// If the cause was an InterruptedException, then check if the receiver is stopped - +// if yes, just break out of the loop. Else send a Nack and log a warning. +// In the unlikely case, the cause was not an Exception, +// then just throw it out and exit. +case interrupted: InterruptedException = + if (!receiver.isStopped()) { +logWarning(Interrupted while receiving data from Flume, interrupted) +sendNack(batchReceived, client, seq) + } +case exception: Exception = + logWarning(Error while receiving data from Flume, exception) + sendNack(batchReceived, client, seq) + } + } finally { +receiver.getConnections.add(connection) + } +} + } + + /** + * Gets a batch of events from the specified client. This method does not handle any exceptions + * which will be propogated to the caller. + * @param client Client to get events from + * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]] + */ + private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = { +val eventBatch = client.getEventBatch(receiver.getMaxBatchSize) +if (!SparkSinkUtils.isErrorBatch(eventBatch)) { + // No error, proceed with processing data + logDebug(Received batch of + eventBatch.getEvents.size + events with sequence number: --- End diff -- nit: With so many clauses in the string, its better to use string interpolation. `logDebug(sReceived batch of ${eventBatch.getEvents.size} events with sequence number: ${eventBatch.getSequenceNumber})` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3198] [SQL] Remove the TreeNode.id
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/2155#issuecomment-53529874 This is a follow up of #2114 after discussing with @marmbrus --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3198] [SQL] Remove the TreeNode.id
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2155#issuecomment-53529888 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19292/consoleFull) for PR 2155 at commit [`bec42d2`](https://github.com/apache/spark/commit/bec42d23fa6d3c3f956a1fd291d76e0919daf7d7). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/2065#discussion_r16758820 --- Diff: external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.flume + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + +import com.google.common.base.Throwables + +import org.apache.spark.Logging +import org.apache.spark.streaming.flume.sink._ + +private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends Runnable with + Logging { + + def run(): Unit = { +while (!receiver.isStopped()) { + val connection = receiver.getConnections.poll() + val client = connection.client + var batchReceived = false + var seq: CharSequence = null + try { +getBatch(client) match { + case Some(eventBatch) = +batchReceived = true +seq = eventBatch.getSequenceNumber +val events = toSparkFlumeEvents(eventBatch.getEvents) +if (store(events)) { + sendAck(client, seq) +} else { + sendNack(batchReceived, client, seq) +} + case None = +} + } catch { +case e: Exception = + Throwables.getRootCause(e) match { +// If the cause was an InterruptedException, then check if the receiver is stopped - +// if yes, just break out of the loop. Else send a Nack and log a warning. +// In the unlikely case, the cause was not an Exception, +// then just throw it out and exit. +case interrupted: InterruptedException = + if (!receiver.isStopped()) { +logWarning(Interrupted while receiving data from Flume, interrupted) +sendNack(batchReceived, client, seq) + } +case exception: Exception = + logWarning(Error while receiving data from Flume, exception) + sendNack(batchReceived, client, seq) + } + } finally { +receiver.getConnections.add(connection) + } +} + } + + /** + * Gets a batch of events from the specified client. This method does not handle any exceptions + * which will be propogated to the caller. + * @param client Client to get events from + * @return [[Some]] which contains the event batch if Flume sent any events back, else [[None]] + */ + private def getBatch(client: SparkFlumeProtocol.Callback): Option[EventBatch] = { +val eventBatch = client.getEventBatch(receiver.getMaxBatchSize) +if (!SparkSinkUtils.isErrorBatch(eventBatch)) { + // No error, proceed with processing data + logDebug(Received batch of + eventBatch.getEvents.size + events with sequence number: ++ eventBatch.getSequenceNumber) + Some(eventBatch) +} else { + logWarning( --- End diff -- nit: Again, spilling into three lines is weird. ``` logWarning(Did not receive events from Flume agent due to error on the Flume agent: + eventBatch.getErrorMsg) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/2065#issuecomment-53530019 This now looks quite good to me. There were a few more formatting issues, should take 5 minutes to solve :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3167] Handle special driver configs in ...
GitHub user andrewor14 opened a pull request: https://github.com/apache/spark/pull/2156 [SPARK-3167] Handle special driver configs in Windows (Branch 1.1) This is an effort to bring the Windows scripts up to speed after recent splashing changes in #1845. You can merge this pull request into a Git repository by running: $ git pull https://github.com/andrewor14/spark windows-config-branch-1.1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2156.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2156 commit 00b9dfe7012aab765f4d3fbf44f52f0cf335a01d Author: Andrew Or andrewo...@gmail.com Date: 2014-08-27T05:52:16Z [SPARK-3167] Handle special driver configs in Windows This is an effort to bring the Windows scripts up to speed after recent splashing changes in #1845. Author: Andrew Or andrewo...@gmail.com Closes #2129 from andrewor14/windows-config and squashes the following commits: 881a8f0 [Andrew Or] Add reference to Windows taskkill 92e6047 [Andrew Or] Update a few comments (minor) 22b1acd [Andrew Or] Fix style again (minor) afcffea [Andrew Or] Fix style (minor) 72004c2 [Andrew Or] Actually respect --driver-java-options 803218b [Andrew Or] Actually respect SPARK_*_CLASSPATH eeb34a0 [Andrew Or] Update outdated comment (minor) 35caecc [Andrew Or] In Windows, actually kill Java processes on exit f97daa2 [Andrew Or] Fix Windows spark shell stdin issue 83ebe60 [Andrew Or] Parse special driver configs in Windows (broken) Conflicts: bin/spark-class2.cmd --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3139] Made ContextCleaner to not block ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2143#issuecomment-53530146 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19289/consoleFull) for PR 2143 at commit [`9c84202`](https://github.com/apache/spark/commit/9c84202631ccc82a99179e7a9dbfdff3a1d32c55). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2917] [SQL] Avoid table creation in log...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1846#issuecomment-53530198 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19287/consoleFull) for PR 1846 at commit [`ae1e072`](https://github.com/apache/spark/commit/ae1e072e78d3653f5850487e825afb6ba4571bcd). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `$FWDIR/bin/spark-submit --class $CLASS $` * `class ExternalSorter(object):` * `$FWDIR/bin/spark-submit --class $CLASS $` * `protected class AttributeEquals(val a: Attribute) ` * `case class CreateTableAsSelect(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3139] Made ContextCleaner to not block ...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/2143#issuecomment-53530260 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3139] Made ContextCleaner to not block ...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/2143#issuecomment-53530257 The failed unit test is unrelated to this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759047 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.storage.{FileSegment, ShuffleBlockId} --- End diff -- sort the import --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759034 --- Diff: core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala --- @@ -15,21 +15,24 @@ * limitations under the License. */ -package org.apache.spark.storage +package org.apache.spark.shuffle import java.io.File +import java.nio.ByteBuffer import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConversions._ +import scala.Some --- End diff -- remove this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3139] Made ContextCleaner to not block ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2143#issuecomment-53530699 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19296/consoleFull) for PR 2143 at commit [`9c84202`](https://github.com/apache/spark/commit/9c84202631ccc82a99179e7a9dbfdff3a1d32c55). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759141 --- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala --- @@ -17,14 +17,18 @@ package org.apache.spark.shuffle.sort -import java.io.{DataInputStream, FileInputStream} +import java.util.concurrent.ConcurrentHashMap +import org.apache.spark.{SparkConf, TaskContext, ShuffleDependency} import org.apache.spark.shuffle._ -import org.apache.spark.{TaskContext, ShuffleDependency} import org.apache.spark.shuffle.hash.HashShuffleReader -import org.apache.spark.storage.{DiskBlockManager, FileSegment, ShuffleBlockId} +import org.apache.spark.storage.ShuffleBlockId + +private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager { + + val indexShuffleBlockManager = new IndexShuffleBlockManager(conf) --- End diff -- tighten the accessibility - make everything private --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1812] Enable cross build for scala 2.11...
Github user ScrapCodes closed the pull request at: https://github.com/apache/spark/pull/996 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2805] akka 2.3.4
Github user ScrapCodes commented on the pull request: https://github.com/apache/spark/pull/1685#issuecomment-53530947 This looks good to me, given automated tests pass. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759235 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -34,8 +34,11 @@ import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ import org.apache.spark.network.netty.client.BlockFetchingClientFactory import org.apache.spark.network.netty.server.BlockServer +import org.apache.spark.network.netty.PathResolver --- End diff -- sort the imports --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759256 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -85,18 +87,62 @@ private[spark] class BlockManager( val tachyonStorePath = s$storeDir/$appFolderName/${this.executorId} val tachyonMaster = conf.get(spark.tachyonStore.url, tachyon://localhost:19998) val tachyonBlockManager = - new TachyonBlockManager(shuffleBlockManager, tachyonStorePath, tachyonMaster) + new TachyonBlockManager(this, tachyonStorePath, tachyonMaster) tachyonInitialized = true new TachyonStore(this, tachyonBlockManager) } - private val useNetty = conf.getBoolean(spark.shuffle.use.netty, false) + val shuffleManager = { +// Let the user specify short names for shuffle managers +val shortShuffleMgrNames = Map( + hash - org.apache.spark.shuffle.hash.HashShuffleManager, + sort - org.apache.spark.shuffle.sort.SortShuffleManager) +val shuffleMgrName = conf.get(spark.shuffle.manager, hash) +val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) + +val clazz = Class.forName(shuffleMgrClass, true, Utils.getContextOrSparkClassLoader) +try { + clazz.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[ShuffleManager] +} catch { + case _: NoSuchMethodException = +clazz.getConstructor().newInstance().asInstanceOf[ShuffleManager] +} + } + + var shuffleBlockPathResolver: PathResolver = null + private val useNetty = { +val userConfigNetty = conf.getBoolean(spark.shuffle.use.netty, false) + +if (userConfigNetty (shuffleManager.isInstanceOf[HashShuffleManager] + || shuffleManager.isInstanceOf[SortShuffleManager])) { + shuffleBlockPathResolver = shuffleManager.shuffleBlockManager.asInstanceOf[PathResolver] + true +} else { + false +} + } // If we use Netty for shuffle, start a new Netty-based shuffle sender service. private[storage] val nettyBlockClientFactory: BlockFetchingClientFactory = { if (useNetty) new BlockFetchingClientFactory(conf) else null } + /* --- End diff -- u can remove this code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759266 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -107,6 +153,11 @@ private[spark] class BlockManager( } } + if (useNetty (shuffleManager.isInstanceOf[HashShuffleManager] +|| shuffleManager.isInstanceOf[SortShuffleManager])) { +val resolver = shuffleManager.shuffleBlockManager.asInstanceOf[PathResolver] --- End diff -- does this actually do anything? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759276 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -573,7 +634,7 @@ private[spark] class BlockManager( serializer: Serializer, readMetrics: ShuffleReadMetrics): BlockFetcherIterator = { val iter = - if (conf.getBoolean(spark.shuffle.use.netty, false)) { +if (useNetty) { --- End diff -- indent is off here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759291 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -573,7 +634,7 @@ private[spark] class BlockManager( serializer: Serializer, readMetrics: ShuffleReadMetrics): BlockFetcherIterator = { val iter = - if (conf.getBoolean(spark.shuffle.use.netty, false)) { +if (useNetty) { --- End diff -- but fyi .. as part of https://issues.apache.org/jira/browse/SPARK-3019 I'm cleaning all these stuff up and there won't be any Netty specific code anymore, so it's probably ok to not clean this up here (but don't worry about it since you have done it already) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759331 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -89,25 +89,33 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc } } - override def getBytes(blockId: BlockId): Option[ByteBuffer] = { -val segment = diskManager.getBlockLocation(blockId) -val channel = new RandomAccessFile(segment.file, r).getChannel + private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = { +val channel = new RandomAccessFile(file, r).getChannel try { // For small files, directly read rather than memory map - if (segment.length minMemoryMapBytes) { -val buf = ByteBuffer.allocate(segment.length.toInt) -channel.read(buf, segment.offset) + if (file.length minMemoryMapBytes) { --- End diff -- I think u want length instead of file.length here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1241#issuecomment-53531388 Thanks for doing this. To help with the review, can you write a short design doc discussing the interfaces between different components, similar to the one attached here https://issues.apache.org/jira/browse/SPARK-3019 ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759577 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -89,25 +89,33 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc } } - override def getBytes(blockId: BlockId): Option[ByteBuffer] = { -val segment = diskManager.getBlockLocation(blockId) -val channel = new RandomAccessFile(segment.file, r).getChannel + private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = { +val channel = new RandomAccessFile(file, r).getChannel try { // For small files, directly read rather than memory map - if (segment.length minMemoryMapBytes) { -val buf = ByteBuffer.allocate(segment.length.toInt) -channel.read(buf, segment.offset) + if (file.length minMemoryMapBytes) { +val buf = ByteBuffer.allocate(length.toInt) +channel.read(buf, offset) buf.flip() Some(buf) } else { -Some(channel.map(MapMode.READ_ONLY, segment.offset, segment.length)) +Some(channel.map(MapMode.READ_ONLY, offset, length)) } } finally { channel.close() } } + override def getBytes(blockId: BlockId): Option[ByteBuffer] = { +val file = diskManager.getFile(blockId.name) +getBytes(file, 0, file.length) + } + + def getBytes(segment: FileSegment): Option[ByteBuffer] = { +getBytes(segment.file, segment.offset, segment.length) + } + override def getValues(blockId: BlockId): Option[Iterator[Any]] = { --- End diff -- can u add a todo here that getValues should bypass getBytes to use stream based APIs? Otherwise this uses a lot of memory during external sort merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3239] [PySpark] randomize the dirs for ...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/2152#issuecomment-53531754 test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759644 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -121,20 +129,16 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc } override def remove(blockId: BlockId): Boolean = { -val fileSegment = diskManager.getBlockLocation(blockId) -val file = fileSegment.file -if (file.exists() file.length() == fileSegment.length) { +val file = diskManager.getFile(blockId.name) --- End diff -- this changes the behavior to always delete the file even if consolidation is on. Is this intended / correct? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759699 --- Diff: core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockManager.scala --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.storage.{FileSegment, ShuffleBlockId} +import java.nio.ByteBuffer + +private[spark] +trait ShuffleBlockManager { + type ShuffleId = Int + + def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] --- End diff -- can you add some javadoc explaining what this method is for? e.g. is it for getting local blocks or remote blocks? In what condition does it return Some(buf) vs None? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3239] [PySpark] randomize the dirs for ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2152#issuecomment-53532166 QA tests have started for PR 2152. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19297/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759777 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.io._ +import java.nio.ByteBuffer + +import org.apache.spark.{SparkEnv, SparkConf} +import org.apache.spark.network.netty.PathResolver +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage._ +import org.apache.spark.storage.ShuffleBlockId + +private[spark] +class IndexShuffleBlockManager(conf: SparkConf) + extends ShuffleBlockManager with PathResolver { + + private lazy val blockManager = SparkEnv.get.blockManager + + // Mapping to a single shuffleBlockId with reduce ID 0 that we'll used to write all results to. + private def consolidatedId(blockId: ShuffleBlockId): ShuffleBlockId = { +blockId.copy(reduceId = 0) + } + + def getDataFile(blockId: ShuffleBlockId): File = { +blockManager.diskBlockManager.getFile(consolidatedId(blockId)) + } + + private def getIndexFile(blockId: ShuffleBlockId): File = { +blockManager.diskBlockManager.getFile(consolidatedId(blockId).name + .index) + } + + // Remove data file and index file that contain the output data from one map. + // Only need to be called once for all shuffleBlockId belong to one map output. + def removeDataByMap(blockId: ShuffleBlockId): Unit = { +var file = getDataFile(blockId) +if (file.exists) { + file.delete --- End diff -- add parenthesis since delete has side effect --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759788 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.io._ +import java.nio.ByteBuffer + +import org.apache.spark.{SparkEnv, SparkConf} +import org.apache.spark.network.netty.PathResolver +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage._ +import org.apache.spark.storage.ShuffleBlockId + +private[spark] +class IndexShuffleBlockManager(conf: SparkConf) + extends ShuffleBlockManager with PathResolver { + + private lazy val blockManager = SparkEnv.get.blockManager + + // Mapping to a single shuffleBlockId with reduce ID 0 that we'll used to write all results to. + private def consolidatedId(blockId: ShuffleBlockId): ShuffleBlockId = { +blockId.copy(reduceId = 0) + } + + def getDataFile(blockId: ShuffleBlockId): File = { +blockManager.diskBlockManager.getFile(consolidatedId(blockId)) + } + + private def getIndexFile(blockId: ShuffleBlockId): File = { +blockManager.diskBlockManager.getFile(consolidatedId(blockId).name + .index) + } + + // Remove data file and index file that contain the output data from one map. + // Only need to be called once for all shuffleBlockId belong to one map output. + def removeDataByMap(blockId: ShuffleBlockId): Unit = { +var file = getDataFile(blockId) +if (file.exists) { + file.delete +} + +file = getIndexFile(blockId) +if (file.exists) { --- End diff -- add parenthesis since exists / delete have side effect (in the case of exists it looks up the inode) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759807 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.io._ +import java.nio.ByteBuffer + +import org.apache.spark.{SparkEnv, SparkConf} +import org.apache.spark.network.netty.PathResolver +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage._ +import org.apache.spark.storage.ShuffleBlockId + +private[spark] +class IndexShuffleBlockManager(conf: SparkConf) + extends ShuffleBlockManager with PathResolver { + + private lazy val blockManager = SparkEnv.get.blockManager + + // Mapping to a single shuffleBlockId with reduce ID 0 that we'll used to write all results to. + private def consolidatedId(blockId: ShuffleBlockId): ShuffleBlockId = { +blockId.copy(reduceId = 0) + } + + def getDataFile(blockId: ShuffleBlockId): File = { +blockManager.diskBlockManager.getFile(consolidatedId(blockId)) + } + + private def getIndexFile(blockId: ShuffleBlockId): File = { +blockManager.diskBlockManager.getFile(consolidatedId(blockId).name + .index) + } + + // Remove data file and index file that contain the output data from one map. + // Only need to be called once for all shuffleBlockId belong to one map output. + def removeDataByMap(blockId: ShuffleBlockId): Unit = { --- End diff -- this is a fairly awkard way of specifying a map .. why don't you just take the mapId in? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3167] Handle special driver configs in ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2129#issuecomment-53532419 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19291/consoleFull) for PR 2129 at commit [`881a8f0`](https://github.com/apache/spark/commit/881a8f0d03046bf074776a8e6c820a99fad02d11). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `rem In this case, leave out the main class (org.apache.spark.deploy.SparkSubmit) and use our own.` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759871 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.io._ +import java.nio.ByteBuffer + +import org.apache.spark.{SparkEnv, SparkConf} +import org.apache.spark.network.netty.PathResolver +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage._ +import org.apache.spark.storage.ShuffleBlockId + +private[spark] +class IndexShuffleBlockManager(conf: SparkConf) + extends ShuffleBlockManager with PathResolver { + + private lazy val blockManager = SparkEnv.get.blockManager + + // Mapping to a single shuffleBlockId with reduce ID 0 that we'll used to write all results to. + private def consolidatedId(blockId: ShuffleBlockId): ShuffleBlockId = { +blockId.copy(reduceId = 0) + } + + def getDataFile(blockId: ShuffleBlockId): File = { +blockManager.diskBlockManager.getFile(consolidatedId(blockId)) + } + + private def getIndexFile(blockId: ShuffleBlockId): File = { +blockManager.diskBlockManager.getFile(consolidatedId(blockId).name + .index) + } + + // Remove data file and index file that contain the output data from one map. + // Only need to be called once for all shuffleBlockId belong to one map output. + def removeDataByMap(blockId: ShuffleBlockId): Unit = { +var file = getDataFile(blockId) +if (file.exists) { + file.delete +} + +file = getIndexFile(blockId) +if (file.exists) { + file.delete +} + } + + // Write an index file with the offsets of each block, plus a final offset at the end for the + // end of the output file. This will be used by getBlockLocation to figure out where each block + // begins and ends. + def writeIndexFile(blockId: ShuffleBlockId, offsets: Array[Long]) = { +val indexFile = getIndexFile(blockId) +val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))) +try { + for(offset - offsets) { --- End diff -- space after for --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3243] Don't use stale spark-driver.* sy...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2154#issuecomment-53532443 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19290/consoleFull) for PR 2154 at commit [`17ec6fc`](https://github.com/apache/spark/commit/17ec6fc4923ea1159fc90c2a4356fd438e0bc033). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3241][SQL] create NumberFormat instance...
GitHub user baishuo opened a pull request: https://github.com/apache/spark/pull/2157 [SPARK-3241][SQL] create NumberFormat instance by threadsafe way You can merge this pull request into a Git repository by running: $ git pull https://github.com/baishuo/spark patch-threadlocal Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2157.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2157 commit 5d05a01d7737ee86ed42cb004b01d0cf22d4d695 Author: baishuo vc_j...@hotmail.com Date: 2014-08-27T03:12:24Z create NumberFormat instance by threadsafe way --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3167] Handle special driver configs in ...
Github user andrewor14 closed the pull request at: https://github.com/apache/spark/pull/2156 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759951 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.io._ +import java.nio.ByteBuffer + +import org.apache.spark.{SparkEnv, SparkConf} +import org.apache.spark.network.netty.PathResolver +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage._ +import org.apache.spark.storage.ShuffleBlockId + +private[spark] +class IndexShuffleBlockManager(conf: SparkConf) + extends ShuffleBlockManager with PathResolver { + + private lazy val blockManager = SparkEnv.get.blockManager + + // Mapping to a single shuffleBlockId with reduce ID 0 that we'll used to write all results to. + private def consolidatedId(blockId: ShuffleBlockId): ShuffleBlockId = { +blockId.copy(reduceId = 0) + } + + def getDataFile(blockId: ShuffleBlockId): File = { +blockManager.diskBlockManager.getFile(consolidatedId(blockId)) + } + + private def getIndexFile(blockId: ShuffleBlockId): File = { +blockManager.diskBlockManager.getFile(consolidatedId(blockId).name + .index) + } + + // Remove data file and index file that contain the output data from one map. + // Only need to be called once for all shuffleBlockId belong to one map output. + def removeDataByMap(blockId: ShuffleBlockId): Unit = { +var file = getDataFile(blockId) +if (file.exists) { + file.delete +} + +file = getIndexFile(blockId) +if (file.exists) { + file.delete +} + } + + // Write an index file with the offsets of each block, plus a final offset at the end for the + // end of the output file. This will be used by getBlockLocation to figure out where each block + // begins and ends. + def writeIndexFile(blockId: ShuffleBlockId, offsets: Array[Long]) = { +val indexFile = getIndexFile(blockId) +val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))) +try { + for(offset - offsets) { +out.writeLong(offset) + } +} finally { + out.close() +} + } + + /** + * Get the location of a block in a map output file. Uses the index file we create for it. + * */ + def getBlockLocation(blockId: ShuffleBlockId): FileSegment = { --- End diff -- since ShuffleBlockId is just a BlockId, can you make this private? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16759963 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.io._ +import java.nio.ByteBuffer + +import org.apache.spark.{SparkEnv, SparkConf} +import org.apache.spark.network.netty.PathResolver +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage._ +import org.apache.spark.storage.ShuffleBlockId --- End diff -- this import is redundant --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3241][SQL] create NumberFormat instance...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2157#issuecomment-53532690 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16760004 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.io._ +import java.nio.ByteBuffer + +import org.apache.spark.{SparkEnv, SparkConf} +import org.apache.spark.network.netty.PathResolver +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage._ +import org.apache.spark.storage.ShuffleBlockId + +private[spark] +class IndexShuffleBlockManager(conf: SparkConf) + extends ShuffleBlockManager with PathResolver { + + private lazy val blockManager = SparkEnv.get.blockManager + + // Mapping to a single shuffleBlockId with reduce ID 0 that we'll used to write all results to. + private def consolidatedId(blockId: ShuffleBlockId): ShuffleBlockId = { +blockId.copy(reduceId = 0) + } + + def getDataFile(blockId: ShuffleBlockId): File = { +blockManager.diskBlockManager.getFile(consolidatedId(blockId)) + } + + private def getIndexFile(blockId: ShuffleBlockId): File = { +blockManager.diskBlockManager.getFile(consolidatedId(blockId).name + .index) + } + + // Remove data file and index file that contain the output data from one map. --- End diff -- use /** */ instead of // --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16760032 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.io._ +import java.nio.ByteBuffer + +import org.apache.spark.{SparkEnv, SparkConf} +import org.apache.spark.network.netty.PathResolver +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage._ +import org.apache.spark.storage.ShuffleBlockId + +private[spark] +class IndexShuffleBlockManager(conf: SparkConf) --- End diff -- can u add javadoc explaining what this class does ... and the structure of the blocks (e.g. reduce id 0 is where we put all the blocks) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3170][CORE][BUG]:RDD info loss in Stor...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2131#discussion_r16760036 --- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala --- @@ -68,7 +68,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { // Otherwise, cache the values and keep track of any updates in block statuses val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) - context.taskMetrics.updatedBlocks = Some(updatedBlocks) + val metrics = context.taskMetrics + val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) + metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq) --- End diff -- I raised this in the other PR and since we have a new one I'll just ask again. Can you explain what this fixes? My understanding is that this can only be called once per task, and since this is the only place where we set `updatedBlocks` I don't see how the original `TaskMetrics` could already have updated blocks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3170][CORE][BUG]:RDD info loss in Stor...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2131#discussion_r16760052 --- Diff: core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala --- @@ -70,8 +70,10 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Spar } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized { -// Remove all partitions that are no longer cached -_rddInfoMap.retain { case (_, info) = info.numCachedPartitions 0 } +// Remove all partitions that are no longer cached in current completed stage +val completedRddInfoIds = Set[Int]() ++ stageCompleted.stageInfo.rddInfos.map(r = r.id) +_rddInfoMap.retain { case (id, info) = + !completedRddInfoIds.contains(id) || info.numCachedPartitions 0 } --- End diff -- also, the style should be ``` _rddInfoMap.retain { case (id, info) = !completed... } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Added support for :cp jar that was broken in...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1929#issuecomment-53533093 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19293/consoleFull) for PR 1929 at commit [`f420cbf`](https://github.com/apache/spark/commit/f420cbf00a5f98c8eec73d251ed1d6b9352ad063). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `rem In this case, leave out the main class (org.apache.spark.deploy.SparkSubmit) and use our own.` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3170][CORE][BUG]:RDD info loss in Stor...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2131#discussion_r16760122 --- Diff: core/src/test/scala/org/apache/spark/CacheManagerSuite.scala --- @@ -87,4 +99,12 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar assert(value.toList === List(1, 2, 3, 4)) } } + + test(verity task metrics updated correctly) { +blockManager = sc.env.blockManager +cacheManager = new CacheManager(blockManager) --- End diff -- is there a reason why you're not using `sc.env.cacheManager`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16760213 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.io._ +import java.nio.ByteBuffer + +import org.apache.spark.{SparkEnv, SparkConf} +import org.apache.spark.network.netty.PathResolver +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage._ +import org.apache.spark.storage.ShuffleBlockId + +private[spark] +class IndexShuffleBlockManager(conf: SparkConf) --- End diff -- also i don't think conf is used at all? maybe remove it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16760248 --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala --- @@ -25,6 +25,9 @@ import org.apache.spark.shuffle._ * mapper (possibly reusing these across waves of tasks). */ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager { + + val fileShuffleBlockManager = new FileShuffleBlockManager(conf) --- End diff -- private --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2065#issuecomment-53533488 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19298/consoleFull) for PR 2065 at commit [`f93a07c`](https://github.com/apache/spark/commit/f93a07c9e6cd4b34a276e9c75aa601fdafa67ef7). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16760326 --- Diff: core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala --- @@ -181,17 +171,30 @@ class ShuffleBlockManager(blockManager: BlockManager, /** * Returns the physical file segment in which the given BlockId is located. - * This function should only be called if shuffle file consolidation is enabled, as it is - * an error condition if we don't find the expected block. */ def getBlockLocation(id: ShuffleBlockId): FileSegment = { -// Search all file groups associated with this shuffle. -val shuffleState = shuffleStates(id.shuffleId) -for (fileGroup - shuffleState.allFileGroups) { - val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId) - if (segment.isDefined) { return segment.get } +if (consolidateShuffleFiles) { + // Search all file groups associated with this shuffle. + val shuffleState = shuffleStates(id.shuffleId) + val iter = shuffleState.allFileGroups.iterator + while (iter.hasNext) { +val segment = iter.next.getFileSegmentFor(id.mapId, id.reduceId) +if (segment.isDefined) { return segment.get } + } + throw new IllegalStateException(Failed to find shuffle block: + id) +} else { + val file = blockManager.diskBlockManager.getFile(id) --- End diff -- this no longer supports file consolidation, does it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3170][CORE][BUG]:RDD info loss in Stor...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/2131#discussion_r16760352 --- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala --- @@ -68,7 +68,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { // Otherwise, cache the values and keep track of any updates in block statuses val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) - context.taskMetrics.updatedBlocks = Some(updatedBlocks) + val metrics = context.taskMetrics + val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) + metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq) --- End diff -- A single task could call `getOrCompute` multiple times in a chain - if it is computing several pipelined RDD's. ``` a.cache().filter(---).cache().count ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3170][CORE][BUG]:RDD info loss in Stor...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2131#discussion_r16760497 --- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala --- @@ -68,7 +68,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { // Otherwise, cache the values and keep track of any updates in block statuses val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) - context.taskMetrics.updatedBlocks = Some(updatedBlocks) + val metrics = context.taskMetrics + val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) + metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq) --- End diff -- Ah I see. Makes sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3139] Made ContextCleaner to not block ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2143#issuecomment-53534120 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19296/consoleFull) for PR 2143 at commit [`9c84202`](https://github.com/apache/spark/commit/9c84202631ccc82a99179e7a9dbfdff3a1d32c55). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `rem In this case, leave out the main class (org.apache.spark.deploy.SparkSubmit) and use our own.` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3200][REPL] Classes defined with refere...
GitHub user ScrapCodes opened a pull request: https://github.com/apache/spark/pull/2158 [SPARK-3200][REPL] Classes defined with reference to external variables,... ... should not crash REPL. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ScrapCodes/spark-1 SPARK-3200/repl_importing_mech Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2158.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2158 commit bd74e0bdeaa2f29f6d19d517fe1bc25d3142644a Author: Prashant Sharma prashan...@imaginea.com Date: 2014-08-27T07:10:05Z [SPARK-3200][REPL] Classes defined with reference to external variables, should not crash REPL. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3198] [SQL] Remove the TreeNode.id
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2155#issuecomment-53534171 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19292/consoleFull) for PR 2155 at commit [`bec42d2`](https://github.com/apache/spark/commit/bec42d23fa6d3c3f956a1fd291d76e0919daf7d7). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `rem In this case, leave out the main class (org.apache.spark.deploy.SparkSubmit) and use our own.` * `$FWDIR/bin/spark-submit --class $CLASS $` * `class ExternalSorter(object):` * `$FWDIR/bin/spark-submit --class $CLASS $` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3170][CORE][BUG]:RDD info loss in Stor...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2131#discussion_r16760627 --- Diff: core/src/test/scala/org/apache/spark/CacheManagerSuite.scala --- @@ -87,4 +99,12 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar assert(value.toList === List(1, 2, 3, 4)) } } + + test(verity task metrics updated correctly) { +blockManager = sc.env.blockManager +cacheManager = new CacheManager(blockManager) +val context = new TaskContext(0, 0, 0) +cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY) +assert(context.taskMetrics.updatedBlocks.getOrElse(Seq()).size == 2) --- End diff -- Can you use `===` here instead of `==` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3139] Made ContextCleaner to not block ...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/2143#issuecomment-53534279 Okay thanks TD and Andrew - I'm pulling this in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3200][REPL] Classes defined with refere...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2158#issuecomment-53534504 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19299/consoleFull) for PR 2158 at commit [`051ed3c`](https://github.com/apache/spark/commit/051ed3cd255203ca29c23b46dca767836940dc20). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user colorant commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16760826 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -107,6 +153,11 @@ private[spark] class BlockManager( } } + if (useNetty (shuffleManager.isInstanceOf[HashShuffleManager] +|| shuffleManager.isInstanceOf[SortShuffleManager])) { +val resolver = shuffleManager.shuffleBlockManager.asInstanceOf[PathResolver] --- End diff -- yeap, suppose other non file based shuffleBlockManager not implement PathResolver --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3139] Akka timeouts from ContextCleaner...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/2056#issuecomment-53534681 This was fixed by a different approach in #2143. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3139] Made ContextCleaner to not block ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/2143 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3241][SQL] create NumberFormat instance...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/2157#issuecomment-53534767 I don't think we need to change anything. `NumberFormat` is not thread-safe mean we can not share the same `NumberFormat` instance among different threads (coz they may call the `parse` or `format` methods). Obviously, it's not the case here. And the `NumberFormat.getInstance()` itself should be thread-safe if I understand correctly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user colorant commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16760900 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -107,6 +153,11 @@ private[spark] class BlockManager( } } + if (useNetty (shuffleManager.isInstanceOf[HashShuffleManager] +|| shuffleManager.isInstanceOf[SortShuffleManager])) { +val resolver = shuffleManager.shuffleBlockManager.asInstanceOf[PathResolver] --- End diff -- oh, I c, sorry for that. should do a code clean to remove the redundant code before push --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3170][CORE][BUG]:RDD info loss in Stor...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/2131#discussion_r16761022 --- Diff: core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala --- @@ -70,8 +70,10 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Spar } override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized { -// Remove all partitions that are no longer cached -_rddInfoMap.retain { case (_, info) = info.numCachedPartitions 0 } +// Remove all partitions that are no longer cached in current completed stage +val completedRddInfoIds = Set[Int]() ++ stageCompleted.stageInfo.rddInfos.map(r = r.id) --- End diff -- Also, I would just call this `completedRddIds` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2608][Core] Fixed command line option p...
Github user tnachen commented on the pull request: https://github.com/apache/spark/pull/2145#issuecomment-53535667 Ok I just tried it on a mesos cluster and it didn't work. The classpath it put in the Mesos command is where spark lives in the spark shell in another host, but not in just pulled down spark-executor from the tar. sh -c 'cd spark-1*; /usr/bin/java -cp ::/home/jclouds/src/spark/conf:/home/jclouds/src/spark/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop1.0.4.jar -XX:MaxPermSize=128m -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@highly-available-182-e54.c.modern-saga-648.internal:47860/user/CoarseGrainedScheduler 20140818-071808-3483423754-5050-2070-4 10.151.50.130 2' So we must still compute classpaths after it's pulled down, not wherever the spark-shell is being executed and assume it's going to run the tared spark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Added support for :cp jar that was broken in...
Github user gkossakowski commented on the pull request: https://github.com/apache/spark/pull/1929#issuecomment-53536315 @gkossakowski, thanks for the detailed reply. From my point of view, what we want when new JARs are added is for earlier JARs to take precedence. This is what makes the most sense. If you already instantiated an object from the previous version of the class and stored it in a variable, it's not possible for it to suddenly change class. So instead the effect should be the same as tacking on another JAR at the end of your classpath -- only classes that are not found in earlier JARs come from there. Would these semantics be possible to implement for 2.11? We agree on semantics. I called changing existing class shadowing but we mean the same: changes to existing classes should not be allowed. Adding jars to the classpath means just adding new classes that were not previously available. For that we need merging of packages as I explained earlier. It's possible to implement this kind of API for 2.11 but it doesn't exist yet . I hope we can figure out how to merge your changes and work on the API on the compiler side. The current approach of going deep into internals of `Global` as seen in this PR is fine as a short term experimentation so you can quickly deliver a fix to your users. Long term solution would be migrating most of the Spark's code that talks to compiler internals to Scala code base. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3170][CORE][BUG]:RDD info loss in Stor...
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/2131#discussion_r16761636 --- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala --- @@ -68,7 +68,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { // Otherwise, cache the values and keep track of any updates in block statuses val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) - context.taskMetrics.updatedBlocks = Some(updatedBlocks) + val metrics = context.taskMetrics + val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) + metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq) --- End diff -- @andrewor14 IMHO, the getOrCompute can be called more than once per task (indirect recursively). In this code snippet: val rdd1 = sc.parallelize(...).cache() val rdd2 = rdd1.map(...).cache() val count = rdd2.count() This code snippet will submit one stage . We take task-1 as an example. Task-1 firstly calls getOrCompute(rdd-2) , and then calls getOrCompute(rdd-1) inside getOrCompute(rdd-2). Therefore, it will generates and caches block rdd-1-1 and block rdd-2-1 one by one. At the end of getOrCompute(rdd-1), the taskMetrics.updatedBlocks of task-1 will be seq(rdd-1-1). Then at the end of getOrCompute(rdd-2), the taskMetrics.updatedBlocks will be seq(rdd-1-1, rdd-2-1). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2608][Core] Fixed command line option p...
Github user liancheng commented on the pull request: https://github.com/apache/spark/pull/2145#issuecomment-53536988 @tnachen Thanks a lot! I'm currently working on another version that can figure out executor side classpath correctly. The basic idea is: 1. we still start the executor with `spark-class`, and 1. we pass `extraJavaOpts` and `extraLibraryPath` via `SPARK_EXECUTOR_OPTS`, which is recognized by `spark-class` and not used anywhere else. You may find the WIP version [here](https://github.com/liancheng/spark/compare/apache:branch-1.1...liancheng:mesos-fix-with-env-var?expand=1#diff-d425d35aa23c47a62fbb538554f2f2cfR123). Discussed with @pwendell about this solution tonight, and it seems workable. And it's also much simpler. For now, the only issue is that it cannot handle quoted string with spaces correctly (i.e. `-Dfoo=bar bar`). It might be buggy in other ways though, still testing it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2608][Core] Fixed command line option p...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/2145#issuecomment-53537143 Hey guys, yeah this is an issue with the approach of using the utilities from the standalone deploy mode for this - it makes assumptions that don't hold in mesos mode. I spoke a bit offline with @liancheng and I think there is a much simpler/surgical fix that will unblock the Spark 1.1 release. But we should have a nicer way of building up the command in Scala like is done here. It might mean we slightly re-factor things so that parts of the utility functions for standalone mode can be used here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Added support for :cp jar that was broken in...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/1929#issuecomment-53537357 @mateiz I'd prefer not to merge this into branch-1.1 at this point unless you see a really compelling need. Scarred from Spark 1.0.0 which actually released with a major REPL bug (which was itself, an attempt to fix another bug!). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2065#issuecomment-53537410 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19298/consoleFull) for PR 2065 at commit [`f93a07c`](https://github.com/apache/spark/commit/f93a07c9e6cd4b34a276e9c75aa601fdafa67ef7). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `rem In this case, leave out the main class (org.apache.spark.deploy.SparkSubmit) and use our own.` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3170][CORE][BUG]:RDD info loss in Stor...
Github user uncleGen commented on a diff in the pull request: https://github.com/apache/spark/pull/2131#discussion_r16762096 --- Diff: core/src/test/scala/org/apache/spark/CacheManagerSuite.scala --- @@ -87,4 +99,12 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar assert(value.toList === List(1, 2, 3, 4)) } } + + test(verity task metrics updated correctly) { +blockManager = sc.env.blockManager +cacheManager = new CacheManager(blockManager) +val context = new TaskContext(0, 0, 0) +cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY) +assert(context.taskMetrics.updatedBlocks.getOrElse(Seq()).size == 2) --- End diff -- sorry for my poor coding, I will review again --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2608][Core] Fixed command line option p...
Github user tnachen commented on the pull request: https://github.com/apache/spark/pull/2145#issuecomment-53537607 I'm glad we're having these conversations :) Really helping folks that have bad experience using Mesos with Spark. I'm looking forward for the fix and once it's updated I can verify the fix with our mesos cluster. I'm chatting with Mesos committers about different issues people are hitting and I'll be addressing those in future patches. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL] [SPARK-3236] Reading Parquet tables from...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/2150#issuecomment-53537794 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3235][SQL] Ensure in-memory tables don'...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/2147#issuecomment-53537765 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3239] [PySpark] randomize the dirs for ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2152#issuecomment-53537875 QA results for PR 2152:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19297/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3237][SQL] Fix parquet filters with UDF...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/2153 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3237][SQL] Fix parquet filters with UDF...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/2153#issuecomment-53538008 Thanks for looking this over! Merged to master and 1.1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2608][Core] Fixed command line option p...
Github user tnachen commented on the pull request: https://github.com/apache/spark/pull/2145#issuecomment-53538424 Also I think I didn't mention it explicitly, I've been testing with having a spark tar ball available through a HTTP server and setting SPARK_EXECUTOR_URI to that, and slaves have no spark installed. I know folks are using both cases where the executor uri is either set or unset, which defaults to spark_home. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user colorant commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16762489 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -121,20 +129,16 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc } override def remove(blockId: BlockId): Boolean = { -val fileSegment = diskManager.getBlockLocation(blockId) -val file = fileSegment.file -if (file.exists() file.length() == fileSegment.length) { +val file = diskManager.getFile(blockId.name) --- End diff -- no, it won't change the behavior, when consolidation is on, the physical file name is different from blockId.name, so it won't delete anything. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3200][REPL] Classes defined with refere...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2158#issuecomment-53538916 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19299/consoleFull) for PR 2158 at commit [`051ed3c`](https://github.com/apache/spark/commit/051ed3cd255203ca29c23b46dca767836940dc20). * This patch **passes** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16762641 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -121,20 +129,16 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc } override def remove(blockId: BlockId): Boolean = { -val fileSegment = diskManager.getBlockLocation(blockId) -val file = fileSegment.file -if (file.exists() file.length() == fileSegment.length) { +val file = diskManager.getFile(blockId.name) --- End diff -- are you saying diskManager.getFile would return a file that doens't exist when consolidation is on? if that's the case, can u add inline comment explaining this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2608][Core] Fixed command line option p...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/2145#issuecomment-53539011 hi @liancheng , is there a situation we should cover ` -Dfoo=bar bar ` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2830][MLLIB] doc update for 1.1
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/2151#issuecomment-53539701 Merged into master and branch-1.1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2830][MLLIB] doc update for 1.1
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/2151 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2288] Hide ShuffleBlockManager behind S...
Github user colorant commented on a diff in the pull request: https://github.com/apache/spark/pull/1241#discussion_r16763705 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -121,20 +129,16 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc } override def remove(blockId: BlockId): Boolean = { -val fileSegment = diskManager.getBlockLocation(blockId) -val file = fileSegment.file -if (file.exists() file.length() == fileSegment.length) { +val file = diskManager.getFile(blockId.name) --- End diff -- Yep, for consolidation. But think again for sortShuffleManager, since all data currently is stored in reduiceId 0 file, then it will lead to some problem. I am wondering to change the physical filename mapping there too to solve this problem. since we won't want disk manager to know anything about shuffle block name mapping. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3227] [mllib] Added migration guide for...
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/2146#issuecomment-53542182 LGTM. Merged into master and branch-1.1. Thanks! (This doesn't touch code, so I skipped Jenkins.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3227] [mllib] Added migration guide for...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/2146 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3173][SQL] Timestamp support in the par...
Github user byF commented on the pull request: https://github.com/apache/spark/pull/2084#issuecomment-53542655 I had a problem with running the tests, eventually [figured it out](http://mail-archives.apache.org/mod_mbox/spark-dev/201406.mbox/%3cdfe1084a-3c58-44c2-9b7c-9161e76e5...@gmail.com%3E) The tests added and the literal conversion works. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/2065#issuecomment-53542841 This looks real good now. Thanks @harishreedharan for all changes and the wonderful refactoring. I am going to quickly test this in my local flume set up for double confirmation. If it works out, will merge this in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3241][SQL] create NumberFormat instance...
Github user baishuo commented on the pull request: https://github.com/apache/spark/pull/2157#issuecomment-53544332 thank you @chenghao-intel . I think I didnt express what I think clearly. why there is a threadlocal is to ensure there is one and only one NumberFormat instance in the same thread. othrewise, if open was called more than once, there maybe more than one instance of NumberFormat --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3139] Akka timeouts from ContextCleaner...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2056#issuecomment-53544987 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19302/consoleFull) for PR 2056 at commit [`eb6aa5a`](https://github.com/apache/spark/commit/eb6aa5ad4524d9afba7512d545636cda1673374c). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3139] Akka timeouts from ContextCleaner...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2056#issuecomment-53545458 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19303/consoleFull) for PR 2056 at commit [`dbf3834`](https://github.com/apache/spark/commit/dbf38340ebc1c34924403e04639cebe37afa27bc). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3139] Akka timeouts from ContextCleaner...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2056#issuecomment-53545805 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19303/consoleFull) for PR 2056 at commit [`dbf3834`](https://github.com/apache/spark/commit/dbf38340ebc1c34924403e04639cebe37afa27bc). * This patch **fails** unit tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3241][SQL] create NumberFormat instance...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/2157#discussion_r16765443 --- Diff: sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala --- @@ -71,7 +71,7 @@ private[hive] class SparkHiveHadoopWriter( } def open() { -val numfmt = NumberFormat.getInstance() +val numfmt = SparkHiveHadoopWriter.threadLocalNumberFormat.get() --- End diff -- This is a local variable, right? It's not shared with other threads. It is already creating a new instance for each call. Or if you argue it could be the same instance -- a `ThreadLocal` wouldn't help. But it really can't be a singleton. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3139] Akka timeouts from ContextCleaner...
Github user witgo commented on the pull request: https://github.com/apache/spark/pull/2056#issuecomment-53546333 @tdas @pwendell We do not need to wait for clear `RDD`,`Broadcast` #2143 does not solve the timeout in [removeShuffle method](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala#L159) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3154][STREAMING] Make FlumePollingInput...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/2065#issuecomment-53546318 Alright, tested this. Merging it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3241][SQL] create NumberFormat instance...
Github user srowen commented on the pull request: https://github.com/apache/spark/pull/2157#issuecomment-53546624 Sorry for commenting out of order -- I missed the original comment. @baishuo why do you want to avoid multiple instances per thread? that does not solve a thread-safety problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3139] Akka timeouts from ContextCleaner...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2056#issuecomment-53546851 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19304/consoleFull) for PR 2056 at commit [`a7a0347`](https://github.com/apache/spark/commit/a7a0347ed1c7ea37a6148ecb5e8f571e5d6937b5). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org