spark git commit: [SPARK-6752][Streaming] Allow StreamingContext to be recreated from checkpoint and existing SparkContext
Repository: spark Updated Branches: refs/heads/master cc48e6387 - 534f2a436 [SPARK-6752][Streaming] Allow StreamingContext to be recreated from checkpoint and existing SparkContext Currently if you want to create a StreamingContext from checkpoint information, the system will create a new SparkContext. This prevent StreamingContext to be recreated from checkpoints in managed environments where SparkContext is precreated. The solution in this PR: Introduce the following methods on StreamingContext 1. `new StreamingContext(checkpointDirectory, sparkContext)` Recreate StreamingContext from checkpoint using the provided SparkContext 2. `StreamingContext.getOrCreate(checkpointDirectory, sparkContext, createFunction: SparkContext = StreamingContext)` If checkpoint file exists, then recreate StreamingContext using the provided SparkContext (that is, call 1.), else create StreamingContext using the provided createFunction TODO: the corresponding Java and Python API has to be added as well. Author: Tathagata Das tathagata.das1...@gmail.com Closes #5428 from tdas/SPARK-6752 and squashes the following commits: 94db63c [Tathagata Das] Fix long line. 524f519 [Tathagata Das] Many changes based on PR comments. eabd092 [Tathagata Das] Added Function0, Java API and unit tests for StreamingContext.getOrCreate 36a7823 [Tathagata Das] Minor changes. 204814e [Tathagata Das] Added StreamingContext.getOrCreate with existing SparkContext Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/534f2a43 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/534f2a43 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/534f2a43 Branch: refs/heads/master Commit: 534f2a43625fbf1a3a65d09550a19875cd1dce43 Parents: cc48e63 Author: Tathagata Das tathagata.das1...@gmail.com Authored: Thu Apr 23 11:29:34 2015 -0700 Committer: Tathagata Das tathagata.das1...@gmail.com Committed: Thu Apr 23 11:29:34 2015 -0700 -- .../spark/api/java/function/Function0.java | 27 .../org/apache/spark/streaming/Checkpoint.scala | 26 ++- .../spark/streaming/StreamingContext.scala | 85 -- .../api/java/JavaStreamingContext.scala | 119 +- .../apache/spark/streaming/JavaAPISuite.java| 145 - .../spark/streaming/CheckpointSuite.scala | 3 +- .../spark/streaming/StreamingContextSuite.scala | 159 +++ 7 files changed, 503 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/534f2a43/core/src/main/java/org/apache/spark/api/java/function/Function0.java -- diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function0.java b/core/src/main/java/org/apache/spark/api/java/function/Function0.java new file mode 100644 index 000..38e410c --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/java/function/Function0.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java.function; + +import java.io.Serializable; + +/** + * A zero-argument function that returns an R. + */ +public interface Function0R extends Serializable { + public R call() throws Exception; +} http://git-wip-us.apache.org/repos/asf/spark/blob/534f2a43/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 0a50485..7bfae25 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -77,7 +77,8 @@ object Checkpoint extends Logging { } /** Get checkpoint files present in the give directory, ordered by oldest-first */ - def getCheckpointFiles(checkpointDir: String, fs: FileSystem): Seq[Path] = { + def
spark git commit: [SPARK-7044] [SQL] Fix the deadlock in script transformation
Repository: spark Updated Branches: refs/heads/master 975f53e4f - cc48e6387 [SPARK-7044] [SQL] Fix the deadlock in script transformation Author: Cheng Hao hao.ch...@intel.com Closes #5625 from chenghao-intel/transform and squashes the following commits: 5ec1dd2 [Cheng Hao] fix the deadlock issue in ScriptTransform Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc48e638 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc48e638 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc48e638 Branch: refs/heads/master Commit: cc48e6387abdd909921cb58e0588cdf226556bcd Parents: 975f53e Author: Cheng Hao hao.ch...@intel.com Authored: Thu Apr 23 10:35:22 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Apr 23 10:35:22 2015 -0700 -- .../hive/execution/ScriptTransformation.scala | 33 +--- .../sql/hive/execution/SQLQuerySuite.scala | 8 + 2 files changed, 29 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cc48e638/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index cab0fdd..3eddda3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -145,20 +145,29 @@ case class ScriptTransformation( val dataOutputStream = new DataOutputStream(outputStream) val outputProjection = new InterpretedProjection(input, child.output) - iter -.map(outputProjection) -.foreach { row = - if (inputSerde == null) { -val data = row.mkString(, ioschema.inputRowFormatMap(TOK_TABLEROWFORMATFIELD), - ioschema.inputRowFormatMap(TOK_TABLEROWFORMATLINES)).getBytes(utf-8) - -outputStream.write(data) - } else { -val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi) -prepareWritable(writable).write(dataOutputStream) + // Put the write(output to the pipeline) into a single thread + // and keep the collector as remain in the main thread. + // otherwise it will causes deadlock if the data size greater than + // the pipeline / buffer capacity. + new Thread(new Runnable() { +override def run(): Unit = { + iter +.map(outputProjection) +.foreach { row = +if (inputSerde == null) { + val data = row.mkString(, ioschema.inputRowFormatMap(TOK_TABLEROWFORMATFIELD), + ioschema.inputRowFormatMap(TOK_TABLEROWFORMATLINES)).getBytes(utf-8) + + outputStream.write(data) +} else { + val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi) + prepareWritable(writable).write(dataOutputStream) +} } + outputStream.close() } - outputStream.close() + }).start() + iterator } } http://git-wip-us.apache.org/repos/asf/spark/blob/cc48e638/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 47b4cb9..4f8d0ac 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -561,4 +561,12 @@ class SQLQuerySuite extends QueryTest { sql(select d from dn union all select d * 2 from dn) .queryExecution.analyzed } + + test(test script transform) { +val data = (1 to 10).map { i = (i, i, i) } +data.toDF(d1, d2, d3).registerTempTable(script_trans) +assert(10 === + sql(SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans) + .queryExecution.toRdd.count()) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [minor][streaming]fixed scala string interpolation error
Repository: spark Updated Branches: refs/heads/master a7d65d38f - 975f53e4f [minor][streaming]fixed scala string interpolation error Author: Prabeesh K prabees...@namshi.com Closes #5653 from prabeesh/fix and squashes the following commits: 9d7a9f5 [Prabeesh K] fixed scala string interpolation error Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/975f53e4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/975f53e4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/975f53e4 Branch: refs/heads/master Commit: 975f53e4f978759db7639cd08498ad8cd0ae2a56 Parents: a7d65d3 Author: Prabeesh K prabees...@namshi.com Authored: Thu Apr 23 10:33:13 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Apr 23 10:33:13 2015 -0700 -- .../scala/org/apache/spark/examples/streaming/MQTTWordCount.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/975f53e4/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala index f40caad..85b9a54 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala @@ -56,7 +56,7 @@ object MQTTPublisher { while (true) { try { msgtopic.publish(message) - println(sPublished data. topic: {msgtopic.getName()}; Message: {message}) + println(sPublished data. topic: ${msgtopic.getName()}; Message: $message) } catch { case e: MqttException if e.getReasonCode == MqttException.REASON_CODE_MAX_INFLIGHT = Thread.sleep(10) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7085][MLlib] Fix miniBatchFraction parameter in train method called with 4 arguments
Repository: spark Updated Branches: refs/heads/master 6afde2c78 - 3e91cc273 [SPARK-7085][MLlib] Fix miniBatchFraction parameter in train method called with 4 arguments Author: wizz w...@wizz-dev01.kawasaki.flab.fujitsu.com Closes #5658 from kuromatsu-nobuyuki/SPARK-7085 and squashes the following commits: 6ec2d21 [wizz] Fix miniBatchFraction parameter in train method called with 4 arguments Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e91cc27 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e91cc27 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e91cc27 Branch: refs/heads/master Commit: 3e91cc273d281053618bfa032bc610e2cf8d8e78 Parents: 6afde2c Author: wizz w...@wizz-dev01.kawasaki.flab.fujitsu.com Authored: Thu Apr 23 14:00:07 2015 -0700 Committer: Joseph K. Bradley jos...@databricks.com Committed: Thu Apr 23 14:00:07 2015 -0700 -- .../scala/org/apache/spark/mllib/regression/RidgeRegression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3e91cc27/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala index 8838ca8..309f9af 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala @@ -171,7 +171,7 @@ object RidgeRegressionWithSGD { numIterations: Int, stepSize: Double, regParam: Double): RidgeRegressionModel = { -train(input, numIterations, stepSize, regParam, 0.01) +train(input, numIterations, stepSize, regParam, 1.0) } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7087] [BUILD] Fix path issue change version script
Repository: spark Updated Branches: refs/heads/master baa83a9a6 - 6d0749cae [SPARK-7087] [BUILD] Fix path issue change version script Author: Tijo Thomas tijopara...@gmail.com Closes #5656 from tijoparacka/FIX_PATHISSUE_CHANGE_VERSION_SCRIPT and squashes the following commits: ab4f4b1 [Tijo Thomas] removed whitespace 24478c9 [Tijo Thomas] modified to provide the spark base dir while searching for pom and also while changing the vesrion no 7b8e10b [Tijo Thomas] Modified for providing the base directories while finding the list of pom files and also while changing the version no Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6d0749ca Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6d0749ca Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6d0749ca Branch: refs/heads/master Commit: 6d0749cae301ee4bf37632d657de48e75548a523 Parents: baa83a9 Author: Tijo Thomas tijopara...@gmail.com Authored: Thu Apr 23 17:23:15 2015 -0400 Committer: Sean Owen so...@cloudera.com Committed: Thu Apr 23 17:23:15 2015 -0400 -- dev/change-version-to-2.10.sh | 6 +++--- dev/change-version-to-2.11.sh | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6d0749ca/dev/change-version-to-2.10.sh -- diff --git a/dev/change-version-to-2.10.sh b/dev/change-version-to-2.10.sh index 15e0c73..c4adb1f 100755 --- a/dev/change-version-to-2.10.sh +++ b/dev/change-version-to-2.10.sh @@ -18,9 +18,9 @@ # # Note that this will not necessarily work as intended with non-GNU sed (e.g. OS X) - -find . -name 'pom.xml' | grep -v target \ +BASEDIR=$(dirname $0)/.. +find $BASEDIR -name 'pom.xml' | grep -v target \ | xargs -I {} sed -i -e 's/\(artifactId.*\)_2.11/\1_2.10/g' {} # Also update scala.binary.version in parent POM -sed -i -e '0,/scala\.binary\.version2.11/s//scala.binary.version2.10/' pom.xml +sed -i -e '0,/scala\.binary\.version2.11/s//scala.binary.version2.10/' $BASEDIR/pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/6d0749ca/dev/change-version-to-2.11.sh -- diff --git a/dev/change-version-to-2.11.sh b/dev/change-version-to-2.11.sh index c0a8cb4..d370019 100755 --- a/dev/change-version-to-2.11.sh +++ b/dev/change-version-to-2.11.sh @@ -18,9 +18,9 @@ # # Note that this will not necessarily work as intended with non-GNU sed (e.g. OS X) - -find . -name 'pom.xml' | grep -v target \ +BASEDIR=$(dirname $0)/.. +find $BASEDIR -name 'pom.xml' | grep -v target \ | xargs -I {} sed -i -e 's/\(artifactId.*\)_2.10/\1_2.11/g' {} # Also update scala.binary.version in parent POM -sed -i -e '0,/scala\.binary\.version2.10/s//scala.binary.version2.11/' pom.xml +sed -i -e '0,/scala\.binary\.version2.10/s//scala.binary.version2.11/' $BASEDIR/pom.xml - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7070] [MLLIB] LDA.setBeta should call setTopicConcentration.
Repository: spark Updated Branches: refs/heads/master 6d0749cae - 1ed46a60a [SPARK-7070] [MLLIB] LDA.setBeta should call setTopicConcentration. jkbradley Author: Xiangrui Meng m...@databricks.com Closes #5649 from mengxr/SPARK-7070 and squashes the following commits: c66023c [Xiangrui Meng] setBeta should call setTopicConcentration Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ed46a60 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ed46a60 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ed46a60 Branch: refs/heads/master Commit: 1ed46a60adacb352b385d2331401822a5a2c55c0 Parents: 6d0749c Author: Xiangrui Meng m...@databricks.com Authored: Thu Apr 23 14:46:54 2015 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Thu Apr 23 14:46:54 2015 -0700 -- .../main/scala/org/apache/spark/mllib/clustering/LDA.scala | 2 +- .../scala/org/apache/spark/mllib/clustering/LDASuite.scala | 8 2 files changed, 9 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1ed46a60/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 9d63a08..d006b39 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -177,7 +177,7 @@ class LDA private ( def getBeta: Double = getTopicConcentration /** Alias for [[setTopicConcentration()]] */ - def setBeta(beta: Double): this.type = setBeta(beta) + def setBeta(beta: Double): this.type = setTopicConcentration(beta) /** * Maximum number of iterations for learning. http://git-wip-us.apache.org/repos/asf/spark/blob/1ed46a60/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 15de10f..cc747da 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -123,6 +123,14 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { assert(termVertexIds.map(i = LDA.index2term(i.toLong)) === termIds) assert(termVertexIds.forall(i = LDA.isTermVertex((i.toLong, 0 } + + test(setter alias) { +val lda = new LDA().setAlpha(2.0).setBeta(3.0) +assert(lda.getAlpha === 2.0) +assert(lda.getDocConcentration === 2.0) +assert(lda.getBeta === 3.0) +assert(lda.getTopicConcentration === 3.0) + } } private[clustering] object LDASuite { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7070] [MLLIB] LDA.setBeta should call setTopicConcentration.
Repository: spark Updated Branches: refs/heads/branch-1.3 3b38cac26 - e76317b65 [SPARK-7070] [MLLIB] LDA.setBeta should call setTopicConcentration. jkbradley Author: Xiangrui Meng m...@databricks.com Closes #5649 from mengxr/SPARK-7070 and squashes the following commits: c66023c [Xiangrui Meng] setBeta should call setTopicConcentration (cherry picked from commit 1ed46a60adacb352b385d2331401822a5a2c55c0) Signed-off-by: Xiangrui Meng m...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e76317b6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e76317b6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e76317b6 Branch: refs/heads/branch-1.3 Commit: e76317b653f786ae9b7cff2588ecb00bf3b51d7f Parents: 3b38cac Author: Xiangrui Meng m...@databricks.com Authored: Thu Apr 23 14:46:54 2015 -0700 Committer: Xiangrui Meng m...@databricks.com Committed: Thu Apr 23 14:47:02 2015 -0700 -- .../main/scala/org/apache/spark/mllib/clustering/LDA.scala | 2 +- .../scala/org/apache/spark/mllib/clustering/LDASuite.scala | 8 2 files changed, 9 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e76317b6/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala index 5e17c8d..f3befa6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala @@ -177,7 +177,7 @@ class LDA private ( def getBeta: Double = getTopicConcentration /** Alias for [[setTopicConcentration()]] */ - def setBeta(beta: Double): this.type = setBeta(beta) + def setBeta(beta: Double): this.type = setTopicConcentration(beta) /** * Maximum number of iterations for learning. http://git-wip-us.apache.org/repos/asf/spark/blob/e76317b6/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 302d751..0b022f9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -123,6 +123,14 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { assert(termVertexIds.map(i = LDA.index2term(i.toLong)) === termIds) assert(termVertexIds.forall(i = LDA.isTermVertex((i.toLong, 0 } + + test(setter alias) { +val lda = new LDA().setAlpha(2.0).setBeta(3.0) +assert(lda.getAlpha === 2.0) +assert(lda.getDocConcentration === 2.0) +assert(lda.getBeta === 3.0) +assert(lda.getTopicConcentration === 3.0) + } } private[clustering] object LDASuite { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6879] [HISTORYSERVER] check if app is completed before clean it up
Repository: spark Updated Branches: refs/heads/master 3e91cc273 - baa83a9a6 [SPARK-6879] [HISTORYSERVER] check if app is completed before clean it up https://issues.apache.org/jira/browse/SPARK-6879 Use `applications` to replace `FileStatus`, and check if the app is completed before clean it up. If an exception was throwed, add it to `applications` to wait for the next loop. Author: WangTaoTheTonic wangtao...@huawei.com Closes #5491 from WangTaoTheTonic/SPARK-6879 and squashes the following commits: 4a533eb [WangTaoTheTonic] treat ACE specially cb45105 [WangTaoTheTonic] rebase d4d5251 [WangTaoTheTonic] per Marcelo's comments d7455d8 [WangTaoTheTonic] slightly change when delete file b0abca5 [WangTaoTheTonic] use global var to store apps to clean 94adfe1 [WangTaoTheTonic] leave expired apps alone to be deleted 9872a9d [WangTaoTheTonic] use the right path fdef4d6 [WangTaoTheTonic] check if app is completed before clean it up Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/baa83a9a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/baa83a9a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/baa83a9a Branch: refs/heads/master Commit: baa83a9a6769c5e119438d65d7264dceb8d743d5 Parents: 3e91cc2 Author: WangTaoTheTonic wangtao...@huawei.com Authored: Thu Apr 23 17:20:17 2015 -0400 Committer: Sean Owen so...@cloudera.com Committed: Thu Apr 23 17:20:17 2015 -0400 -- .../deploy/history/FsHistoryProvider.scala | 32 1 file changed, 20 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/baa83a9a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 9847d59..a94ebf6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -35,7 +35,6 @@ import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.{Logging, SecurityManager, SparkConf} - /** * A class that provides application history from event logs stored in the file system. * This provider checks for new finished applications in the background periodically and @@ -76,6 +75,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] = new mutable.LinkedHashMap() + // List of applications to be deleted by event log cleaner. + private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo] + // Constants used to parse Spark 1.0.0 log directories. private[history] val LOG_PREFIX = EVENT_LOG_ private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + _ @@ -266,34 +268,40 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis */ private def cleanLogs(): Unit = { try { - val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) -.getOrElse(Seq[FileStatus]()) val maxAge = conf.getTimeAsSeconds(spark.history.fs.cleaner.maxAge, 7d) * 1000 val now = System.currentTimeMillis() val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + // Scan all logs from the log directory. + // Only completed applications older than the specified max age will be deleted. applications.values.foreach { info = -if (now - info.lastUpdated = maxAge) { +if (now - info.lastUpdated = maxAge || !info.completed) { appsToRetain += (info.id - info) +} else { + appsToClean += info } } applications = appsToRetain - // Scan all logs from the log directory. - // Only directories older than the specified max age will be deleted - statusList.foreach { dir = + val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo] + appsToClean.foreach { info = try { - if (now - dir.getModificationTime() maxAge) { -// if path is a directory and set to true, -// the directory is deleted else throws an exception -fs.delete(dir.getPath, true) + val path = new Path(logDir, info.logPath) + if (fs.exists(path)) { +fs.delete(path, true) } } catch { - case t: IOException = logError(sIOException in cleaning logs of $dir, t) + case e: AccessControlException = +
[2/2] spark git commit: [SQL] Break dataTypes.scala into multiple files.
[SQL] Break dataTypes.scala into multiple files. It was over 1000 lines of code, making it harder to find all the types. Only moved code around, and didn't change any. Author: Reynold Xin r...@databricks.com Closes #5670 from rxin/break-types and squashes the following commits: 8c59023 [Reynold Xin] Check in missing files. dcd5193 [Reynold Xin] [SQL] Break dataTypes.scala into multiple files. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6220d933 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6220d933 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6220d933 Branch: refs/heads/master Commit: 6220d933e5ce4ba890f5d6a50a69b95d319dafb4 Parents: 1ed46a6 Author: Reynold Xin r...@databricks.com Authored: Thu Apr 23 14:48:19 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Apr 23 14:48:19 2015 -0700 -- .../org/apache/spark/sql/types/ArrayType.scala | 74 ++ .../org/apache/spark/sql/types/BinaryType.scala | 63 + .../apache/spark/sql/types/BooleanType.scala| 51 + .../org/apache/spark/sql/types/ByteType.scala | 54 + .../org/apache/spark/sql/types/DataType.scala | 353 + .../org/apache/spark/sql/types/DateType.scala | 54 + .../apache/spark/sql/types/DecimalType.scala| 110 ++ .../org/apache/spark/sql/types/DoubleType.scala | 53 + .../org/apache/spark/sql/types/FloatType.scala | 53 + .../apache/spark/sql/types/IntegerType.scala| 54 + .../org/apache/spark/sql/types/LongType.scala | 54 + .../org/apache/spark/sql/types/MapType.scala| 79 ++ .../org/apache/spark/sql/types/NullType.scala | 39 + .../org/apache/spark/sql/types/ShortType.scala | 53 + .../org/apache/spark/sql/types/StringType.scala | 50 + .../apache/spark/sql/types/StructField.scala| 54 + .../org/apache/spark/sql/types/StructType.scala | 263 .../apache/spark/sql/types/TimestampType.scala | 57 + .../spark/sql/types/UserDefinedType.scala | 81 ++ .../org/apache/spark/sql/types/dataTypes.scala | 1224 -- 20 files changed, 1649 insertions(+), 1224 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala new file mode 100644 index 000..b116163 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.types + +import org.json4s.JsonDSL._ + +import org.apache.spark.annotation.DeveloperApi + + +object ArrayType { + /** Construct a [[ArrayType]] object with the given element type. The `containsNull` is true. */ + def apply(elementType: DataType): ArrayType = ArrayType(elementType, containsNull = true) +} + + +/** + * :: DeveloperApi :: + * The data type for collections of multiple values. + * Internally these are represented as columns that contain a ``scala.collection.Seq``. + * + * Please use [[DataTypes.createArrayType()]] to create a specific instance. + * + * An [[ArrayType]] object comprises two fields, `elementType: [[DataType]]` and + * `containsNull: Boolean`. The field of `elementType` is used to specify the type of + * array elements. The field of `containsNull` is used to specify if the array has `null` values. + * + * @param elementType The data type of values. + * @param containsNull Indicates if values have `null` values + * + * @group dataType + */ +@DeveloperApi +case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType { + + /** No-arg constructor for kryo. */ + protected def this() = this(null, false) + + private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = { +builder.append( + s$prefix-- element:
[1/2] spark git commit: [SQL] Break dataTypes.scala into multiple files.
Repository: spark Updated Branches: refs/heads/master 1ed46a60a - 6220d933e http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala deleted file mode 100644 index 87c7b75..000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala +++ /dev/null @@ -1,1224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the License); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.types - -import java.sql.Timestamp - -import scala.collection.mutable.ArrayBuffer -import scala.math._ -import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral} -import scala.reflect.ClassTag -import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag} -import scala.util.parsing.combinator.RegexParsers - -import org.json4s._ -import org.json4s.JsonAST.JValue -import org.json4s.JsonDSL._ -import org.json4s.jackson.JsonMethods._ - -import org.apache.spark.SparkException -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.catalyst.ScalaReflectionLock -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} -import org.apache.spark.util.Utils - - -object DataType { - def fromJson(json: String): DataType = parseDataType(parse(json)) - - private val nonDecimalNameToType = { -Seq(NullType, DateType, TimestampType, BinaryType, - IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType) - .map(t = t.typeName - t).toMap - } - - /** Given the string representation of a type, return its DataType */ - private def nameToType(name: String): DataType = { -val FIXED_DECIMAL = decimal\(\s*(\d+)\s*,\s*(\d+)\s*\).r -name match { - case decimal = DecimalType.Unlimited - case FIXED_DECIMAL(precision, scale) = DecimalType(precision.toInt, scale.toInt) - case other = nonDecimalNameToType(other) -} - } - - private object JSortedObject { -def unapplySeq(value: JValue): Option[List[(String, JValue)]] = value match { - case JObject(seq) = Some(seq.toList.sortBy(_._1)) - case _ = None -} - } - - // NOTE: Map fields must be sorted in alphabetical order to keep consistent with the Python side. - private def parseDataType(json: JValue): DataType = json match { -case JString(name) = - nameToType(name) - -case JSortedObject( -(containsNull, JBool(n)), -(elementType, t: JValue), -(type, JString(array))) = - ArrayType(parseDataType(t), n) - -case JSortedObject( -(keyType, k: JValue), -(type, JString(map)), -(valueContainsNull, JBool(n)), -(valueType, v: JValue)) = - MapType(parseDataType(k), parseDataType(v), n) - -case JSortedObject( -(fields, JArray(fields)), -(type, JString(struct))) = - StructType(fields.map(parseStructField)) - -case JSortedObject( -(class, JString(udtClass)), -(pyClass, _), -(sqlType, _), -(type, JString(udt))) = - Class.forName(udtClass).newInstance().asInstanceOf[UserDefinedType[_]] - } - - private def parseStructField(json: JValue): StructField = json match { -case JSortedObject( -(metadata, metadata: JObject), -(name, JString(name)), -(nullable, JBool(nullable)), -(type, dataType: JValue)) = - StructField(name, parseDataType(dataType), nullable, Metadata.fromJObject(metadata)) -// Support reading schema when 'metadata' is missing. -case JSortedObject( -(name, JString(name)), -(nullable, JBool(nullable)), -(type, dataType: JValue)) = - StructField(name, parseDataType(dataType), nullable) - } - - @deprecated(Use DataType.fromJson instead, 1.2.0) - def fromCaseClassString(string: String): DataType = CaseClassStringParser(string) - - private object CaseClassStringParser extends RegexParsers { -protected lazy val primitiveType: Parser[DataType] = - ( StringType ^^^ StringType
spark git commit: Update sql-programming-guide.md
Repository: spark Updated Branches: refs/heads/master 2d010f7af - 67bccbda1 Update sql-programming-guide.md fix typo Author: Ken Geis geis@gmail.com Closes #5674 from kgeis/patch-1 and squashes the following commits: 5ae67de [Ken Geis] Update sql-programming-guide.md Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67bccbda Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67bccbda Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67bccbda Branch: refs/heads/master Commit: 67bccbda1e3ed7db2753daa7e6ae8b1441356177 Parents: 2d010f7 Author: Ken Geis geis@gmail.com Authored: Thu Apr 23 20:45:33 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Apr 23 20:45:33 2015 -0700 -- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/67bccbda/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index b202254..49b1e69 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1364,7 +1364,7 @@ the Data Sources API. The following options are supported: tr tdcodedriver/code/td td - The class name of the JDBC driver needed to connect to this URL. This class with be loaded + The class name of the JDBC driver needed to connect to this URL. This class will be loaded on the master and workers before running an JDBC commands to allow the driver to register itself with the JDBC subsystem. /td - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Update sql-programming-guide.md
Repository: spark Updated Branches: refs/heads/branch-1.3 2b340af0c - c4470b93f Update sql-programming-guide.md fix typo Author: Ken Geis geis@gmail.com Closes #5674 from kgeis/patch-1 and squashes the following commits: 5ae67de [Ken Geis] Update sql-programming-guide.md (cherry picked from commit 67bccbda1e3ed7db2753daa7e6ae8b1441356177) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4470b93 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4470b93 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4470b93 Branch: refs/heads/branch-1.3 Commit: c4470b93f932ef801eb12c3aeb538ec704696cb0 Parents: 2b340af Author: Ken Geis geis@gmail.com Authored: Thu Apr 23 20:45:33 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Apr 23 20:45:44 2015 -0700 -- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c4470b93/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index b202254..49b1e69 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1364,7 +1364,7 @@ the Data Sources API. The following options are supported: tr tdcodedriver/code/td td - The class name of the JDBC driver needed to connect to this URL. This class with be loaded + The class name of the JDBC driver needed to connect to this URL. This class will be loaded on the master and workers before running an JDBC commands to allow the driver to register itself with the JDBC subsystem. /td - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SQL] Fixed expression data type matching.
Repository: spark Updated Branches: refs/heads/master 67bccbda1 - d3a302def [SQL] Fixed expression data type matching. Also took the chance to improve documentation for various types. Author: Reynold Xin r...@databricks.com Closes #5675 from rxin/data-type-matching-expr and squashes the following commits: 0f31856 [Reynold Xin] One more function documentation. 27c1973 [Reynold Xin] Added more documentation. 336a36d [Reynold Xin] [SQL] Fixed expression data type matching. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3a302de Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3a302de Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3a302de Branch: refs/heads/master Commit: d3a302defc45768492dec9da4c40d78d28997a65 Parents: 67bccbd Author: Reynold Xin r...@databricks.com Authored: Thu Apr 23 21:21:03 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Apr 23 21:21:03 2015 -0700 -- .../expressions/codegen/CodeGenerator.scala | 2 +- .../org/apache/spark/sql/types/DataType.scala | 50 2 files changed, 42 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d3a302de/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index cbe5203..dbc92fb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -279,7 +279,7 @@ abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Loggin org.apache.spark.sql.types.UTF8String(${eval.primitiveTerm}.toString) .children - case EqualTo(e1: BinaryType, e2: BinaryType) = + case EqualTo(e1 @ BinaryType(), e2 @ BinaryType()) = (e1, e2).evaluateAs (BooleanType) { case (eval1, eval2) = q http://git-wip-us.apache.org/repos/asf/spark/blob/d3a302de/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index e6bfcd9..06bff7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -40,32 +40,46 @@ import org.apache.spark.util.Utils */ @DeveloperApi abstract class DataType { - /** Matches any expression that evaluates to this DataType */ - def unapply(a: Expression): Boolean = a match { + /** + * Enables matching against NumericType for expressions: + * {{{ + * case Cast(child @ BinaryType(), StringType) = + * ... + * }}} + */ + private[sql] def unapply(a: Expression): Boolean = a match { case e: Expression if e.dataType == this = true case _ = false } - /** The default size of a value of this data type. */ + /** + * The default size of a value of this data type, used internally for size estimation. + */ def defaultSize: Int + /** Name of the type used in JSON serialization. */ def typeName: String = this.getClass.getSimpleName.stripSuffix($).dropRight(4).toLowerCase private[sql] def jsonValue: JValue = typeName + /** The compact JSON representation of this data type. */ def json: String = compact(render(jsonValue)) + /** The pretty (i.e. indented) JSON representation of this data type. */ def prettyJson: String = pretty(render(jsonValue)) + /** Readable string representation for the type. */ def simpleString: String = typeName - /** Check if `this` and `other` are the same data type when ignoring nullability - * (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`). + /** + * Check if `this` and `other` are the same data type when ignoring nullability + * (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`). */ private[spark] def sameType(other: DataType): Boolean = DataType.equalsIgnoreNullability(this, other) - /** Returns the same data type but set all nullability fields are true + /** + * Returns the same data type but set all nullability fields are true * (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`). */ private[spark] def asNullable: DataType
spark git commit: [SPARK-7055][SQL]Use correct ClassLoader for JDBC Driver in JDBCRDD.getConnector
Repository: spark Updated Branches: refs/heads/master 534f2a436 - c1213e6a9 [SPARK-7055][SQL]Use correct ClassLoader for JDBC Driver in JDBCRDD.getConnector Author: Vinod K C vinod...@huawei.com Closes #5633 from vinodkc/use_correct_classloader_driverload and squashes the following commits: 73c5380 [Vinod K C] Use correct ClassLoader for JDBC Driver Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1213e6a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1213e6a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1213e6a Branch: refs/heads/master Commit: c1213e6a92e126ad886d9804cedaf6db3618e602 Parents: 534f2a4 Author: Vinod K C vinod...@huawei.com Authored: Thu Apr 23 12:00:23 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Thu Apr 23 12:00:23 2015 -0700 -- sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c1213e6a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index b975191..f326510 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow} import org.apache.spark.sql.types._ import org.apache.spark.sql.sources._ +import org.apache.spark.util.Utils private[sql] object JDBCRDD extends Logging { /** @@ -152,7 +153,7 @@ private[sql] object JDBCRDD extends Logging { def getConnector(driver: String, url: String, properties: Properties): () = Connection = { () = { try { -if (driver != null) Class.forName(driver) +if (driver != null) Utils.getContextOrSparkClassLoader.loadClass(driver) } catch { case e: ClassNotFoundException = { logWarning(sCouldn't find class $driver, e); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7037] [CORE] Inconsistent behavior for non-spark config properties in spark-shell and spark-submit
Repository: spark Updated Branches: refs/heads/master 73db132bf - 336f7f537 [SPARK-7037] [CORE] Inconsistent behavior for non-spark config properties in spark-shell and spark-submit When specifying non-spark properties (i.e. names don't start with spark.) in the command line and config file, spark-submit and spark-shell behave differently, causing confusion to users. Here is the summary- * spark-submit * --conf k=v = silently ignored * spark-defaults.conf = applied * spark-shell * --conf k=v = show a warning message and ignored * spark-defaults.conf = show a warning message and ignored I assume that ignoring non-spark properties is intentional. If so, it should always be ignored with a warning message in all cases. Author: Cheolsoo Park cheols...@netflix.com Closes #5617 from piaozhexiu/SPARK-7037 and squashes the following commits: 8957950 [Cheolsoo Park] Add IgnoreNonSparkProperties method fedd01c [Cheolsoo Park] Ignore non-spark properties with a warning message in all cases Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/336f7f53 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/336f7f53 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/336f7f53 Branch: refs/heads/master Commit: 336f7f5373e5f6960ecd9967d3703c8507e329ec Parents: 73db132 Author: Cheolsoo Park cheols...@netflix.com Authored: Thu Apr 23 20:10:55 2015 -0400 Committer: Sean Owen so...@cloudera.com Committed: Thu Apr 23 20:10:55 2015 -0400 -- .../spark/deploy/SparkSubmitArguments.scala | 22 ++-- 1 file changed, 16 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/336f7f53/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index faa8780..c896842 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -77,12 +77,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) Option(propertiesFile).foreach { filename = Utils.getPropertiesFromFile(filename).foreach { case (k, v) = -if (k.startsWith(spark.)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} +defaultProperties(k) = v +if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) } } defaultProperties @@ -97,6 +93,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S } // Populate `sparkProperties` map from properties file mergeDefaultSparkProperties() + // Remove keys that don't start with spark. from `sparkProperties`. + ignoreNonSparkProperties() // Use `sparkProperties` map along with env vars to fill in any missing parameters loadEnvironmentArguments() @@ -118,6 +116,18 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S } /** + * Remove keys that don't start with spark. from `sparkProperties`. + */ + private def ignoreNonSparkProperties(): Unit = { +sparkProperties.foreach { case (k, v) = + if (!k.startsWith(spark.)) { +sparkProperties -= k +SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) + } +} + } + + /** * Load arguments from environment variables, Spark properties etc. */ private def loadEnvironmentArguments(): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6818] [SPARKR] Support column deletion in SparkR DataFrame API.
Repository: spark Updated Branches: refs/heads/master 6220d933e - 73db132bf [SPARK-6818] [SPARKR] Support column deletion in SparkR DataFrame API. Author: Sun Rui rui@intel.com Closes #5655 from sun-rui/SPARK-6818 and squashes the following commits: 7c66570 [Sun Rui] [SPARK-6818][SPARKR] Support column deletion in SparkR DataFrame API. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73db132b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73db132b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73db132b Branch: refs/heads/master Commit: 73db132bf503341c7a5cf9409351c282a8464175 Parents: 6220d93 Author: Sun Rui rui@intel.com Authored: Thu Apr 23 16:08:14 2015 -0700 Committer: Shivaram Venkataraman shiva...@cs.berkeley.edu Committed: Thu Apr 23 16:08:14 2015 -0700 -- R/pkg/R/DataFrame.R | 8 +++- R/pkg/inst/tests/test_sparkSQL.R | 5 + 2 files changed, 12 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/73db132b/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 861fe1c..b59b700 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -790,9 +790,12 @@ setMethod($, signature(x = DataFrame), setMethod($-, signature(x = DataFrame), function(x, name, value) { -stopifnot(class(value) == Column) +stopifnot(class(value) == Column || is.null(value)) cols - columns(x) if (name %in% cols) { + if (is.null(value)) { +cols - Filter(function(c) { c != name }, cols) + } cols - lapply(cols, function(c) { if (c == name) { alias(value, name) @@ -802,6 +805,9 @@ setMethod($-, signature(x = DataFrame), }) nx - select(x, cols) } else { + if (is.null(value)) { +return(x) + } nx - withColumn(x, name, value) } x@sdf - nx@sdf http://git-wip-us.apache.org/repos/asf/spark/blob/73db132b/R/pkg/inst/tests/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index 25831ae..af7a6c5 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -449,6 +449,11 @@ test_that(select operators, { df$age2 - df$age * 2 expect_equal(columns(df), c(name, age, age2)) expect_equal(count(where(df, df$age2 == df$age * 2)), 2) + + df$age2 - NULL + expect_equal(columns(df), c(name, age)) + df$age3 - NULL + expect_equal(columns(df), c(name, age)) }) test_that(select with column, { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MLlib] Add support for BooleanType to VectorAssembler.
Repository: spark Updated Branches: refs/heads/master d9e70f331 - 2d33323ca [MLlib] Add support for BooleanType to VectorAssembler. Author: Reynold Xin r...@databricks.com Closes #5648 from rxin/vectorAssembler-boolean and squashes the following commits: 1bf3d40 [Reynold Xin] [MLlib] Add support for BooleanType to VectorAssembler. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d33323c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d33323c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d33323c Branch: refs/heads/master Commit: 2d33323cadbf58dd1d05998d18cad6a896cd Parents: d9e70f3 Author: Reynold Xin r...@databricks.com Authored: Wed Apr 22 23:54:48 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Wed Apr 22 23:54:48 2015 -0700 -- .../main/scala/org/apache/spark/ml/feature/VectorAssembler.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2d33323c/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala index fd16d3d..7b2a451 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala @@ -55,7 +55,7 @@ class VectorAssembler extends Transformer with HasInputCols with HasOutputCol { schema(c).dataType match { case DoubleType = UnresolvedAttribute(c) case t if t.isInstanceOf[VectorUDT] = UnresolvedAttribute(c) -case _: NumericType = +case _: NumericType | BooleanType = Alias(Cast(UnresolvedAttribute(c), DoubleType), s${c}_double_$uid)() } } @@ -68,7 +68,7 @@ class VectorAssembler extends Transformer with HasInputCols with HasOutputCol { val outputColName = map(outputCol) val inputDataTypes = inputColNames.map(name = schema(name).dataType) inputDataTypes.foreach { - case _: NumericType = + case _: NumericType | BooleanType = case t if t.isInstanceOf[VectorUDT] = case other = throw new IllegalArgumentException(sData type $other is not supported.) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7068][SQL] Remove PrimitiveType
Repository: spark Updated Branches: refs/heads/master 2d33323ca - 29163c520 [SPARK-7068][SQL] Remove PrimitiveType Author: Reynold Xin r...@databricks.com Closes #5646 from rxin/remove-primitive-type and squashes the following commits: 01b673d [Reynold Xin] [SPARK-7068][SQL] Remove PrimitiveType Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29163c52 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29163c52 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29163c52 Branch: refs/heads/master Commit: 29163c520087e89ca322521db1dd8656d86a6f0e Parents: 2d33323 Author: Reynold Xin r...@databricks.com Authored: Wed Apr 22 23:55:20 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Wed Apr 22 23:55:20 2015 -0700 -- .../org/apache/spark/sql/types/dataTypes.scala | 70 +--- .../spark/sql/parquet/ParquetConverter.scala| 11 +-- .../sql/parquet/ParquetTableOperations.scala| 2 +- .../apache/spark/sql/parquet/ParquetTypes.scala | 6 +- .../apache/spark/sql/parquet/newParquet.scala | 13 ++-- 5 files changed, 48 insertions(+), 54 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/29163c52/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala index ddf9d66..42e26e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala @@ -41,6 +41,21 @@ import org.apache.spark.util.Utils object DataType { def fromJson(json: String): DataType = parseDataType(parse(json)) + private val nonDecimalNameToType = { +(Seq(NullType, DateType, TimestampType, BinaryType) ++ NativeType.all) + .map(t = t.typeName - t).toMap + } + + /** Given the string representation of a type, return its DataType */ + private def nameToType(name: String): DataType = { +val FIXED_DECIMAL = decimal\(\s*(\d+)\s*,\s*(\d+)\s*\).r +name match { + case decimal = DecimalType.Unlimited + case FIXED_DECIMAL(precision, scale) = DecimalType(precision.toInt, scale.toInt) + case other = nonDecimalNameToType(other) +} + } + private object JSortedObject { def unapplySeq(value: JValue): Option[List[(String, JValue)]] = value match { case JObject(seq) = Some(seq.toList.sortBy(_._1)) @@ -51,7 +66,7 @@ object DataType { // NOTE: Map fields must be sorted in alphabetical order to keep consistent with the Python side. private def parseDataType(json: JValue): DataType = json match { case JString(name) = - PrimitiveType.nameToType(name) + nameToType(name) case JSortedObject( (containsNull, JBool(n)), @@ -190,13 +205,11 @@ object DataType { equalsIgnoreNullability(leftKeyType, rightKeyType) equalsIgnoreNullability(leftValueType, rightValueType) case (StructType(leftFields), StructType(rightFields)) = -leftFields.size == rightFields.size -leftFields.zip(rightFields) - .forall{ -case (left, right) = - left.name == right.name equalsIgnoreNullability(left.dataType, right.dataType) - } - case (left, right) = left == right +leftFields.length == rightFields.length +leftFields.zip(rightFields).forall { case (l, r) = + l.name == r.name equalsIgnoreNullability(l.dataType, r.dataType) +} + case (l, r) = l == r } } @@ -225,12 +238,11 @@ object DataType { equalsIgnoreCompatibleNullability(fromValue, toValue) case (StructType(fromFields), StructType(toFields)) = -fromFields.size == toFields.size - fromFields.zip(toFields).forall { -case (fromField, toField) = - fromField.name == toField.name -(toField.nullable || !fromField.nullable) -equalsIgnoreCompatibleNullability(fromField.dataType, toField.dataType) +fromFields.length == toFields.length + fromFields.zip(toFields).forall { case (fromField, toField) = +fromField.name == toField.name + (toField.nullable || !fromField.nullable) + equalsIgnoreCompatibleNullability(fromField.dataType, toField.dataType) } case (fromDataType, toDataType) = fromDataType == toDataType @@ -256,8 +268,6 @@ abstract class DataType { /** The default size of a value of this data type. */ def defaultSize: Int - def isPrimitive: Boolean = false - def typeName: String =
spark git commit: [SPARK-7069][SQL] Rename NativeType - AtomicType.
Repository: spark Updated Branches: refs/heads/master 29163c520 - f60bece14 [SPARK-7069][SQL] Rename NativeType - AtomicType. Also renamed JvmType to InternalType. Author: Reynold Xin r...@databricks.com Closes #5651 from rxin/native-to-atomic-type and squashes the following commits: cbd4028 [Reynold Xin] [SPARK-7069][SQL] Rename NativeType - AtomicType. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f60bece1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f60bece1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f60bece1 Branch: refs/heads/master Commit: f60bece14f98450b4a71b00d7b58525f06e1f9ed Parents: 29163c5 Author: Reynold Xin r...@databricks.com Authored: Thu Apr 23 01:43:40 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Apr 23 01:43:40 2015 -0700 -- .../spark/sql/catalyst/ScalaReflection.scala| 24 ++-- .../sql/catalyst/expressions/arithmetic.scala | 4 +- .../expressions/codegen/CodeGenerator.scala | 18 ++- .../codegen/GenerateProjection.scala| 4 +- .../sql/catalyst/expressions/predicates.scala | 10 +- .../spark/sql/catalyst/expressions/rows.scala | 6 +- .../org/apache/spark/sql/types/dataTypes.scala | 114 +-- .../spark/sql/columnar/ColumnAccessor.scala | 2 +- .../spark/sql/columnar/ColumnBuilder.scala | 4 +- .../apache/spark/sql/columnar/ColumnType.scala | 6 +- .../CompressibleColumnAccessor.scala| 4 +- .../compression/CompressibleColumnBuilder.scala | 4 +- .../compression/CompressionScheme.scala | 10 +- .../compression/compressionSchemes.scala| 42 +++ .../org/apache/spark/sql/json/JsonRDD.scala | 6 +- .../spark/sql/parquet/ParquetConverter.scala| 12 +- .../spark/sql/parquet/ParquetTableSupport.scala | 2 +- .../spark/sql/columnar/ColumnStatsSuite.scala | 6 +- .../spark/sql/columnar/ColumnTypeSuite.scala| 8 +- .../spark/sql/columnar/ColumnarTestUtils.scala | 6 +- .../compression/DictionaryEncodingSuite.scala | 4 +- .../compression/IntegralDeltaSuite.scala| 6 +- .../compression/RunLengthEncodingSuite.scala| 4 +- .../TestCompressibleColumnBuilder.scala | 6 +- 24 files changed, 159 insertions(+), 153 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f60bece1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index d952195..c529655 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst -import java.sql.Timestamp - import org.apache.spark.util.Utils import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation @@ -110,7 +108,7 @@ trait ScalaReflection { StructField(p.name.toString, dataType, nullable) }), nullable = true) case t if t : typeOf[String] = Schema(StringType, nullable = true) - case t if t : typeOf[Timestamp] = Schema(TimestampType, nullable = true) + case t if t : typeOf[java.sql.Timestamp] = Schema(TimestampType, nullable = true) case t if t : typeOf[java.sql.Date] = Schema(DateType, nullable = true) case t if t : typeOf[BigDecimal] = Schema(DecimalType.Unlimited, nullable = true) case t if t : typeOf[java.math.BigDecimal] = Schema(DecimalType.Unlimited, nullable = true) @@ -136,20 +134,20 @@ trait ScalaReflection { def typeOfObject: PartialFunction[Any, DataType] = { // The data type can be determined without ambiguity. -case obj: BooleanType.JvmType = BooleanType -case obj: BinaryType.JvmType = BinaryType +case obj: Boolean = BooleanType +case obj: Array[Byte] = BinaryType case obj: String = StringType -case obj: StringType.JvmType = StringType -case obj: ByteType.JvmType = ByteType -case obj: ShortType.JvmType = ShortType -case obj: IntegerType.JvmType = IntegerType -case obj: LongType.JvmType = LongType -case obj: FloatType.JvmType = FloatType -case obj: DoubleType.JvmType = DoubleType +case obj: UTF8String = StringType +case obj: Byte = ByteType +case obj: Short = ShortType +case obj: Int = IntegerType +case obj: Long = LongType +case obj: Float = FloatType +case obj: Double = DoubleType case obj: java.sql.Date = DateType case obj: java.math.BigDecimal =
spark git commit: [HOTFIX] [SQL] Fix compilation for scala 2.11.
Repository: spark Updated Branches: refs/heads/master f60bece14 - a7d65d38f [HOTFIX] [SQL] Fix compilation for scala 2.11. Author: Prashant Sharma prashan...@imaginea.com Closes #5652 from ScrapCodes/hf/compilation-fix-scala-2.11 and squashes the following commits: 819ff06 [Prashant Sharma] [HOTFIX] Fix compilation for scala 2.11. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a7d65d38 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a7d65d38 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a7d65d38 Branch: refs/heads/master Commit: a7d65d38f934c5c751ba32aa7ab648c6d16044ab Parents: f60bece Author: Prashant Sharma prashan...@imaginea.com Authored: Thu Apr 23 16:45:26 2015 +0530 Committer: Prashant Sharma prashan...@imaginea.com Committed: Thu Apr 23 16:45:26 2015 +0530 -- .../test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a7d65d38/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java -- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index fc3ed4a..e02c848 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -162,7 +162,7 @@ public class JavaDataFrameSuite { BufferInteger outputBuffer = (BufferInteger) first.getJavaMap(2).get(hello); Assert.assertArrayEquals( bean.getC().get(hello), - Ints.toArray(JavaConversions.asJavaList(outputBuffer))); + Ints.toArray(JavaConversions.bufferAsJavaList(outputBuffer))); SeqString d = first.getAs(3); Assert.assertEquals(bean.getD().size(), d.length()); for (int i = 0; i d.length(); i++) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Fixed a typo from the previous commit.
Repository: spark Updated Branches: refs/heads/master d3a302def - 4c722d77a Fixed a typo from the previous commit. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c722d77 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c722d77 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c722d77 Branch: refs/heads/master Commit: 4c722d77ae7e77eeaa7531687fa9bd6050344d18 Parents: d3a302d Author: Reynold Xin r...@databricks.com Authored: Thu Apr 23 22:39:00 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Apr 23 22:39:00 2015 -0700 -- .../src/main/scala/org/apache/spark/sql/types/DataType.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4c722d77/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index 06bff7d..0992a7c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -41,7 +41,7 @@ import org.apache.spark.util.Utils @DeveloperApi abstract class DataType { /** - * Enables matching against NumericType for expressions: + * Enables matching against DataType for expressions: * {{{ * case Cast(child @ BinaryType(), StringType) = * ... - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7060][SQL] Add alias function to python dataframe
Repository: spark Updated Branches: refs/heads/master 336f7f537 - 2d010f7af [SPARK-7060][SQL] Add alias function to python dataframe This pr tries to provide a way to let python users workaround https://issues.apache.org/jira/browse/SPARK-6231. Author: Yin Huai yh...@databricks.com Closes #5634 from yhuai/pythonDFAlias and squashes the following commits: 8465acd [Yin Huai] Add an alias to a Python DF. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d010f7a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d010f7a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d010f7a Branch: refs/heads/master Commit: 2d010f7afe6ac8e67e07da6bea700e9e8c9e6cc2 Parents: 336f7f5 Author: Yin Huai yh...@databricks.com Authored: Thu Apr 23 18:52:55 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Apr 23 18:52:55 2015 -0700 -- python/pyspark/sql/dataframe.py | 14 ++ 1 file changed, 14 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2d010f7a/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index c8c30ce..4759f5f 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -453,6 +453,20 @@ class DataFrame(object): return [f.name for f in self.schema.fields] @ignore_unicode_prefix +def alias(self, alias): +Returns a new :class:`DataFrame` with an alias set. + + from pyspark.sql.functions import * + df_as1 = df.alias(df_as1) + df_as2 = df.alias(df_as2) + joined_df = df_as1.join(df_as2, col(df_as1.name) == col(df_as2.name), 'inner') + joined_df.select(col(df_as1.name), col(df_as2.name), col(df_as2.age)).collect() +[Row(name=u'Alice', name=u'Alice', age=2), Row(name=u'Bob', name=u'Bob', age=5)] + +assert isinstance(alias, basestring), alias should be a string +return DataFrame(getattr(self._jdf, as)(alias), self.sql_ctx) + +@ignore_unicode_prefix def join(self, other, joinExprs=None, joinType=None): Joins with another :class:`DataFrame`, using the given join expression. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7044][SQL] Fix the deadlock in ScriptTransform(for Spark 1.3)
Repository: spark Updated Branches: refs/heads/branch-1.3 e76317b65 - 2b340af0c [SPARK-7044][SQL] Fix the deadlock in ScriptTransform(for Spark 1.3) Author: Cheng Hao hao.ch...@intel.com Closes #5671 from chenghao-intel/transform2 and squashes the following commits: 2237e81 [Cheng Hao] fix the deadlock in ScriptTransform Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b340af0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b340af0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b340af0 Branch: refs/heads/branch-1.3 Commit: 2b340af0c96c39b7da21b44072b136b6fe582210 Parents: e76317b Author: Cheng Hao hao.ch...@intel.com Authored: Thu Apr 23 20:16:51 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Thu Apr 23 20:16:51 2015 -0700 -- .../hive/execution/ScriptTransformation.scala | 33 +--- .../sql/hive/execution/SQLQuerySuite.scala | 8 + 2 files changed, 29 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2b340af0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 8efed7f..e41dfbd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -146,20 +146,29 @@ case class ScriptTransformation( val dataOutputStream = new DataOutputStream(outputStream) val outputProjection = new InterpretedProjection(input, child.output) - iter -.map(outputProjection) -.foreach { row = - if (inputSerde == null) { -val data = row.mkString(, ioschema.inputRowFormatMap(TOK_TABLEROWFORMATFIELD), - ioschema.inputRowFormatMap(TOK_TABLEROWFORMATLINES)).getBytes(utf-8) - -outputStream.write(data) - } else { -val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi) -prepareWritable(writable).write(dataOutputStream) + // Put the write(output to the pipeline) into a single thread + // and keep the collector as remain in the main thread. + // otherwise it will causes deadlock if the data size greater than + // the pipeline / buffer capacity. + new Thread(new Runnable() { +override def run(): Unit = { + iter +.map(outputProjection) +.foreach { row = +if (inputSerde == null) { + val data = row.mkString(, ioschema.inputRowFormatMap(TOK_TABLEROWFORMATFIELD), + ioschema.inputRowFormatMap(TOK_TABLEROWFORMATLINES)).getBytes(utf-8) + + outputStream.write(data) +} else { + val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi) + prepareWritable(writable).write(dataOutputStream) +} } + outputStream.close() } - outputStream.close() + }).start() + iterator } } http://git-wip-us.apache.org/repos/asf/spark/blob/2b340af0/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index e177f29..b473810 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -538,4 +538,12 @@ class SQLQuerySuite extends QueryTest { sql(sDROP TABLE $tableName) } } + + test(test script transform) { +val data = (1 to 10).map { i = (i, i, i) } +data.toDF(d1, d2, d3).registerTempTable(script_trans) +assert(10 === + sql(SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans) + .queryExecution.toRdd.count()) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org