spark git commit: [SPARK-12321][SQL] JSON format for TreeNode (use reflection)
Repository: spark Updated Branches: refs/heads/master 474eb21a3 -> 7634fe951 [SPARK-12321][SQL] JSON format for TreeNode (use reflection) An alternative solution for https://github.com/apache/spark/pull/10295 , instead of implementing json format for all logical/physical plans and expressions, use reflection to implement it in `TreeNode`. Here I use pre-order traversal to flattern a plan tree to a plan list, and add an extra field `num-children` to each plan node, so that we can reconstruct the tree from the list. example json: logical plan tree: ``` [ { "class" : "org.apache.spark.sql.catalyst.plans.logical.Sort", "num-children" : 1, "order" : [ [ { "class" : "org.apache.spark.sql.catalyst.expressions.SortOrder", "num-children" : 1, "child" : 0, "direction" : "Ascending" }, { "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference", "num-children" : 0, "name" : "i", "dataType" : "integer", "nullable" : true, "metadata" : { }, "exprId" : { "id" : 10, "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6" }, "qualifiers" : [ ] } ] ], "global" : false, "child" : 0 }, { "class" : "org.apache.spark.sql.catalyst.plans.logical.Project", "num-children" : 1, "projectList" : [ [ { "class" : "org.apache.spark.sql.catalyst.expressions.Alias", "num-children" : 1, "child" : 0, "name" : "i", "exprId" : { "id" : 10, "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6" }, "qualifiers" : [ ] }, { "class" : "org.apache.spark.sql.catalyst.expressions.Add", "num-children" : 2, "left" : 0, "right" : 1 }, { "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference", "num-children" : 0, "name" : "a", "dataType" : "integer", "nullable" : true, "metadata" : { }, "exprId" : { "id" : 0, "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6" }, "qualifiers" : [ ] }, { "class" : "org.apache.spark.sql.catalyst.expressions.Literal", "num-children" : 0, "value" : "1", "dataType" : "integer" } ], [ { "class" : "org.apache.spark.sql.catalyst.expressions.Alias", "num-children" : 1, "child" : 0, "name" : "j", "exprId" : { "id" : 11, "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6" }, "qualifiers" : [ ] }, { "class" : "org.apache.spark.sql.catalyst.expressions.Multiply", "num-children" : 2, "left" : 0, "right" : 1 }, { "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference", "num-children" : 0, "name" : "a", "dataType" : "integer", "nullable" : true, "metadata" : { }, "exprId" : { "id" : 0, "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6" }, "qualifiers" : [ ] }, { "class" : "org.apache.spark.sql.catalyst.expressions.Literal", "num-children" : 0, "value" : "2", "dataType" : "integer" } ] ], "child" : 0 }, { "class" : "org.apache.spark.sql.catalyst.plans.logical.LocalRelation", "num-children" : 0, "output" : [ [ { "class" : "org.apache.spark.sql.catalyst.expressions.AttributeReference", "num-children" : 0, "name" : "a", "dataType" : "integer", "nullable" : true, "metadata" : { }, "exprId" : { "id" : 0, "jvmId" : "cd1313c7-3f66-4ed7-a320-7d91e4633ac6" }, "qualifiers" : [ ] } ] ], "data" : [ ] } ] ``` Author: Wenchen FanCloses #10311 from cloud-fan/toJson-reflection. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7634fe95 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7634fe95 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7634fe95 Branch: refs/heads/master Commit: 7634fe9511e1a8fb94979624b1b617b495b48ad3 Parents: 474eb21 Author: Wenchen Fan Authored: Mon Dec 21 12:47:07 2015 -0800 Committer: Michael Armbrust Committed: Mon Dec 21 12:47:07 2015 -0800 -- .../spark/sql/catalyst/ScalaReflection.scala| 114 .../expressions/aggregate/interfaces.scala | 1 - .../sql/catalyst/expressions/literals.scala | 41 +++ .../catalyst/expressions/namedExpressions.scala | 4 + .../spark/sql/catalyst/plans/QueryPlan.scala| 2 + .../spark/sql/catalyst/trees/TreeNode.scala | 258 ++- .../org/apache/spark/sql/types/DataType.scala | 6 +- .../spark/sql/execution/ExistingRDD.scala | 4 +- .../columnar/InMemoryColumnarTableScan.scala| 6 +- .../scala/org/apache/spark/sql/QueryTest.scala | 102 +++- .../apache/spark/sql/UserDefinedTypeSuite.scala | 5 + .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 + .../hive/execution/ScriptTransformation.scala | 2 +- 13 files changed, 472 insertions(+), 75 deletions(-)
spark git commit: Doc typo: ltrim = trim from left end, not right
Repository: spark Updated Branches: refs/heads/master 1eb90bc9c -> fc6dbcc70 Doc typo: ltrim = trim from left end, not right Author: pshearerCloses #10414 from pshearer/patch-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc6dbcc7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc6dbcc7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc6dbcc7 Branch: refs/heads/master Commit: fc6dbcc7038c2b030ef6a2dc8be5848499ccee1c Parents: 1eb90bc Author: pshearer Authored: Mon Dec 21 14:04:59 2015 -0800 Committer: Andrew Or Committed: Mon Dec 21 14:04:59 2015 -0800 -- python/pyspark/sql/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fc6dbcc7/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 9062594..25594d7 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1053,7 +1053,7 @@ _string_functions = { 'lower': 'Converts a string column to lower case.', 'upper': 'Converts a string column to upper case.', 'reverse': 'Reverses the string column and returns it as a new string column.', -'ltrim': 'Trim the spaces from right end for the specified string value.', +'ltrim': 'Trim the spaces from left end for the specified string value.', 'rtrim': 'Trim the spaces from right end for the specified string value.', 'trim': 'Trim the spaces from both ends for the specified string column.', } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Doc typo: ltrim = trim from left end, not right
Repository: spark Updated Branches: refs/heads/branch-1.6 d6a519ff2 -> c754a0879 Doc typo: ltrim = trim from left end, not right Author: pshearerCloses #10414 from pshearer/patch-1. (cherry picked from commit fc6dbcc7038c2b030ef6a2dc8be5848499ccee1c) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c754a087 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c754a087 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c754a087 Branch: refs/heads/branch-1.6 Commit: c754a08793458813d608e48ad1b158da770cd992 Parents: d6a519f Author: pshearer Authored: Mon Dec 21 14:04:59 2015 -0800 Committer: Andrew Or Committed: Mon Dec 21 14:05:07 2015 -0800 -- python/pyspark/sql/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c754a087/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 9062594..25594d7 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1053,7 +1053,7 @@ _string_functions = { 'lower': 'Converts a string column to lower case.', 'upper': 'Converts a string column to upper case.', 'reverse': 'Reverses the string column and returns it as a new string column.', -'ltrim': 'Trim the spaces from right end for the specified string value.', +'ltrim': 'Trim the spaces from left end for the specified string value.', 'rtrim': 'Trim the spaces from right end for the specified string value.', 'trim': 'Trim the spaces from both ends for the specified string column.', } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Doc typo: ltrim = trim from left end, not right
Repository: spark Updated Branches: refs/heads/branch-1.5 eb54c914a -> 4d54ba896 Doc typo: ltrim = trim from left end, not right Author: pshearerCloses #10414 from pshearer/patch-1. (cherry picked from commit fc6dbcc7038c2b030ef6a2dc8be5848499ccee1c) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d54ba89 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d54ba89 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d54ba89 Branch: refs/heads/branch-1.5 Commit: 4d54ba896345f60fc240fd03d89ec590f02bb06e Parents: eb54c91 Author: pshearer Authored: Mon Dec 21 14:04:59 2015 -0800 Committer: Andrew Or Committed: Mon Dec 21 14:05:19 2015 -0800 -- python/pyspark/sql/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4d54ba89/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index fa3bd3f..f6762d4 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -956,7 +956,7 @@ _string_functions = { 'lower': 'Converts a string column to lower case.', 'upper': 'Converts a string column to upper case.', 'reverse': 'Reverses the string column and returns it as a new string column.', -'ltrim': 'Trim the spaces from right end for the specified string value.', +'ltrim': 'Trim the spaces from left end for the specified string value.', 'rtrim': 'Trim the spaces from right end for the specified string value.', 'trim': 'Trim the spaces from both ends for the specified string column.', } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12374][SPARK-12150][SQL] Adding logical/physical operators for Range
Repository: spark Updated Branches: refs/heads/master 7634fe951 -> 4883a5087 [SPARK-12374][SPARK-12150][SQL] Adding logical/physical operators for Range Based on the suggestions from marmbrus , added logical/physical operators for Range for improving the performance. Also added another API for resolving the JIRA Spark-12150. Could you take a look at my implementation, marmbrus ? If not good, I can rework it. : ) Thank you very much! Author: gatorsmileCloses #10335 from gatorsmile/rangeOperators. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4883a508 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4883a508 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4883a508 Branch: refs/heads/master Commit: 4883a5087d481d4de5d3beabbd709853de01399a Parents: 7634fe9 Author: gatorsmile Authored: Mon Dec 21 13:46:58 2015 -0800 Committer: Michael Armbrust Committed: Mon Dec 21 13:46:58 2015 -0800 -- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../catalyst/plans/logical/basicOperators.scala | 32 ++ .../scala/org/apache/spark/sql/SQLContext.scala | 23 +--- .../spark/sql/execution/SparkStrategies.scala | 2 + .../spark/sql/execution/basicOperators.scala| 62 .../org/apache/spark/sql/DataFrameSuite.scala | 5 ++ .../execution/ExchangeCoordinatorSuite.scala| 1 + 7 files changed, 119 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4883a508/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 194ecc0..81a4d0a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -759,7 +759,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli val numElements: BigInt = { val safeStart = BigInt(start) val safeEnd = BigInt(end) - if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) { + if ((safeEnd - safeStart) % step == 0 || (safeEnd > safeStart) != (step > 0)) { (safeEnd - safeStart) / step } else { // the remainder has the same sign with range, could add 1 more http://git-wip-us.apache.org/repos/asf/spark/blob/4883a508/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index ec42b76..64ef4d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -210,6 +210,38 @@ case class Sort( override def output: Seq[Attribute] = child.output } +/** Factory for constructing new `Range` nodes. */ +object Range { + def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = { +val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes +new Range(start, end, step, numSlices, output) + } +} + +case class Range( +start: Long, +end: Long, +step: Long, +numSlices: Int, +output: Seq[Attribute]) extends LeafNode { + require(step != 0, "step cannot be 0") + val numElements: BigInt = { +val safeStart = BigInt(start) +val safeEnd = BigInt(end) +if ((safeEnd - safeStart) % step == 0 || (safeEnd > safeStart) != (step > 0)) { + (safeEnd - safeStart) / step +} else { + // the remainder has the same sign with range, could add 1 more + (safeEnd - safeStart) / step + 1 +} + } + + override def statistics: Statistics = { +val sizeInBytes = LongType.defaultSize * numElements +Statistics( sizeInBytes = sizeInBytes ) + } +} + case class Aggregate( groupingExpressions: Seq[Expression], aggregateExpressions: Seq[NamedExpression], http://git-wip-us.apache.org/repos/asf/spark/blob/4883a508/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index db286ea..eadf5cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++
spark git commit: [SPARK-12392][CORE] Optimize a location order of broadcast blocks by considering preferred local hosts
Repository: spark Updated Branches: refs/heads/master 4883a5087 -> 935f46630 [SPARK-12392][CORE] Optimize a location order of broadcast blocks by considering preferred local hosts When multiple workers exist in a host, we can bypass unnecessary remote access for broadcasts; block managers fetch broadcast blocks from the same host instead of remote hosts. Author: Takeshi YAMAMUROCloses #10346 from maropu/OptimizeBlockLocationOrder. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/935f4663 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/935f4663 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/935f4663 Branch: refs/heads/master Commit: 935f46630685306edbdec91f71710703317fe129 Parents: 4883a50 Author: Takeshi YAMAMURO Authored: Mon Dec 21 14:02:40 2015 -0800 Committer: Andrew Or Committed: Mon Dec 21 14:03:23 2015 -0800 -- .../org/apache/spark/storage/BlockManager.scala | 12 +++- .../apache/spark/storage/BlockManagerSuite.scala | 19 ++- 2 files changed, 29 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/935f4663/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6074fc5..b5b7804 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -578,9 +578,19 @@ private[spark] class BlockManager( doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]] } + /** + * Return a list of locations for the given block, prioritizing the local machine since + * multiple block managers can share the same host. + */ + private def getLocations(blockId: BlockId): Seq[BlockManagerId] = { +val locs = Random.shuffle(master.getLocations(blockId)) +val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host } +preferredLocs ++ otherLocs + } + private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") -val locations = Random.shuffle(master.getLocations(blockId)) +val locations = getLocations(blockId) var numFetchFailures = 0 for (loc <- locations) { logDebug(s"Getting remote block $blockId from $loc") http://git-wip-us.apache.org/repos/asf/spark/blob/935f4663/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 53991d8..bf49be3 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -26,6 +26,7 @@ import scala.language.implicitConversions import scala.language.postfixOps import org.mockito.Mockito.{mock, when} +import org.mockito.{Matchers => mc} import org.scalatest._ import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts._ @@ -66,7 +67,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE private def makeBlockManager( maxMem: Long, - name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { + name: String = SparkContext.DRIVER_IDENTIFIER, + master: BlockManagerMaster = this.master): BlockManager = { val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf, @@ -451,6 +453,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(list2DiskGet.get.readMethod === DataReadMethod.Disk) } + test("optimize a location order of blocks") { +val localHost = Utils.localHostName() +val otherHost = "otherHost" +val bmMaster = mock(classOf[BlockManagerMaster]) +val bmId1 = BlockManagerId("id1", localHost, 1) +val bmId2 = BlockManagerId("id2", localHost, 2) +val bmId3 = BlockManagerId("id3", otherHost, 3) +when(bmMaster.getLocations(mc.any[BlockId])).thenReturn(Seq(bmId1, bmId2, bmId3)) + +val blockManager = makeBlockManager(128, "exec", bmMaster) +val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations) +val locations = blockManager
spark git commit: [SPARK-12339][SPARK-11206][WEBUI] Added a null check that was removed in
Repository: spark Updated Branches: refs/heads/master fc6dbcc70 -> b0849b8ae [SPARK-12339][SPARK-11206][WEBUI] Added a null check that was removed in Updates made in SPARK-11206 missed an edge case which cause's a NullPointerException when a task is killed. In some cases when a task ends in failure taskMetrics is initialized as null (see JobProgressListener.onTaskEnd()). To address this a null check was added. Before the changes in SPARK-11206 this null check was called at the start of the updateTaskAccumulatorValues() function. Author: Alex BozarthCloses #10405 from ajbozarth/spark12339. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0849b8a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0849b8a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0849b8a Branch: refs/heads/master Commit: b0849b8aeafa801bb0561f1f6e46dc1d56c37c19 Parents: fc6dbcc Author: Alex Bozarth Authored: Mon Dec 21 14:06:36 2015 -0800 Committer: Andrew Or Committed: Mon Dec 21 14:06:36 2015 -0800 -- .../apache/spark/sql/execution/ui/SQLListener.scala | 14 -- 1 file changed, 8 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b0849b8a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index e19a1e3..622e01c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -160,12 +160,14 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { -updateTaskAccumulatorValues( - taskEnd.taskInfo.taskId, - taskEnd.stageId, - taskEnd.stageAttemptId, - taskEnd.taskMetrics.accumulatorUpdates(), - finishTask = true) +if (taskEnd.taskMetrics != null) { + updateTaskAccumulatorValues( +taskEnd.taskInfo.taskId, +taskEnd.stageId, +taskEnd.stageAttemptId, +taskEnd.taskMetrics.accumulatorUpdates(), +finishTask = true) +} } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12388] change default compression to lz4
Repository: spark Updated Branches: refs/heads/master d655d37dd -> 29cecd4a4 [SPARK-12388] change default compression to lz4 According the benchmark [1], LZ4-java could be 80% (or 30%) faster than Snappy. After changing the compressor to LZ4, I saw 20% improvement on end-to-end time for a TPCDS query (Q4). [1] https://github.com/ning/jvm-compressor-benchmark/wiki cc rxin Author: Davies LiuCloses #10342 from davies/lz4. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29cecd4a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29cecd4a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29cecd4a Branch: refs/heads/master Commit: 29cecd4a42f6969613e5b2a40f2724f99e7eec01 Parents: d655d37 Author: Davies Liu Authored: Mon Dec 21 14:21:43 2015 -0800 Committer: Davies Liu Committed: Mon Dec 21 14:21:43 2015 -0800 -- .rat-excludes | 1 + .../org/apache/spark/io/CompressionCodec.scala | 12 +- .../apache/spark/io/LZ4BlockInputStream.java| 263 +++ .../apache/spark/io/CompressionCodecSuite.scala | 8 +- docs/configuration.md | 2 +- .../execution/ExchangeCoordinatorSuite.scala| 4 +- 6 files changed, 276 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/29cecd4a/.rat-excludes -- diff --git a/.rat-excludes b/.rat-excludes index 7262c96..3544c0f 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -84,3 +84,4 @@ gen-java.* org.apache.spark.sql.sources.DataSourceRegister org.apache.spark.scheduler.SparkHistoryListenerFactory .*parquet +LZ4BlockInputStream.java http://git-wip-us.apache.org/repos/asf/spark/blob/29cecd4a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala -- diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index ca74eed..7178046 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -17,10 +17,10 @@ package org.apache.spark.io -import java.io.{IOException, InputStream, OutputStream} +import java.io._ import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} -import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} +import net.jpountz.lz4.LZ4BlockOutputStream import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf @@ -49,7 +49,8 @@ private[spark] object CompressionCodec { private val configKey = "spark.io.compression.codec" private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = { -codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec] +(codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec] + || codec.isInstanceOf[LZ4CompressionCodec]) } private val shortCompressionCodecNames = Map( @@ -92,12 +93,11 @@ private[spark] object CompressionCodec { } } - val FALLBACK_COMPRESSION_CODEC = "lzf" - val DEFAULT_COMPRESSION_CODEC = "snappy" + val FALLBACK_COMPRESSION_CODEC = "snappy" + val DEFAULT_COMPRESSION_CODEC = "lz4" val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq } - /** * :: DeveloperApi :: * LZ4 implementation of [[org.apache.spark.io.CompressionCodec]]. http://git-wip-us.apache.org/repos/asf/spark/blob/29cecd4a/core/src/main/scala/org/apache/spark/io/LZ4BlockInputStream.java -- diff --git a/core/src/main/scala/org/apache/spark/io/LZ4BlockInputStream.java b/core/src/main/scala/org/apache/spark/io/LZ4BlockInputStream.java new file mode 100644 index 000..27b6f0d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/io/LZ4BlockInputStream.java @@ -0,0 +1,263 @@ +package org.apache.spark.io; + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.EOFException; +import java.io.FilterInputStream; +import
spark git commit: [SPARK-2331] SparkContext.emptyRDD should return RDD[T] not EmptyRDD[T]
Repository: spark Updated Branches: refs/heads/master b0849b8ae -> a820ca19d [SPARK-2331] SparkContext.emptyRDD should return RDD[T] not EmptyRDD[T] Author: Reynold XinCloses #10394 from rxin/SPARK-2331. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a820ca19 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a820ca19 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a820ca19 Branch: refs/heads/master Commit: a820ca19de1fb4daa01939a4b8bde8d874a7f3fc Parents: b0849b8 Author: Reynold Xin Authored: Mon Dec 21 14:07:48 2015 -0800 Committer: Andrew Or Committed: Mon Dec 21 14:07:48 2015 -0800 -- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- project/MimaExcludes.scala | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a820ca19/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 81a4d0a..c4541aa 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1248,7 +1248,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** Get an RDD that has no partitions or elements. */ - def emptyRDD[T: ClassTag]: EmptyRDD[T] = new EmptyRDD[T](this) + def emptyRDD[T: ClassTag]: RDD[T] = new EmptyRDD[T](this) // Methods for creating shared variables http://git-wip-us.apache.org/repos/asf/spark/blob/a820ca19/project/MimaExcludes.scala -- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index a3cfcd2..ad878c1 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -34,6 +34,9 @@ import com.typesafe.tools.mima.core.ProblemFilters._ object MimaExcludes { def excludes(version: String) = version match { case v if v.startsWith("2.0") => + Seq( + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD") +) ++ // When 1.6 is officially released, update this exclusion list. Seq( MimaBuild.excludeSparkPackage("deploy"), - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12466] Fix harmless NPE in tests
Repository: spark Updated Branches: refs/heads/branch-1.6 c754a0879 -> ca3998512 [SPARK-12466] Fix harmless NPE in tests ``` [info] ReplayListenerSuite: [info] - Simple replay (58 milliseconds) java.lang.NullPointerException at org.apache.spark.deploy.master.Master$$anonfun$asyncRebuildSparkUI$1.applyOrElse(Master.scala:982) at org.apache.spark.deploy.master.Master$$anonfun$asyncRebuildSparkUI$1.applyOrElse(Master.scala:980) ``` https://amplab.cs.berkeley.edu/jenkins/view/Spark-QA-Test/job/Spark-Master-SBT/4316/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.2,label=spark-test/consoleFull This was introduced in #10284. It's harmless because the NPE is caused by a race that occurs mainly in `local-cluster` tests (but don't actually fail the tests). Tested locally to verify that the NPE is gone. Author: Andrew OrCloses #10417 from andrewor14/fix-harmless-npe. (cherry picked from commit d655d37ddf59d7fb6db529324ac8044d53b2622a) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca399851 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca399851 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca399851 Branch: refs/heads/branch-1.6 Commit: ca3998512dd7801379c96c9399d3d053ab7472cd Parents: c754a08 Author: Andrew Or Authored: Mon Dec 21 14:09:04 2015 -0800 Committer: Andrew Or Committed: Mon Dec 21 14:09:11 2015 -0800 -- .../src/main/scala/org/apache/spark/deploy/master/Master.scala | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ca399851/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index fc42bf0..5d97c63 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -979,7 +979,11 @@ private[deploy] class Master( futureUI.onSuccess { case Some(ui) => appIdToUI.put(app.id, ui) - self.send(AttachCompletedRebuildUI(app.id)) + // `self` can be null if we are already in the process of shutting down + // This happens frequently in tests where `local-cluster` is used + if (self != null) { +self.send(AttachCompletedRebuildUI(app.id)) + } // Application UI is successfully rebuilt, so link the Master UI to it // NOTE - app.appUIUrlAtHistoryServer is volatile app.appUIUrlAtHistoryServer = Some(ui.basePath) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12466] Fix harmless NPE in tests
Repository: spark Updated Branches: refs/heads/master a820ca19d -> d655d37dd [SPARK-12466] Fix harmless NPE in tests ``` [info] ReplayListenerSuite: [info] - Simple replay (58 milliseconds) java.lang.NullPointerException at org.apache.spark.deploy.master.Master$$anonfun$asyncRebuildSparkUI$1.applyOrElse(Master.scala:982) at org.apache.spark.deploy.master.Master$$anonfun$asyncRebuildSparkUI$1.applyOrElse(Master.scala:980) ``` https://amplab.cs.berkeley.edu/jenkins/view/Spark-QA-Test/job/Spark-Master-SBT/4316/AMPLAB_JENKINS_BUILD_PROFILE=hadoop2.2,label=spark-test/consoleFull This was introduced in #10284. It's harmless because the NPE is caused by a race that occurs mainly in `local-cluster` tests (but don't actually fail the tests). Tested locally to verify that the NPE is gone. Author: Andrew OrCloses #10417 from andrewor14/fix-harmless-npe. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d655d37d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d655d37d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d655d37d Branch: refs/heads/master Commit: d655d37ddf59d7fb6db529324ac8044d53b2622a Parents: a820ca1 Author: Andrew Or Authored: Mon Dec 21 14:09:04 2015 -0800 Committer: Andrew Or Committed: Mon Dec 21 14:09:04 2015 -0800 -- .../src/main/scala/org/apache/spark/deploy/master/Master.scala | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d655d37d/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index fc42bf0..5d97c63 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -979,7 +979,11 @@ private[deploy] class Master( futureUI.onSuccess { case Some(ui) => appIdToUI.put(app.id, ui) - self.send(AttachCompletedRebuildUI(app.id)) + // `self` can be null if we are already in the process of shutting down + // This happens frequently in tests where `local-cluster` is used + if (self != null) { +self.send(AttachCompletedRebuildUI(app.id)) + } // Application UI is successfully rebuilt, so link the Master UI to it // NOTE - app.appUIUrlAtHistoryServer is volatile app.appUIUrlAtHistoryServer = Some(ui.basePath) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11807] Remove support for Hadoop < 2.2
Repository: spark Updated Branches: refs/heads/master 29cecd4a4 -> 0a38637d0 [SPARK-11807] Remove support for Hadoop < 2.2 i.e. Hadoop 1 and Hadoop 2.0 Author: Reynold XinCloses #10404 from rxin/SPARK-11807. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a38637d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a38637d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a38637d Branch: refs/heads/master Commit: 0a38637d05d2338503ecceacfb911a6da6d49538 Parents: 29cecd4 Author: Reynold Xin Authored: Mon Dec 21 22:15:52 2015 -0800 Committer: Reynold Xin Committed: Mon Dec 21 22:15:52 2015 -0800 -- .../spark/deploy/history/FsHistoryProvider.scala | 10 +- .../mapreduce/SparkHadoopMapReduceUtil.scala | 17 ++--- dev/create-release/release-build.sh | 3 --- dev/run-tests-jenkins.py | 4 dev/run-tests.py | 2 -- docs/building-spark.md| 18 -- make-distribution.sh | 2 +- pom.xml | 13 - sql/README.md | 2 +- 9 files changed, 9 insertions(+), 62 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0a38637d/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 718efc4..6e91d73 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -663,16 +663,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // For testing. private[history] def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = { -val hadoop1Class = "org.apache.hadoop.hdfs.protocol.FSConstants$SafeModeAction" val hadoop2Class = "org.apache.hadoop.hdfs.protocol.HdfsConstants$SafeModeAction" -val actionClass: Class[_] = - try { -getClass().getClassLoader().loadClass(hadoop2Class) - } catch { -case _: ClassNotFoundException => - getClass().getClassLoader().loadClass(hadoop1Class) - } - +val actionClass: Class[_] = getClass().getClassLoader().loadClass(hadoop2Class) val action = actionClass.getField("SAFEMODE_GET").get(null) val method = dfs.getClass().getMethod("setSafeMode", action.getClass()) method.invoke(dfs, action).asInstanceOf[Boolean] http://git-wip-us.apache.org/repos/asf/spark/blob/0a38637d/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala -- diff --git a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala index 943ebcb..82d807f 100644 --- a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala @@ -26,17 +26,13 @@ import org.apache.spark.util.Utils private[spark] trait SparkHadoopMapReduceUtil { def newJobContext(conf: Configuration, jobId: JobID): JobContext = { -val klass = firstAvailableClass( -"org.apache.hadoop.mapreduce.task.JobContextImpl", // hadoop2, hadoop2-yarn -"org.apache.hadoop.mapreduce.JobContext") // hadoop1 +val klass = Utils.classForName("org.apache.hadoop.mapreduce.task.JobContextImpl") val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[JobID]) ctor.newInstance(conf, jobId).asInstanceOf[JobContext] } def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = { -val klass = firstAvailableClass( -"org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl", // hadoop2, hadoop2-yarn -"org.apache.hadoop.mapreduce.TaskAttemptContext") // hadoop1 +val klass = Utils.classForName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl") val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID]) ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext] } @@ -69,13 +65,4 @@ trait SparkHadoopMapReduceUtil { } } } - - private def firstAvailableClass(first: String, second: String): Class[_] = { -try { - Utils.classForName(first) -} catch { -
spark git commit: [MINOR] Fix typos in JavaStreamingContext
Repository: spark Updated Branches: refs/heads/branch-1.6 5b19e7cfd -> 309ef355f [MINOR] Fix typos in JavaStreamingContext Author: Shixiong ZhuCloses #10424 from zsxwing/typo. (cherry picked from commit 93da8565fea42d8ac978df411daced4a9ea3a9c8) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/309ef355 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/309ef355 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/309ef355 Branch: refs/heads/branch-1.6 Commit: 309ef355fc511b70765983358d5c92b5f1a26bce Parents: 5b19e7c Author: Shixiong Zhu Authored: Mon Dec 21 22:28:18 2015 -0800 Committer: Reynold Xin Committed: Mon Dec 21 22:28:26 2015 -0800 -- .../spark/streaming/api/java/JavaStreamingContext.scala | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/309ef355/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 8f21c79..7a50135 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -695,9 +695,9 @@ object JavaStreamingContext { * * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program * @param factoryJavaStreamingContextFactory object to create a new JavaStreamingContext - * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactor. + * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory. */ - @deprecated("use getOrCreate without JavaStreamingContextFactor", "1.4.0") + @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0") def getOrCreate( checkpointPath: String, factory: JavaStreamingContextFactory @@ -718,7 +718,7 @@ object JavaStreamingContext { * @param factoryJavaStreamingContextFactory object to create a new JavaStreamingContext * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible * file system - * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactor. + * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory. */ @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0") def getOrCreate( @@ -744,7 +744,7 @@ object JavaStreamingContext { * file system * @param createOnError Whether to create a new JavaStreamingContext if there is an * error in reading checkpoint data. - * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactor. + * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory. */ @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0") def getOrCreate( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR] Fix typos in JavaStreamingContext
Repository: spark Updated Branches: refs/heads/master 0a38637d0 -> 93da8565f [MINOR] Fix typos in JavaStreamingContext Author: Shixiong ZhuCloses #10424 from zsxwing/typo. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93da8565 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93da8565 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93da8565 Branch: refs/heads/master Commit: 93da8565fea42d8ac978df411daced4a9ea3a9c8 Parents: 0a38637 Author: Shixiong Zhu Authored: Mon Dec 21 22:28:18 2015 -0800 Committer: Reynold Xin Committed: Mon Dec 21 22:28:18 2015 -0800 -- .../spark/streaming/api/java/JavaStreamingContext.scala | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/93da8565/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 8f21c79..7a50135 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -695,9 +695,9 @@ object JavaStreamingContext { * * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program * @param factoryJavaStreamingContextFactory object to create a new JavaStreamingContext - * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactor. + * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory. */ - @deprecated("use getOrCreate without JavaStreamingContextFactor", "1.4.0") + @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0") def getOrCreate( checkpointPath: String, factory: JavaStreamingContextFactory @@ -718,7 +718,7 @@ object JavaStreamingContext { * @param factoryJavaStreamingContextFactory object to create a new JavaStreamingContext * @param hadoopConf Hadoop configuration if necessary for reading from any HDFS compatible * file system - * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactor. + * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory. */ @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0") def getOrCreate( @@ -744,7 +744,7 @@ object JavaStreamingContext { * file system * @param createOnError Whether to create a new JavaStreamingContext if there is an * error in reading checkpoint data. - * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactor. + * @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactory. */ @deprecated("use getOrCreate without JavaStreamingContextFactory", "1.4.0") def getOrCreate( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: Preparing development version 1.6.0-SNAPSHOT
Preparing development version 1.6.0-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b19e7cf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b19e7cf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b19e7cf Branch: refs/heads/branch-1.6 Commit: 5b19e7cfded0e2e41b6f427b4c3cfc3f06f85466 Parents: 4062cda Author: Patrick WendellAuthored: Mon Dec 21 17:50:36 2015 -0800 Committer: Patrick Wendell Committed: Mon Dec 21 17:50:36 2015 -0800 -- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- docker-integration-tests/pom.xml| 2 +- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tags/pom.xml| 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 35 files changed, 35 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5b19e7cf/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index fbabaa5..4b60ee0 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0 +1.6.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/5b19e7cf/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 1b3e417..672e946 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0 +1.6.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/5b19e7cf/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 15b8d75..61744bb 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0 +1.6.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/5b19e7cf/docker-integration-tests/pom.xml -- diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml index d579879..39d3f34 100644 --- a/docker-integration-tests/pom.xml +++ b/docker-integration-tests/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 -1.6.0 +1.6.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/5b19e7cf/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 37b15bb..f5ab2a7 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0 +1.6.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/5b19e7cf/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index 295455a..dceedcf 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0 +1.6.0-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/5b19e7cf/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.6.0-rc4 [created] 4062cda30 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: Preparing Spark release v1.6.0-rc4
Repository: spark Updated Branches: refs/heads/branch-1.6 ca3998512 -> 5b19e7cfd Preparing Spark release v1.6.0-rc4 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4062cda3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4062cda3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4062cda3 Branch: refs/heads/branch-1.6 Commit: 4062cda3087ae42c6c3cb24508fc1d3a931accdf Parents: ca39985 Author: Patrick WendellAuthored: Mon Dec 21 17:50:29 2015 -0800 Committer: Patrick Wendell Committed: Mon Dec 21 17:50:29 2015 -0800 -- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- docker-integration-tests/pom.xml| 2 +- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tags/pom.xml| 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 35 files changed, 35 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4062cda3/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 4b60ee0..fbabaa5 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0-SNAPSHOT +1.6.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/4062cda3/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 672e946..1b3e417 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0-SNAPSHOT +1.6.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/4062cda3/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 61744bb..15b8d75 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0-SNAPSHOT +1.6.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/4062cda3/docker-integration-tests/pom.xml -- diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml index 39d3f34..d579879 100644 --- a/docker-integration-tests/pom.xml +++ b/docker-integration-tests/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 -1.6.0-SNAPSHOT +1.6.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/4062cda3/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index f5ab2a7..37b15bb 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0-SNAPSHOT +1.6.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/4062cda3/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index dceedcf..295455a 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0-SNAPSHOT +1.6.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/4062cda3/external/flume-sink/pom.xml
spark git commit: [SPARK-12296][PYSPARK][MLLIB] Feature parity for pyspark mllib standard scaler model
Repository: spark Updated Branches: refs/heads/master 2235cd444 -> 969d5665b [SPARK-12296][PYSPARK][MLLIB] Feature parity for pyspark mllib standard scaler model Some methods are missing, such as ways to access the std, mean, etc. This PR is for feature parity for pyspark.mllib.feature.StandardScaler & StandardScalerModel. Author: Holden KarauCloses #10298 from holdenk/SPARK-12296-feature-parity-pyspark-mllib-StandardScalerModel. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/969d5665 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/969d5665 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/969d5665 Branch: refs/heads/master Commit: 969d5665bb1806703f948e8e7ab6133fca38c086 Parents: 2235cd4 Author: Holden Karau Authored: Tue Dec 22 09:14:12 2015 +0200 Committer: Nick Pentreath Committed: Tue Dec 22 09:14:12 2015 +0200 -- python/pyspark/mllib/feature.py | 40 1 file changed, 40 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/969d5665/python/pyspark/mllib/feature.py -- diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index acd7ec5..6129353 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -172,6 +172,38 @@ class StandardScalerModel(JavaVectorTransformer): self.call("setWithStd", withStd) return self +@property +@since('2.0.0') +def withStd(self): +""" +Returns if the model scales the data to unit standard deviation. +""" +return self.call("withStd") + +@property +@since('2.0.0') +def withMean(self): +""" +Returns if the model centers the data before scaling. +""" +return self.call("withMean") + +@property +@since('2.0.0') +def std(self): +""" +Return the column standard deviation values. +""" +return self.call("std") + +@property +@since('2.0.0') +def mean(self): +""" +Return the column mean values. +""" +return self.call("mean") + class StandardScaler(object): """ @@ -196,6 +228,14 @@ class StandardScaler(object): >>> for r in result.collect(): r DenseVector([-0.7071, 0.7071, -0.7071]) DenseVector([0.7071, -0.7071, 0.7071]) +>>> int(model.std[0]) +4 +>>> int(model.mean[0]*10) +9 +>>> model.withStd +True +>>> model.withMean +True .. versionadded:: 1.2.0 """ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12349][ML] Make spark.ml PCAModel load backwards compatible
Repository: spark Updated Branches: refs/heads/master ce1798b3a -> d0f695089 [SPARK-12349][ML] Make spark.ml PCAModel load backwards compatible Only load explainedVariance in PCAModel if it was written with Spark > 1.6.x jkbradley is this kind of what you had in mind? Author: Sean OwenCloses #10327 from srowen/SPARK-12349. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0f69508 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0f69508 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0f69508 Branch: refs/heads/master Commit: d0f695089e4627273133c5f49ef7a83c1840c8f5 Parents: ce1798b Author: Sean Owen Authored: Mon Dec 21 10:21:22 2015 + Committer: Sean Owen Committed: Mon Dec 21 10:21:22 2015 + -- .../scala/org/apache/spark/ml/feature/PCA.scala | 33 +--- 1 file changed, 28 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d0f69508/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala index 53d33ea..759be81 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -167,14 +167,37 @@ object PCAModel extends MLReadable[PCAModel] { private val className = classOf[PCAModel].getName +/** + * Loads a [[PCAModel]] from data located at the input path. Note that the model includes an + * `explainedVariance` member that is not recorded by Spark 1.6 and earlier. A model + * can be loaded from such older data but will have an empty vector for + * `explainedVariance`. + * + * @param path path to serialized model data + * @return a [[PCAModel]] + */ override def load(path: String): PCAModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) + + // explainedVariance field is not present in Spark <= 1.6 + val versionRegex = "([0-9]+)\\.([0-9])+.*".r + val hasExplainedVariance = metadata.sparkVersion match { +case versionRegex(major, minor) => + (major.toInt >= 2 || (major.toInt == 1 && minor.toInt > 6)) +case _ => false + } + val dataPath = new Path(path, "data").toString - val Row(pc: DenseMatrix, explainedVariance: DenseVector) = -sqlContext.read.parquet(dataPath) -.select("pc", "explainedVariance") -.head() - val model = new PCAModel(metadata.uid, pc, explainedVariance) + val model = if (hasExplainedVariance) { +val Row(pc: DenseMatrix, explainedVariance: DenseVector) = + sqlContext.read.parquet(dataPath) +.select("pc", "explainedVariance") +.head() +new PCAModel(metadata.uid, pc, explainedVariance) + } else { +val Row(pc: DenseMatrix) = sqlContext.read.parquet(dataPath).select("pc").head() +new PCAModel(metadata.uid, pc, Vectors.dense(Array.empty[Double]).asInstanceOf[DenseVector]) + } DefaultParamsReader.getAndSetParams(model, metadata) model } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11823][SQL] Fix flaky JDBC cancellation test in HiveThriftBinaryServerSuite
Repository: spark Updated Branches: refs/heads/master 93da8565f -> 2235cd444 [SPARK-11823][SQL] Fix flaky JDBC cancellation test in HiveThriftBinaryServerSuite This patch fixes a flaky "test jdbc cancel" test in HiveThriftBinaryServerSuite. This test is prone to a race-condition which causes it to block indefinitely with while waiting for an extremely slow query to complete, which caused many Jenkins builds to time out. For more background, see my comments on #6207 (the PR which introduced this test). Author: Josh RosenCloses #10425 from JoshRosen/SPARK-11823. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2235cd44 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2235cd44 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2235cd44 Branch: refs/heads/master Commit: 2235cd44407e3b6b401fb84a2096ade042c51d36 Parents: 93da856 Author: Josh Rosen Authored: Mon Dec 21 23:12:05 2015 -0800 Committer: Josh Rosen Committed: Mon Dec 21 23:12:05 2015 -0800 -- .../thriftserver/HiveThriftServer2Suites.scala | 85 +--- 1 file changed, 56 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2235cd44/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala -- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 139d8e8..ebb2575 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -23,9 +23,8 @@ import java.sql.{Date, DriverManager, SQLException, Statement} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ -import scala.concurrent.{Await, Promise, future} +import scala.concurrent.{Await, ExecutionContext, Promise, future} import scala.io.Source import scala.util.{Random, Try} @@ -43,7 +42,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.{Logging, SparkFunSuite} object TestData { @@ -356,31 +355,54 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") queries.foreach(statement.execute) - - val largeJoin = "SELECT COUNT(*) FROM test_map " + -List.fill(10)("join test_map").mkString(" ") - val f = future { Thread.sleep(100); statement.cancel(); } - val e = intercept[SQLException] { -statement.executeQuery(largeJoin) + implicit val ec = ExecutionContext.fromExecutorService( +ThreadUtils.newDaemonSingleThreadExecutor("test-jdbc-cancel")) + try { +// Start a very-long-running query that will take hours to finish, then cancel it in order +// to demonstrate that cancellation works. +val f = future { + statement.executeQuery( +"SELECT COUNT(*) FROM test_map " + +List.fill(10)("join test_map").mkString(" ")) +} +// Note that this is slightly race-prone: if the cancel is issued before the statement +// begins executing then we'll fail with a timeout. As a result, this fixed delay is set +// slightly more conservatively than may be strictly necessary. +Thread.sleep(1000) +statement.cancel() +val e = intercept[SQLException] { + Await.result(f, 3.minute) +} +assert(e.getMessage.contains("cancelled")) + +// Cancellation is a no-op if spark.sql.hive.thriftServer.async=false +statement.executeQuery("SET spark.sql.hive.thriftServer.async=false") +try { + val sf = future { +statement.executeQuery( + "SELECT COUNT(*) FROM test_map " + +List.fill(4)("join test_map").mkString(" ") +) + } + // Similarly, this is also slightly race-prone on fast machines where the query above + // might race and complete before we issue the cancel. + Thread.sleep(1000) + statement.cancel() + val rs1 = Await.result(sf, 3.minute) +
spark git commit: [SPARK-11823][SQL] Fix flaky JDBC cancellation test in HiveThriftBinaryServerSuite
Repository: spark Updated Branches: refs/heads/branch-1.6 309ef355f -> 0f905d7df [SPARK-11823][SQL] Fix flaky JDBC cancellation test in HiveThriftBinaryServerSuite This patch fixes a flaky "test jdbc cancel" test in HiveThriftBinaryServerSuite. This test is prone to a race-condition which causes it to block indefinitely with while waiting for an extremely slow query to complete, which caused many Jenkins builds to time out. For more background, see my comments on #6207 (the PR which introduced this test). Author: Josh RosenCloses #10425 from JoshRosen/SPARK-11823. (cherry picked from commit 2235cd44407e3b6b401fb84a2096ade042c51d36) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f905d7d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f905d7d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f905d7d Branch: refs/heads/branch-1.6 Commit: 0f905d7df43b20d9335ec880b134d8d4f962c297 Parents: 309ef35 Author: Josh Rosen Authored: Mon Dec 21 23:12:05 2015 -0800 Committer: Josh Rosen Committed: Mon Dec 21 23:12:45 2015 -0800 -- .../thriftserver/HiveThriftServer2Suites.scala | 85 +--- 1 file changed, 56 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0f905d7d/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala -- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 139d8e8..ebb2575 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -23,9 +23,8 @@ import java.sql.{Date, DriverManager, SQLException, Statement} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ -import scala.concurrent.{Await, Promise, future} +import scala.concurrent.{Await, ExecutionContext, Promise, future} import scala.io.Source import scala.util.{Random, Try} @@ -43,7 +42,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.{Logging, SparkFunSuite} object TestData { @@ -356,31 +355,54 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") queries.foreach(statement.execute) - - val largeJoin = "SELECT COUNT(*) FROM test_map " + -List.fill(10)("join test_map").mkString(" ") - val f = future { Thread.sleep(100); statement.cancel(); } - val e = intercept[SQLException] { -statement.executeQuery(largeJoin) + implicit val ec = ExecutionContext.fromExecutorService( +ThreadUtils.newDaemonSingleThreadExecutor("test-jdbc-cancel")) + try { +// Start a very-long-running query that will take hours to finish, then cancel it in order +// to demonstrate that cancellation works. +val f = future { + statement.executeQuery( +"SELECT COUNT(*) FROM test_map " + +List.fill(10)("join test_map").mkString(" ")) +} +// Note that this is slightly race-prone: if the cancel is issued before the statement +// begins executing then we'll fail with a timeout. As a result, this fixed delay is set +// slightly more conservatively than may be strictly necessary. +Thread.sleep(1000) +statement.cancel() +val e = intercept[SQLException] { + Await.result(f, 3.minute) +} +assert(e.getMessage.contains("cancelled")) + +// Cancellation is a no-op if spark.sql.hive.thriftServer.async=false +statement.executeQuery("SET spark.sql.hive.thriftServer.async=false") +try { + val sf = future { +statement.executeQuery( + "SELECT COUNT(*) FROM test_map " + +List.fill(4)("join test_map").mkString(" ") +) + } + // Similarly, this is also slightly race-prone on fast machines where the query above + // might race and complete before we issue the
spark git commit: [PYSPARK] Pyspark typo & Add missing abstractmethod annotation
Repository: spark Updated Branches: refs/heads/master d0f695089 -> 1920d72a1 [PYSPARK] Pyspark typo & Add missing abstractmethod annotation No jira is created since this is a trivial change. davies Please help review it Author: Jeff ZhangCloses #10143 from zjffdu/pyspark_typo. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1920d72a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1920d72a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1920d72a Branch: refs/heads/master Commit: 1920d72a1f7b9844323d06e8094818347f413df6 Parents: d0f6950 Author: Jeff Zhang Authored: Mon Dec 21 08:53:46 2015 -0800 Committer: Davies Liu Committed: Mon Dec 21 08:53:46 2015 -0800 -- python/pyspark/ml/pipeline.py | 2 +- python/pyspark/ml/wrapper.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1920d72a/python/pyspark/ml/pipeline.py -- diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py index 4475451..9f5f6ac 100644 --- a/python/pyspark/ml/pipeline.py +++ b/python/pyspark/ml/pipeline.py @@ -86,7 +86,7 @@ class Transformer(Params): @abstractmethod def _transform(self, dataset): """ -Transforms the input dataset with optional parameters. +Transforms the input dataset. :param dataset: input dataset, which is an instance of :py:class:`pyspark.sql.DataFrame` http://git-wip-us.apache.org/repos/asf/spark/blob/1920d72a/python/pyspark/ml/wrapper.py -- diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 4bcb4aa..dd1d4b0 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -15,7 +15,7 @@ # limitations under the License. # -from abc import ABCMeta +from abc import ABCMeta, abstractmethod from pyspark import SparkContext from pyspark.sql import DataFrame @@ -110,6 +110,7 @@ class JavaEstimator(Estimator, JavaWrapper): __metaclass__ = ABCMeta +@abstractmethod def _create_model(self, java_model): """ Creates a model from the input Java model reference. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org