spark git commit: [SPARK-10378][SQL][Test] Remove HashJoinCompatibilitySuite.
Repository: spark Updated Branches: refs/heads/master 52ea399e6 -> d65656c45 [SPARK-10378][SQL][Test] Remove HashJoinCompatibilitySuite. They don't bring much value since we now have better unit test coverage for hash joins. This will also help reduce the test time. Author: Reynold Xin Closes #8542 from rxin/SPARK-10378. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d65656c4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d65656c4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d65656c4 Branch: refs/heads/master Commit: d65656c455d19b83c6412571873586b458aa355e Parents: 52ea399 Author: Reynold Xin Authored: Mon Aug 31 18:09:24 2015 -0700 Committer: Reynold Xin Committed: Mon Aug 31 18:09:24 2015 -0700 -- .../execution/HashJoinCompatibilitySuite.scala | 169 --- 1 file changed, 169 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d65656c4/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala -- diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala deleted file mode 100644 index 1a5ba20..000 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HashJoinCompatibilitySuite.scala +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import java.io.File - -import org.apache.spark.sql.SQLConf -import org.apache.spark.sql.hive.test.TestHive - -/** - * Runs the test cases that are included in the hive distribution with hash joins. - */ -class HashJoinCompatibilitySuite extends HiveCompatibilitySuite { - override def beforeAll() { -super.beforeAll() -TestHive.setConf(SQLConf.SORTMERGE_JOIN, false) - } - - override def afterAll() { -TestHive.setConf(SQLConf.SORTMERGE_JOIN, true) -super.afterAll() - } - - override def whiteList = Seq( -"auto_join0", -"auto_join1", -"auto_join10", -"auto_join11", -"auto_join12", -"auto_join13", -"auto_join14", -"auto_join14_hadoop20", -"auto_join15", -"auto_join17", -"auto_join18", -"auto_join19", -"auto_join2", -"auto_join20", -"auto_join21", -"auto_join22", -"auto_join23", -"auto_join24", -"auto_join25", -"auto_join26", -"auto_join27", -"auto_join28", -"auto_join3", -"auto_join30", -"auto_join31", -"auto_join32", -"auto_join4", -"auto_join5", -"auto_join6", -"auto_join7", -"auto_join8", -"auto_join9", -"auto_join_filters", -"auto_join_nulls", -"auto_join_reordering_values", -"auto_smb_mapjoin_14", -"auto_sortmerge_join_1", -"auto_sortmerge_join_10", -"auto_sortmerge_join_11", -"auto_sortmerge_join_12", -"auto_sortmerge_join_13", -"auto_sortmerge_join_14", -"auto_sortmerge_join_15", -"auto_sortmerge_join_16", -"auto_sortmerge_join_2", -"auto_sortmerge_join_3", -"auto_sortmerge_join_4", -"auto_sortmerge_join_5", -"auto_sortmerge_join_6", -"auto_sortmerge_join_7", -"auto_sortmerge_join_8", -"auto_sortmerge_join_9", -"correlationoptimizer1", -"correlationoptimizer10", -"correlationoptimizer11", -"correlationoptimizer13", -"correlationoptimizer14", -"correlationoptimizer15", -"correlationoptimizer2", -"correlationoptimizer3", -"correlationoptimizer4", -"correlationoptimizer6", -"correlationoptimizer7", -"correlationoptimizer8", -"correlationoptimizer9", -"join0", -"join1", -"join10", -"join11", -"join12", -"join13", -"join14", -"join14_hadoop20", -"join15", -"join16", -"join17", -"join18", -"join19", -"join2", -"join20", -"joi
spark git commit: [SPARK-10355] [ML] [PySpark] Add Python API for SQLTransformer
Repository: spark Updated Branches: refs/heads/master fe16fd0b8 -> 52ea399e6 [SPARK-10355] [ML] [PySpark] Add Python API for SQLTransformer Add Python API for SQLTransformer Author: Yanbo Liang Closes #8527 from yanboliang/spark-10355. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52ea399e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52ea399e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52ea399e Branch: refs/heads/master Commit: 52ea399e6ee37b7c44aae7709863e006fca88906 Parents: fe16fd0 Author: Yanbo Liang Authored: Mon Aug 31 16:11:27 2015 -0700 Committer: Xiangrui Meng Committed: Mon Aug 31 16:11:27 2015 -0700 -- python/pyspark/ml/feature.py | 57 --- 1 file changed, 54 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/52ea399e/python/pyspark/ml/feature.py -- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 59300a6..0626281 100644 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -28,9 +28,9 @@ from pyspark.mllib.linalg import _convert_to_vector __all__ = ['Binarizer', 'Bucketizer', 'DCT', 'ElementwiseProduct', 'HashingTF', 'IDF', 'IDFModel', 'NGram', 'Normalizer', 'OneHotEncoder', 'PolynomialExpansion', 'RegexTokenizer', - 'StandardScaler', 'StandardScalerModel', 'StringIndexer', 'StringIndexerModel', - 'Tokenizer', 'VectorAssembler', 'VectorIndexer', 'Word2Vec', 'Word2VecModel', - 'PCA', 'PCAModel', 'RFormula', 'RFormulaModel'] + 'SQLTransformer', 'StandardScaler', 'StandardScalerModel', 'StringIndexer', + 'StringIndexerModel', 'Tokenizer', 'VectorAssembler', 'VectorIndexer', 'Word2Vec', + 'Word2VecModel', 'PCA', 'PCAModel', 'RFormula', 'RFormulaModel'] @inherit_doc @@ -744,6 +744,57 @@ class RegexTokenizer(JavaTransformer, HasInputCol, HasOutputCol): @inherit_doc +class SQLTransformer(JavaTransformer): +""" +Implements the transforms which are defined by SQL statement. +Currently we only support SQL syntax like 'SELECT ... FROM __THIS__' +where '__THIS__' represents the underlying table of the input dataset. + +>>> df = sqlContext.createDataFrame([(0, 1.0, 3.0), (2, 2.0, 5.0)], ["id", "v1", "v2"]) +>>> sqlTrans = SQLTransformer( +... statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__") +>>> sqlTrans.transform(df).head() +Row(id=0, v1=1.0, v2=3.0, v3=4.0, v4=3.0) +""" + +# a placeholder to make it appear in the generated doc +statement = Param(Params._dummy(), "statement", "SQL statement") + +@keyword_only +def __init__(self, statement=None): +""" +__init__(self, statement=None) +""" +super(SQLTransformer, self).__init__() +self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.SQLTransformer", self.uid) +self.statement = Param(self, "statement", "SQL statement") +kwargs = self.__init__._input_kwargs +self.setParams(**kwargs) + +@keyword_only +def setParams(self, statement=None): +""" +setParams(self, statement=None) +Sets params for this SQLTransformer. +""" +kwargs = self.setParams._input_kwargs +return self._set(**kwargs) + +def setStatement(self, value): +""" +Sets the value of :py:attr:`statement`. +""" +self._paramMap[self.statement] = value +return self + +def getStatement(self): +""" +Gets the value of statement or its default value. +""" +return self.getOrDefault(self.statement) + + +@inherit_doc class StandardScaler(JavaEstimator, HasInputCol, HasOutputCol): """ Standardizes features by removing the mean and scaling to unit variance using column summary - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10349] [ML] OneVsRest use 'when ... otherwise' not UDF to generate new label at binary reduction
Repository: spark Updated Branches: refs/heads/master 540bdee93 -> fe16fd0b8 [SPARK-10349] [ML] OneVsRest use 'when ... otherwise' not UDF to generate new label at binary reduction Currently OneVsRest use UDF to generate new binary label during training. Considering that [SPARK-7321](https://issues.apache.org/jira/browse/SPARK-7321) has been merged, we can use ```when ... otherwise``` which will be more efficiency. Author: Yanbo Liang Closes #8519 from yanboliang/spark-10349. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe16fd0b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe16fd0b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe16fd0b Branch: refs/heads/master Commit: fe16fd0b8b717f01151bc659ec3299dab091c97a Parents: 540bdee Author: Yanbo Liang Authored: Mon Aug 31 16:06:38 2015 -0700 Committer: Xiangrui Meng Committed: Mon Aug 31 16:06:38 2015 -0700 -- .../org/apache/spark/ml/classification/OneVsRest.scala| 10 ++ 1 file changed, 2 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fe16fd0b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index c62e132..debc164 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -91,7 +91,6 @@ final class OneVsRestModel private[ml] ( // add an accumulator column to store predictions of all the models val accColName = "mbc$acc" + UUID.randomUUID().toString val initUDF = udf { () => Map[Int, Double]() } -val mapType = MapType(IntegerType, DoubleType, valueContainsNull = false) val newDataset = dataset.withColumn(accColName, initUDF()) // persist if underlying dataset is not persistent. @@ -195,16 +194,11 @@ final class OneVsRest(override val uid: String) // create k columns, one for each binary classifier. val models = Range(0, numClasses).par.map { index => - val labelUDF = udf { (label: Double) => -if (label.toInt == index) 1.0 else 0.0 - } - // generate new label metadata for the binary problem. - // TODO: use when ... otherwise after SPARK-7321 is merged val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata() val labelColName = "mc2b$" + index - val trainingDataset = -multiclassLabeled.withColumn(labelColName, labelUDF(col($(labelCol))), newLabelMeta) + val trainingDataset = multiclassLabeled.withColumn( +labelColName, when(col($(labelCol)) === index.toDouble, 1.0).otherwise(0.0), newLabelMeta) val classifier = getClassifier val paramMap = new ParamMap() paramMap.put(classifier.labelCol -> labelColName) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: Preparing development version 1.5.1-SNAPSHOT
Preparing development version 1.5.1-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b270a16 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b270a16 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b270a16 Branch: refs/heads/branch-1.5 Commit: 2b270a166d6bd5b42399400924c576c9996bfc10 Parents: 908e37b Author: Patrick Wendell Authored: Mon Aug 31 15:57:49 2015 -0700 Committer: Patrick Wendell Committed: Mon Aug 31 15:57:49 2015 -0700 -- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 33 files changed, 33 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 3ef7d6f..7b41ebb 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.0 +1.5.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 684e07b..16bf17c 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.0 +1.5.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 6b082ad..beb547f 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.0 +1.5.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 9ef1eda..3926b79 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.0 +1.5.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index aa7021d..5eda12d 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.0 +1.5.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 7d72f78..33f2cd7 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.0 +1.5.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/2b270a16/external/flume/pom.xml -- diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 38683e3..670c783 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark s
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.5.0-rc3 [created] 908e37bcc - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[1/2] spark git commit: Preparing Spark release v1.5.0-rc3
Repository: spark Updated Branches: refs/heads/branch-1.5 1c752b8b5 -> 2b270a166 Preparing Spark release v1.5.0-rc3 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/908e37bc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/908e37bc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/908e37bc Branch: refs/heads/branch-1.5 Commit: 908e37bcc10132bb2aa7f80ae694a9df6e40f31a Parents: 1c752b8 Author: Patrick Wendell Authored: Mon Aug 31 15:57:42 2015 -0700 Committer: Patrick Wendell Committed: Mon Aug 31 15:57:42 2015 -0700 -- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 33 files changed, 33 insertions(+), 33 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 7b41ebb..3ef7d6f 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.1-SNAPSHOT +1.5.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 16bf17c..684e07b 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.1-SNAPSHOT +1.5.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index beb547f..6b082ad 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.1-SNAPSHOT +1.5.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 3926b79..9ef1eda 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.1-SNAPSHOT +1.5.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index 5eda12d..aa7021d 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.1-SNAPSHOT +1.5.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml index 33f2cd7..7d72f78 100644 --- a/external/flume-sink/pom.xml +++ b/external/flume-sink/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.5.1-SNAPSHOT +1.5.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/908e37bc/external/flume/pom.xml -- diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 670c783..38683e3 100644 --- a/external/flume/pom.xml +++
spark git commit: [SPARK-10341] [SQL] fix memory starving in unsafe SMJ
Repository: spark Updated Branches: refs/heads/branch-1.5 33ce274cd -> 1c752b8b5 [SPARK-10341] [SQL] fix memory starving in unsafe SMJ In SMJ, the first ExternalSorter could consume all the memory before spilling, then the second can not even acquire the first page. Before we have a better memory allocator, SMJ should call prepare() before call any compute() of it's children. cc rxin JoshRosen Author: Davies Liu Closes #8511 from davies/smj_memory. (cherry picked from commit 540bdee93103a73736d282b95db6a8cda8f6a2b1) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c752b8b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c752b8b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c752b8b Branch: refs/heads/branch-1.5 Commit: 1c752b8b5c7090936b5c2ca94e8fb47c4f570d69 Parents: 33ce274 Author: Davies Liu Authored: Mon Aug 31 15:55:22 2015 -0700 Committer: Reynold Xin Committed: Mon Aug 31 15:55:29 2015 -0700 -- .../rdd/MapPartitionsWithPreparationRDD.scala | 21 ++-- .../apache/spark/rdd/ZippedPartitionsRDD.scala | 13 .../MapPartitionsWithPreparationRDDSuite.scala | 14 + 3 files changed, 42 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1c752b8b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala index b475bd8..1f2213d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala @@ -17,6 +17,7 @@ package org.apache.spark.rdd +import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import org.apache.spark.{Partition, Partitioner, TaskContext} @@ -38,12 +39,28 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M override def getPartitions: Array[Partition] = firstParent[T].partitions + // In certain join operations, prepare can be called on the same partition multiple times. + // In this case, we need to ensure that each call to compute gets a separate prepare argument. + private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M] + + /** + * Prepare a partition for a single call to compute. + */ + def prepare(): Unit = { +preparedArguments += preparePartition() + } + /** * Prepare a partition before computing it from its parent. */ override def compute(partition: Partition, context: TaskContext): Iterator[U] = { -val preparedArgument = preparePartition() +val prepared = + if (preparedArguments.isEmpty) { +preparePartition() + } else { +preparedArguments.remove(0) + } val parentIterator = firstParent[T].iterator(partition, context) -executePartition(context, partition.index, preparedArgument, parentIterator) +executePartition(context, partition.index, prepared, parentIterator) } } http://git-wip-us.apache.org/repos/asf/spark/blob/1c752b8b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index 81f40ad..b3c6439 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -73,6 +73,16 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag]( super.clearDependencies() rdds = null } + + /** + * Call the prepare method of every parent that has one. + * This is needed for reserving execution memory in advance. + */ + protected def tryPrepareParents(): Unit = { +rdds.collect { + case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.prepare() +} + } } private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]( @@ -84,6 +94,7 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag] extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { +tryPrepareParents() val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context)) } @@ -107,6 +118,7 @@ private[spark] class ZippedPartitionsRDD3
spark git commit: [SPARK-10341] [SQL] fix memory starving in unsafe SMJ
Repository: spark Updated Branches: refs/heads/master 5b3245d6d -> 540bdee93 [SPARK-10341] [SQL] fix memory starving in unsafe SMJ In SMJ, the first ExternalSorter could consume all the memory before spilling, then the second can not even acquire the first page. Before we have a better memory allocator, SMJ should call prepare() before call any compute() of it's children. cc rxin JoshRosen Author: Davies Liu Closes #8511 from davies/smj_memory. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/540bdee9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/540bdee9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/540bdee9 Branch: refs/heads/master Commit: 540bdee93103a73736d282b95db6a8cda8f6a2b1 Parents: 5b3245d Author: Davies Liu Authored: Mon Aug 31 15:55:22 2015 -0700 Committer: Reynold Xin Committed: Mon Aug 31 15:55:22 2015 -0700 -- .../rdd/MapPartitionsWithPreparationRDD.scala | 21 ++-- .../apache/spark/rdd/ZippedPartitionsRDD.scala | 13 .../MapPartitionsWithPreparationRDDSuite.scala | 14 + 3 files changed, 42 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/540bdee9/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala index b475bd8..1f2213d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala @@ -17,6 +17,7 @@ package org.apache.spark.rdd +import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import org.apache.spark.{Partition, Partitioner, TaskContext} @@ -38,12 +39,28 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M override def getPartitions: Array[Partition] = firstParent[T].partitions + // In certain join operations, prepare can be called on the same partition multiple times. + // In this case, we need to ensure that each call to compute gets a separate prepare argument. + private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M] + + /** + * Prepare a partition for a single call to compute. + */ + def prepare(): Unit = { +preparedArguments += preparePartition() + } + /** * Prepare a partition before computing it from its parent. */ override def compute(partition: Partition, context: TaskContext): Iterator[U] = { -val preparedArgument = preparePartition() +val prepared = + if (preparedArguments.isEmpty) { +preparePartition() + } else { +preparedArguments.remove(0) + } val parentIterator = firstParent[T].iterator(partition, context) -executePartition(context, partition.index, preparedArgument, parentIterator) +executePartition(context, partition.index, prepared, parentIterator) } } http://git-wip-us.apache.org/repos/asf/spark/blob/540bdee9/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index 81f40ad..b3c6439 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -73,6 +73,16 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag]( super.clearDependencies() rdds = null } + + /** + * Call the prepare method of every parent that has one. + * This is needed for reserving execution memory in advance. + */ + protected def tryPrepareParents(): Unit = { +rdds.collect { + case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.prepare() +} + } } private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]( @@ -84,6 +94,7 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag] extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { +tryPrepareParents() val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context)) } @@ -107,6 +118,7 @@ private[spark] class ZippedPartitionsRDD3 extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning) { override de
spark git commit: [SPARK-8472] [ML] [PySpark] Python API for DCT
Repository: spark Updated Branches: refs/heads/master 23e39cc7b -> 5b3245d6d [SPARK-8472] [ML] [PySpark] Python API for DCT Add Python API for ml.feature.DCT. Author: Yanbo Liang Closes #8485 from yanboliang/spark-8472. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b3245d6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b3245d6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b3245d6 Branch: refs/heads/master Commit: 5b3245d6dff65972fc39c73f90d5cbdf84d19129 Parents: 23e39cc Author: Yanbo Liang Authored: Mon Aug 31 15:50:41 2015 -0700 Committer: Xiangrui Meng Committed: Mon Aug 31 15:50:41 2015 -0700 -- python/pyspark/ml/feature.py | 65 ++- 1 file changed, 64 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5b3245d6/python/pyspark/ml/feature.py -- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 04b2b2c..59300a6 100644 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -26,7 +26,7 @@ from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaTransformer from pyspark.mllib.common import inherit_doc from pyspark.mllib.linalg import _convert_to_vector -__all__ = ['Binarizer', 'Bucketizer', 'ElementwiseProduct', 'HashingTF', 'IDF', 'IDFModel', +__all__ = ['Binarizer', 'Bucketizer', 'DCT', 'ElementwiseProduct', 'HashingTF', 'IDF', 'IDFModel', 'NGram', 'Normalizer', 'OneHotEncoder', 'PolynomialExpansion', 'RegexTokenizer', 'StandardScaler', 'StandardScalerModel', 'StringIndexer', 'StringIndexerModel', 'Tokenizer', 'VectorAssembler', 'VectorIndexer', 'Word2Vec', 'Word2VecModel', @@ -167,6 +167,69 @@ class Bucketizer(JavaTransformer, HasInputCol, HasOutputCol): @inherit_doc +class DCT(JavaTransformer, HasInputCol, HasOutputCol): +""" +A feature transformer that takes the 1D discrete cosine transform +of a real vector. No zero padding is performed on the input vector. +It returns a real vector of the same length representing the DCT. +The return vector is scaled such that the transform matrix is +unitary (aka scaled DCT-II). + +More information on +`https://en.wikipedia.org/wiki/Discrete_cosine_transform#DCT-II Wikipedia`. + +>>> from pyspark.mllib.linalg import Vectors +>>> df1 = sqlContext.createDataFrame([(Vectors.dense([5.0, 8.0, 6.0]),)], ["vec"]) +>>> dct = DCT(inverse=False, inputCol="vec", outputCol="resultVec") +>>> df2 = dct.transform(df1) +>>> df2.head().resultVec +DenseVector([10.969..., -0.707..., -2.041...]) +>>> df3 = DCT(inverse=True, inputCol="resultVec", outputCol="origVec").transform(df2) +>>> df3.head().origVec +DenseVector([5.0, 8.0, 6.0]) +""" + +# a placeholder to make it appear in the generated doc +inverse = Param(Params._dummy(), "inverse", "Set transformer to perform inverse DCT, " + +"default False.") + +@keyword_only +def __init__(self, inverse=False, inputCol=None, outputCol=None): +""" +__init__(self, inverse=False, inputCol=None, outputCol=None) +""" +super(DCT, self).__init__() +self._java_obj = self._new_java_obj("org.apache.spark.ml.feature.DCT", self.uid) +self.inverse = Param(self, "inverse", "Set transformer to perform inverse DCT, " + + "default False.") +self._setDefault(inverse=False) +kwargs = self.__init__._input_kwargs +self.setParams(**kwargs) + +@keyword_only +def setParams(self, inverse=False, inputCol=None, outputCol=None): +""" +setParams(self, inverse=False, inputCol=None, outputCol=None) +Sets params for this DCT. +""" +kwargs = self.setParams._input_kwargs +return self._set(**kwargs) + +def setInverse(self, value): +""" +Sets the value of :py:attr:`inverse`. +""" +self._paramMap[self.inverse] = value +return self + +def getInverse(self): +""" +Gets the value of inverse or its default value. +""" +return self.getOrDefault(self.inverse) + + +@inherit_doc class ElementwiseProduct(JavaTransformer, HasInputCol, HasOutputCol): """ Outputs the Hadamard product (i.e., the element-wise product) of each input vector - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9954] [MLLIB] use first 128 nonzeros to compute Vector.hashCode
Repository: spark Updated Branches: refs/heads/master a2d5c7209 -> 23e39cc7b [SPARK-9954] [MLLIB] use first 128 nonzeros to compute Vector.hashCode This could help reduce hash collisions, e.g., in `RDD[Vector].repartition`. jkbradley Author: Xiangrui Meng Closes #8182 from mengxr/SPARK-9954. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23e39cc7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23e39cc7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23e39cc7 Branch: refs/heads/master Commit: 23e39cc7b1bb7f1087c4706234c9b5165a571357 Parents: a2d5c72 Author: Xiangrui Meng Authored: Mon Aug 31 15:49:25 2015 -0700 Committer: Xiangrui Meng Committed: Mon Aug 31 15:49:25 2015 -0700 -- .../org/apache/spark/mllib/linalg/Vectors.scala | 38 +++- 1 file changed, 21 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/23e39cc7/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 06ebb15..3642e92 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -71,20 +71,22 @@ sealed trait Vector extends Serializable { } /** - * Returns a hash code value for the vector. The hash code is based on its size and its nonzeros - * in the first 16 entries, using a hash algorithm similar to [[java.util.Arrays.hashCode]]. + * Returns a hash code value for the vector. The hash code is based on its size and its first 128 + * nonzero entries, using a hash algorithm similar to [[java.util.Arrays.hashCode]]. */ override def hashCode(): Int = { // This is a reference implementation. It calls return in foreachActive, which is slow. // Subclasses should override it with optimized implementation. var result: Int = 31 + size +var nnz = 0 this.foreachActive { (index, value) => - if (index < 16) { + if (nnz < Vectors.MAX_HASH_NNZ) { // ignore explicit 0 for comparison between sparse and dense if (value != 0) { result = 31 * result + index val bits = java.lang.Double.doubleToLongBits(value) result = 31 * result + (bits ^ (bits >>> 32)).toInt + nnz += 1 } } else { return result @@ -536,6 +538,9 @@ object Vectors { } allEqual } + + /** Max number of nonzero entries used in computing hash code. */ + private[linalg] val MAX_HASH_NNZ = 128 } /** @@ -578,13 +583,15 @@ class DenseVector @Since("1.0.0") ( override def hashCode(): Int = { var result: Int = 31 + size var i = 0 -val end = math.min(values.length, 16) -while (i < end) { +val end = values.length +var nnz = 0 +while (i < end && nnz < Vectors.MAX_HASH_NNZ) { val v = values(i) if (v != 0.0) { result = 31 * result + i val bits = java.lang.Double.doubleToLongBits(values(i)) result = 31 * result + (bits ^ (bits >>> 32)).toInt +nnz += 1 } i += 1 } @@ -707,19 +714,16 @@ class SparseVector @Since("1.0.0") ( override def hashCode(): Int = { var result: Int = 31 + size val end = values.length -var continue = true var k = 0 -while ((k < end) & continue) { - val i = indices(k) - if (i < 16) { -val v = values(k) -if (v != 0.0) { - result = 31 * result + i - val bits = java.lang.Double.doubleToLongBits(v) - result = 31 * result + (bits ^ (bits >>> 32)).toInt -} - } else { -continue = false +var nnz = 0 +while (k < end && nnz < Vectors.MAX_HASH_NNZ) { + val v = values(k) + if (v != 0.0) { +val i = indices(k) +result = 31 * result + i +val bits = java.lang.Double.doubleToLongBits(v) +result = 31 * result + (bits ^ (bits >>> 32)).toInt +nnz += 1 } k += 1 } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10170] [SQL] Add DB2 JDBC dialect support.
Repository: spark Updated Branches: refs/heads/master 4a5fe0916 -> a2d5c7209 [SPARK-10170] [SQL] Add DB2 JDBC dialect support. Data frame write to DB2 database is failing because by default JDBC data source implementation is generating a table schema with DB2 unsupported data types TEXT for String, and BIT1(1) for Boolean. This patch registers DB2 JDBC Dialect that maps String, Boolean to valid DB2 data types. Author: sureshthalamati Closes #8393 from sureshthalamati/db2_dialect_spark-10170. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2d5c720 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2d5c720 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2d5c720 Branch: refs/heads/master Commit: a2d5c72091b1c602694dbca823a7b26f86b02864 Parents: 4a5fe09 Author: sureshthalamati Authored: Mon Aug 31 12:39:58 2015 -0700 Committer: Reynold Xin Committed: Mon Aug 31 12:39:58 2015 -0700 -- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 18 ++ .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 7 +++ 2 files changed, 25 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a2d5c720/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 8849fc2..c6d05c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -125,6 +125,7 @@ object JdbcDialects { registerDialect(MySQLDialect) registerDialect(PostgresDialect) + registerDialect(DB2Dialect) /** * Fetch the JdbcDialect class corresponding to a given database url. @@ -222,3 +223,20 @@ case object MySQLDialect extends JdbcDialect { s"`$colName`" } } + +/** + * :: DeveloperApi :: + * Default DB2 dialect, mapping string/boolean on write to valid DB2 types. + * By default string, and boolean gets mapped to db2 invalid types TEXT, and BIT(1). + */ +@DeveloperApi +case object DB2Dialect extends JdbcDialect { + + override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2") + + override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { +case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB)) +case BooleanType => Some(JdbcType("CHAR(1)", java.sql.Types.CHAR)) +case _ => None + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/a2d5c720/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 0edac08..d8c9a08 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -407,6 +407,7 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext test("Default jdbc dialect registration") { assert(JdbcDialects.get("jdbc:mysql://127.0.0.1/db") == MySQLDialect) assert(JdbcDialects.get("jdbc:postgresql://127.0.0.1/db") == PostgresDialect) +assert(JdbcDialects.get("jdbc:db2://127.0.0.1/db") == DB2Dialect) assert(JdbcDialects.get("test.invalid") == NoopDialect) } @@ -443,4 +444,10 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext assert(agg.getCatalystType(0, "", 1, null) === Some(LongType)) assert(agg.getCatalystType(1, "", 1, null) === Some(StringType)) } + + test("DB2Dialect type mapping") { +val db2Dialect = JdbcDialects.get("jdbc:db2://127.0.0.1/db") + assert(db2Dialect.getJDBCType(StringType).map(_.databaseTypeDefinition).get == "CLOB") + assert(db2Dialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "CHAR(1)") + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later
Repository: spark Updated Branches: refs/heads/branch-1.5 bf5b2f26b -> 33ce274cd [SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later `deregisterReceiver` should not remove `ReceiverTrackingInfo`. Otherwise, it will throw `java.util.NoSuchElementException: key not found` when restarting it. Author: zsxwing Closes #8538 from zsxwing/SPARK-10369. (cherry picked from commit 4a5fe091658b1d06f427e404a11a84fc84f953c5) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33ce274c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33ce274c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33ce274c Branch: refs/heads/branch-1.5 Commit: 33ce274cdf7538b5816f1a400b2fad394ec2a056 Parents: bf5b2f2 Author: zsxwing Authored: Mon Aug 31 12:19:11 2015 -0700 Committer: Tathagata Das Committed: Mon Aug 31 12:19:48 2015 -0700 -- .../streaming/scheduler/ReceiverTracker.scala | 4 +- .../scheduler/ReceiverTrackerSuite.scala| 51 2 files changed, 53 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/33ce274c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 3d532a6..f86fd44 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -291,7 +291,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ReceiverTrackingInfo( streamId, ReceiverState.INACTIVE, None, None, None, None, Some(errorInfo)) } -receiverTrackingInfos -= streamId +receiverTrackingInfos(streamId) = newReceiverTrackingInfo listenerBus.post(StreamingListenerReceiverStopped(newReceiverTrackingInfo.toReceiverInfo)) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" @@ -483,7 +483,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false context.reply(true) // Local messages case AllReceiverIds => -context.reply(receiverTrackingInfos.keys.toSeq) +context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq) case StopAllReceivers => assert(isTrackerStopping || isTrackerStopped) stopReceivers() http://git-wip-us.apache.org/repos/asf/spark/blob/33ce274c/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index dd292ba..45138b7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -60,6 +60,26 @@ class ReceiverTrackerSuite extends TestSuiteBase { } } } + + test("should restart receiver after stopping it") { +withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc => + @volatile var startTimes = 0 + ssc.addStreamingListener(new StreamingListener { +override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { + startTimes += 1 +} + }) + val input = ssc.receiverStream(new StoppableReceiver) + val output = new TestOutputStream(input) + output.register() + ssc.start() + StoppableReceiver.shouldStop = true + eventually(timeout(10 seconds), interval(10 millis)) { +// The receiver is stopped once, so if it's restarted, it should be started twice. +assert(startTimes === 2) + } +} + } } /** An input DStream with for testing rate controlling */ @@ -132,3 +152,34 @@ private[streaming] object RateTestReceiver { def getActive(): Option[RateTestReceiver] = Option(activeReceiver) } + +/** + * A custom receiver that could be stopped via StoppableReceiver.shouldStop + */ +class StoppableReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) { + + var receivingThreadOption: Option[Thread] = None + + def onStart() { +val thread = new Thread() { + override def run() { +while (!StoppableReceiver.shouldStop) { +
spark git commit: [SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later
Repository: spark Updated Branches: refs/heads/master 72f6dbf7b -> 4a5fe0916 [SPARK-10369] [STREAMING] Don't remove ReceiverTrackingInfo when deregisterReceivering since we may reuse it later `deregisterReceiver` should not remove `ReceiverTrackingInfo`. Otherwise, it will throw `java.util.NoSuchElementException: key not found` when restarting it. Author: zsxwing Closes #8538 from zsxwing/SPARK-10369. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a5fe091 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a5fe091 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a5fe091 Branch: refs/heads/master Commit: 4a5fe091658b1d06f427e404a11a84fc84f953c5 Parents: 72f6dbf Author: zsxwing Authored: Mon Aug 31 12:19:11 2015 -0700 Committer: Tathagata Das Committed: Mon Aug 31 12:19:11 2015 -0700 -- .../streaming/scheduler/ReceiverTracker.scala | 4 +- .../scheduler/ReceiverTrackerSuite.scala| 51 2 files changed, 53 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4a5fe091/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 3d532a6..f86fd44 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -291,7 +291,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false ReceiverTrackingInfo( streamId, ReceiverState.INACTIVE, None, None, None, None, Some(errorInfo)) } -receiverTrackingInfos -= streamId +receiverTrackingInfos(streamId) = newReceiverTrackingInfo listenerBus.post(StreamingListenerReceiverStopped(newReceiverTrackingInfo.toReceiverInfo)) val messageWithError = if (error != null && !error.isEmpty) { s"$message - $error" @@ -483,7 +483,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false context.reply(true) // Local messages case AllReceiverIds => -context.reply(receiverTrackingInfos.keys.toSeq) +context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq) case StopAllReceivers => assert(isTrackerStopping || isTrackerStopped) stopReceivers() http://git-wip-us.apache.org/repos/asf/spark/blob/4a5fe091/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index dd292ba..45138b7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -60,6 +60,26 @@ class ReceiverTrackerSuite extends TestSuiteBase { } } } + + test("should restart receiver after stopping it") { +withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc => + @volatile var startTimes = 0 + ssc.addStreamingListener(new StreamingListener { +override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = { + startTimes += 1 +} + }) + val input = ssc.receiverStream(new StoppableReceiver) + val output = new TestOutputStream(input) + output.register() + ssc.start() + StoppableReceiver.shouldStop = true + eventually(timeout(10 seconds), interval(10 millis)) { +// The receiver is stopped once, so if it's restarted, it should be started twice. +assert(startTimes === 2) + } +} + } } /** An input DStream with for testing rate controlling */ @@ -132,3 +152,34 @@ private[streaming] object RateTestReceiver { def getActive(): Option[RateTestReceiver] = Option(activeReceiver) } + +/** + * A custom receiver that could be stopped via StoppableReceiver.shouldStop + */ +class StoppableReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) { + + var receivingThreadOption: Option[Thread] = None + + def onStart() { +val thread = new Thread() { + override def run() { +while (!StoppableReceiver.shouldStop) { + Thread.sleep(10) +} +StoppableReceiver.this.stop("stop") + } +} +thread.sta
spark git commit: [SPARK-8730] Fixes - Deser objects containing a primitive class attribute
Repository: spark Updated Branches: refs/heads/master f0f563a3c -> 72f6dbf7b [SPARK-8730] Fixes - Deser objects containing a primitive class attribute Author: EugenCepoi Closes #7122 from EugenCepoi/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72f6dbf7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72f6dbf7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72f6dbf7 Branch: refs/heads/master Commit: 72f6dbf7b0c8b271f5f9c762374422c69c8ab43d Parents: f0f563a Author: EugenCepoi Authored: Mon Aug 31 13:24:35 2015 -0500 Committer: Imran Rashid Committed: Mon Aug 31 13:24:35 2015 -0500 -- .../spark/serializer/JavaSerializer.scala | 27 .../spark/serializer/JavaSerializerSuite.scala | 18 + 2 files changed, 40 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/72f6dbf7/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala -- diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index 4a5274b..b463a71 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -62,17 +62,34 @@ private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoa extends DeserializationStream { private val objIn = new ObjectInputStream(in) { -override def resolveClass(desc: ObjectStreamClass): Class[_] = { - // scalastyle:off classforname - Class.forName(desc.getName, false, loader) - // scalastyle:on classforname -} +override def resolveClass(desc: ObjectStreamClass): Class[_] = + try { +// scalastyle:off classforname +Class.forName(desc.getName, false, loader) +// scalastyle:on classforname + } catch { +case e: ClassNotFoundException => + JavaDeserializationStream.primitiveMappings.get(desc.getName).getOrElse(throw e) + } } def readObject[T: ClassTag](): T = objIn.readObject().asInstanceOf[T] def close() { objIn.close() } } +private object JavaDeserializationStream { + val primitiveMappings = Map[String, Class[_]]( +"boolean" -> classOf[Boolean], +"byte" -> classOf[Byte], +"char" -> classOf[Char], +"short" -> classOf[Short], +"int" -> classOf[Int], +"long" -> classOf[Long], +"float" -> classOf[Float], +"double" -> classOf[Double], +"void" -> classOf[Void] + ) +} private[spark] class JavaSerializerInstance( counterReset: Int, extraDebugInfo: Boolean, defaultClassLoader: ClassLoader) http://git-wip-us.apache.org/repos/asf/spark/blob/72f6dbf7/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala index 329a2b6..20f4567 100644 --- a/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/JavaSerializerSuite.scala @@ -25,4 +25,22 @@ class JavaSerializerSuite extends SparkFunSuite { val instance = serializer.newInstance() instance.deserialize[JavaSerializer](instance.serialize(serializer)) } + + test("Deserialize object containing a primitive Class as attribute") { +val serializer = new JavaSerializer(new SparkConf()) +val instance = serializer.newInstance() +instance.deserialize[JavaSerializer](instance.serialize(new ContainsPrimitiveClass())) + } +} + +private class ContainsPrimitiveClass extends Serializable { + val intClass = classOf[Int] + val longClass = classOf[Long] + val shortClass = classOf[Short] + val charClass = classOf[Char] + val doubleClass = classOf[Double] + val floatClass = classOf[Float] + val booleanClass = classOf[Boolean] + val byteClass = classOf[Byte] + val voidClass = classOf[Void] } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org