spark git commit: [SPARK-12692][BUILD] Enforce style checking about white space before comma
Repository: spark Updated Branches: refs/heads/master cb7b864a2 -> 3d81d63f4 [SPARK-12692][BUILD] Enforce style checking about white space before comma This is the final PR about SPARK-12692. We have removed all of white spaces before comma from code so let's enforce style checking. Author: Kousuke SarutaCloses #10736 from sarutak/SPARK-12692-followup-enforce-checking. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d81d63f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d81d63f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d81d63f Branch: refs/heads/master Commit: 3d81d63f4499478ef7861bf77383c30aed14bb19 Parents: cb7b864 Author: Kousuke Saruta Authored: Wed Jan 13 00:51:24 2016 -0800 Committer: Reynold Xin Committed: Wed Jan 13 00:51:24 2016 -0800 -- scalastyle-config.xml | 13 ++--- 1 file changed, 6 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3d81d63f/scalastyle-config.xml -- diff --git a/scalastyle-config.xml b/scalastyle-config.xml index bc209ee..967a482 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -197,6 +197,12 @@ This file is divided into 3 sections: + + + COMMA + + + @@ -217,13 +223,6 @@ This file is divided into 3 sections: - - - - COMMA - - - - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9297] [SQL] Add covar_pop and covar_samp
Repository: spark Updated Branches: refs/heads/master d6fd9b376 -> 63eee86cc [SPARK-9297] [SQL] Add covar_pop and covar_samp JIRA: https://issues.apache.org/jira/browse/SPARK-9297 Add two aggregation functions: covar_pop and covar_samp. Author: Liang-Chi HsiehAuthor: Liang-Chi Hsieh Closes #10029 from viirya/covar-funcs. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63eee86c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63eee86c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63eee86c Branch: refs/heads/master Commit: 63eee86cc652c108ca7712c8c0a73db1ca89ae90 Parents: d6fd9b3 Author: Liang-Chi Hsieh Authored: Wed Jan 13 10:26:55 2016 -0800 Committer: Davies Liu Committed: Wed Jan 13 10:26:55 2016 -0800 -- .../catalyst/analysis/FunctionRegistry.scala| 2 + .../expressions/aggregate/Covariance.scala | 198 +++ .../scala/org/apache/spark/sql/functions.scala | 40 .../hive/execution/AggregationQuerySuite.scala | 32 +++ 4 files changed, 272 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/63eee86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 5c2aa3c..d9009e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -182,6 +182,8 @@ object FunctionRegistry { expression[Average]("avg"), expression[Corr]("corr"), expression[Count]("count"), +expression[CovPopulation]("covar_pop"), +expression[CovSample]("covar_samp"), expression[First]("first"), expression[First]("first_value"), expression[Last]("last"), http://git-wip-us.apache.org/repos/asf/spark/blob/63eee86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala new file mode 100644 index 000..f53b01b --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.TypeUtils +import org.apache.spark.sql.types._ + +/** + * Compute the covariance between two expressions. + * When applied on empty data (i.e., count is zero), it returns NULL. + * + */ +abstract class Covariance(left: Expression, right: Expression) extends ImperativeAggregate +with Serializable { + override def children: Seq[Expression] = Seq(left, right) + + override def nullable: Boolean = true + + override def dataType: DataType = DoubleType + + override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType, DoubleType) + + override def checkInputDataTypes(): TypeCheckResult = { +if (left.dataType.isInstanceOf[DoubleType] && right.dataType.isInstanceOf[DoubleType]) { + TypeCheckResult.TypeCheckSuccess +} else { + TypeCheckResult.TypeCheckFailure( +s"covariance requires that both arguments are double type, " + + s"not (${left.dataType}, ${right.dataType}).") +} + } + + override def aggBufferSchema: StructType =
spark git commit: [SPARK-9383][PROJECT-INFRA] PR merge script should reset back to previous branch when possible
Repository: spark Updated Branches: refs/heads/master 38148f737 -> 97e0c7c5a [SPARK-9383][PROJECT-INFRA] PR merge script should reset back to previous branch when possible This patch modifies our PR merge script to reset back to a named branch when restoring the original checkout upon exit. When the committer is originally checked out to a detached head, then they will be restored back to that same ref (the same as today's behavior). This is a slightly updated version of #7569, with an extra fix to handle the detached head corner-case. Author: Josh RosenCloses #10709 from JoshRosen/SPARK-9383. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97e0c7c5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97e0c7c5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97e0c7c5 Branch: refs/heads/master Commit: 97e0c7c5af4d002937f9ee679568bb501d8818fc Parents: 38148f7 Author: Josh Rosen Authored: Wed Jan 13 11:56:30 2016 -0800 Committer: Josh Rosen Committed: Wed Jan 13 11:56:30 2016 -0800 -- dev/merge_spark_pr.py | 19 --- 1 file changed, 16 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/97e0c7c5/dev/merge_spark_pr.py -- diff --git a/dev/merge_spark_pr.py b/dev/merge_spark_pr.py index bf1a000..5ab285e 100755 --- a/dev/merge_spark_pr.py +++ b/dev/merge_spark_pr.py @@ -355,11 +355,21 @@ def standardize_jira_ref(text): return clean_text + +def get_current_ref(): +ref = run_cmd("git rev-parse --abbrev-ref HEAD").strip() +if ref == 'HEAD': +# The current ref is a detached HEAD, so grab its SHA. +return run_cmd("git rev-parse HEAD").strip() +else: +return ref + + def main(): global original_head os.chdir(SPARK_HOME) -original_head = run_cmd("git rev-parse HEAD")[:8] +original_head = get_current_ref() branches = get_json("%s/branches" % GITHUB_API_BASE) branch_names = filter(lambda x: x.startswith("branch-"), [x['name'] for x in branches]) @@ -449,5 +459,8 @@ if __name__ == "__main__": (failure_count, test_count) = doctest.testmod() if failure_count: exit(-1) - -main() +try: +main() +except: +clean_up() +raise - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12268][PYSPARK] Make pyspark shell pythonstartup work under python3
Repository: spark Updated Branches: refs/heads/master 97e0c7c5a -> e4e0b3f7b [SPARK-12268][PYSPARK] Make pyspark shell pythonstartup work under python3 This replaces the `execfile` used for running custom python shell scripts with explicit open, compile and exec (as recommended by 2to3). The reason for this change is to make the pythonstartup option compatible with python3. Author: Erik SelinCloses #10255 from tyro89/pythonstartup-python3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4e0b3f7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4e0b3f7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4e0b3f7 Branch: refs/heads/master Commit: e4e0b3f7b2945aae5ec7c3d68296010bbc5160cf Parents: 97e0c7c Author: Erik Selin Authored: Wed Jan 13 12:21:45 2016 -0800 Committer: Josh Rosen Committed: Wed Jan 13 12:21:45 2016 -0800 -- python/pyspark/shell.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e4e0b3f7/python/pyspark/shell.py -- diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index 9933129..26cafca 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -76,4 +76,6 @@ if add_files is not None: # which allows us to execute the user's PYTHONSTARTUP file: _pythonstartup = os.environ.get('OLD_PYTHONSTARTUP') if _pythonstartup and os.path.isfile(_pythonstartup): -execfile(_pythonstartup) +with open(_pythonstartup) as f: +code = compile(f.read(), _pythonstartup, 'exec') +exec(code) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12268][PYSPARK] Make pyspark shell pythonstartup work under python3
Repository: spark Updated Branches: refs/heads/branch-1.6 364f799cf -> cf6d506c7 [SPARK-12268][PYSPARK] Make pyspark shell pythonstartup work under python3 This replaces the `execfile` used for running custom python shell scripts with explicit open, compile and exec (as recommended by 2to3). The reason for this change is to make the pythonstartup option compatible with python3. Author: Erik SelinCloses #10255 from tyro89/pythonstartup-python3. (cherry picked from commit e4e0b3f7b2945aae5ec7c3d68296010bbc5160cf) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf6d506c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf6d506c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf6d506c Branch: refs/heads/branch-1.6 Commit: cf6d506c7426dbcd19d4c9d7c2d673aa52d00d4e Parents: 364f799 Author: Erik Selin Authored: Wed Jan 13 12:21:45 2016 -0800 Committer: Josh Rosen Committed: Wed Jan 13 12:22:21 2016 -0800 -- python/pyspark/shell.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cf6d506c/python/pyspark/shell.py -- diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index 9933129..26cafca 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -76,4 +76,6 @@ if add_files is not None: # which allows us to execute the user's PYTHONSTARTUP file: _pythonstartup = os.environ.get('OLD_PYTHONSTARTUP') if _pythonstartup and os.path.isfile(_pythonstartup): -execfile(_pythonstartup) +with open(_pythonstartup) as f: +code = compile(f.read(), _pythonstartup, 'exec') +exec(code) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12685][MLLIB][BACKPORT TO 1.4] word2vec trainWordsCount gets overflow
Repository: spark Updated Branches: refs/heads/branch-1.6 f9ecd3a39 -> 364f799cf [SPARK-12685][MLLIB][BACKPORT TO 1.4] word2vec trainWordsCount gets overflow jira: https://issues.apache.org/jira/browse/SPARK-12685 master PR: https://github.com/apache/spark/pull/10627 the log of word2vec reports trainWordsCount = -785727483 during computation over a large dataset. Update the priority as it will affect the computation process. alpha = learningRate * (1 - numPartitions * wordCount.toDouble / (trainWordsCount + 1)) Author: Yuhao YangCloses #10721 from hhbyyh/branch-1.4. (cherry picked from commit 7bd2564192f51f6229cf759a2bafc22134479955) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/364f799c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/364f799c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/364f799c Branch: refs/heads/branch-1.6 Commit: 364f799cf6e23d084d7e9adb8c33f923f4130aa9 Parents: f9ecd3a Author: Yuhao Yang Authored: Wed Jan 13 11:53:25 2016 -0800 Committer: Joseph K. Bradley Committed: Wed Jan 13 11:54:02 2016 -0800 -- .../main/scala/org/apache/spark/mllib/feature/Word2Vec.scala | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/364f799c/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 1dbedaa..30a1849 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -152,7 +152,7 @@ class Word2Vec extends Serializable with Logging { /** context words from [-window, window] */ private var window = 5 - private var trainWordsCount = 0 + private var trainWordsCount = 0L private var vocabSize = 0 @transient private var vocab: Array[VocabWord] = null @transient private var vocabHash = mutable.HashMap.empty[String, Int] @@ -160,13 +160,13 @@ class Word2Vec extends Serializable with Logging { private def learnVocab(words: RDD[String]): Unit = { vocab = words.map(w => (w, 1)) .reduceByKey(_ + _) + .filter(_._2 >= minCount) .map(x => VocabWord( x._1, x._2, new Array[Int](MAX_CODE_LENGTH), new Array[Int](MAX_CODE_LENGTH), 0)) - .filter(_.cn >= minCount) .collect() .sortWith((a, b) => a.cn > b.cn) @@ -180,7 +180,7 @@ class Word2Vec extends Serializable with Logging { trainWordsCount += vocab(a).cn a += 1 } -logInfo("trainWordsCount = " + trainWordsCount) +logInfo(s"vocabSize = $vocabSize, trainWordsCount = $trainWordsCount") } private def createExpTable(): Array[Float] = { @@ -330,7 +330,7 @@ class Word2Vec extends Serializable with Logging { val random = new XORShiftRandom(seed ^ ((idx + 1) << 16) ^ ((-k - 1) << 8)) val syn0Modify = new Array[Int](vocabSize) val syn1Modify = new Array[Int](vocabSize) -val model = iter.foldLeft((syn0Global, syn1Global, 0, 0)) { +val model = iter.foldLeft((syn0Global, syn1Global, 0L, 0L)) { case ((syn0, syn1, lastWordCount, wordCount), sentence) => var lwc = lastWordCount var wc = wordCount - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12761][CORE] Remove duplicated code
Repository: spark Updated Branches: refs/heads/master cc91e2187 -> 38148f737 [SPARK-12761][CORE] Remove duplicated code Removes some duplicated code that was reintroduced during a merge. Author: Jakob OderskyCloses #10711 from jodersky/repl-2.11-duplicate. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38148f73 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38148f73 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38148f73 Branch: refs/heads/master Commit: 38148f7373ee678cd538ce5eae0a75e15c62db8a Parents: cc91e21 Author: Jakob Odersky Authored: Wed Jan 13 11:53:59 2016 -0800 Committer: Reynold Xin Committed: Wed Jan 13 11:53:59 2016 -0800 -- .../scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala | 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/38148f73/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala -- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index 44650f2..bb3081d 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -30,11 +30,7 @@ object Main extends Logging { val conf = new SparkConf() val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf)) val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl") - val s = new Settings() - s.processArguments(List("-Yrepl-class-based", -"-Yrepl-outdir", s"${outputDir.getAbsolutePath}", -"-classpath", getAddedJars.mkString(File.pathSeparator)), true) - // the creation of SecurityManager has to be lazy so SPARK_YARN_MODE is set if needed + var sparkContext: SparkContext = _ var sqlContext: SQLContext = _ var interp = new SparkILoop // this is a public var because tests reset it. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12805][MESOS] Fixes documentation on Mesos run modes
Repository: spark Updated Branches: refs/heads/master 63eee86cc -> cc91e2187 [SPARK-12805][MESOS] Fixes documentation on Mesos run modes The default run has changed, but the documentation didn't fully reflect the change. Author: Luc BourlierCloses #10740 from skyluc/issue/mesos-modes-doc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc91e218 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc91e218 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc91e218 Branch: refs/heads/master Commit: cc91e21879e031bcd05316eabb856e67a51b191d Parents: 63eee86 Author: Luc Bourlier Authored: Wed Jan 13 11:45:13 2016 -0800 Committer: Reynold Xin Committed: Wed Jan 13 11:45:13 2016 -0800 -- docs/running-on-mesos.md | 12 +--- 1 file changed, 5 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cc91e218/docs/running-on-mesos.md -- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 3193e17..ed720f1 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -202,7 +202,7 @@ where each application gets more or fewer machines as it ramps up and down, but additional overhead in launching each task. This mode may be inappropriate for low-latency requirements like interactive queries or serving web requests. -To run in coarse-grained mode, set the `spark.mesos.coarse` property to false in your +To run in fine-grained mode, set the `spark.mesos.coarse` property to false in your [SparkConf](configuration.html#spark-properties): {% highlight scala %} @@ -266,13 +266,11 @@ See the [configuration page](configuration.html) for information on Spark config Property NameDefaultMeaning spark.mesos.coarse - false + true -If set to true, runs over Mesos clusters in -"coarse-grained" sharing mode, -where Spark acquires one long-lived Mesos task on each machine instead of one Mesos task per -Spark task. This gives lower-latency scheduling for short queries, but leaves resources in use -for the whole duration of the Spark job. +If set to true, runs over Mesos clusters in "coarse-grained" sharing mode, where Spark acquires one long-lived Mesos task on each machine. +If set to false, runs over Mesos cluster in "fine-grained" sharing mode, where one Mesos task is created per Spark task. +Detailed information in 'Mesos Run Modes'. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12791][SQL] Simplify CaseWhen by breaking "branches" into "conditions" and "values"
Repository: spark Updated Branches: refs/heads/master c2ea79f96 -> cbbcd8e42 [SPARK-12791][SQL] Simplify CaseWhen by breaking "branches" into "conditions" and "values" This pull request rewrites CaseWhen expression to break the single, monolithic "branches" field into a sequence of tuples (Seq[(condition, value)]) and an explicit optional elseValue field. Prior to this pull request, each even position in "branches" represents the condition for each branch, and each odd position represents the value for each branch. The use of them have been pretty confusing with a lot sliding windows or grouped(2) calls. Author: Reynold XinCloses #10734 from rxin/simplify-case. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbbcd8e4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbbcd8e4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbbcd8e4 Branch: refs/heads/master Commit: cbbcd8e4250aeec700f04c231f8be2f787243f1f Parents: c2ea79f Author: Reynold Xin Authored: Wed Jan 13 12:44:35 2016 -0800 Committer: Reynold Xin Committed: Wed Jan 13 12:44:35 2016 -0800 -- python/pyspark/sql/column.py| 24 ++-- .../apache/spark/sql/catalyst/CatalystQl.scala | 2 +- .../apache/spark/sql/catalyst/SqlParser.scala | 3 +- .../catalyst/analysis/HiveTypeCoercion.scala| 26 ++-- .../expressions/conditionalExpressions.scala| 137 +-- .../spark/sql/catalyst/trees/TreeNode.scala | 9 ++ .../sql/catalyst/analysis/AnalysisSuite.scala | 2 +- .../analysis/ExpressionTypeCheckingSuite.scala | 4 +- .../analysis/HiveTypeCoercionSuite.scala| 15 +- .../ConditionalExpressionSuite.scala| 51 +++ .../scala/org/apache/spark/sql/Column.scala | 19 +-- .../scala/org/apache/spark/sql/functions.scala | 2 +- 12 files changed, 156 insertions(+), 138 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cbbcd8e4/python/pyspark/sql/column.py -- diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index 900def5..320451c 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -368,12 +368,12 @@ class Column(object): >>> from pyspark.sql import functions as F >>> df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show() -+-++ -| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0| -+-++ -|Alice| -1| -| Bob| 1| -+-++ ++-++ +| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0 END| ++-++ +|Alice| -1| +| Bob| 1| ++-++ """ if not isinstance(condition, Column): raise TypeError("condition should be a Column") @@ -393,12 +393,12 @@ class Column(object): >>> from pyspark.sql import functions as F >>> df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show() -+-+-+ -| name|CASE WHEN (age > 3) THEN 1 ELSE 0| -+-+-+ -|Alice|0| -| Bob|1| -+-+-+ ++-+-+ +| name|CASE WHEN (age > 3) THEN 1 ELSE 0 END| ++-+-+ +|Alice|0| +| Bob|1| ++-+-+ """ v = value._jc if isinstance(value, Column) else value jc = self._jc.otherwise(v) http://git-wip-us.apache.org/repos/asf/spark/blob/cbbcd8e4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala
spark git commit: [SPARK-12685][MLLIB][BACKPORT TO 1.4] word2vec trainWordsCount gets overflow
Repository: spark Updated Branches: refs/heads/branch-1.4 0832530e8 -> 7bd256419 [SPARK-12685][MLLIB][BACKPORT TO 1.4] word2vec trainWordsCount gets overflow jira: https://issues.apache.org/jira/browse/SPARK-12685 master PR: https://github.com/apache/spark/pull/10627 the log of word2vec reports trainWordsCount = -785727483 during computation over a large dataset. Update the priority as it will affect the computation process. alpha = learningRate * (1 - numPartitions * wordCount.toDouble / (trainWordsCount + 1)) Author: Yuhao YangCloses #10721 from hhbyyh/branch-1.4. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7bd25641 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7bd25641 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7bd25641 Branch: refs/heads/branch-1.4 Commit: 7bd2564192f51f6229cf759a2bafc22134479955 Parents: 0832530 Author: Yuhao Yang Authored: Wed Jan 13 11:53:25 2016 -0800 Committer: Joseph K. Bradley Committed: Wed Jan 13 11:53:25 2016 -0800 -- .../main/scala/org/apache/spark/mllib/feature/Word2Vec.scala | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7bd25641/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 3493186..b973091 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -139,7 +139,7 @@ class Word2Vec extends Serializable with Logging { /** context words from [-window, window] */ private val window = 5 - private var trainWordsCount = 0 + private var trainWordsCount = 0L private var vocabSize = 0 @transient private var vocab: Array[VocabWord] = null @transient private var vocabHash = mutable.HashMap.empty[String, Int] @@ -147,13 +147,13 @@ class Word2Vec extends Serializable with Logging { private def learnVocab(words: RDD[String]): Unit = { vocab = words.map(w => (w, 1)) .reduceByKey(_ + _) + .filter(_._2 >= minCount) .map(x => VocabWord( x._1, x._2, new Array[Int](MAX_CODE_LENGTH), new Array[Int](MAX_CODE_LENGTH), 0)) - .filter(_.cn >= minCount) .collect() .sortWith((a, b) => a.cn > b.cn) @@ -164,7 +164,7 @@ class Word2Vec extends Serializable with Logging { trainWordsCount += vocab(a).cn a += 1 } -logInfo("trainWordsCount = " + trainWordsCount) +logInfo(s"vocabSize = $vocabSize, trainWordsCount = $trainWordsCount") } private def createExpTable(): Array[Float] = { @@ -313,7 +313,7 @@ class Word2Vec extends Serializable with Logging { val random = new XORShiftRandom(seed ^ ((idx + 1) << 16) ^ ((-k - 1) << 8)) val syn0Modify = new Array[Int](vocabSize) val syn1Modify = new Array[Int](vocabSize) -val model = iter.foldLeft((syn0Global, syn1Global, 0, 0)) { +val model = iter.foldLeft((syn0Global, syn1Global, 0L, 0L)) { case ((syn0, syn1, lastWordCount, wordCount), sentence) => var lwc = lastWordCount var wc = wordCount - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12685][MLLIB][BACKPORT TO 1.4] word2vec trainWordsCount gets overflow
Repository: spark Updated Branches: refs/heads/branch-1.5 253e3eb71 -> 7fdd7cf09 [SPARK-12685][MLLIB][BACKPORT TO 1.4] word2vec trainWordsCount gets overflow jira: https://issues.apache.org/jira/browse/SPARK-12685 master PR: https://github.com/apache/spark/pull/10627 the log of word2vec reports trainWordsCount = -785727483 during computation over a large dataset. Update the priority as it will affect the computation process. alpha = learningRate * (1 - numPartitions * wordCount.toDouble / (trainWordsCount + 1)) Author: Yuhao YangCloses #10721 from hhbyyh/branch-1.4. (cherry picked from commit 7bd2564192f51f6229cf759a2bafc22134479955) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7fdd7cf0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7fdd7cf0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7fdd7cf0 Branch: refs/heads/branch-1.5 Commit: 7fdd7cf09a48b7cbe0c0de11482a6aa6a574d9a7 Parents: 253e3eb Author: Yuhao Yang Authored: Wed Jan 13 11:53:25 2016 -0800 Committer: Joseph K. Bradley Committed: Wed Jan 13 11:53:45 2016 -0800 -- .../main/scala/org/apache/spark/mllib/feature/Word2Vec.scala | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7fdd7cf0/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 131a862..1a9ac47 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -146,7 +146,7 @@ class Word2Vec extends Serializable with Logging { /** context words from [-window, window] */ private val window = 5 - private var trainWordsCount = 0 + private var trainWordsCount = 0L private var vocabSize = 0 @transient private var vocab: Array[VocabWord] = null @transient private var vocabHash = mutable.HashMap.empty[String, Int] @@ -154,13 +154,13 @@ class Word2Vec extends Serializable with Logging { private def learnVocab(words: RDD[String]): Unit = { vocab = words.map(w => (w, 1)) .reduceByKey(_ + _) + .filter(_._2 >= minCount) .map(x => VocabWord( x._1, x._2, new Array[Int](MAX_CODE_LENGTH), new Array[Int](MAX_CODE_LENGTH), 0)) - .filter(_.cn >= minCount) .collect() .sortWith((a, b) => a.cn > b.cn) @@ -174,7 +174,7 @@ class Word2Vec extends Serializable with Logging { trainWordsCount += vocab(a).cn a += 1 } -logInfo("trainWordsCount = " + trainWordsCount) +logInfo(s"vocabSize = $vocabSize, trainWordsCount = $trainWordsCount") } private def createExpTable(): Array[Float] = { @@ -324,7 +324,7 @@ class Word2Vec extends Serializable with Logging { val random = new XORShiftRandom(seed ^ ((idx + 1) << 16) ^ ((-k - 1) << 8)) val syn0Modify = new Array[Int](vocabSize) val syn1Modify = new Array[Int](vocabSize) -val model = iter.foldLeft((syn0Global, syn1Global, 0, 0)) { +val model = iter.foldLeft((syn0Global, syn1Global, 0L, 0L)) { case ((syn0, syn1, lastWordCount, wordCount), sentence) => var lwc = lastWordCount var wc = wordCount - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12642][SQL] improve the hash expression to be decoupled from unsafe row
Repository: spark Updated Branches: refs/heads/master e4e0b3f7b -> c2ea79f96 [SPARK-12642][SQL] improve the hash expression to be decoupled from unsafe row https://issues.apache.org/jira/browse/SPARK-12642 Author: Wenchen FanCloses #10694 from cloud-fan/hash-expr. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c2ea79f9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c2ea79f9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c2ea79f9 Branch: refs/heads/master Commit: c2ea79f96acd076351b48162644ed1cff4c8e090 Parents: e4e0b3f Author: Wenchen Fan Authored: Wed Jan 13 12:29:02 2016 -0800 Committer: Reynold Xin Committed: Wed Jan 13 12:29:02 2016 -0800 -- python/pyspark/sql/functions.py | 2 +- .../sql/catalyst/expressions/UnsafeRow.java | 4 - .../spark/sql/catalyst/expressions/misc.scala | 251 ++- .../expressions/MiscFunctionsSuite.scala| 6 +- .../spark/sql/sources/BucketedWriteSuite.scala | 26 +- .../spark/unsafe/hash/Murmur3_x86_32.java | 28 ++- 6 files changed, 288 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c2ea79f9/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index b0390cb..719eca8 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1023,7 +1023,7 @@ def hash(*cols): """Calculates the hash code of given columns, and returns the result as a int column. >>> sqlContext.createDataFrame([('ABC',)], ['a']).select(hash('a').alias('hash')).collect() -[Row(hash=1358996357)] +[Row(hash=-757602832)] """ sc = SparkContext._active_spark_context jc = sc._jvm.functions.hash(_to_seq(sc, cols, _to_java_column)) http://git-wip-us.apache.org/repos/asf/spark/blob/c2ea79f9/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index b8d3c49..1a35193 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -566,10 +566,6 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS return Murmur3_x86_32.hashUnsafeWords(baseObject, baseOffset, sizeInBytes, 42); } - public int hashCode(int seed) { -return Murmur3_x86_32.hashUnsafeWords(baseObject, baseOffset, sizeInBytes, seed); - } - @Override public boolean equals(Object other) { if (other instanceof UnsafeRow) { http://git-wip-us.apache.org/repos/asf/spark/blob/c2ea79f9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index cc406a3..4751fbe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -25,8 +25,11 @@ import org.apache.commons.codec.digest.DigestUtils import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.unsafe.hash.Murmur3_x86_32 +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} +import org.apache.spark.unsafe.Platform /** * A function that calculates an MD5 128-bit checksum and returns it as a hex string @@ -184,8 +187,31 @@ case class Crc32(child: Expression) extends UnaryExpression with ImplicitCastInp * A function that calculates hash value for a group of expressions. Note that the `seed` argument * is not exposed to users and should only be set inside spark SQL. * - * Internally this function will write arguments into an [[UnsafeRow]], and calculate hash code of - * the unsafe row using murmur3 hasher with a seed. + * The hash value for an expression depends on its type and seed: + * - null: seed + * - boolean:turn boolean into
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.6.0-preview [deleted] 31db36100 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.6-snapshot0-test [deleted] 609d6e87a refs/tags/v1.6.0-preview1 [deleted] f8369412d refs/tags/v1.6.0-preview2 [deleted] 308381420 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.6.0-rc1 [deleted] bf525845c - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.6.0-rc3 [deleted] 168c89e07 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12756][SQL] use hash expression in Exchange
Repository: spark Updated Branches: refs/heads/master e2ae7bd04 -> 962e9bcf9 [SPARK-12756][SQL] use hash expression in Exchange This PR makes bucketing and exchange share one common hash algorithm, so that we can guarantee the data distribution is same between shuffle and bucketed data source, which enables us to only shuffle one side when join a bucketed table and a normal one. This PR also fixes the tests that are broken by the new hash behaviour in shuffle. Author: Wenchen FanCloses #10703 from cloud-fan/use-hash-expr-in-shuffle. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/962e9bcf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/962e9bcf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/962e9bcf Branch: refs/heads/master Commit: 962e9bcf94da6f5134983f2bf1e56c5cd84f2bf7 Parents: e2ae7bd Author: Wenchen Fan Authored: Wed Jan 13 22:43:28 2016 -0800 Committer: Reynold Xin Committed: Wed Jan 13 22:43:28 2016 -0800 -- R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 +- python/pyspark/sql/dataframe.py | 26 +++ python/pyspark/sql/group.py | 6 ++-- .../catalyst/plans/physical/partitioning.scala | 7 - .../apache/spark/sql/execution/Exchange.scala | 12 +-- .../execution/datasources/WriterContainer.scala | 20 ++-- .../apache/spark/sql/JavaDataFrameSuite.java| 4 +-- .../org/apache/spark/sql/JavaDatasetSuite.java | 33 +++- .../org/apache/spark/sql/DataFrameSuite.scala | 21 +++-- .../org/apache/spark/sql/DatasetSuite.scala | 4 +-- .../org/apache/spark/sql/SQLQuerySuite.scala| 2 +- .../spark/sql/sources/BucketedWriteSuite.scala | 11 --- 12 files changed, 84 insertions(+), 64 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/962e9bcf/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 97625b9..40d5066 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1173,7 +1173,7 @@ test_that("group by, agg functions", { expect_equal(3, count(mean(gd))) expect_equal(3, count(max(gd))) - expect_equal(30, collect(max(gd))[1, 2]) + expect_equal(30, collect(max(gd))[2, 2]) expect_equal(1, collect(count(gd))[1, 2]) mockLines2 <- c("{\"name\":\"ID1\", \"value\": \"10\"}", http://git-wip-us.apache.org/repos/asf/spark/blob/962e9bcf/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index a7bc288..90a6b5d 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -403,10 +403,10 @@ class DataFrame(object): +---+-+ |age| name| +---+-+ -| 2|Alice| -| 2|Alice| | 5| Bob| | 5| Bob| +| 2|Alice| +| 2|Alice| +---+-+ >>> data = data.repartition(7, "age") >>> data.show() @@ -552,7 +552,7 @@ class DataFrame(object): >>> df_as2 = df.alias("df_as2") >>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner') >>> joined_df.select(col("df_as1.name"), col("df_as2.name"), col("df_as2.age")).collect() -[Row(name=u'Alice', name=u'Alice', age=2), Row(name=u'Bob', name=u'Bob', age=5)] +[Row(name=u'Bob', name=u'Bob', age=5), Row(name=u'Alice', name=u'Alice', age=2)] """ assert isinstance(alias, basestring), "alias should be a string" return DataFrame(getattr(self._jdf, "as")(alias), self.sql_ctx) @@ -573,14 +573,14 @@ class DataFrame(object): One of `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`. >>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect() -[Row(name=None, height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)] +[Row(name=None, height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)] >>> df.join(df2, 'name', 'outer').select('name', 'height').collect() -[Row(name=u'Tom', height=80), Row(name=u'Alice', height=None), Row(name=u'Bob', height=85)] +[Row(name=u'Tom', height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)] >>> cond = [df.name == df3.name, df.age == df3.age] >>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect() -[Row(name=u'Bob', age=5), Row(name=u'Alice', age=2)] +
spark git commit: [SPARK-12707][SPARK SUBMIT] Remove submit python/R scripts through py…
Repository: spark Updated Branches: refs/heads/master 962e9bcf9 -> 8f13cd4cc [SPARK-12707][SPARK SUBMIT] Remove submit python/R scripts through py⦠â¦spark/sparkR Author: Jeff ZhangCloses #10658 from zjffdu/SPARK-12707. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f13cd4c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f13cd4c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f13cd4c Branch: refs/heads/master Commit: 8f13cd4cc8dcf638b178774418669a2e247d0652 Parents: 962e9bc Author: Jeff Zhang Authored: Wed Jan 13 23:50:08 2016 -0800 Committer: Reynold Xin Committed: Wed Jan 13 23:50:08 2016 -0800 -- .../spark/launcher/SparkSubmitCommandBuilder.java | 13 ++--- 1 file changed, 6 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8f13cd4c/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java -- diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index a95f0f1..269c89c 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -231,11 +231,9 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder { // the pyspark command line, then run it using spark-submit. if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".py")) { System.err.println( -"WARNING: Running python applications through 'pyspark' is deprecated as of Spark 1.0.\n" + +"Running python applications through 'pyspark' is not supported as of Spark 2.0.\n" + "Use ./bin/spark-submit "); - appResource = appArgs.get(0); - appArgs.remove(0); - return buildCommand(env); + System.exit(-1); } checkArgument(appArgs.isEmpty(), "pyspark does not support any application options."); @@ -258,9 +256,10 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder { private List buildSparkRCommand(Map env) throws IOException { if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".R")) { - appResource = appArgs.get(0); - appArgs.remove(0); - return buildCommand(env); + System.err.println( +"Running R applications through 'sparkR' is not supported as of Spark 2.0.\n" + +"Use ./bin/spark-submit "); + System.exit(-1); } // When launching the SparkR shell, store the spark-submit arguments in the SPARKR_SUBMIT_ARGS // env variable. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.6.0-rc2 [deleted] 23f8dfd45 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12690][CORE] Fix NPE in UnsafeInMemorySorter.free()
Repository: spark Updated Branches: refs/heads/branch-1.6 cf6d506c7 -> 26f13faa9 [SPARK-12690][CORE] Fix NPE in UnsafeInMemorySorter.free() I hit the exception below. The `UnsafeKVExternalSorter` does pass `null` as the consumer when creating an `UnsafeInMemorySorter`. Normally the NPE doesn't occur because the `inMemSorter` is set to null later and the `free()` method is not called. It happens when there is another exception like OOM thrown before setting `inMemSorter` to null. Anyway, we can add the null check to avoid it. ``` ERROR spark.TaskContextImpl: Error in TaskCompletionListener java.lang.NullPointerException at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.free(UnsafeInMemorySorter.java:110) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:288) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$1.onTaskCompletion(UnsafeExternalSorter.java:141) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:79) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:77) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:77) at org.apache.spark.scheduler.Task.run(Task.scala:91) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) ``` Author: Carson WangCloses #10637 from carsonwang/FixNPE. (cherry picked from commit eabc7b8ee7e809bab05361ed154f87bff467bd88) Signed-off-by: Josh Rosen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26f13faa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26f13faa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26f13faa Branch: refs/heads/branch-1.6 Commit: 26f13faa981a51046ed1f16b9c3ee42ac5f6b6da Parents: cf6d506 Author: Carson Wang Authored: Wed Jan 13 13:28:39 2016 -0800 Committer: Josh Rosen Committed: Wed Jan 13 13:29:18 2016 -0800 -- .../util/collection/unsafe/sort/UnsafeInMemorySorter.java | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/26f13faa/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java -- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index dce1f15..98a7314 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -107,8 +107,10 @@ public final class UnsafeInMemorySorter { * Free the memory used by pointer array. */ public void free() { -consumer.freeArray(array); -array = null; +if (consumer != null) { + consumer.freeArray(array); + array = null; +} } public void reset() { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12690][CORE] Fix NPE in UnsafeInMemorySorter.free()
Repository: spark Updated Branches: refs/heads/master cbbcd8e42 -> eabc7b8ee [SPARK-12690][CORE] Fix NPE in UnsafeInMemorySorter.free() I hit the exception below. The `UnsafeKVExternalSorter` does pass `null` as the consumer when creating an `UnsafeInMemorySorter`. Normally the NPE doesn't occur because the `inMemSorter` is set to null later and the `free()` method is not called. It happens when there is another exception like OOM thrown before setting `inMemSorter` to null. Anyway, we can add the null check to avoid it. ``` ERROR spark.TaskContextImpl: Error in TaskCompletionListener java.lang.NullPointerException at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.free(UnsafeInMemorySorter.java:110) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:288) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$1.onTaskCompletion(UnsafeExternalSorter.java:141) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:79) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:77) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:77) at org.apache.spark.scheduler.Task.run(Task.scala:91) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) ``` Author: Carson WangCloses #10637 from carsonwang/FixNPE. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eabc7b8e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eabc7b8e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eabc7b8e Branch: refs/heads/master Commit: eabc7b8ee7e809bab05361ed154f87bff467bd88 Parents: cbbcd8e Author: Carson Wang Authored: Wed Jan 13 13:28:39 2016 -0800 Committer: Josh Rosen Committed: Wed Jan 13 13:28:39 2016 -0800 -- .../util/collection/unsafe/sort/UnsafeInMemorySorter.java | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eabc7b8e/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java -- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index f71b8d1..d1b0bc5 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -116,8 +116,10 @@ public final class UnsafeInMemorySorter { * Free the memory used by pointer array. */ public void free() { -consumer.freeArray(array); -array = null; +if (consumer != null) { + consumer.freeArray(array); + array = null; +} } public void reset() { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12400][SHUFFLE] Avoid generating temp shuffle files for empty partitions
Repository: spark Updated Branches: refs/heads/master eabc7b8ee -> cd81fc9e8 [SPARK-12400][SHUFFLE] Avoid generating temp shuffle files for empty partitions This problem lies in `BypassMergeSortShuffleWriter`, empty partition will also generate a temp shuffle file with several bytes. So here change to only create file when partition is not empty. This problem only lies in here, no such issue in `HashShuffleWriter`. Please help to review, thanks a lot. Author: jerryshaoCloses #10376 from jerryshao/SPARK-12400. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd81fc9e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd81fc9e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd81fc9e Branch: refs/heads/master Commit: cd81fc9e8652c07b84f0887a24d67381b4e605fa Parents: eabc7b8 Author: jerryshao Authored: Wed Jan 13 16:34:23 2016 -0800 Committer: Josh Rosen Committed: Wed Jan 13 16:34:23 2016 -0800 -- .../sort/BypassMergeSortShuffleWriter.java | 25 +++-- .../BypassMergeSortShuffleWriterSuite.scala | 38 +++- 2 files changed, 51 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cd81fc9e/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java -- diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index a1a1fb0..56cdc22 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -138,7 +138,7 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); partitionWriters[i] = -blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open(); +blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be @@ -185,16 +185,19 @@ final class BypassMergeSortShuffleWriter extends ShuffleWriter { boolean threwException = true; try { for (int i = 0; i < numPartitions; i++) { -final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file()); -boolean copyThrewException = true; -try { - lengths[i] = Utils.copyStream(in, out, false, transferToEnabled); - copyThrewException = false; -} finally { - Closeables.close(in, copyThrewException); -} -if (!partitionWriters[i].fileSegment().file().delete()) { - logger.error("Unable to delete file for partition {}", i); +final File file = partitionWriters[i].fileSegment().file(); +if (file.exists()) { + final FileInputStream in = new FileInputStream(file); + boolean copyThrewException = true; + try { +lengths[i] = Utils.copyStream(in, out, false, transferToEnabled); +copyThrewException = false; + } finally { +Closeables.close(in, copyThrewException); + } + if (!file.delete()) { +logger.error("Unable to delete file for partition {}", i); + } } } threwException = false; http://git-wip-us.apache.org/repos/asf/spark/blob/cd81fc9e/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index e33408b..ef6ce04 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -105,7 +105,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte new Answer[(TempShuffleBlockId, File)] { override def answer(invocation: InvocationOnMock): (TempShuffleBlockId, File) = { val blockId = new TempShuffleBlockId(UUID.randomUUID) - val file = File.createTempFile(blockId.toString, null, tempDir) +
spark git commit: [SPARK-12026][MLLIB] ChiSqTest gets slower and slower over time when number of features is large
Repository: spark Updated Branches: refs/heads/master cd81fc9e8 -> 021dafc6a [SPARK-12026][MLLIB] ChiSqTest gets slower and slower over time when number of features is large jira: https://issues.apache.org/jira/browse/SPARK-12026 The issue is valid as features.toArray.view.zipWithIndex.slice(startCol, endCol) becomes slower as startCol gets larger. I tested on local and the change can improve the performance and the running time was stable. Author: Yuhao YangCloses #10146 from hhbyyh/chiSq. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/021dafc6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/021dafc6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/021dafc6 Branch: refs/heads/master Commit: 021dafc6a05a31dc22c9f9110dedb47a1f913087 Parents: cd81fc9 Author: Yuhao Yang Authored: Wed Jan 13 17:43:27 2016 -0800 Committer: Joseph K. Bradley Committed: Wed Jan 13 17:43:27 2016 -0800 -- .../scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/021dafc6/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala index f22f2df..4a3fb064 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala @@ -109,7 +109,9 @@ private[stat] object ChiSqTest extends Logging { } i += 1 distinctLabels += label - features.toArray.view.zipWithIndex.slice(startCol, endCol).map { case (feature, col) => + val brzFeatures = features.toBreeze + (startCol until endCol).map { col => +val feature = brzFeatures(col) allDistinctFeatures(col) += feature (col, feature, label) } @@ -122,7 +124,7 @@ private[stat] object ChiSqTest extends Logging { pairCounts.keys.filter(_._1 == startCol).map(_._3).toArray.distinct.zipWithIndex.toMap } val numLabels = labels.size - pairCounts.keys.groupBy(_._1).map { case (col, keys) => + pairCounts.keys.groupBy(_._1).foreach { case (col, keys) => val features = keys.map(_._2).toArray.distinct.zipWithIndex.toMap val numRows = features.size val contingency = new BDM(numRows, numLabels, new Array[Double](numRows * numLabels)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12026][MLLIB] ChiSqTest gets slower and slower over time when number of features is large
Repository: spark Updated Branches: refs/heads/branch-1.6 26f13faa9 -> a490787da [SPARK-12026][MLLIB] ChiSqTest gets slower and slower over time when number of features is large jira: https://issues.apache.org/jira/browse/SPARK-12026 The issue is valid as features.toArray.view.zipWithIndex.slice(startCol, endCol) becomes slower as startCol gets larger. I tested on local and the change can improve the performance and the running time was stable. Author: Yuhao YangCloses #10146 from hhbyyh/chiSq. (cherry picked from commit 021dafc6a05a31dc22c9f9110dedb47a1f913087) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a490787d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a490787d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a490787d Branch: refs/heads/branch-1.6 Commit: a490787daa5ec11a5e30bc0df31f81edd54ccc6a Parents: 26f13fa Author: Yuhao Yang Authored: Wed Jan 13 17:43:27 2016 -0800 Committer: Joseph K. Bradley Committed: Wed Jan 13 17:43:38 2016 -0800 -- .../scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a490787d/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala index 23c8d7c..1c583a4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala @@ -109,7 +109,9 @@ private[stat] object ChiSqTest extends Logging { } i += 1 distinctLabels += label - features.toArray.view.zipWithIndex.slice(startCol, endCol).map { case (feature, col) => + val brzFeatures = features.toBreeze + (startCol until endCol).map { col => +val feature = brzFeatures(col) allDistinctFeatures(col) += feature (col, feature, label) } @@ -122,7 +124,7 @@ private[stat] object ChiSqTest extends Logging { pairCounts.keys.filter(_._1 == startCol).map(_._3).toArray.distinct.zipWithIndex.toMap } val numLabels = labels.size - pairCounts.keys.groupBy(_._1).map { case (col, keys) => + pairCounts.keys.groupBy(_._1).foreach { case (col, keys) => val features = keys.map(_._2).toArray.distinct.zipWithIndex.toMap val numRows = features.size val contingency = new BDM(numRows, numLabels, new Array[Double](numRows * numLabels)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12703][MLLIB][DOC][PYTHON] Fixed pyspark.mllib.clustering.KMeans user guide example
Repository: spark Updated Branches: refs/heads/master 021dafc6a -> 20d8ef858 [SPARK-12703][MLLIB][DOC][PYTHON] Fixed pyspark.mllib.clustering.KMeans user guide example Fixed WSSSE computeCost in Python mllib KMeans user guide example by using new computeCost method API in Python. Author: Joseph K. BradleyCloses #10707 from jkbradley/kmeans-doc-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20d8ef85 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20d8ef85 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20d8ef85 Branch: refs/heads/master Commit: 20d8ef858af6e13db59df118b562ea33cba5464d Parents: 021dafc Author: Joseph K. Bradley Authored: Wed Jan 13 18:01:29 2016 -0800 Committer: Joseph K. Bradley Committed: Wed Jan 13 18:01:29 2016 -0800 -- docs/mllib-clustering.md | 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/20d8ef85/docs/mllib-clustering.md -- diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md index 93cd0c1..d0be032 100644 --- a/docs/mllib-clustering.md +++ b/docs/mllib-clustering.md @@ -152,11 +152,7 @@ clusters = KMeans.train(parsedData, 2, maxIterations=10, runs=10, initializationMode="random") # Evaluate clustering by computing Within Set Sum of Squared Errors -def error(point): -center = clusters.centers[clusters.predict(point)] -return sqrt(sum([x**2 for x in (point - center)])) - -WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y) +WSSSE = clusters.computeCost(parsedData) print("Within Set Sum of Squared Error = " + str(WSSSE)) # Save and load model - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-12819] Deprecate TaskContext.isRunningLocally()
Repository: spark Updated Branches: refs/heads/master 20d8ef858 -> e2ae7bd04 [SPARK-12819] Deprecate TaskContext.isRunningLocally() We've already removed local execution but didn't deprecate `TaskContext.isRunningLocally()`; we should deprecate it for 2.0. Author: Josh RosenCloses #10751 from JoshRosen/remove-local-exec-from-taskcontext. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2ae7bd0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2ae7bd0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2ae7bd0 Branch: refs/heads/master Commit: e2ae7bd046f6d8d6a375c2e81e5a51d7d78ca984 Parents: 20d8ef8 Author: Josh Rosen Authored: Wed Jan 13 21:02:54 2016 -0800 Committer: Reynold Xin Committed: Wed Jan 13 21:02:54 2016 -0800 -- core/src/main/scala/org/apache/spark/CacheManager.scala | 5 - core/src/main/scala/org/apache/spark/TaskContext.scala | 3 ++- core/src/main/scala/org/apache/spark/TaskContextImpl.scala | 3 +-- core/src/main/scala/org/apache/spark/scheduler/Task.scala | 3 +-- .../src/test/scala/org/apache/spark/CacheManagerSuite.scala | 9 - 5 files changed, 4 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e2ae7bd0/core/src/main/scala/org/apache/spark/CacheManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 4d20c73..36b536e 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -68,11 +68,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { logInfo(s"Partition $key not found, computing it") val computedValues = rdd.computeOrReadCheckpoint(partition, context) - // If the task is running locally, do not persist the result - if (context.isRunningLocally) { -return computedValues - } - // Otherwise, cache the values and keep track of any updates in block statuses val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) http://git-wip-us.apache.org/repos/asf/spark/blob/e2ae7bd0/core/src/main/scala/org/apache/spark/TaskContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index e25ed0f..7704abc 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -97,8 +97,9 @@ abstract class TaskContext extends Serializable { /** * Returns true if the task is running locally in the driver program. - * @return + * @return false */ + @deprecated("Local execution was removed, so this always returns false", "2.0.0") def isRunningLocally(): Boolean /** http://git-wip-us.apache.org/repos/asf/spark/blob/e2ae7bd0/core/src/main/scala/org/apache/spark/TaskContextImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index 6c49363..94ff884 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -33,7 +33,6 @@ private[spark] class TaskContextImpl( override val taskMemoryManager: TaskMemoryManager, @transient private val metricsSystem: MetricsSystem, internalAccumulators: Seq[Accumulator[Long]], -val runningLocally: Boolean = false, val taskMetrics: TaskMetrics = TaskMetrics.empty) extends TaskContext with Logging { @@ -85,7 +84,7 @@ private[spark] class TaskContextImpl( override def isCompleted(): Boolean = completed - override def isRunningLocally(): Boolean = runningLocally + override def isRunningLocally(): Boolean = false override def isInterrupted(): Boolean = interrupted http://git-wip-us.apache.org/repos/asf/spark/blob/e2ae7bd0/core/src/main/scala/org/apache/spark/scheduler/Task.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 0379ca2..fca5792 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -74,8 +74,7 @@
spark git commit: [SPARK-12692][BUILD][HOT-FIX] Fix the scala style of KinesisBackedBlockRDDSuite.scala.
Repository: spark Updated Branches: refs/heads/master 3d81d63f4 -> d6fd9b376 [SPARK-12692][BUILD][HOT-FIX] Fix the scala style of KinesisBackedBlockRDDSuite.scala. https://github.com/apache/spark/pull/10736 was merged yesterday and caused the master start to fail because of the style issue. Author: Yin HuaiCloses #10742 from yhuai/fixStyle. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6fd9b37 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6fd9b37 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6fd9b37 Branch: refs/heads/master Commit: d6fd9b376b7071aecef34dc82a33eba42b183bc9 Parents: 3d81d63 Author: Yin Huai Authored: Wed Jan 13 10:01:15 2016 -0800 Committer: Yin Huai Committed: Wed Jan 13 10:01:15 2016 -0800 -- .../spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d6fd9b37/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala -- diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala index e6f504c..e916f1e 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala @@ -158,9 +158,9 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean) testBlockRemove: Boolean = false ): Unit = { require(shardIds.size > 1, "Need at least 2 shards to test") -require(numPartitionsInBM <= shardIds.size , +require(numPartitionsInBM <= shardIds.size, "Number of partitions in BlockManager cannot be more than the Kinesis test shards available") -require(numPartitionsInKinesis <= shardIds.size , +require(numPartitionsInKinesis <= shardIds.size, "Number of partitions in Kinesis cannot be more than the Kinesis test shards available") require(numPartitionsInBM <= numPartitions, "Number of partitions in BlockManager cannot be more than that in RDD") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org