spark git commit: [SPARK-8702] [WEBUI] Avoid massive concating strings in Javascript
Repository: spark Updated Branches: refs/heads/master 660c6cec7 - 630bd5fd8 [SPARK-8702] [WEBUI] Avoid massive concating strings in Javascript When there are massive tasks, such as `sc.parallelize(1 to 10, 1).count()`, the generated JS codes have a lot of string concatenations in the stage page, nearly 40 string concatenations for one task. We can generate the whole string for a task instead of execution string concatenations in the browser. Before this patch, the load time of the page is about 21 seconds. ![screen shot 2015-06-29 at 6 44 04 pm](https://cloud.githubusercontent.com/assets/1000778/8406644/eb55ed18-1e90-11e5-9ad5-50d27ad1dff1.png) After this patch, it reduces to about 17 seconds. ![screen shot 2015-06-29 at 6 47 34 pm](https://cloud.githubusercontent.com/assets/1000778/8406665/087003ca-1e91-11e5-80a8-3485aa9adafa.png) One disadvantage is that the generated JS codes become hard to read. Author: zsxwing zsxw...@gmail.com Closes #7082 from zsxwing/js-string and squashes the following commits: b29231d [zsxwing] Avoid massive concating strings in Javascript Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/630bd5fd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/630bd5fd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/630bd5fd Branch: refs/heads/master Commit: 630bd5fd80193ab6dc6ad0e7bcc13ee0dadabd38 Parents: 660c6ce Author: zsxwing zsxw...@gmail.com Authored: Tue Jun 30 00:46:55 2015 +0900 Committer: Kousuke Saruta saru...@oss.nttdata.co.jp Committed: Tue Jun 30 00:46:55 2015 +0900 -- .../org/apache/spark/ui/jobs/StagePage.scala| 88 ++-- 1 file changed, 44 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/630bd5fd/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index b83a49f..e96bf49 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -572,55 +572,55 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage(stage) { val attempt = taskInfo.attempt val timelineObject = s - { - 'className': 'task task-assignment-timeline-object', - 'group': '$executorId', - 'content': 'div class=task-assignment-timeline-content' + - 'data-toggle=tooltip data-placement=top' + - 'data-html=true data-container=body' + - 'data-title=${sTask + index + (attempt + attempt + )}br' + - 'Status: ${taskInfo.status}br' + - 'Launch Time: ${UIUtils.formatDate(new Date(launchTime))}' + - '${ + |{ + |'className': 'task task-assignment-timeline-object', + |'group': '$executorId', + |'content': 'div class=task-assignment-timeline-content + |data-toggle=tooltip data-placement=top + |data-html=true data-container=body + |data-title=${sTask + index + (attempt + attempt + )}br + |Status: ${taskInfo.status}br + |Launch Time: ${UIUtils.formatDate(new Date(launchTime))} + |${ if (!taskInfo.running) { sbrFinish Time: ${UIUtils.formatDate(new Date(finishTime))} } else { } - }' + - 'brScheduler Delay: $schedulerDelay ms' + - 'brTask Deserialization Time: ${UIUtils.formatDuration(deserializationTime)}' + - 'brShuffle Read Time: ${UIUtils.formatDuration(shuffleReadTime)}' + - 'brExecutor Computing Time: ${UIUtils.formatDuration(executorComputingTime)}' + - 'brShuffle Write Time: ${UIUtils.formatDuration(shuffleWriteTime)}' + - 'brResult Serialization Time: ${UIUtils.formatDuration(serializationTime)}' + - 'brGetting Result Time: ${UIUtils.formatDuration(gettingResultTime)}' + - 'svg class=task-assignment-timeline-duration-bar' + - 'rect class=scheduler-delay-proportion ' + - 'x=$schedulerDelayProportionPos% y=0px height=26px' + - 'width=$schedulerDelayProportion%/rect' + - 'rect class=deserialization-time-proportion '+ - 'x=$deserializationTimeProportionPos% y=0px height=26px' + - 'width=$deserializationTimeProportion%/rect'
spark git commit: Revert [SPARK-8372] History server shows incorrect information for application not started
Repository: spark Updated Branches: refs/heads/master 715f084ca - ea88b1a50 Revert [SPARK-8372] History server shows incorrect information for application not started This reverts commit 2837e067099921dd4ab6639ac5f6e89f789d4ff4. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea88b1a5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea88b1a5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea88b1a5 Branch: refs/heads/master Commit: ea88b1a5077e6ba980b0de6d3bc508c62285ba4c Parents: 715f084 Author: Andrew Or and...@databricks.com Authored: Mon Jun 29 10:52:05 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Mon Jun 29 10:52:05 2015 -0700 -- .../deploy/history/FsHistoryProvider.scala | 38 +++-- .../deploy/history/FsHistoryProviderSuite.scala | 43 ++-- 2 files changed, 28 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ea88b1a5/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index db383b9..5427a88 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) replayBus.addListener(appListener) val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus) - appInfo.foreach { app = ui.setAppName(s${app.name} ($appId)) } + ui.setAppName(s${appInfo.name} ($appId)) val uiAclsEnabled = conf.getBoolean(spark.history.ui.acls.enable, false) ui.getSecurityManager.setAcls(uiAclsEnabled) @@ -282,12 +282,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val newAttempts = logs.flatMap { fileStatus = try { val res = replay(fileStatus, bus) -res match { - case Some(r) = logDebug(sApplication log ${r.logPath} loaded successfully.) - case None = logWarning(sFailed to load application log ${fileStatus.getPath}. + -The application may have not started.) -} -res +logInfo(sApplication log ${res.logPath} loaded successfully.) +Some(res) } catch { case e: Exception = logError( @@ -433,11 +429,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) /** * Replays the events in the specified log file and returns information about the associated - * application. Return `None` if the application ID cannot be located. + * application. */ - private def replay( - eventLog: FileStatus, - bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = { + private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = { val logPath = eventLog.getPath() logInfo(sReplaying log path: $logPath) val logInput = @@ -451,18 +445,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appCompleted = isApplicationCompleted(eventLog) bus.addListener(appListener) bus.replay(logInput, logPath.toString, !appCompleted) - appListener.appId.map { appId = -new FsApplicationAttemptInfo( - logPath.getName(), - appListener.appName.getOrElse(NOT_STARTED), - appId, - appListener.appAttemptId, - appListener.startTime.getOrElse(-1L), - appListener.endTime.getOrElse(-1L), - getModificationTime(eventLog).get, - appListener.sparkUser.getOrElse(NOT_STARTED), - appCompleted) - } + new FsApplicationAttemptInfo( +logPath.getName(), +appListener.appName.getOrElse(NOT_STARTED), +appListener.appId.getOrElse(logPath.getName()), +appListener.appAttemptId, +appListener.startTime.getOrElse(-1L), +appListener.endTime.getOrElse(-1L), +getModificationTime(eventLog).get, +appListener.sparkUser.getOrElse(NOT_STARTED), +appCompleted) } finally { logInput.close() } http://git-wip-us.apache.org/repos/asf/spark/blob/ea88b1a5/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
spark git commit: Revert [SPARK-8372] History server shows incorrect information for application not started
Repository: spark Updated Branches: refs/heads/branch-1.4 9d9c4b476 - f7c200e6a Revert [SPARK-8372] History server shows incorrect information for application not started This reverts commit f0513733d4f6fc34f86feffd3062600cbbd56a28. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7c200e6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7c200e6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7c200e6 Branch: refs/heads/branch-1.4 Commit: f7c200e6ac4eee8eff5db8e6da58b046c32ea6df Parents: 9d9c4b4 Author: Andrew Or and...@databricks.com Authored: Mon Jun 29 10:52:23 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Mon Jun 29 10:52:23 2015 -0700 -- .../deploy/history/FsHistoryProvider.scala | 38 +++-- .../deploy/history/FsHistoryProviderSuite.scala | 43 ++-- 2 files changed, 28 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f7c200e6/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 5f39b4b..45c2be3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -157,7 +157,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) replayBus.addListener(appListener) val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus) - appInfo.foreach { app = ui.setAppName(s${app.name} ($appId)) } + ui.setAppName(s${appInfo.name} ($appId)) val uiAclsEnabled = conf.getBoolean(spark.history.ui.acls.enable, false) ui.getSecurityManager.setAcls(uiAclsEnabled) @@ -227,12 +227,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val newAttempts = logs.flatMap { fileStatus = try { val res = replay(fileStatus, bus) -res match { - case Some(r) = logDebug(sApplication log ${r.logPath} loaded successfully.) - case None = logWarning(sFailed to load application log ${fileStatus.getPath}. + -The application may have not started.) -} -res +logInfo(sApplication log ${res.logPath} loaded successfully.) +Some(res) } catch { case e: Exception = logError( @@ -378,11 +374,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) /** * Replays the events in the specified log file and returns information about the associated - * application. Return `None` if the application ID cannot be located. + * application. */ - private def replay( - eventLog: FileStatus, - bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = { + private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = { val logPath = eventLog.getPath() logInfo(sReplaying log path: $logPath) val logInput = @@ -396,18 +390,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appCompleted = isApplicationCompleted(eventLog) bus.addListener(appListener) bus.replay(logInput, logPath.toString, !appCompleted) - appListener.appId.map { appId = -new FsApplicationAttemptInfo( - logPath.getName(), - appListener.appName.getOrElse(NOT_STARTED), - appId, - appListener.appAttemptId, - appListener.startTime.getOrElse(-1L), - appListener.endTime.getOrElse(-1L), - getModificationTime(eventLog).get, - appListener.sparkUser.getOrElse(NOT_STARTED), - appCompleted) - } + new FsApplicationAttemptInfo( +logPath.getName(), +appListener.appName.getOrElse(NOT_STARTED), +appListener.appId.getOrElse(logPath.getName()), +appListener.appAttemptId, +appListener.startTime.getOrElse(-1L), +appListener.endTime.getOrElse(-1L), +getModificationTime(eventLog).get, +appListener.sparkUser.getOrElse(NOT_STARTED), +appCompleted) } finally { logInput.close() } http://git-wip-us.apache.org/repos/asf/spark/blob/f7c200e6/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
spark git commit: [SPARK-8661][ML] for LinearRegressionSuite.scala, changed javadoc-style comments to regular multiline comments, to make copy-pasting R code more simple
Repository: spark Updated Branches: refs/heads/master ed359de59 - 4e880cf59 [SPARK-8661][ML] for LinearRegressionSuite.scala, changed javadoc-style comments to regular multiline comments, to make copy-pasting R code more simple for mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala, changed javadoc-style comments to regular multiline comments, to make copy-pasting R code more simple Author: Rosstin astera...@gmail.com Closes #7098 from Rosstin/SPARK-8661 and squashes the following commits: 5a05dee [Rosstin] SPARK-8661 for LinearRegressionSuite.scala, changed javadoc-style comments to regular multiline comments to make it easier to copy-paste the R code. bb9a4b1 [Rosstin] Merge branch 'master' of github.com:apache/spark into SPARK-8660 242aedd [Rosstin] SPARK-8660, changed comment style from JavaDoc style to normal multiline comment in order to make copypaste into R easier, in file classification/LogisticRegressionSuite.scala 2cd2985 [Rosstin] Merge branch 'master' of github.com:apache/spark into SPARK-8639 21ac1e5 [Rosstin] Merge branch 'master' of github.com:apache/spark into SPARK-8639 6c18058 [Rosstin] fixed minor typos in docs/README.md and docs/api.md Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e880cf5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e880cf5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e880cf5 Branch: refs/heads/master Commit: 4e880cf5967c0933e1d098a1d1f7db34b23ca8f8 Parents: ed359de Author: Rosstin astera...@gmail.com Authored: Mon Jun 29 16:09:29 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Jun 29 16:09:29 2015 -0700 -- .../ml/regression/LinearRegressionSuite.scala | 192 +-- 1 file changed, 96 insertions(+), 96 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4e880cf5/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala index ad1e9da..5f39d44 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala @@ -28,26 +28,26 @@ class LinearRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { @transient var dataset: DataFrame = _ @transient var datasetWithoutIntercept: DataFrame = _ - /** - * In `LinearRegressionSuite`, we will make sure that the model trained by SparkML - * is the same as the one trained by R's glmnet package. The following instruction - * describes how to reproduce the data in R. - * - * import org.apache.spark.mllib.util.LinearDataGenerator - * val data = - * sc.parallelize(LinearDataGenerator.generateLinearInput(6.3, Array(4.7, 7.2), - * Array(0.9, -1.3), Array(0.7, 1.2), 1, 42, 0.1), 2) - * data.map(x= x.label + , + x.features(0) + , + x.features(1)).coalesce(1) - * .saveAsTextFile(path) + /* + In `LinearRegressionSuite`, we will make sure that the model trained by SparkML + is the same as the one trained by R's glmnet package. The following instruction + describes how to reproduce the data in R. + + import org.apache.spark.mllib.util.LinearDataGenerator + val data = + sc.parallelize(LinearDataGenerator.generateLinearInput(6.3, Array(4.7, 7.2), + Array(0.9, -1.3), Array(0.7, 1.2), 1, 42, 0.1), 2) + data.map(x= x.label + , + x.features(0) + , + x.features(1)).coalesce(1) + .saveAsTextFile(path) */ override def beforeAll(): Unit = { super.beforeAll() dataset = sqlContext.createDataFrame( sc.parallelize(LinearDataGenerator.generateLinearInput( 6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 1, 42, 0.1), 2)) -/** - * datasetWithoutIntercept is not needed for correctness testing but is useful for illustrating - * training model without intercept +/* + datasetWithoutIntercept is not needed for correctness testing but is useful for illustrating + training model without intercept */ datasetWithoutIntercept = sqlContext.createDataFrame( sc.parallelize(LinearDataGenerator.generateLinearInput( @@ -59,20 +59,20 @@ class LinearRegressionSuite extends SparkFunSuite with MLlibTestSparkContext { val trainer = new LinearRegression val model = trainer.fit(dataset) -/** - * Using the following R code to load the data and train the model using glmnet package. - * - * library(glmnet) - * data - read.csv(path,
spark git commit: [SPARK-8710] [SQL] Change ScalaReflection.mirror from a val to a def.
Repository: spark Updated Branches: refs/heads/master 4e880cf59 - 4b497a724 [SPARK-8710] [SQL] Change ScalaReflection.mirror from a val to a def. jira: https://issues.apache.org/jira/browse/SPARK-8710 Author: Yin Huai yh...@databricks.com Closes #7094 from yhuai/SPARK-8710 and squashes the following commits: c854baa [Yin Huai] Change ScalaReflection.mirror from a val to a def. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b497a72 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b497a72 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b497a72 Branch: refs/heads/master Commit: 4b497a724a87ef24702c2df9ec6863ee57a87c1c Parents: 4e880cf Author: Yin Huai yh...@databricks.com Authored: Mon Jun 29 16:26:05 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Jun 29 16:26:05 2015 -0700 -- .../org/apache/spark/sql/catalyst/ScalaReflection.scala | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4b497a72/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 90698cd..21b1de1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -28,7 +28,11 @@ import org.apache.spark.sql.types._ */ object ScalaReflection extends ScalaReflection { val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe - val mirror: universe.Mirror = universe.runtimeMirror(Thread.currentThread().getContextClassLoader) + // Since we are creating a runtime mirror usign the class loader of current thread, + // we need to use def at here. So, every time we call mirror, it is using the + // class loader of the current thread. + override def mirror: universe.Mirror = +universe.runtimeMirror(Thread.currentThread().getContextClassLoader) } /** @@ -39,7 +43,7 @@ trait ScalaReflection { val universe: scala.reflect.api.Universe /** The mirror used to access types in the universe */ - val mirror: universe.Mirror + def mirror: universe.Mirror import universe._ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8710] [SQL] Change ScalaReflection.mirror from a val to a def.
Repository: spark Updated Branches: refs/heads/branch-1.4 0c3f7fc88 - 6a45d86db [SPARK-8710] [SQL] Change ScalaReflection.mirror from a val to a def. jira: https://issues.apache.org/jira/browse/SPARK-8710 Author: Yin Huai yh...@databricks.com Closes #7094 from yhuai/SPARK-8710 and squashes the following commits: c854baa [Yin Huai] Change ScalaReflection.mirror from a val to a def. (cherry picked from commit 4b497a724a87ef24702c2df9ec6863ee57a87c1c) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a45d86d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a45d86d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a45d86d Branch: refs/heads/branch-1.4 Commit: 6a45d86dbdefe8d8c50342953f30a28502cb2c3f Parents: 0c3f7fc Author: Yin Huai yh...@databricks.com Authored: Mon Jun 29 16:26:05 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Jun 29 16:26:13 2015 -0700 -- .../org/apache/spark/sql/catalyst/ScalaReflection.scala | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6a45d86d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 6998cc8..dca5ffb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -27,7 +27,11 @@ import org.apache.spark.sql.types._ */ object ScalaReflection extends ScalaReflection { val universe: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe - val mirror: universe.Mirror = universe.runtimeMirror(Thread.currentThread().getContextClassLoader) + // Since we are creating a runtime mirror usign the class loader of current thread, + // we need to use def at here. So, every time we call mirror, it is using the + // class loader of the current thread. + override def mirror: universe.Mirror = +universe.runtimeMirror(Thread.currentThread().getContextClassLoader) } /** @@ -38,7 +42,7 @@ trait ScalaReflection { val universe: scala.reflect.api.Universe /** The mirror used to access types in the universe */ - val mirror: universe.Mirror + def mirror: universe.Mirror import universe._ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8589] [SQL] cleanup DateTimeUtils
Repository: spark Updated Branches: refs/heads/master 4b497a724 - 881662e9c [SPARK-8589] [SQL] cleanup DateTimeUtils move date time related operations into `DateTimeUtils` and rename some methods to make it more clear. Author: Wenchen Fan cloud0...@outlook.com Closes #6980 from cloud-fan/datetime and squashes the following commits: 9373a9d [Wenchen Fan] cleanup DateTimeUtil Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/881662e9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/881662e9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/881662e9 Branch: refs/heads/master Commit: 881662e9c93893430756320f51cef0fc6643f681 Parents: 4b497a7 Author: Wenchen Fan cloud0...@outlook.com Authored: Mon Jun 29 16:34:50 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Jun 29 16:34:50 2015 -0700 -- .../spark/sql/catalyst/expressions/Cast.scala | 43 ++-- .../spark/sql/catalyst/util/DateTimeUtils.scala | 70 ++-- .../spark/sql/hive/hiveWriterContainers.scala | 2 +- 3 files changed, 58 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/881662e9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 8d66968..d69d490 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.expressions import java.math.{BigDecimal = JavaBigDecimal} import java.sql.{Date, Timestamp} -import java.text.{DateFormat, SimpleDateFormat} import org.apache.spark.Logging import org.apache.spark.sql.catalyst.analysis.TypeCheckResult @@ -122,9 +121,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w // UDFToString private[this] def castToString(from: DataType): Any = Any = from match { case BinaryType = buildCast[Array[Byte]](_, UTF8String.fromBytes) -case DateType = buildCast[Int](_, d = UTF8String.fromString(DateTimeUtils.toString(d))) +case DateType = buildCast[Int](_, d = UTF8String.fromString(DateTimeUtils.dateToString(d))) case TimestampType = buildCast[Long](_, - t = UTF8String.fromString(timestampToString(DateTimeUtils.toJavaTimestamp(t + t = UTF8String.fromString(DateTimeUtils.timestampToString(t))) case _ = buildCast[Any](_, o = UTF8String.fromString(o.toString)) } @@ -183,7 +182,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case ByteType = buildCast[Byte](_, b = longToTimestamp(b.toLong)) case DateType = - buildCast[Int](_, d = DateTimeUtils.toMillisSinceEpoch(d) * 1) + buildCast[Int](_, d = DateTimeUtils.daysToMillis(d) * 1) // TimestampWritable.decimalToTimestamp case DecimalType() = buildCast[Decimal](_, d = decimalToTimestamp(d)) @@ -216,18 +215,6 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w ts / 1000.0 } - // Converts Timestamp to string according to Hive TimestampWritable convention - private[this] def timestampToString(ts: Timestamp): String = { -val timestampString = ts.toString -val formatted = Cast.threadLocalTimestampFormat.get.format(ts) - -if (timestampString.length 19 timestampString.substring(19) != .0) { - formatted + timestampString.substring(19) -} else { - formatted -} - } - // DateConverter private[this] def castToDate(from: DataType): Any = Any = from match { case StringType = @@ -449,11 +436,11 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case (DateType, StringType) = defineCodeGen(ctx, ev, c = s${ctx.stringType}.fromString( - org.apache.spark.sql.catalyst.util.DateTimeUtils.toString($c))) - // Special handling required for timestamps in hive test cases since the toString function - // does not match the expected output. + org.apache.spark.sql.catalyst.util.DateTimeUtils.dateToString($c))) case (TimestampType, StringType) = -super.genCode(ctx, ev) +defineCodeGen(ctx, ev, c = + s${ctx.stringType}.fromString( + org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($c))) case (_, StringType) = defineCodeGen(ctx, ev, c = s${ctx.stringType}.fromString(String.valueOf($c))) @@
spark git commit: [SPARK-8634] [STREAMING] [TESTS] Fix flaky test StreamingListenerSuite receiver info reporting
Repository: spark Updated Branches: refs/heads/master 881662e9c - cec98525f [SPARK-8634] [STREAMING] [TESTS] Fix flaky test StreamingListenerSuite receiver info reporting As per the unit test log in https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35754/ ``` 15/06/24 23:09:10.210 Thread-3495 INFO ReceiverTracker: Starting 1 receivers 15/06/24 23:09:10.270 Thread-3495 INFO SparkContext: Starting job: apply at Transformer.scala:22 ... 15/06/24 23:09:14.259 ForkJoinPool-4-worker-29 INFO StreamingListenerSuiteReceiver: Started receiver and sleeping 15/06/24 23:09:14.270 ForkJoinPool-4-worker-29 INFO StreamingListenerSuiteReceiver: Reporting error and sleeping ``` it needs at least 4 seconds to receive all receiver events in this slow machine, but `timeout` for `eventually` is only 2 seconds. This PR increases `timeout` to make this test stable. Author: zsxwing zsxw...@gmail.com Closes #7017 from zsxwing/SPARK-8634 and squashes the following commits: 719cae4 [zsxwing] Fix flaky test StreamingListenerSuite receiver info reporting Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cec98525 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cec98525 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cec98525 Branch: refs/heads/master Commit: cec98525fd2b731cb78935bf7bc6c7963411744e Parents: 881662e Author: zsxwing zsxw...@gmail.com Authored: Mon Jun 29 17:19:05 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Mon Jun 29 17:19:05 2015 -0700 -- .../scala/org/apache/spark/streaming/StreamingListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cec98525/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 1dc8960..7bc7727 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -116,7 +116,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { ssc.start() try { - eventually(timeout(2000 millis), interval(20 millis)) { + eventually(timeout(30 seconds), interval(20 millis)) { collector.startedReceiverStreamIds.size should equal (1) collector.startedReceiverStreamIds(0) should equal (0) collector.stoppedReceiverStreamIds should have size 1 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7287] [SPARK-8567] [TEST] Add sc.stop to applications in SparkSubmitSuite
Repository: spark Updated Branches: refs/heads/master cec98525f - fbf75738f [SPARK-7287] [SPARK-8567] [TEST] Add sc.stop to applications in SparkSubmitSuite Hopefully, this suite will not be flaky anymore. Author: Yin Huai yh...@databricks.com Closes #7027 from yhuai/SPARK-8567 and squashes the following commits: c0167e2 [Yin Huai] Add sc.stop(). Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbf75738 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbf75738 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbf75738 Branch: refs/heads/master Commit: fbf75738feddebb352d5cedf503b573105d4b7a7 Parents: cec9852 Author: Yin Huai yh...@databricks.com Authored: Mon Jun 29 17:20:05 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Mon Jun 29 17:20:05 2015 -0700 -- .../apache/spark/deploy/SparkSubmitSuite.scala | 2 ++ .../regression-test-SPARK-8489/Main.scala| 1 + .../regression-test-SPARK-8489/test.jar | Bin 6811 - 6828 bytes 3 files changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fbf75738/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 357ed90..2e05dec 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -548,6 +548,7 @@ object JarCreationTest extends Logging { if (result.nonEmpty) { throw new Exception(Could not load user class from jar:\n + result(0)) } +sc.stop() } } @@ -573,6 +574,7 @@ object SimpleApplicationTest { sMaster had $config=$masterValue but executor had $config=$executorValue) } } +sc.stop() } } http://git-wip-us.apache.org/repos/asf/spark/blob/fbf75738/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala -- diff --git a/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala b/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala index e171517..0e428ba 100644 --- a/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala +++ b/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala @@ -38,6 +38,7 @@ object Main { val df = hc.createDataFrame(Seq(MyCoolClass(1, 2, 3))) df.collect() println(Regression test for SPARK-8489 success!) +sc.stop() } } http://git-wip-us.apache.org/repos/asf/spark/blob/fbf75738/sql/hive/src/test/resources/regression-test-SPARK-8489/test.jar -- diff --git a/sql/hive/src/test/resources/regression-test-SPARK-8489/test.jar b/sql/hive/src/test/resources/regression-test-SPARK-8489/test.jar index 4f59fba..5944aa6 100644 Binary files a/sql/hive/src/test/resources/regression-test-SPARK-8489/test.jar and b/sql/hive/src/test/resources/regression-test-SPARK-8489/test.jar differ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SQL][DOCS] Remove wrong example from DataFrame.scala
Repository: spark Updated Branches: refs/heads/master 492dca3a7 - 94e040d05 [SQL][DOCS] Remove wrong example from DataFrame.scala In DataFrame.scala, there are examples like as follows. ``` * // The following are equivalent: * peopleDf.filter($age 15) * peopleDf.where($age 15) * peopleDf($age 15) ``` But, I think the last example doesn't work. Author: Kousuke Saruta saru...@oss.nttdata.co.jp Closes #6977 from sarutak/fix-dataframe-example and squashes the following commits: 46efbd7 [Kousuke Saruta] Removed wrong example Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94e040d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94e040d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94e040d0 Branch: refs/heads/master Commit: 94e040d05996111b2b448bcdee1cda184c6d039b Parents: 492dca3 Author: Kousuke Saruta saru...@oss.nttdata.co.jp Authored: Mon Jun 29 12:16:12 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Jun 29 12:16:12 2015 -0700 -- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 2 -- 1 file changed, 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/94e040d0/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index d75d883..986e591 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -682,7 +682,6 @@ class DataFrame private[sql]( * // The following are equivalent: * peopleDf.filter($age 15) * peopleDf.where($age 15) - * peopleDf($age 15) * }}} * @group dfops * @since 1.3.0 @@ -707,7 +706,6 @@ class DataFrame private[sql]( * // The following are equivalent: * peopleDf.filter($age 15) * peopleDf.where($age 15) - * peopleDf($age 15) * }}} * @group dfops * @since 1.3.0 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SQL][DOCS] Remove wrong example from DataFrame.scala
Repository: spark Updated Branches: refs/heads/branch-1.4 f7c200e6a - da51cf58f [SQL][DOCS] Remove wrong example from DataFrame.scala In DataFrame.scala, there are examples like as follows. ``` * // The following are equivalent: * peopleDf.filter($age 15) * peopleDf.where($age 15) * peopleDf($age 15) ``` But, I think the last example doesn't work. Author: Kousuke Saruta saru...@oss.nttdata.co.jp Closes #6977 from sarutak/fix-dataframe-example and squashes the following commits: 46efbd7 [Kousuke Saruta] Removed wrong example (cherry picked from commit 94e040d05996111b2b448bcdee1cda184c6d039b) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da51cf58 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da51cf58 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da51cf58 Branch: refs/heads/branch-1.4 Commit: da51cf58fcdc7a094590d267f3ce779d91fea6e6 Parents: f7c200e Author: Kousuke Saruta saru...@oss.nttdata.co.jp Authored: Mon Jun 29 12:16:12 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Jun 29 12:16:44 2015 -0700 -- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 2 -- 1 file changed, 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/da51cf58/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 29bba18..0b76138 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -633,7 +633,6 @@ class DataFrame private[sql]( * // The following are equivalent: * peopleDf.filter($age 15) * peopleDf.where($age 15) - * peopleDf($age 15) * }}} * @group dfops * @since 1.3.0 @@ -658,7 +657,6 @@ class DataFrame private[sql]( * // The following are equivalent: * peopleDf.filter($age 15) * peopleDf.where($age 15) - * peopleDf($age 15) * }}} * @group dfops * @since 1.3.0 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8692] [SQL] re-order the case statements that handling catalyst data types
Repository: spark Updated Branches: refs/heads/master ea88b1a50 - ed413bcc7 [SPARK-8692] [SQL] re-order the case statements that handling catalyst data types use same order: boolean, byte, short, int, date, long, timestamp, float, double, string, binary, decimal. Then we can easily check whether some data types are missing by just one glance, and make sure we handle data/timestamp just as int/long. Author: Wenchen Fan cloud0...@outlook.com Closes #7073 from cloud-fan/fix-date and squashes the following commits: 463044d [Wenchen Fan] fix style 51cd347 [Wenchen Fan] refactor handling of date and timestmap Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed413bcc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed413bcc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed413bcc Branch: refs/heads/master Commit: ed413bcc78d8d97a1a0cd0871d7a20f7170476d0 Parents: ea88b1a Author: Wenchen Fan cloud0...@outlook.com Authored: Mon Jun 29 11:41:26 2015 -0700 Committer: Cheng Lian l...@databricks.com Committed: Mon Jun 29 11:41:26 2015 -0700 -- .../expressions/SpecificMutableRow.scala| 12 +-- .../expressions/UnsafeRowConverter.scala| 6 +- .../expressions/codegen/CodeGenerator.scala | 6 +- .../spark/sql/columnar/ColumnAccessor.scala | 42 +- .../spark/sql/columnar/ColumnBuilder.scala | 30 +++ .../apache/spark/sql/columnar/ColumnStats.scala | 74 +- .../apache/spark/sql/columnar/ColumnType.scala | 10 +-- .../sql/execution/SparkSqlSerializer2.scala | 82 +++- .../spark/sql/parquet/ParquetTableSupport.scala | 34 .../apache/spark/sql/parquet/ParquetTypes.scala | 4 +- .../spark/sql/columnar/ColumnStatsSuite.scala | 9 ++- .../spark/sql/columnar/ColumnTypeSuite.scala| 54 ++--- .../spark/sql/columnar/ColumnarTestUtils.scala | 8 +- .../columnar/NullableColumnAccessorSuite.scala | 6 +- .../columnar/NullableColumnBuilderSuite.scala | 6 +- 15 files changed, 174 insertions(+), 209 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ed413bcc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala index 53fedb5..3928c0f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala @@ -196,15 +196,15 @@ final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableR def this(dataTypes: Seq[DataType]) = this( dataTypes.map { -case IntegerType = new MutableInt +case BooleanType = new MutableBoolean case ByteType = new MutableByte -case FloatType = new MutableFloat case ShortType = new MutableShort +// We use INT for DATE internally +case IntegerType | DateType = new MutableInt +// We use Long for Timestamp internally +case LongType | TimestampType = new MutableLong +case FloatType = new MutableFloat case DoubleType = new MutableDouble -case BooleanType = new MutableBoolean -case LongType = new MutableLong -case DateType = new MutableInt // We use INT for DATE internally -case TimestampType = new MutableLong // We use Long for Timestamp internally case _ = new MutableAny }.toArray) http://git-wip-us.apache.org/repos/asf/spark/blob/ed413bcc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala index 89adaf0..b61d490 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala @@ -128,14 +128,12 @@ private object UnsafeColumnWriter { case BooleanType = BooleanUnsafeColumnWriter case ByteType = ByteUnsafeColumnWriter case ShortType = ShortUnsafeColumnWriter - case IntegerType = IntUnsafeColumnWriter - case LongType = LongUnsafeColumnWriter + case IntegerType | DateType = IntUnsafeColumnWriter + case LongType | TimestampType =
spark git commit: [SPARK-8579] [SQL] support arbitrary object in UnsafeRow
Repository: spark Updated Branches: refs/heads/master 931da5c8a - ed359de59 [SPARK-8579] [SQL] support arbitrary object in UnsafeRow This PR brings arbitrary object support in UnsafeRow (both in grouping key and aggregation buffer). Two object pools will be created to hold those non-primitive objects, and put the index of them into UnsafeRow. In order to compare the grouping key as bytes, the objects in key will be stored in a unique object pool, to make sure same objects will have same index (used as hashCode). For StringType and BinaryType, we still put them as var-length in UnsafeRow when initializing for better performance. But for update, they will be an object inside object pools (there will be some garbages left in the buffer). BTW: Will create a JIRA once issue.apache.org is available. cc JoshRosen rxin Author: Davies Liu dav...@databricks.com Closes #6959 from davies/unsafe_obj and squashes the following commits: 5ce39da [Davies Liu] fix comment 5e797bf [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_obj 5803d64 [Davies Liu] fix conflict 461d304 [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_obj 2f41c90 [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_obj b04d69c [Davies Liu] address comments 4859b80 [Davies Liu] fix comments f38011c [Davies Liu] add a test for grouping by decimal d2cf7ab [Davies Liu] add more tests for null checking 71983c5 [Davies Liu] add test for timestamp e8a1649 [Davies Liu] reuse buffer for string 39f09ca [Davies Liu] Merge branch 'master' of github.com:apache/spark into unsafe_obj 035501e [Davies Liu] fix style 236d6de [Davies Liu] support arbitrary object in UnsafeRow Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed359de5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed359de5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed359de5 Branch: refs/heads/master Commit: ed359de595d5dd67b60eddf092eaf89041c8 Parents: 931da5c Author: Davies Liu dav...@databricks.com Authored: Mon Jun 29 15:59:20 2015 -0700 Committer: Davies Liu dav...@databricks.com Committed: Mon Jun 29 15:59:20 2015 -0700 -- .../UnsafeFixedWidthAggregationMap.java | 144 ++-- .../sql/catalyst/expressions/UnsafeRow.java | 218 ++- .../spark/sql/catalyst/util/ObjectPool.java | 78 +++ .../sql/catalyst/util/UniqueObjectPool.java | 59 + .../apache/spark/sql/catalyst/InternalRow.scala | 5 +- .../expressions/UnsafeRowConverter.scala| 94 .../UnsafeFixedWidthAggregationMapSuite.scala | 65 -- .../expressions/UnsafeRowConverterSuite.scala | 190 .../sql/catalyst/util/ObjectPoolSuite.scala | 57 + .../sql/execution/GeneratedAggregate.scala | 16 +- 10 files changed, 615 insertions(+), 311 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ed359de5/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java index 83f2a31..1e79f4b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java @@ -19,9 +19,11 @@ package org.apache.spark.sql.catalyst.expressions; import java.util.Iterator; +import scala.Function1; + import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.catalyst.util.ObjectPool; +import org.apache.spark.sql.catalyst.util.UniqueObjectPool; import org.apache.spark.unsafe.PlatformDependent; import org.apache.spark.unsafe.map.BytesToBytesMap; import org.apache.spark.unsafe.memory.MemoryLocation; @@ -38,16 +40,28 @@ public final class UnsafeFixedWidthAggregationMap { * An empty aggregation buffer, encoded in UnsafeRow format. When inserting a new key into the * map, we copy this buffer and use it as the value. */ - private final byte[] emptyAggregationBuffer; + private final byte[] emptyBuffer; - private final StructType aggregationBufferSchema; + /** + * An empty row used by `initProjection` + */ + private static final InternalRow emptyRow = new GenericInternalRow(); - private final StructType groupingKeySchema; + /** + * Whether can the empty aggregation buffer be reuse without calling
spark git commit: [SPARK-7810] [PYSPARK] solve python rdd socket connection problem
Repository: spark Updated Branches: refs/heads/branch-1.4 457d07eaa - 187015f67 [SPARK-7810] [PYSPARK] solve python rdd socket connection problem Method _load_from_socket in rdd.py cannot load data from jvm socket when ipv6 is used. The current method only works well with ipv4. New modification should work around both two protocols. Author: Ai He ai...@ussuning.com Author: AiHe ai...@ussuning.com Closes #6338 from AiHe/pyspark-networking-issue and squashes the following commits: d4fc9c4 [Ai He] handle code review 2 e75c5c8 [Ai He] handle code review 5644953 [AiHe] solve python rdd socket connection problem to jvm (cherry picked from commit ecd3aacf2805bb231cfb44bab079319cfe73c3f1) Signed-off-by: Davies Liu dav...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/187015f6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/187015f6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/187015f6 Branch: refs/heads/branch-1.4 Commit: 187015f6712e13ca5f652ccdea2d084a9afecfc7 Parents: 457d07e Author: Ai He ai...@ussuning.com Authored: Mon Jun 29 14:36:26 2015 -0700 Committer: Davies Liu dav...@databricks.com Committed: Mon Jun 29 14:36:48 2015 -0700 -- python/pyspark/rdd.py | 18 +++--- 1 file changed, 15 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/187015f6/python/pyspark/rdd.py -- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1b64be2..cb20bc8 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -121,10 +121,22 @@ def _parse_memory(s): def _load_from_socket(port, serializer): -sock = socket.socket() -sock.settimeout(3) +sock = None +# Support for both IPv4 and IPv6. +# On most of IPv6-ready systems, IPv6 will take precedence. +for res in socket.getaddrinfo(localhost, port, socket.AF_UNSPEC, socket.SOCK_STREAM): +af, socktype, proto, canonname, sa = res +try: +sock = socket.socket(af, socktype, proto) +sock.settimeout(3) +sock.connect(sa) +except socket.error: +sock = None +continue +break +if not sock: +raise Exception(could not open socket) try: -sock.connect((localhost, port)) rf = sock.makefile(rb, 65536) for item in serializer.load_stream(rf): yield item - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7810] [PYSPARK] solve python rdd socket connection problem
Repository: spark Updated Branches: refs/heads/master f6fc254ec - ecd3aacf2 [SPARK-7810] [PYSPARK] solve python rdd socket connection problem Method _load_from_socket in rdd.py cannot load data from jvm socket when ipv6 is used. The current method only works well with ipv4. New modification should work around both two protocols. Author: Ai He ai...@ussuning.com Author: AiHe ai...@ussuning.com Closes #6338 from AiHe/pyspark-networking-issue and squashes the following commits: d4fc9c4 [Ai He] handle code review 2 e75c5c8 [Ai He] handle code review 5644953 [AiHe] solve python rdd socket connection problem to jvm Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecd3aacf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecd3aacf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecd3aacf Branch: refs/heads/master Commit: ecd3aacf2805bb231cfb44bab079319cfe73c3f1 Parents: f6fc254 Author: Ai He ai...@ussuning.com Authored: Mon Jun 29 14:36:26 2015 -0700 Committer: Davies Liu dav...@databricks.com Committed: Mon Jun 29 14:36:26 2015 -0700 -- python/pyspark/rdd.py | 18 +++--- 1 file changed, 15 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ecd3aacf/python/pyspark/rdd.py -- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1b64be2..cb20bc8 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -121,10 +121,22 @@ def _parse_memory(s): def _load_from_socket(port, serializer): -sock = socket.socket() -sock.settimeout(3) +sock = None +# Support for both IPv4 and IPv6. +# On most of IPv6-ready systems, IPv6 will take precedence. +for res in socket.getaddrinfo(localhost, port, socket.AF_UNSPEC, socket.SOCK_STREAM): +af, socktype, proto, canonname, sa = res +try: +sock = socket.socket(af, socktype, proto) +sock.settimeout(3) +sock.connect(sa) +except socket.error: +sock = None +continue +break +if not sock: +raise Exception(could not open socket) try: -sock.connect((localhost, port)) rf = sock.makefile(rb, 65536) for item in serializer.load_stream(rf): yield item - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [HOTFIX] Fix whitespace style error
Repository: spark Updated Branches: refs/heads/branch-1.4 0de1737a8 - 0c3f7fc88 [HOTFIX] Fix whitespace style error Author: Michael Armbrust mich...@databricks.com Closes #7102 from marmbrus/fixStyle and squashes the following commits: 8c08124 [Michael Armbrust] [HOTFIX] Fix whitespace style error Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c3f7fc8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c3f7fc8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c3f7fc8 Branch: refs/heads/branch-1.4 Commit: 0c3f7fc88a8e87d0581e2a4c7a73d47642dea129 Parents: 0de1737 Author: Michael Armbrust mich...@databricks.com Authored: Mon Jun 29 15:57:36 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Jun 29 15:57:36 2015 -0700 -- .../org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0c3f7fc8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index e62a1f9..d5ffccf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -308,8 +308,7 @@ trait HiveTypeCoercion { object InConversion extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { // Skip nodes who's children have not been resolved yet. - case e if !e.childrenResolved = e - + case e if !e.childrenResolved = e case i @ In(a, b) if b.exists(_.dataType != a.dataType) = i.makeCopy(Array(a, b.map(Cast(_, a.dataType } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SQL] [MINOR] Skip unresolved expression for InConversion
Repository: spark Updated Branches: refs/heads/branch-1.4 6b9f3831a - 457d07eaa [SQL] [MINOR] Skip unresolved expression for InConversion Author: scwf wangf...@huawei.com Closes #6145 from scwf/InConversion and squashes the following commits: 5c8ac6b [scwf] minir fix for InConversion (cherry picked from commit edf09ea1bd4bf7692e0085ad9c70cb1bfc8d06d8) Signed-off-by: Cheng Lian l...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/457d07ea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/457d07ea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/457d07ea Branch: refs/heads/branch-1.4 Commit: 457d07eaa023b44b75344110508f629925eb6247 Parents: 6b9f383 Author: scwf wangf...@huawei.com Authored: Sun May 17 15:17:11 2015 +0800 Committer: Cheng Lian l...@databricks.com Committed: Mon Jun 29 13:30:47 2015 -0700 -- .../org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/457d07ea/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 6d0f4a0..e62a1f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -307,6 +307,9 @@ trait HiveTypeCoercion { */ object InConversion extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { + // Skip nodes who's children have not been resolved yet. + case e if !e.childrenResolved = e + case i @ In(a, b) if b.exists(_.dataType != a.dataType) = i.makeCopy(Array(a, b.map(Cast(_, a.dataType } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8056][SQL] Design an easier way to construct schema for both Scala and Python
Repository: spark Updated Branches: refs/heads/master 27ef85451 - f6fc254ec [SPARK-8056][SQL] Design an easier way to construct schema for both Scala and Python I've added functionality to create new StructType similar to how we add parameters to a new SparkContext. I've also added tests for this type of creation. Author: Ilya Ganelin ilya.gane...@capitalone.com Closes #6686 from ilganeli/SPARK-8056B and squashes the following commits: 27c1de1 [Ilya Ganelin] Rename 467d836 [Ilya Ganelin] Removed from_string in favor of _parse_Datatype_json_value 5fef5a4 [Ilya Ganelin] Updates for type parsing 4085489 [Ilya Ganelin] Style errors 3670cf5 [Ilya Ganelin] added string to DataType conversion 8109e00 [Ilya Ganelin] Fixed error in tests 41ab686 [Ilya Ganelin] Fixed style errors e7ba7e0 [Ilya Ganelin] Moved some python tests to tests.py. Added cleaner handling of null data type and added test for correctness of input format 15868fa [Ilya Ganelin] Fixed python errors b79b992 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-8056B a3369fc [Ilya Ganelin] Fixing space errors e240040 [Ilya Ganelin] Style bab7823 [Ilya Ganelin] Constructor error 73d4677 [Ilya Ganelin] Style 4ed00d9 [Ilya Ganelin] Fixed default arg 67df57a [Ilya Ganelin] Removed Foo 04cbf0c [Ilya Ganelin] Added comments for single object 0484d7a [Ilya Ganelin] Restored second method 6aeb740 [Ilya Ganelin] Style 689e54d [Ilya Ganelin] Style f497e9e [Ilya Ganelin] Got rid of old code e3c7a88 [Ilya Ganelin] Fixed doctest failure a62ccde [Ilya Ganelin] Style 966ac06 [Ilya Ganelin] style checks dabb7e6 [Ilya Ganelin] Added Python tests a3f4152 [Ilya Ganelin] added python bindings and better comments e6e536c [Ilya Ganelin] Added extra space 7529a2e [Ilya Ganelin] Fixed formatting d388f86 [Ilya Ganelin] Fixed small bug c4e3bf5 [Ilya Ganelin] Reverted to using parse. Updated parse to support long d7634b6 [Ilya Ganelin] Reverted to fromString to properly support types 22c39d5 [Ilya Ganelin] replaced FromString with DataTypeParser.parse. Replaced empty constructor initializing a null to have it instead create a new array to allow appends to it. faca398 [Ilya Ganelin] [SPARK-8056] Replaced default argument usage. Updated usage and code for DataType.fromString 1acf76e [Ilya Ganelin] Scala style e31c674 [Ilya Ganelin] Fixed bug in test 8dc0795 [Ilya Ganelin] Added tests for creation of StructType object with new methods fdf7e9f [Ilya Ganelin] [SPARK-8056] Created add methods to facilitate building new StructType objects. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6fc254e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6fc254e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6fc254e Branch: refs/heads/master Commit: f6fc254ec4ce5f103d45da6d007b4066ce751236 Parents: 27ef854 Author: Ilya Ganelin ilya.gane...@capitalone.com Authored: Mon Jun 29 14:15:15 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Jun 29 14:15:15 2015 -0700 -- python/pyspark/sql/tests.py | 29 ++ python/pyspark/sql/types.py | 52 +- .../apache/spark/sql/types/DataTypeParser.scala | 2 +- .../org/apache/spark/sql/types/StructType.scala | 104 ++- .../apache/spark/sql/types/DataTypeSuite.scala | 31 ++ 5 files changed, 212 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f6fc254e/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index ffee43a..34f397d 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -516,6 +516,35 @@ class SQLTests(ReusedPySparkTestCase): self.assertEqual([Row(a=2, b=1, c=3), Row(a=4, b=1, c=4)], df.filter(df.a.between(df.b, df.c)).collect()) +def test_struct_type(self): +from pyspark.sql.types import StructType, StringType, StructField +struct1 = StructType().add(f1, StringType(), True).add(f2, StringType(), True, None) +struct2 = StructType([StructField(f1, StringType(), True), + StructField(f2, StringType(), True, None)]) +self.assertEqual(struct1, struct2) + +struct1 = StructType().add(f1, StringType(), True).add(f2, StringType(), True, None) +struct2 = StructType([StructField(f1, StringType(), True)]) +self.assertNotEqual(struct1, struct2) + +struct1 = (StructType().add(StructField(f1, StringType(), True)) + .add(StructField(f2, StringType(), True, None))) +struct2 = StructType([StructField(f1, StringType(), True), + StructField(f2,
spark git commit: [SPARK-8070] [SQL] [PYSPARK] avoid spark jobs in createDataFrame
Repository: spark Updated Branches: refs/heads/master be7ef0676 - afae9766f [SPARK-8070] [SQL] [PYSPARK] avoid spark jobs in createDataFrame Avoid the unnecessary jobs when infer schema from list. cc yhuai mengxr Author: Davies Liu dav...@databricks.com Closes #6606 from davies/improve_create and squashes the following commits: a5928bf [Davies Liu] Update MimaExcludes.scala 62da911 [Davies Liu] fix mima bab4d7d [Davies Liu] Merge branch 'improve_create' of github.com:davies/spark into improve_create eee44a8 [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_create 8d9292d [Davies Liu] Update context.py eb24531 [Davies Liu] Update context.py c969997 [Davies Liu] bug fix d5a8ab0 [Davies Liu] fix tests 8c3f10d [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_create 6ea5925 [Davies Liu] address comments 6ceaeff [Davies Liu] avoid spark jobs in createDataFrame Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/afae9766 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/afae9766 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/afae9766 Branch: refs/heads/master Commit: afae9766f28d2e58297405c39862d20a04267b62 Parents: be7ef06 Author: Davies Liu dav...@databricks.com Authored: Mon Jun 29 13:20:55 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Jun 29 13:20:55 2015 -0700 -- python/pyspark/sql/context.py | 64 -- python/pyspark/sql/types.py | 48 2 files changed, 75 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/afae9766/python/pyspark/sql/context.py -- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index dc23922..4dda3b4 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -203,7 +203,37 @@ class SQLContext(object): self._sc._javaAccumulator, returnType.json()) +def _inferSchemaFromList(self, data): + +Infer schema from list of Row or tuple. + +:param data: list of Row or tuple +:return: StructType + +if not data: +raise ValueError(can not infer schema from empty dataset) +first = data[0] +if type(first) is dict: +warnings.warn(inferring schema from dict is deprecated, + please use pyspark.sql.Row instead) +schema = _infer_schema(first) +if _has_nulltype(schema): +for r in data: +schema = _merge_type(schema, _infer_schema(r)) +if not _has_nulltype(schema): +break +else: +raise ValueError(Some of types cannot be determined after inferring) +return schema + def _inferSchema(self, rdd, samplingRatio=None): + +Infer schema from an RDD of Row or tuple. + +:param rdd: an RDD of Row or tuple +:param samplingRatio: sampling ratio, or no sampling (default) +:return: StructType + first = rdd.first() if not first: raise ValueError(The first row in RDD is empty, @@ -322,6 +352,8 @@ class SQLContext(object): data = [r.tolist() for r in data.to_records(index=False)] if not isinstance(data, RDD): +if not isinstance(data, list): +data = list(data) try: # data could be list, tuple, generator ... rdd = self._sc.parallelize(data) @@ -330,28 +362,26 @@ class SQLContext(object): else: rdd = data -if schema is None: -schema = self._inferSchema(rdd, samplingRatio) +if schema is None or isinstance(schema, (list, tuple)): +if isinstance(data, RDD): +struct = self._inferSchema(rdd, samplingRatio) +else: +struct = self._inferSchemaFromList(data) +if isinstance(schema, (list, tuple)): +for i, name in enumerate(schema): +struct.fields[i].name = name +schema = struct converter = _create_converter(schema) rdd = rdd.map(converter) -if isinstance(schema, (list, tuple)): -first = rdd.first() -if not isinstance(first, (list, tuple)): -raise TypeError(each row in `rdd` should be list or tuple, -but got %r % type(first)) -row_cls = Row(*schema) -schema = self._inferSchema(rdd.map(lambda r: row_cls(*r)), samplingRatio) - -
[4/4] spark git commit: [SPARK-8478] [SQL] Harmonize UDF-related code to use uniformly UDF instead of Udf
[SPARK-8478] [SQL] Harmonize UDF-related code to use uniformly UDF instead of Udf Follow-up of #6902 for being coherent between ```Udf``` and ```UDF``` Author: BenFradet benjamin.fra...@gmail.com Closes #6920 from BenFradet/SPARK-8478 and squashes the following commits: c500f29 [BenFradet] renamed a few variables in functions to use UDF 8ab0f2d [BenFradet] renamed idUdf to idUDF in SQLQuerySuite 98696c2 [BenFradet] renamed originalUdfs in TestHive to originalUDFs 7738f74 [BenFradet] modified HiveUDFSuite to use only UDF c52608d [BenFradet] renamed HiveUdfSuite to HiveUDFSuite e51b9ac [BenFradet] renamed ExtractPythonUdfs to ExtractPythonUDFs 8c756f1 [BenFradet] renamed Hive UDF related code 2a1ca76 [BenFradet] renamed pythonUdfs to pythonUDFs 261e6fb [BenFradet] renamed ScalaUdf to ScalaUDF Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/931da5c8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/931da5c8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/931da5c8 Branch: refs/heads/master Commit: 931da5c8ab271ff2ee04419c7e3c6b0012459694 Parents: c8ae887 Author: BenFradet benjamin.fra...@gmail.com Authored: Mon Jun 29 15:27:13 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Jun 29 15:27:13 2015 -0700 -- .../sql/catalyst/expressions/ScalaUDF.scala | 962 +++ .../sql/catalyst/expressions/ScalaUdf.scala | 962 --- .../scala/org/apache/spark/sql/SQLContext.scala | 4 +- .../org/apache/spark/sql/UDFRegistration.scala | 96 +- .../apache/spark/sql/UserDefinedFunction.scala | 4 +- .../apache/spark/sql/execution/pythonUDFs.scala | 292 ++ .../apache/spark/sql/execution/pythonUdfs.scala | 292 -- .../scala/org/apache/spark/sql/functions.scala | 34 +- .../org/apache/spark/sql/SQLQuerySuite.scala| 4 +- .../org/apache/spark/sql/hive/HiveContext.scala | 4 +- .../org/apache/spark/sql/hive/HiveQl.scala | 2 +- .../org/apache/spark/sql/hive/hiveUDFs.scala| 598 .../org/apache/spark/sql/hive/hiveUdfs.scala| 598 .../apache/spark/sql/hive/test/TestHive.scala | 4 +- .../resources/data/files/testUDF/part-0 | Bin 0 - 153 bytes .../resources/data/files/testUdf/part-0 | Bin 153 - 0 bytes .../spark/sql/hive/execution/HiveUDFSuite.scala | 261 + .../spark/sql/hive/execution/HiveUdfSuite.scala | 261 - 18 files changed, 2189 insertions(+), 2189 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/931da5c8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala new file mode 100644 index 000..dbb4381 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala @@ -0,0 +1,962 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.types.DataType + +/** + * User-defined function. + * @param dataType Return type of function. + */ +case class ScalaUDF(function: AnyRef, dataType: DataType, children: Seq[Expression]) + extends Expression { + + override def nullable: Boolean = true + + override def toString: String = sscalaUDF(${children.mkString(,)}) + + // scalastyle:off + + /** This method has been generated by this script + +(1 to 22).map { x = + val anys = (1 to x).map(x = Any).reduce(_ + , + _) + val childs = (0 to x - 1).map(x = sval child$x = children($x)).reduce(_ + \n + _) + val converters = (0 to x - 1).map(x = slazy val converter$x = CatalystTypeConverters.createToScalaConverter(child$x.dataType)).reduce(_ + \n + _) + val evals = (0 to x - 1).map(x =
[2/4] spark git commit: [SPARK-8478] [SQL] Harmonize UDF-related code to use uniformly UDF instead of Udf
http://git-wip-us.apache.org/repos/asf/spark/blob/931da5c8/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala new file mode 100644 index 000..9e1cff0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala @@ -0,0 +1,292 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the License); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an AS IS BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution + +import java.util.{List = JList, Map = JMap} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +import net.razorvine.pickle.{Pickler, Unpickler} + +import org.apache.spark.{Accumulator, Logging = SparkLogging} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.api.python.{PythonBroadcast, PythonRDD} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * A serialized version of a Python lambda function. Suitable for use in a [[PythonRDD]]. + */ +private[spark] case class PythonUDF( +name: String, +command: Array[Byte], +envVars: JMap[String, String], +pythonIncludes: JList[String], +pythonExec: String, +pythonVer: String, +broadcastVars: JList[Broadcast[PythonBroadcast]], +accumulator: Accumulator[JList[Array[Byte]]], +dataType: DataType, +children: Seq[Expression]) extends Expression with SparkLogging { + + override def toString: String = sPythonUDF#$name(${children.mkString(,)}) + + override def nullable: Boolean = true + + override def eval(input: InternalRow): Any = { +throw new UnsupportedOperationException(PythonUDFs can not be directly evaluated.) + } +} + +/** + * Extracts PythonUDFs from operators, rewriting the query plan so that the UDF can be evaluated + * alone in a batch. + * + * This has the limitation that the input to the Python UDF is not allowed include attributes from + * multiple child operators. + */ +private[spark] object ExtractPythonUDFs extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { +// Skip EvaluatePython nodes. +case plan: EvaluatePython = plan + +case plan: LogicalPlan if plan.resolved = + // Extract any PythonUDFs from the current operator. + val udfs = plan.expressions.flatMap(_.collect { case udf: PythonUDF = udf }) + if (udfs.isEmpty) { +// If there aren't any, we are done. +plan + } else { +// Pick the UDF we are going to evaluate (TODO: Support evaluating multiple UDFs at a time) +// If there is more than one, we will add another evaluation operator in a subsequent pass. +udfs.find(_.resolved) match { + case Some(udf) = +var evaluation: EvaluatePython = null + +// Rewrite the child that has the input required for the UDF +val newChildren = plan.children.map { child = + // Check to make sure that the UDF can be evaluated with only the input of this child. + // Other cases are disallowed as they are ambiguous or would require a cartesian + // product. + if (udf.references.subsetOf(child.outputSet)) { +evaluation = EvaluatePython(udf, child) +evaluation + } else if (udf.references.intersect(child.outputSet).nonEmpty) { +sys.error(sInvalid PythonUDF $udf, requires attributes from more than one child.) + } else { +child + } +} + +assert(evaluation != null, Unable to evaluate PythonUDF. Missing input attributes.) + +// Trim away the new UDF value if it was
[3/4] spark git commit: [SPARK-8478] [SQL] Harmonize UDF-related code to use uniformly UDF instead of Udf
http://git-wip-us.apache.org/repos/asf/spark/blob/931da5c8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala deleted file mode 100644 index 55df72f..000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala +++ /dev/null @@ -1,962 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the License); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.expressions - -import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.types.DataType - -/** - * User-defined function. - * @param dataType Return type of function. - */ -case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression]) - extends Expression { - - override def nullable: Boolean = true - - override def toString: String = sscalaUDF(${children.mkString(,)}) - - // scalastyle:off - - /** This method has been generated by this script - -(1 to 22).map { x = - val anys = (1 to x).map(x = Any).reduce(_ + , + _) - val childs = (0 to x - 1).map(x = sval child$x = children($x)).reduce(_ + \n + _) - val converters = (0 to x - 1).map(x = slazy val converter$x = CatalystTypeConverters.createToScalaConverter(child$x.dataType)).reduce(_ + \n + _) - val evals = (0 to x - 1).map(x = sconverter$x(child$x.eval(input))).reduce(_ + ,\n + _) - - scase $x = - val func = function.asInstanceOf[($anys) = Any] - $childs - $converters - (input: InternalRow) = { -func( - $evals) - } - -}.foreach(println) - - */ - - private[this] val f = children.size match { -case 0 = - val func = function.asInstanceOf[() = Any] - (input: InternalRow) = { -func() - } - -case 1 = - val func = function.asInstanceOf[(Any) = Any] - val child0 = children(0) - lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType) - (input: InternalRow) = { -func( - converter0(child0.eval(input))) - } - -case 2 = - val func = function.asInstanceOf[(Any, Any) = Any] - val child0 = children(0) - val child1 = children(1) - lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType) - lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType) - (input: InternalRow) = { -func( - converter0(child0.eval(input)), - converter1(child1.eval(input))) - } - -case 3 = - val func = function.asInstanceOf[(Any, Any, Any) = Any] - val child0 = children(0) - val child1 = children(1) - val child2 = children(2) - lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType) - lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType) - lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType) - (input: InternalRow) = { -func( - converter0(child0.eval(input)), - converter1(child1.eval(input)), - converter2(child2.eval(input))) - } - -case 4 = - val func = function.asInstanceOf[(Any, Any, Any, Any) = Any] - val child0 = children(0) - val child1 = children(1) - val child2 = children(2) - val child3 = children(3) - lazy val converter0 = CatalystTypeConverters.createToScalaConverter(child0.dataType) - lazy val converter1 = CatalystTypeConverters.createToScalaConverter(child1.dataType) - lazy val converter2 = CatalystTypeConverters.createToScalaConverter(child2.dataType) - lazy val converter3 = CatalystTypeConverters.createToScalaConverter(child3.dataType) - (input: InternalRow) = { -func( - converter0(child0.eval(input)), - converter1(child1.eval(input)), - converter2(child2.eval(input)), - converter3(child3.eval(input))) - } - -case 5 = -
[1/4] spark git commit: [SPARK-8478] [SQL] Harmonize UDF-related code to use uniformly UDF instead of Udf
Repository: spark Updated Branches: refs/heads/master c8ae887ef - 931da5c8a http://git-wip-us.apache.org/repos/asf/spark/blob/931da5c8/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala new file mode 100644 index 000..56b0bef --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.{DataInput, DataOutput} +import java.util +import java.util.Properties + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.ql.udf.generic.{GenericUDAFAverage, GenericUDF} +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory} +import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats} +import org.apache.hadoop.io.Writable +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.hive.test.TestHive + +import org.apache.spark.util.Utils + +import scala.collection.JavaConversions._ + +case class Fields(f1: Int, f2: Int, f3: Int, f4: Int, f5: Int) + +// Case classes for the custom UDF's. +case class IntegerCaseClass(i: Int) +case class ListListIntCaseClass(lli: Seq[(Int, Int, Int)]) +case class StringCaseClass(s: String) +case class ListStringCaseClass(l: Seq[String]) + +/** + * A test suite for Hive custom UDFs. + */ +class HiveUDFSuite extends QueryTest { + + import TestHive.{udf, sql} + import TestHive.implicits._ + + test(spark sql udf test that returns a struct) { +udf.register(getStruct, (_: Int) = Fields(1, 2, 3, 4, 5)) +assert(sql( + +|SELECT getStruct(1).f1, +| getStruct(1).f2, +| getStruct(1).f3, +| getStruct(1).f4, +| getStruct(1).f5 FROM src LIMIT 1 + .stripMargin).head() === Row(1, 2, 3, 4, 5)) + } + + test(SPARK-4785 When called with arguments referring column fields, PMOD throws NPE) { +checkAnswer( + sql(SELECT PMOD(CAST(key as INT), 10) FROM src LIMIT 1), + Row(8) +) + } + + test(hive struct udf) { +sql( + + |CREATE EXTERNAL TABLE hiveUDFTestTable ( + | pair STRUCTid: INT, value: INT + |) + |PARTITIONED BY (partition STRING) + |ROW FORMAT SERDE '%s' + |STORED AS SEQUENCEFILE +. +stripMargin.format(classOf[PairSerDe].getName)) + +val location = Utils.getSparkClassLoader.getResource(data/files/testUDF).getFile +sql(s + ALTER TABLE hiveUDFTestTable + ADD IF NOT EXISTS PARTITION(partition='testUDF') + LOCATION '$location') + +sql(sCREATE TEMPORARY FUNCTION testUDF AS '${classOf[PairUDF].getName}') +sql(SELECT testUDF(pair) FROM hiveUDFTestTable) +sql(DROP TEMPORARY FUNCTION IF EXISTS testUDF) + } + + test(SPARK-6409 UDAFAverage test) { +sql(sCREATE TEMPORARY FUNCTION test_avg AS '${classOf[GenericUDAFAverage].getName}') +checkAnswer( + sql(SELECT test_avg(1), test_avg(substr(value,5)) FROM src), + Seq(Row(1.0, 260.182))) +sql(DROP TEMPORARY FUNCTION IF EXISTS test_avg) +TestHive.reset() + } + + test(SPARK-2693 udaf aggregates test) { +checkAnswer(sql(SELECT percentile(key, 1) FROM src LIMIT 1), + sql(SELECT max(key) FROM src).collect().toSeq) + +checkAnswer(sql(SELECT percentile(key, array(1, 1)) FROM src LIMIT 1), + sql(SELECT array(max(key), max(key)) FROM src).collect().toSeq) + } + + test(Generic UDAF aggregates) { +checkAnswer(sql(SELECT ceiling(percentile_approx(key, 0.9)) FROM src LIMIT 1), + sql(SELECT max(key) FROM src LIMIT 1).collect().toSeq) + +checkAnswer(sql(SELECT percentile_approx(100.0, array(0.9, 0.9)) FROM src LIMIT 1), + sql(SELECT array(100, 100) FROM src LIMIT
spark git commit: [SPARK-7810] [PYSPARK] solve python rdd socket connection problem
Repository: spark Updated Branches: refs/heads/branch-1.3 ac3591d09 - 0ce83db11 [SPARK-7810] [PYSPARK] solve python rdd socket connection problem Method _load_from_socket in rdd.py cannot load data from jvm socket when ipv6 is used. The current method only works well with ipv4. New modification should work around both two protocols. Author: Ai He ai...@ussuning.com Author: AiHe ai...@ussuning.com Closes #6338 from AiHe/pyspark-networking-issue and squashes the following commits: d4fc9c4 [Ai He] handle code review 2 e75c5c8 [Ai He] handle code review 5644953 [AiHe] solve python rdd socket connection problem to jvm (cherry picked from commit ecd3aacf2805bb231cfb44bab079319cfe73c3f1) Signed-off-by: Davies Liu dav...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ce83db1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ce83db1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ce83db1 Branch: refs/heads/branch-1.3 Commit: 0ce83db11f5f91121705a5b72f134e9ffe72d2a6 Parents: ac3591d Author: Ai He ai...@ussuning.com Authored: Mon Jun 29 14:36:26 2015 -0700 Committer: Davies Liu dav...@databricks.com Committed: Mon Jun 29 14:37:54 2015 -0700 -- python/pyspark/rdd.py | 18 +++--- 1 file changed, 15 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0ce83db1/python/pyspark/rdd.py -- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index bd18cb3..f27d7a6 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -112,10 +112,22 @@ def _parse_memory(s): def _load_from_socket(port, serializer): -sock = socket.socket() -sock.settimeout(3) +sock = None +# Support for both IPv4 and IPv6. +# On most of IPv6-ready systems, IPv6 will take precedence. +for res in socket.getaddrinfo(localhost, port, socket.AF_UNSPEC, socket.SOCK_STREAM): +af, socktype, proto, canonname, sa = res +try: +sock = socket.socket(af, socktype, proto) +sock.settimeout(3) +sock.connect(sa) +except socket.error: +sock = None +continue +break +if not sock: +raise Exception(could not open socket) try: -sock.connect((localhost, port)) rf = sock.makefile(rb, 65536) for item in serializer.load_stream(rf): yield item - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7862] [SQL] Disable the error message redirect to stderr
Repository: spark Updated Branches: refs/heads/master 637b4eeda - c6ba2ea34 [SPARK-7862] [SQL] Disable the error message redirect to stderr This is a follow up of #6404, the ScriptTransformation prints the error msg into stderr directly, probably be a disaster for application log. Author: Cheng Hao hao.ch...@intel.com Closes #6882 from chenghao-intel/verbose and squashes the following commits: bfedd77 [Cheng Hao] revert the write 76ff46b [Cheng Hao] update the CircularBuffer 692b19e [Cheng Hao] check the process exitValue for ScriptTransform 47e0970 [Cheng Hao] Use the RedirectThread instead 1de771d [Cheng Hao] naming the threads in ScriptTransformation 8536e81 [Cheng Hao] disable the error message redirection for stderr Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6ba2ea3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6ba2ea3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6ba2ea3 Branch: refs/heads/master Commit: c6ba2ea341ad23de265d870669b25e6a41f461e5 Parents: 637b4ee Author: Cheng Hao hao.ch...@intel.com Authored: Mon Jun 29 12:46:33 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Jun 29 12:46:33 2015 -0700 -- .../scala/org/apache/spark/util/Utils.scala | 33 + .../org/apache/spark/util/UtilsSuite.scala | 8 +++ .../spark/sql/hive/client/ClientWrapper.scala | 29 ++- .../hive/execution/ScriptTransformation.scala | 51 .../sql/hive/execution/SQLQuerySuite.scala | 2 +- 5 files changed, 77 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c6ba2ea3/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 19157af..a7fc749 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2333,3 +2333,36 @@ private[spark] class RedirectThread( } } } + +/** + * An [[OutputStream]] that will store the last 10 kilobytes (by default) written to it + * in a circular buffer. The current contents of the buffer can be accessed using + * the toString method. + */ +private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream { + var pos: Int = 0 + var buffer = new Array[Int](sizeInBytes) + + def write(i: Int): Unit = { +buffer(pos) = i +pos = (pos + 1) % buffer.length + } + + override def toString: String = { +val (end, start) = buffer.splitAt(pos) +val input = new java.io.InputStream { + val iterator = (start ++ end).iterator + + def read(): Int = if (iterator.hasNext) iterator.next() else -1 +} +val reader = new BufferedReader(new InputStreamReader(input)) +val stringBuilder = new StringBuilder +var line = reader.readLine() +while (line != null) { + stringBuilder.append(line) + stringBuilder.append(\n) + line = reader.readLine() +} +stringBuilder.toString() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/c6ba2ea3/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index a61ea39..baa4c66 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -673,4 +673,12 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(!Utils.isInDirectory(nullFile, parentDir)) assert(!Utils.isInDirectory(nullFile, childFile3)) } + + test(circular buffer) { +val buffer = new CircularBuffer(25) +val stream = new java.io.PrintStream(buffer, true, UTF-8) + +stream.println(test circular test circular test circular test circular test circular) +assert(buffer.toString === t circular test circular\n) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/c6ba2ea3/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 4c708ce..cbd2bf6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -22,6 +22,8 @@ import java.net.URI import java.util.{ArrayList =
spark git commit: [SPARK-8681] fixed wrong ordering of columns in crosstab
Repository: spark Updated Branches: refs/heads/master c6ba2ea34 - be7ef0676 [SPARK-8681] fixed wrong ordering of columns in crosstab I specifically randomized the test. What crosstab does is equivalent to a countByKey, therefore if this test fails again for any reason, we will know that we hit a corner case or something. cc rxin marmbrus Author: Burak Yavuz brk...@gmail.com Closes #7060 from brkyvz/crosstab-fixes and squashes the following commits: 0a65234 [Burak Yavuz] addressed comments v1 d96da7e [Burak Yavuz] fixed wrong ordering of columns in crosstab Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be7ef067 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be7ef067 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be7ef067 Branch: refs/heads/master Commit: be7ef067620408859144e0244b0f1b8eb56faa86 Parents: c6ba2ea Author: Burak Yavuz brk...@gmail.com Authored: Mon Jun 29 13:15:04 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Jun 29 13:15:04 2015 -0700 -- .../sql/execution/stat/StatFunctions.scala | 8 +++--- .../apache/spark/sql/DataFrameStatSuite.scala | 28 +++- 2 files changed, 20 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/be7ef067/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 042e2c9..b624ef7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -111,7 +111,7 @@ private[sql] object StatFunctions extends Logging { the pairs. Please try reducing the amount of distinct items in your columns.) } // get the distinct values of column 2, so that we can make them the column names -val distinctCol2 = counts.map(_.get(1)).distinct.zipWithIndex.toMap +val distinctCol2: Map[Any, Int] = counts.map(_.get(1)).distinct.zipWithIndex.toMap val columnSize = distinctCol2.size require(columnSize 1e4, sThe number of distinct values for $col2, can't + sexceed 1e4. Currently $columnSize) @@ -120,14 +120,16 @@ private[sql] object StatFunctions extends Logging { rows.foreach { (row: Row) = // row.get(0) is column 1 // row.get(1) is column 2 -// row.get(3) is the frequency +// row.get(2) is the frequency countsRow.setLong(distinctCol2.get(row.get(1)).get + 1, row.getLong(2)) } // the value of col1 is the first value, the rest are the counts countsRow.update(0, UTF8String.fromString(col1Item.toString)) countsRow }.toSeq -val headerNames = distinctCol2.map(r = StructField(r._1.toString, LongType)).toSeq +// In the map, the column names (._1) are not ordered by the index (._2). This was the bug in +// SPARK-8681. We need to explicitly sort by the column index and assign the column names. +val headerNames = distinctCol2.toSeq.sortBy(_._2).map(r = StructField(r._1.toString, LongType)) val schema = StructType(StructField(tableName, StringType) +: headerNames) new DataFrame(df.sqlContext, LocalRelation(schema.toAttributes, table)).na.fill(0.0) http://git-wip-us.apache.org/repos/asf/spark/blob/be7ef067/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 0d3ff89..64ec1a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import java.util.Random + import org.scalatest.Matchers._ import org.apache.spark.SparkFunSuite @@ -65,22 +67,22 @@ class DataFrameStatSuite extends SparkFunSuite { } test(crosstab) { -val df = Seq((0, 0), (2, 1), (1, 0), (2, 0), (0, 0), (2, 0)).toDF(a, b) +val rng = new Random() +val data = Seq.tabulate(25)(i = (rng.nextInt(5), rng.nextInt(10))) +val df = data.toDF(a, b) val crosstab = df.stat.crosstab(a, b) val columnNames = crosstab.schema.fieldNames assert(columnNames(0) === a_b) -assert(columnNames(1) === 0) -assert(columnNames(2) === 1) -val rows: Array[Row] = crosstab.collect().sortBy(_.getString(0)) -assert(rows(0).get(0).toString === 0) -
spark git commit: [SPARK-8681] fixed wrong ordering of columns in crosstab
Repository: spark Updated Branches: refs/heads/branch-1.4 da51cf58f - 6b9f3831a [SPARK-8681] fixed wrong ordering of columns in crosstab I specifically randomized the test. What crosstab does is equivalent to a countByKey, therefore if this test fails again for any reason, we will know that we hit a corner case or something. cc rxin marmbrus Author: Burak Yavuz brk...@gmail.com Closes #7060 from brkyvz/crosstab-fixes and squashes the following commits: 0a65234 [Burak Yavuz] addressed comments v1 d96da7e [Burak Yavuz] fixed wrong ordering of columns in crosstab (cherry picked from commit be7ef067620408859144e0244b0f1b8eb56faa86) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b9f3831 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b9f3831 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b9f3831 Branch: refs/heads/branch-1.4 Commit: 6b9f3831a8d417362bfb42c260613eb7da850f12 Parents: da51cf5 Author: Burak Yavuz brk...@gmail.com Authored: Mon Jun 29 13:15:04 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Jun 29 13:15:12 2015 -0700 -- .../sql/execution/stat/StatFunctions.scala | 8 +++--- .../apache/spark/sql/DataFrameStatSuite.scala | 28 +++- 2 files changed, 20 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6b9f3831/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index 93383e5..e4a525e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -110,7 +110,7 @@ private[sql] object StatFunctions extends Logging { the pairs. Please try reducing the amount of distinct items in your columns.) } // get the distinct values of column 2, so that we can make them the column names -val distinctCol2 = counts.map(_.get(1)).distinct.zipWithIndex.toMap +val distinctCol2: Map[Any, Int] = counts.map(_.get(1)).distinct.zipWithIndex.toMap val columnSize = distinctCol2.size require(columnSize 1e4, sThe number of distinct values for $col2, can't + sexceed 1e4. Currently $columnSize) @@ -119,14 +119,16 @@ private[sql] object StatFunctions extends Logging { rows.foreach { (row: Row) = // row.get(0) is column 1 // row.get(1) is column 2 -// row.get(3) is the frequency +// row.get(2) is the frequency countsRow.setLong(distinctCol2.get(row.get(1)).get + 1, row.getLong(2)) } // the value of col1 is the first value, the rest are the counts countsRow.setString(0, col1Item.toString) countsRow }.toSeq -val headerNames = distinctCol2.map(r = StructField(r._1.toString, LongType)).toSeq +// In the map, the column names (._1) are not ordered by the index (._2). This was the bug in +// SPARK-8681. We need to explicitly sort by the column index and assign the column names. +val headerNames = distinctCol2.toSeq.sortBy(_._2).map(r = StructField(r._1.toString, LongType)) val schema = StructType(StructField(tableName, StringType) +: headerNames) new DataFrame(df.sqlContext, LocalRelation(schema.toAttributes, table)).na.fill(0.0) http://git-wip-us.apache.org/repos/asf/spark/blob/6b9f3831/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 438f479..f740fb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import java.util.Random + import org.scalatest.Matchers._ import org.apache.spark.SparkFunSuite @@ -65,22 +67,22 @@ class DataFrameStatSuite extends SparkFunSuite { } test(crosstab) { -val df = Seq((0, 0), (2, 1), (1, 0), (2, 0), (0, 0), (2, 0)).toDF(a, b) +val rng = new Random() +val data = Seq.tabulate(25)(i = (rng.nextInt(5), rng.nextInt(10))) +val df = data.toDF(a, b) val crosstab = df.stat.crosstab(a, b) val columnNames = crosstab.schema.fieldNames assert(columnNames(0) === a_b) -assert(columnNames(1) === 0) -assert(columnNames(2) === 1) -val rows:
spark git commit: [SPARK-8709] Exclude hadoop-client's mockito-all dependency
Repository: spark Updated Branches: refs/heads/master afae9766f - 27ef85451 [SPARK-8709] Exclude hadoop-client's mockito-all dependency This patch excludes `hadoop-client`'s dependency on `mockito-all`. As of #7061, Spark depends on `mockito-core` instead of `mockito-all`, so the dependency from Hadoop was leading to test compilation failures for some of the Hadoop 2 SBT builds. Author: Josh Rosen joshro...@databricks.com Closes #7090 from JoshRosen/SPARK-8709 and squashes the following commits: e190122 [Josh Rosen] [SPARK-8709] Exclude hadoop-client's mockito-all dependency. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27ef8545 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27ef8545 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27ef8545 Branch: refs/heads/master Commit: 27ef85451cd237caa7016baa69957a35ab365aa8 Parents: afae976 Author: Josh Rosen joshro...@databricks.com Authored: Mon Jun 29 14:07:55 2015 -0700 Committer: Josh Rosen joshro...@databricks.com Committed: Mon Jun 29 14:07:55 2015 -0700 -- LICENSE | 2 +- core/pom.xml | 10 -- launcher/pom.xml | 6 -- pom.xml | 8 4 files changed, 9 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/27ef8545/LICENSE -- diff --git a/LICENSE b/LICENSE index 8672be5..f9e412c 100644 --- a/LICENSE +++ b/LICENSE @@ -948,6 +948,6 @@ The following components are provided under the MIT License. See project link fo (MIT License) SLF4J LOG4J-12 Binding (org.slf4j:slf4j-log4j12:1.7.5 - http://www.slf4j.org) (MIT License) pyrolite (org.spark-project:pyrolite:2.0.1 - http://pythonhosted.org/Pyro4/) (MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt) - (The MIT License) Mockito (org.mockito:mockito-core:1.8.5 - http://www.mockito.org) + (The MIT License) Mockito (org.mockito:mockito-core:1.9.5 - http://www.mockito.org) (MIT License) jquery (https://jquery.org/license/) (MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs) http://git-wip-us.apache.org/repos/asf/spark/blob/27ef8545/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 565437c..aee0d92 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -69,16 +69,6 @@ dependency groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId - exclusions -exclusion - groupIdjavax.servlet/groupId - artifactIdservlet-api/artifactId -/exclusion -exclusion - groupIdorg.codehaus.jackson/groupId - artifactIdjackson-mapper-asl/artifactId -/exclusion - /exclusions /dependency dependency groupIdorg.apache.spark/groupId http://git-wip-us.apache.org/repos/asf/spark/blob/27ef8545/launcher/pom.xml -- diff --git a/launcher/pom.xml b/launcher/pom.xml index a853e67..2fd768d 100644 --- a/launcher/pom.xml +++ b/launcher/pom.xml @@ -68,12 +68,6 @@ groupIdorg.apache.hadoop/groupId artifactIdhadoop-client/artifactId scopetest/scope - exclusions -exclusion - groupIdorg.codehaus.jackson/groupId - artifactIdjackson-mapper-asl/artifactId -/exclusion - /exclusions /dependency /dependencies http://git-wip-us.apache.org/repos/asf/spark/blob/27ef8545/pom.xml -- diff --git a/pom.xml b/pom.xml index 4c18bd5..94dd512 100644 --- a/pom.xml +++ b/pom.xml @@ -748,6 +748,10 @@ artifactIdasm/artifactId /exclusion exclusion +groupIdorg.codehaus.jackson/groupId +artifactIdjackson-mapper-asl/artifactId + /exclusion + exclusion groupIdorg.ow2.asm/groupId artifactIdasm/artifactId /exclusion @@ -760,6 +764,10 @@ artifactIdcommons-logging/artifactId /exclusion exclusion +groupIdorg.mockito/groupId +artifactIdmockito-all/artifactId + /exclusion + exclusion groupIdorg.mortbay.jetty/groupId artifactIdservlet-api-2.5/artifactId /exclusion - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8567] [SQL] Add logs to record the progress of HiveSparkSubmitSuite (1.4 branch)
Repository: spark Updated Branches: refs/heads/branch-1.4 187015f67 - 0de1737a8 [SPARK-8567] [SQL] Add logs to record the progress of HiveSparkSubmitSuite (1.4 branch) Cherry-pick f9b397f54d1c491680d70aba210bb8211fd249c1 to branch 1.4. Author: Yin Huai yh...@databricks.com Closes #7092 from yhuai/SPARK-8567-1.4 and squashes the following commits: 0ae2e14 [Yin Huai] [SPARK-8567] [SQL] Add logs to record the progress of HiveSparkSubmitSuite. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0de1737a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0de1737a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0de1737a Branch: refs/heads/branch-1.4 Commit: 0de1737a8adf458aa578cf18b1bef1eb618c5783 Parents: 187015f Author: Yin Huai yh...@databricks.com Authored: Mon Jun 29 15:20:35 2015 -0700 Committer: Yin Huai yh...@databricks.com Committed: Mon Jun 29 15:20:35 2015 -0700 -- .../org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala| 9 + 1 file changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0de1737a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 8ca7a80..dde0d9e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -107,6 +107,7 @@ object SparkSubmitClassLoaderTest extends Logging { val sc = new SparkContext(conf) val hiveContext = new TestHiveContext(sc) val df = hiveContext.createDataFrame((1 to 100).map(i = (i, i))).toDF(i, j) +logInfo(Testing load classes at the driver side.) // First, we load classes at driver side. try { Class.forName(args(0), true, Thread.currentThread().getContextClassLoader) @@ -116,6 +117,7 @@ object SparkSubmitClassLoaderTest extends Logging { throw new Exception(Could not load user class from jar:\n, t) } // Second, we load classes at the executor side. +logInfo(Testing load classes at the executor side.) val result = df.mapPartitions { x = var exception: String = null try { @@ -133,6 +135,7 @@ object SparkSubmitClassLoaderTest extends Logging { } // Load a Hive UDF from the jar. +logInfo(Registering temporary Hive UDF provided in a jar.) hiveContext.sql( |CREATE TEMPORARY FUNCTION example_max @@ -142,18 +145,23 @@ object SparkSubmitClassLoaderTest extends Logging { hiveContext.createDataFrame((1 to 10).map(i = (i, sstr$i))).toDF(key, val) source.registerTempTable(sourceTable) // Load a Hive SerDe from the jar. +logInfo(Creating a Hive table with a SerDe provided in a jar.) hiveContext.sql( |CREATE TABLE t1(key int, val string) |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' .stripMargin) // Actually use the loaded UDF and SerDe. +logInfo(Writing data into the table.) hiveContext.sql( INSERT INTO TABLE t1 SELECT example_max(key) as key, val FROM sourceTable GROUP BY val) +logInfo(Running a simple query on the table.) val count = hiveContext.table(t1).orderBy(key, val).count() if (count != 10) { throw new Exception(stable t1 should have 10 rows instead of $count rows) } +logInfo(Test finishes.) +sc.stop() } } @@ -191,5 +199,6 @@ object SparkSQLConfTest extends Logging { val hiveContext = new TestHiveContext(sc) // Run a simple command to make sure all lazy vals in hiveContext get instantiated. hiveContext.tables().collect() +sc.stop() } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8355] [SQL] Python DataFrameReader/Writer should mirror Scala
Repository: spark Updated Branches: refs/heads/branch-1.4 e1bbf1a08 - 9d9c4b476 [SPARK-8355] [SQL] Python DataFrameReader/Writer should mirror Scala I compared PySpark DataFrameReader/Writer against Scala ones. `Option` function is missing in both reader and writer, but the rest seems to all match. I added `Option` to reader and writer and updated the `pyspark-sql` test. Author: Cheolsoo Park cheols...@netflix.com Closes #7078 from piaozhexiu/SPARK-8355 and squashes the following commits: c63d419 [Cheolsoo Park] Fix version 524e0aa [Cheolsoo Park] Add option function to df reader and writer (cherry picked from commit ac2e17b01c0843d928a363d2cc4faf57ec8c8b47) Signed-off-by: Reynold Xin r...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d9c4b47 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d9c4b47 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d9c4b47 Branch: refs/heads/branch-1.4 Commit: 9d9c4b476d194f8f0102c4bf0fc263de9d1fb3be Parents: e1bbf1a Author: Cheolsoo Park cheols...@netflix.com Authored: Mon Jun 29 00:13:39 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Jun 29 00:21:31 2015 -0700 -- python/pyspark/sql/readwriter.py | 14 ++ python/pyspark/sql/tests.py | 1 + 2 files changed, 15 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9d9c4b47/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 1b7bc0f..c4cc62e 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -73,6 +73,13 @@ class DataFrameReader(object): self._jreader = self._jreader.schema(jschema) return self +@since(1.5) +def option(self, key, value): +Adds an input option for the underlying data source. + +self._jreader = self._jreader.option(key, value) +return self + @since(1.4) def options(self, **options): Adds input options for the underlying data source. @@ -235,6 +242,13 @@ class DataFrameWriter(object): self._jwrite = self._jwrite.format(source) return self +@since(1.5) +def option(self, key, value): +Adds an output option for the underlying data source. + +self._jwrite = self._jwrite.option(key, value) +return self + @since(1.4) def options(self, **options): Adds output options for the underlying data source. http://git-wip-us.apache.org/repos/asf/spark/blob/9d9c4b47/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 5c25890..f902776 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -549,6 +549,7 @@ class SQLTests(ReusedPySparkTestCase): self.assertEqual(sorted(df.collect()), sorted(actual.collect())) df.write.mode(overwrite).options(noUse=this options will not be used in save.)\ +.option(noUse, this option will not be used in save.)\ .format(json).save(path=tmpPath) actual =\ self.sqlCtx.read.format(json)\ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8698] partitionBy in Python DataFrame reader/writer interface should not default to empty tuple.
Repository: spark Updated Branches: refs/heads/master ac2e17b01 - 660c6cec7 [SPARK-8698] partitionBy in Python DataFrame reader/writer interface should not default to empty tuple. Author: Reynold Xin r...@databricks.com Closes #7079 from rxin/SPARK-8698 and squashes the following commits: 8513e1c [Reynold Xin] [SPARK-8698] partitionBy in Python DataFrame reader/writer interface should not default to empty tuple. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/660c6cec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/660c6cec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/660c6cec Branch: refs/heads/master Commit: 660c6cec75dc165cf5d62cdc1b0951bdb93df365 Parents: ac2e17b Author: Reynold Xin r...@databricks.com Authored: Mon Jun 29 00:22:44 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Jun 29 00:22:44 2015 -0700 -- python/pyspark/sql/readwriter.py | 21 + 1 file changed, 13 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/660c6cec/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index c4cc62e..882a030 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -270,12 +270,11 @@ class DataFrameWriter(object): if len(cols) == 1 and isinstance(cols[0], (list, tuple)): cols = cols[0] -if len(cols) 0: -self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols)) +self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols)) return self @since(1.4) -def save(self, path=None, format=None, mode=None, partitionBy=(), **options): +def save(self, path=None, format=None, mode=None, partitionBy=None, **options): Saves the contents of the :class:`DataFrame` to a data source. The data source is specified by the ``format`` and a set of ``options``. @@ -295,7 +294,9 @@ class DataFrameWriter(object): df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data')) -self.partitionBy(partitionBy).mode(mode).options(**options) +self.mode(mode).options(**options) +if partitionBy is not None: +self.partitionBy(partitionBy) if format is not None: self.format(format) if path is None: @@ -315,7 +316,7 @@ class DataFrameWriter(object): self._jwrite.mode(overwrite if overwrite else append).insertInto(tableName) @since(1.4) -def saveAsTable(self, name, format=None, mode=None, partitionBy=(), **options): +def saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options): Saves the content of the :class:`DataFrame` as the specified table. In the case the table already exists, behavior of this function depends on the @@ -334,7 +335,9 @@ class DataFrameWriter(object): :param partitionBy: names of partitioning columns :param options: all other string options -self.partitionBy(partitionBy).mode(mode).options(**options) +self.mode(mode).options(**options) +if partitionBy is not None: +self.partitionBy(partitionBy) if format is not None: self.format(format) self._jwrite.saveAsTable(name) @@ -356,7 +359,7 @@ class DataFrameWriter(object): self.mode(mode)._jwrite.json(path) @since(1.4) -def parquet(self, path, mode=None, partitionBy=()): +def parquet(self, path, mode=None, partitionBy=None): Saves the content of the :class:`DataFrame` in Parquet format at the specified path. :param path: the path in any Hadoop supported file system @@ -370,7 +373,9 @@ class DataFrameWriter(object): df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) -self.partitionBy(partitionBy).mode(mode) +self.mode(mode) +if partitionBy is not None: +self.partitionBy(partitionBy) self._jwrite.parquet(path) @since(1.4) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8355] [SQL] Python DataFrameReader/Writer should mirror Scala
Repository: spark Updated Branches: refs/heads/master 0b10662fe - ac2e17b01 [SPARK-8355] [SQL] Python DataFrameReader/Writer should mirror Scala I compared PySpark DataFrameReader/Writer against Scala ones. `Option` function is missing in both reader and writer, but the rest seems to all match. I added `Option` to reader and writer and updated the `pyspark-sql` test. Author: Cheolsoo Park cheols...@netflix.com Closes #7078 from piaozhexiu/SPARK-8355 and squashes the following commits: c63d419 [Cheolsoo Park] Fix version 524e0aa [Cheolsoo Park] Add option function to df reader and writer Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac2e17b0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac2e17b0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac2e17b0 Branch: refs/heads/master Commit: ac2e17b01c0843d928a363d2cc4faf57ec8c8b47 Parents: 0b10662 Author: Cheolsoo Park cheols...@netflix.com Authored: Mon Jun 29 00:13:39 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Jun 29 00:13:39 2015 -0700 -- python/pyspark/sql/readwriter.py | 14 ++ python/pyspark/sql/tests.py | 1 + 2 files changed, 15 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ac2e17b0/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 1b7bc0f..c4cc62e 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -73,6 +73,13 @@ class DataFrameReader(object): self._jreader = self._jreader.schema(jschema) return self +@since(1.5) +def option(self, key, value): +Adds an input option for the underlying data source. + +self._jreader = self._jreader.option(key, value) +return self + @since(1.4) def options(self, **options): Adds input options for the underlying data source. @@ -235,6 +242,13 @@ class DataFrameWriter(object): self._jwrite = self._jwrite.format(source) return self +@since(1.5) +def option(self, key, value): +Adds an output option for the underlying data source. + +self._jwrite = self._jwrite.option(key, value) +return self + @since(1.4) def options(self, **options): Adds output options for the underlying data source. http://git-wip-us.apache.org/repos/asf/spark/blob/ac2e17b0/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index e6a434e..ffee43a 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -564,6 +564,7 @@ class SQLTests(ReusedPySparkTestCase): self.assertEqual(sorted(df.collect()), sorted(actual.collect())) df.write.mode(overwrite).options(noUse=this options will not be used in save.)\ +.option(noUse, this option will not be used in save.)\ .format(json).save(path=tmpPath) actual =\ self.sqlCtx.read.format(json)\ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8634] [STREAMING] [TESTS] Fix flaky test StreamingListenerSuite receiver info reporting
Repository: spark Updated Branches: refs/heads/branch-1.4 6a45d86db - f84f24769 [SPARK-8634] [STREAMING] [TESTS] Fix flaky test StreamingListenerSuite receiver info reporting As per the unit test log in https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/35754/ ``` 15/06/24 23:09:10.210 Thread-3495 INFO ReceiverTracker: Starting 1 receivers 15/06/24 23:09:10.270 Thread-3495 INFO SparkContext: Starting job: apply at Transformer.scala:22 ... 15/06/24 23:09:14.259 ForkJoinPool-4-worker-29 INFO StreamingListenerSuiteReceiver: Started receiver and sleeping 15/06/24 23:09:14.270 ForkJoinPool-4-worker-29 INFO StreamingListenerSuiteReceiver: Reporting error and sleeping ``` it needs at least 4 seconds to receive all receiver events in this slow machine, but `timeout` for `eventually` is only 2 seconds. This PR increases `timeout` to make this test stable. Author: zsxwing zsxw...@gmail.com Closes #7017 from zsxwing/SPARK-8634 and squashes the following commits: 719cae4 [zsxwing] Fix flaky test StreamingListenerSuite receiver info reporting (cherry picked from commit cec98525fd2b731cb78935bf7bc6c7963411744e) Signed-off-by: Andrew Or and...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f84f2476 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f84f2476 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f84f2476 Branch: refs/heads/branch-1.4 Commit: f84f24769a678fcee78c2bbc464d0fe2057223be Parents: 6a45d86 Author: zsxwing zsxw...@gmail.com Authored: Mon Jun 29 17:19:05 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Mon Jun 29 17:19:24 2015 -0700 -- .../scala/org/apache/spark/streaming/StreamingListenerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f84f2476/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index 1dc8960..7bc7727 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -116,7 +116,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { ssc.start() try { - eventually(timeout(2000 millis), interval(20 millis)) { + eventually(timeout(30 seconds), interval(20 millis)) { collector.startedReceiverStreamIds.size should equal (1) collector.startedReceiverStreamIds(0) should equal (0) collector.stoppedReceiverStreamIds should have size 1 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8019] [SPARKR] Support SparkR spawning worker R processes with a command other then Rscript
Repository: spark Updated Branches: refs/heads/master d7f796da4 - 4a9e03fa8 [SPARK-8019] [SPARKR] Support SparkR spawning worker R processes with a command other then Rscript This is a simple change to add a new environment variable spark.sparkr.r.command that specifies the command that SparkR will use when creating an R engine process. If this is not specified, Rscript will be used by default. I did not add any documentation, since I couldn't find any place where environment variables (such as spark.sparkr.use.daemon) are documented. I also did not add a unit test. The only test that would work generally would be one starting SparkR with sparkR.init(sparkEnvir=list(spark.sparkr.r.command=Rscript)), just using the default value. I think that this is a low-risk change. Likely committers: shivaram Author: Michael Sannella x268 msann...@tibco.com Closes #6557 from msannell/altR and squashes the following commits: 7eac142 [Michael Sannella x268] add spark.sparkr.r.command config parameter Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a9e03fa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a9e03fa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a9e03fa Branch: refs/heads/master Commit: 4a9e03fa850af9e4ee56d011671faa04fb601170 Parents: d7f796d Author: Michael Sannella x268 msann...@tibco.com Authored: Mon Jun 29 17:28:28 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Mon Jun 29 17:28:28 2015 -0700 -- core/src/main/scala/org/apache/spark/api/r/RRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4a9e03fa/core/src/main/scala/org/apache/spark/api/r/RRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala index 4dfa732..5246765 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala @@ -391,7 +391,7 @@ private[r] object RRDD { } private def createRProcess(rLibDir: String, port: Int, script: String): BufferedStreamThread = { -val rCommand = Rscript +val rCommand = SparkEnv.get.conf.get(spark.sparkr.r.command, Rscript) val rOptions = --vanilla val rExecScript = rLibDir + /SparkR/worker/ + script val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8715] ArrayOutOfBoundsException fixed for DataFrameStatSuite.crosstab
Repository: spark Updated Branches: refs/heads/branch-1.4 80d53565a - ffc793a6c [SPARK-8715] ArrayOutOfBoundsException fixed for DataFrameStatSuite.crosstab cc yhuai Author: Burak Yavuz brk...@gmail.com Closes #7100 from brkyvz/ct-flakiness-fix and squashes the following commits: abc299a [Burak Yavuz] change 'to' to until 7e96d7c [Burak Yavuz] ArrayOutOfBoundsException fixed for DataFrameStatSuite.crosstab (cherry picked from commit ecacb1e88a135c802e253793e7c863d6ca8d2408) Signed-off-by: Yin Huai yh...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ffc793a6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ffc793a6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ffc793a6 Branch: refs/heads/branch-1.4 Commit: ffc793a6caaf8e6190651af4ff2d46492f062fc0 Parents: 80d5356 Author: Burak Yavuz brk...@gmail.com Authored: Mon Jun 29 18:48:28 2015 -0700 Committer: Yin Huai yh...@databricks.com Committed: Mon Jun 29 18:48:38 2015 -0700 -- .../src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ffc793a6/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index f740fb2..3e87eba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -78,7 +78,7 @@ class DataFrameStatSuite extends SparkFunSuite { val rows = crosstab.collect() rows.foreach { row = val i = row.getString(0).toInt - for (col - 1 to 9) { + for (col - 1 until columnNames.length) { val j = columnNames(col).toInt assert(row.getLong(col) === expected.getOrElse((i, j), 0).toLong) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8437] [DOCS] Using directory path without wildcard for filename slow for large number of files with wholeTextFiles and binaryFiles
Repository: spark Updated Branches: refs/heads/branch-1.4 cdfa388dd - b2684557f [SPARK-8437] [DOCS] Using directory path without wildcard for filename slow for large number of files with wholeTextFiles and binaryFiles Note that 'dir/*' can be more efficient in some Hadoop FS implementations that 'dir/' Author: Sean Owen so...@cloudera.com Closes #7036 from srowen/SPARK-8437 and squashes the following commits: 0e813ae [Sean Owen] Note that 'dir/*' can be more efficient in some Hadoop FS implementations that 'dir/' (cherry picked from commit 5d30eae56051c563a8427f330b09ef66db0a0d21) Signed-off-by: Andrew Or and...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2684557 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2684557 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2684557 Branch: refs/heads/branch-1.4 Commit: b2684557fa0d2ec14b7529324443c8154d81c348 Parents: cdfa388 Author: Sean Owen so...@cloudera.com Authored: Mon Jun 29 17:21:35 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Mon Jun 29 17:21:47 2015 -0700 -- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b2684557/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b4c0d4c..f8af710 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -824,6 +824,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * }}} * * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * @note On some filesystems, `.../path/*` can be a more efficient way to read all files in a directory + * rather than `.../path/` or `.../path` * * @param minPartitions A suggestion value of the minimal splitting number for input data. */ @@ -871,9 +873,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * (a-hdfs-path/part-n, its content) * }}} * - * @param minPartitions A suggestion value of the minimal splitting number for input data. - * * @note Small files are preferred; very large files may cause bad performance. + * @note On some filesystems, `.../path/*` can be a more efficient way to read all files in a directory + * rather than `.../path/` or `.../path` + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. */ @Experimental def binaryFiles( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8456] [ML] Ngram featurizer python
Repository: spark Updated Branches: refs/heads/master 4c1808be4 - 620605a4a [SPARK-8456] [ML] Ngram featurizer python Python API for N-gram feature transformer Author: Feynman Liang fli...@databricks.com Closes #6960 from feynmanliang/ngram-featurizer-python and squashes the following commits: f9e37c9 [Feynman Liang] Remove debugging code 4dd81f4 [Feynman Liang] Fix typo and doctest 06c79ac [Feynman Liang] Style guide 26c1175 [Feynman Liang] Add python NGram API Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/620605a4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/620605a4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/620605a4 Branch: refs/heads/master Commit: 620605a4a1123afaab2674e38251f1231dea17ce Parents: 4c1808b Author: Feynman Liang fli...@databricks.com Authored: Mon Jun 29 18:40:30 2015 -0700 Committer: Joseph K. Bradley jos...@databricks.com Committed: Mon Jun 29 18:40:30 2015 -0700 -- python/pyspark/ml/feature.py | 71 ++- python/pyspark/ml/tests.py | 11 ++ 2 files changed, 81 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/620605a4/python/pyspark/ml/feature.py -- diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index ddb33f4..8804dac 100644 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -21,7 +21,7 @@ from pyspark.ml.util import keyword_only from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaTransformer from pyspark.mllib.common import inherit_doc -__all__ = ['Binarizer', 'HashingTF', 'IDF', 'IDFModel', 'Normalizer', 'OneHotEncoder', +__all__ = ['Binarizer', 'HashingTF', 'IDF', 'IDFModel', 'NGram', 'Normalizer', 'OneHotEncoder', 'PolynomialExpansion', 'RegexTokenizer', 'StandardScaler', 'StandardScalerModel', 'StringIndexer', 'StringIndexerModel', 'Tokenizer', 'VectorAssembler', 'VectorIndexer', 'Word2Vec', 'Word2VecModel'] @@ -266,6 +266,75 @@ class IDFModel(JavaModel): @inherit_doc +@ignore_unicode_prefix +class NGram(JavaTransformer, HasInputCol, HasOutputCol): + +A feature transformer that converts the input array of strings into an array of n-grams. Null +values in the input array are ignored. +It returns an array of n-grams where each n-gram is represented by a space-separated string of +words. +When the input is empty, an empty array is returned. +When the input array length is less than n (number of elements per n-gram), no n-grams are +returned. + + df = sqlContext.createDataFrame([Row(inputTokens=[a, b, c, d, e])]) + ngram = NGram(n=2, inputCol=inputTokens, outputCol=nGrams) + ngram.transform(df).head() +Row(inputTokens=[u'a', u'b', u'c', u'd', u'e'], nGrams=[u'a b', u'b c', u'c d', u'd e']) + # Change n-gram length + ngram.setParams(n=4).transform(df).head() +Row(inputTokens=[u'a', u'b', u'c', u'd', u'e'], nGrams=[u'a b c d', u'b c d e']) + # Temporarily modify output column. + ngram.transform(df, {ngram.outputCol: output}).head() +Row(inputTokens=[u'a', u'b', u'c', u'd', u'e'], output=[u'a b c d', u'b c d e']) + ngram.transform(df).head() +Row(inputTokens=[u'a', u'b', u'c', u'd', u'e'], nGrams=[u'a b c d', u'b c d e']) + # Must use keyword arguments to specify params. + ngram.setParams(text) +Traceback (most recent call last): +... +TypeError: Method setParams forces keyword arguments. + + +# a placeholder to make it appear in the generated doc +n = Param(Params._dummy(), n, number of elements per n-gram (=1)) + +@keyword_only +def __init__(self, n=2, inputCol=None, outputCol=None): + +__init__(self, n=2, inputCol=None, outputCol=None) + +super(NGram, self).__init__() +self._java_obj = self._new_java_obj(org.apache.spark.ml.feature.NGram, self.uid) +self.n = Param(self, n, number of elements per n-gram (=1)) +self._setDefault(n=2) +kwargs = self.__init__._input_kwargs +self.setParams(**kwargs) + +@keyword_only +def setParams(self, n=2, inputCol=None, outputCol=None): + +setParams(self, n=2, inputCol=None, outputCol=None) +Sets params for this NGram. + +kwargs = self.setParams._input_kwargs +return self._set(**kwargs) + +def setN(self, value): + +Sets the value of :py:attr:`n`. + +self._paramMap[self.n] = value +return self + +def getN(self): + +Gets the value of n or its default value. + +return self.getOrDefault(self.n) + + +@inherit_doc class
spark git commit: [SPARK-8410] [SPARK-8475] remove previous ivy resolution when using spark-submit
Repository: spark Updated Branches: refs/heads/master 5d30eae56 - d7f796da4 [SPARK-8410] [SPARK-8475] remove previous ivy resolution when using spark-submit This PR also includes re-ordering the order that repositories are used when resolving packages. User provided repositories will be prioritized. cc andrewor14 Author: Burak Yavuz brk...@gmail.com Closes #7089 from brkyvz/delete-prev-ivy-resolution and squashes the following commits: a21f95a [Burak Yavuz] remove previous ivy resolution when using spark-submit Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7f796da Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7f796da Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7f796da Branch: refs/heads/master Commit: d7f796da45d9a7c76ee4c29a9e0661ef76d8028a Parents: 5d30eae Author: Burak Yavuz brk...@gmail.com Authored: Mon Jun 29 17:27:02 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Mon Jun 29 17:27:02 2015 -0700 -- .../org/apache/spark/deploy/SparkSubmit.scala | 37 .../spark/deploy/SparkSubmitUtilsSuite.scala| 6 ++-- 2 files changed, 26 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d7f796da/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index abf2227..b1d6ec2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -756,6 +756,20 @@ private[spark] object SparkSubmitUtils { val cr = new ChainResolver cr.setName(list) +val repositoryList = remoteRepos.getOrElse() +// add any other remote repositories other than maven central +if (repositoryList.trim.nonEmpty) { + repositoryList.split(,).zipWithIndex.foreach { case (repo, i) = +val brr: IBiblioResolver = new IBiblioResolver +brr.setM2compatible(true) +brr.setUsepoms(true) +brr.setRoot(repo) +brr.setName(srepo-${i + 1}) +cr.add(brr) +printStream.println(s$repo added as a remote repository with the name: ${brr.getName}) + } +} + val localM2 = new IBiblioResolver localM2.setM2compatible(true) localM2.setRoot(m2Path.toURI.toString) @@ -786,20 +800,6 @@ private[spark] object SparkSubmitUtils { sp.setRoot(http://dl.bintray.com/spark-packages/maven;) sp.setName(spark-packages) cr.add(sp) - -val repositoryList = remoteRepos.getOrElse() -// add any other remote repositories other than maven central -if (repositoryList.trim.nonEmpty) { - repositoryList.split(,).zipWithIndex.foreach { case (repo, i) = -val brr: IBiblioResolver = new IBiblioResolver -brr.setM2compatible(true) -brr.setUsepoms(true) -brr.setRoot(repo) -brr.setName(srepo-${i + 1}) -cr.add(brr) -printStream.println(s$repo added as a remote repository with the name: ${brr.getName}) - } -} cr } @@ -922,6 +922,15 @@ private[spark] object SparkSubmitUtils { // A Module descriptor must be specified. Entries are dummy strings val md = getModuleDescriptor +// clear ivy resolution from previous launches. The resolution file is usually at +// ~/.ivy2/org.apache.spark-spark-submit-parent-default.xml. In between runs, this file +// leads to confusion with Ivy when the files can no longer be found at the repository +// declared in that file/ +val mdId = md.getModuleRevisionId +val previousResolution = new File(ivySettings.getDefaultCache, + s${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml) +if (previousResolution.exists) previousResolution.delete + md.setDefaultConf(ivyConfName) // Add exclusion rules for Spark and Scala Library http://git-wip-us.apache.org/repos/asf/spark/blob/d7f796da/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 12c40f0..c9b435a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -77,9 +77,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { assert(resolver2.getResolvers.size() === 7) val expected = repos.split(,).map(r = s$r/)
spark git commit: [SPARK-8410] [SPARK-8475] remove previous ivy resolution when using spark-submit
Repository: spark Updated Branches: refs/heads/branch-1.4 b2684557f - c0fbd6781 [SPARK-8410] [SPARK-8475] remove previous ivy resolution when using spark-submit This PR also includes re-ordering the order that repositories are used when resolving packages. User provided repositories will be prioritized. cc andrewor14 Author: Burak Yavuz brk...@gmail.com Closes #7089 from brkyvz/delete-prev-ivy-resolution and squashes the following commits: a21f95a [Burak Yavuz] remove previous ivy resolution when using spark-submit (cherry picked from commit d7f796da45d9a7c76ee4c29a9e0661ef76d8028a) Signed-off-by: Andrew Or and...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c0fbd678 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c0fbd678 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c0fbd678 Branch: refs/heads/branch-1.4 Commit: c0fbd6781eb47e930af0f1009779e36da85a6b65 Parents: b268455 Author: Burak Yavuz brk...@gmail.com Authored: Mon Jun 29 17:27:02 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Mon Jun 29 17:27:08 2015 -0700 -- .../org/apache/spark/deploy/SparkSubmit.scala | 37 .../spark/deploy/SparkSubmitUtilsSuite.scala| 6 ++-- 2 files changed, 26 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c0fbd678/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index fe73459..43631ee 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -800,6 +800,20 @@ private[spark] object SparkSubmitUtils { val cr = new ChainResolver cr.setName(list) +val repositoryList = remoteRepos.getOrElse() +// add any other remote repositories other than maven central +if (repositoryList.trim.nonEmpty) { + repositoryList.split(,).zipWithIndex.foreach { case (repo, i) = +val brr: IBiblioResolver = new IBiblioResolver +brr.setM2compatible(true) +brr.setUsepoms(true) +brr.setRoot(repo) +brr.setName(srepo-${i + 1}) +cr.add(brr) +printStream.println(s$repo added as a remote repository with the name: ${brr.getName}) + } +} + val localM2 = new IBiblioResolver localM2.setM2compatible(true) localM2.setRoot(m2Path.toURI.toString) @@ -830,20 +844,6 @@ private[spark] object SparkSubmitUtils { sp.setRoot(http://dl.bintray.com/spark-packages/maven;) sp.setName(spark-packages) cr.add(sp) - -val repositoryList = remoteRepos.getOrElse() -// add any other remote repositories other than maven central -if (repositoryList.trim.nonEmpty) { - repositoryList.split(,).zipWithIndex.foreach { case (repo, i) = -val brr: IBiblioResolver = new IBiblioResolver -brr.setM2compatible(true) -brr.setUsepoms(true) -brr.setRoot(repo) -brr.setName(srepo-${i + 1}) -cr.add(brr) -printStream.println(s$repo added as a remote repository with the name: ${brr.getName}) - } -} cr } @@ -973,6 +973,15 @@ private[spark] object SparkSubmitUtils { // A Module descriptor must be specified. Entries are dummy strings val md = getModuleDescriptor +// clear ivy resolution from previous launches. The resolution file is usually at +// ~/.ivy2/org.apache.spark-spark-submit-parent-default.xml. In between runs, this file +// leads to confusion with Ivy when the files can no longer be found at the repository +// declared in that file/ +val mdId = md.getModuleRevisionId +val previousResolution = new File(ivySettings.getDefaultCache, + s${mdId.getOrganisation}-${mdId.getName}-$ivyConfName.xml) +if (previousResolution.exists) previousResolution.delete + md.setDefaultConf(ivyConfName) // Add exclusion rules for Spark and Scala Library http://git-wip-us.apache.org/repos/asf/spark/blob/c0fbd678/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 8003b71..5559470 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -77,9 +77,9 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with
spark git commit: Revert [SPARK-8437] [DOCS] Using directory path without wildcard for filename slow for large number of files with wholeTextFiles and binaryFiles
Repository: spark Updated Branches: refs/heads/master 4a9e03fa8 - 4c1808be4 Revert [SPARK-8437] [DOCS] Using directory path without wildcard for filename slow for large number of files with wholeTextFiles and binaryFiles This reverts commit 5d30eae56051c563a8427f330b09ef66db0a0d21. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c1808be Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c1808be Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c1808be Branch: refs/heads/master Commit: 4c1808be4d3aaa37a5a878892e91ca73ea405ffa Parents: 4a9e03f Author: Andrew Or and...@databricks.com Authored: Mon Jun 29 18:32:31 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Mon Jun 29 18:32:31 2015 -0700 -- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 ++-- 1 file changed, 2 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4c1808be/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cb7e24c..b3c3bf3 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -831,8 +831,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * }}} * * @note Small files are preferred, large file is also allowable, but may cause bad performance. - * @note On some filesystems, `.../path/*` can be a more efficient way to read all files in a directory - * rather than `.../path/` or `.../path` * * @param minPartitions A suggestion value of the minimal splitting number for input data. */ @@ -880,11 +878,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * (a-hdfs-path/part-n, its content) * }}} * - * @note Small files are preferred; very large files may cause bad performance. - * @note On some filesystems, `.../path/*` can be a more efficient way to read all files in a directory - * rather than `.../path/` or `.../path` - * * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Small files are preferred; very large files may cause bad performance. */ @Experimental def binaryFiles( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Revert [SPARK-8437] [DOCS] Using directory path without wildcard for filename slow for large number of files with wholeTextFiles and binaryFiles
Repository: spark Updated Branches: refs/heads/branch-1.4 c0fbd6781 - 80d53565a Revert [SPARK-8437] [DOCS] Using directory path without wildcard for filename slow for large number of files with wholeTextFiles and binaryFiles This reverts commit b2684557fa0d2ec14b7529324443c8154d81c348. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80d53565 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80d53565 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80d53565 Branch: refs/heads/branch-1.4 Commit: 80d53565ae951f7ef0626ad2ad0e2ecb55993a21 Parents: c0fbd67 Author: Andrew Or and...@databricks.com Authored: Mon Jun 29 18:32:59 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Mon Jun 29 18:32:59 2015 -0700 -- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 ++-- 1 file changed, 2 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/80d53565/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f8af710..b4c0d4c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -824,8 +824,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * }}} * * @note Small files are preferred, large file is also allowable, but may cause bad performance. - * @note On some filesystems, `.../path/*` can be a more efficient way to read all files in a directory - * rather than `.../path/` or `.../path` * * @param minPartitions A suggestion value of the minimal splitting number for input data. */ @@ -873,11 +871,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * (a-hdfs-path/part-n, its content) * }}} * - * @note Small files are preferred; very large files may cause bad performance. - * @note On some filesystems, `.../path/*` can be a more efficient way to read all files in a directory - * rather than `.../path/` or `.../path` - * * @param minPartitions A suggestion value of the minimal splitting number for input data. + * + * @note Small files are preferred; very large files may cause bad performance. */ @Experimental def binaryFiles( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8437] [DOCS] Using directory path without wildcard for filename slow for large number of files with wholeTextFiles and binaryFiles
Repository: spark Updated Branches: refs/heads/master fbf75738f - 5d30eae56 [SPARK-8437] [DOCS] Using directory path without wildcard for filename slow for large number of files with wholeTextFiles and binaryFiles Note that 'dir/*' can be more efficient in some Hadoop FS implementations that 'dir/' Author: Sean Owen so...@cloudera.com Closes #7036 from srowen/SPARK-8437 and squashes the following commits: 0e813ae [Sean Owen] Note that 'dir/*' can be more efficient in some Hadoop FS implementations that 'dir/' Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d30eae5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d30eae5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d30eae5 Branch: refs/heads/master Commit: 5d30eae56051c563a8427f330b09ef66db0a0d21 Parents: fbf7573 Author: Sean Owen so...@cloudera.com Authored: Mon Jun 29 17:21:35 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Mon Jun 29 17:21:35 2015 -0700 -- core/src/main/scala/org/apache/spark/SparkContext.scala | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5d30eae5/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b3c3bf3..cb7e24c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -831,6 +831,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * }}} * * @note Small files are preferred, large file is also allowable, but may cause bad performance. + * @note On some filesystems, `.../path/*` can be a more efficient way to read all files in a directory + * rather than `.../path/` or `.../path` * * @param minPartitions A suggestion value of the minimal splitting number for input data. */ @@ -878,9 +880,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * (a-hdfs-path/part-n, its content) * }}} * - * @param minPartitions A suggestion value of the minimal splitting number for input data. - * * @note Small files are preferred; very large files may cause bad performance. + * @note On some filesystems, `.../path/*` can be a more efficient way to read all files in a directory + * rather than `.../path/` or `.../path` + * + * @param minPartitions A suggestion value of the minimal splitting number for input data. */ @Experimental def binaryFiles( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7287] [SPARK-8567] [TEST] Add sc.stop to applications in SparkSubmitSuite
Repository: spark Updated Branches: refs/heads/branch-1.4 f84f24769 - cdfa388dd [SPARK-7287] [SPARK-8567] [TEST] Add sc.stop to applications in SparkSubmitSuite Hopefully, this suite will not be flaky anymore. Author: Yin Huai yh...@databricks.com Closes #7027 from yhuai/SPARK-8567 and squashes the following commits: c0167e2 [Yin Huai] Add sc.stop(). (cherry picked from commit fbf75738feddebb352d5cedf503b573105d4b7a7) Signed-off-by: Andrew Or and...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cdfa388d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cdfa388d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cdfa388d Branch: refs/heads/branch-1.4 Commit: cdfa388dd0c9e10be24184be30e4d0a73207fe62 Parents: f84f247 Author: Yin Huai yh...@databricks.com Authored: Mon Jun 29 17:20:05 2015 -0700 Committer: Andrew Or and...@databricks.com Committed: Mon Jun 29 17:20:14 2015 -0700 -- .../apache/spark/deploy/SparkSubmitSuite.scala | 2 ++ .../regression-test-SPARK-8489/Main.scala| 1 + .../regression-test-SPARK-8489/test.jar | Bin 6811 - 6828 bytes 3 files changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cdfa388d/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 14007e3..b10f10d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -548,6 +548,7 @@ object JarCreationTest extends Logging { if (result.nonEmpty) { throw new Exception(Could not load user class from jar:\n + result(0)) } +sc.stop() } } @@ -573,6 +574,7 @@ object SimpleApplicationTest { sMaster had $config=$masterValue but executor had $config=$executorValue) } } +sc.stop() } } http://git-wip-us.apache.org/repos/asf/spark/blob/cdfa388d/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala -- diff --git a/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala b/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala index e171517..0e428ba 100644 --- a/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala +++ b/sql/hive/src/test/resources/regression-test-SPARK-8489/Main.scala @@ -38,6 +38,7 @@ object Main { val df = hc.createDataFrame(Seq(MyCoolClass(1, 2, 3))) df.collect() println(Regression test for SPARK-8489 success!) +sc.stop() } } http://git-wip-us.apache.org/repos/asf/spark/blob/cdfa388d/sql/hive/src/test/resources/regression-test-SPARK-8489/test.jar -- diff --git a/sql/hive/src/test/resources/regression-test-SPARK-8489/test.jar b/sql/hive/src/test/resources/regression-test-SPARK-8489/test.jar index 4f59fba..5944aa6 100644 Binary files a/sql/hive/src/test/resources/regression-test-SPARK-8489/test.jar and b/sql/hive/src/test/resources/regression-test-SPARK-8489/test.jar differ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8669] [SQL] Fix crash with BINARY (ENUM) fields with Parquet 1.7
Repository: spark Updated Branches: refs/heads/master ecacb1e88 - 4915e9e3b [SPARK-8669] [SQL] Fix crash with BINARY (ENUM) fields with Parquet 1.7 Patch to fix crash with BINARY fields with ENUM original types. Author: Steven She ste...@canopylabs.com Closes #7048 from stevencanopy/SPARK-8669 and squashes the following commits: 2e72979 [Steven She] [SPARK-8669] [SQL] Fix crash with BINARY (ENUM) fields with Parquet 1.7 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4915e9e3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4915e9e3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4915e9e3 Branch: refs/heads/master Commit: 4915e9e3bffb57eac319ef2173b4a6ae4073d25e Parents: ecacb1e Author: Steven She ste...@canopylabs.com Authored: Mon Jun 29 18:50:09 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Mon Jun 29 18:50:09 2015 -0700 -- .../apache/spark/sql/parquet/CatalystSchemaConverter.scala | 2 +- .../org/apache/spark/sql/parquet/ParquetSchemaSuite.scala| 8 2 files changed, 9 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4915e9e3/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala index 4fd3e93..2be7c64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystSchemaConverter.scala @@ -177,7 +177,7 @@ private[parquet] class CatalystSchemaConverter( case BINARY = field.getOriginalType match { - case UTF8 = StringType + case UTF8 | ENUM = StringType case null if assumeBinaryIsString = StringType case null = BinaryType case DECIMAL = makeDecimalType() http://git-wip-us.apache.org/repos/asf/spark/blob/4915e9e3/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala index d0bfcde..35d3c33 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala @@ -161,6 +161,14 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { .stripMargin, binaryAsString = true) + testSchemaInference[Tuple1[String]]( +binary enum as string, + + |message root { + | optional binary _1 (ENUM); + |} +.stripMargin) + testSchemaInference[Tuple1[Seq[Int]]]( non-nullable array - non-standard, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-7667] [MLLIB] MLlib Python API consistency check
Repository: spark Updated Branches: refs/heads/master 4915e9e3b - f9b6bf2f8 [SPARK-7667] [MLLIB] MLlib Python API consistency check MLlib Python API consistency check Author: Yanbo Liang yblia...@gmail.com Closes #6856 from yanboliang/spark-7667 and squashes the following commits: 21bae35 [Yanbo Liang] remove duplicate code eb12f95 [Yanbo Liang] fix doc inherit problem 9e7ec3c [Yanbo Liang] address comments e763d32 [Yanbo Liang] MLlib Python API consistency check Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9b6bf2f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9b6bf2f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9b6bf2f Branch: refs/heads/master Commit: f9b6bf2f83d9dad273aa36d65d0560d35b941cc2 Parents: 4915e9e Author: Yanbo Liang yblia...@gmail.com Authored: Mon Jun 29 18:50:23 2015 -0700 Committer: Joseph K. Bradley jos...@databricks.com Committed: Mon Jun 29 18:50:23 2015 -0700 -- python/pyspark/mllib/feature.py | 15 ++- 1 file changed, 10 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f9b6bf2f/python/pyspark/mllib/feature.py -- diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index f00bb93..b513877 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -111,6 +111,15 @@ class JavaVectorTransformer(JavaModelWrapper, VectorTransformer): def transform(self, vector): + +Applies transformation on a vector or an RDD[Vector]. + +Note: In Python, transform cannot currently be used within + an RDD transformation or action. + Call transform directly on the RDD instead. + +:param vector: Vector or RDD of Vector to be transformed. + if isinstance(vector, RDD): vector = vector.map(_convert_to_vector) else: @@ -191,7 +200,7 @@ class StandardScaler(object): Computes the mean and variance and stores as a model to be used for later scaling. -:param data: The data used to compute the mean and variance +:param dataset: The data used to compute the mean and variance to build the transformation model. :return: a StandardScalarModel @@ -346,10 +355,6 @@ class IDFModel(JavaVectorTransformer): vector :return: an RDD of TF-IDF vectors or a TF-IDF vector -if isinstance(x, RDD): -return JavaVectorTransformer.transform(self, x) - -x = _convert_to_vector(x) return JavaVectorTransformer.transform(self, x) def idf(self): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Git Push Summary
Repository: spark Updated Branches: refs/heads/scala-2.9 [deleted] d2efe1357 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: MAINTENANCE: Automated closing of pull requests.
Repository: spark Updated Branches: refs/heads/master 7bbbe380c - ea775b066 MAINTENANCE: Automated closing of pull requests. This commit exists to close the following pull requests on Github: Closes #1767 (close requested by 'andrewor14') Closes #6952 (close requested by 'andrewor14') Closes #7051 (close requested by 'andrewor14') Closes #5357 (close requested by 'marmbrus') Closes #5233 (close requested by 'andrewor14') Closes #6930 (close requested by 'JoshRosen') Closes #5502 (close requested by 'andrewor14') Closes #6778 (close requested by 'andrewor14') Closes #7006 (close requested by 'andrewor14') Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea775b06 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea775b06 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea775b06 Branch: refs/heads/master Commit: ea775b0662b952849ac7fe2026fc3fd4714c37e3 Parents: 7bbbe38 Author: Patrick Wendell patr...@databricks.com Authored: Mon Jun 29 21:41:59 2015 -0700 Committer: Patrick Wendell patr...@databricks.com Committed: Mon Jun 29 21:41:59 2015 -0700 -- -- - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5161] Parallelize Python test execution
Repository: spark Updated Branches: refs/heads/master f9b6bf2f8 - 7bbbe380c [SPARK-5161] Parallelize Python test execution This commit parallelizes the Python unit test execution, significantly reducing Jenkins build times. Parallelism is now configurable by passing the `-p` or `--parallelism` flags to either `dev/run-tests` or `python/run-tests` (the default parallelism is 4, but I've successfully tested with higher parallelism). To avoid flakiness, I've disabled the Spark Web UI for the Python tests, similar to what we've done for the JVM tests. Author: Josh Rosen joshro...@databricks.com Closes #7031 from JoshRosen/parallelize-python-tests and squashes the following commits: feb3763 [Josh Rosen] Re-enable other tests f87ea81 [Josh Rosen] Only log output from failed tests d4ded73 [Josh Rosen] Logging improvements a2717e1 [Josh Rosen] Make parallelism configurable via dev/run-tests 1bacf1b [Josh Rosen] Merge remote-tracking branch 'origin/master' into parallelize-python-tests 110cd9d [Josh Rosen] Fix universal_newlines for Python 3 cd13db8 [Josh Rosen] Also log python_implementation 9e31127 [Josh Rosen] Log Python --version output for each executable. a2b9094 [Josh Rosen] Bump up parallelism. 5552380 [Josh Rosen] Python 3 fix 866b5b9 [Josh Rosen] Fix lazy logging warnings in Prospector checks 87cb988 [Josh Rosen] Skip MLLib tests for PyPy 8309bfe [Josh Rosen] Temporarily disable parallelism to debug a failure 9129027 [Josh Rosen] Disable Spark UI in Python tests 037b686 [Josh Rosen] Temporarily disable JVM tests so we can test Python speedup in Jenkins. af4cef4 [Josh Rosen] Initial attempt at parallelizing Python test execution Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7bbbe380 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7bbbe380 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7bbbe380 Branch: refs/heads/master Commit: 7bbbe380c52419cd580d1c99c10131184e4ad440 Parents: f9b6bf2 Author: Josh Rosen joshro...@databricks.com Authored: Mon Jun 29 21:32:40 2015 -0700 Committer: Davies Liu dav...@databricks.com Committed: Mon Jun 29 21:32:40 2015 -0700 -- dev/run-tests | 2 +- dev/run-tests.py | 24 +++- dev/sparktestsupport/shellutils.py | 1 + python/pyspark/java_gateway.py | 2 + python/run-tests.py| 97 + 5 files changed, 101 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7bbbe380/dev/run-tests -- diff --git a/dev/run-tests b/dev/run-tests index a00d9f0..257d1e8 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -20,4 +20,4 @@ FWDIR=$(cd `dirname $0`/..; pwd) cd $FWDIR -exec python -u ./dev/run-tests.py +exec python -u ./dev/run-tests.py $@ http://git-wip-us.apache.org/repos/asf/spark/blob/7bbbe380/dev/run-tests.py -- diff --git a/dev/run-tests.py b/dev/run-tests.py index e5c897b..4596e07 100755 --- a/dev/run-tests.py +++ b/dev/run-tests.py @@ -19,6 +19,7 @@ from __future__ import print_function import itertools +from optparse import OptionParser import os import re import sys @@ -360,12 +361,13 @@ def run_scala_tests(build_tool, hadoop_version, test_modules): run_scala_tests_sbt(test_modules, test_profiles) -def run_python_tests(test_modules): +def run_python_tests(test_modules, parallelism): set_title_and_block(Running PySpark tests, BLOCK_PYSPARK_UNIT_TESTS) command = [os.path.join(SPARK_HOME, python, run-tests)] if test_modules != [modules.root]: command.append(--modules=%s % ','.join(m.name for m in test_modules)) +command.append(--parallelism=%i % parallelism) run_cmd(command) @@ -379,7 +381,25 @@ def run_sparkr_tests(): print(Ignoring SparkR tests as R was not found in PATH) +def parse_opts(): +parser = OptionParser( +prog=run-tests +) +parser.add_option( +-p, --parallelism, type=int, default=4, +help=The number of suites to test in parallel (default %default) +) + +(opts, args) = parser.parse_args() +if args: +parser.error(Unsupported arguments: %s % ' '.join(args)) +if opts.parallelism 1: +parser.error(Parallelism cannot be less than 1) +return opts + + def main(): +opts = parse_opts() # Ensure the user home directory (HOME) is valid and is an absolute directory if not USER_HOME or not os.path.isabs(USER_HOME): print([error] Cannot determine your home directory as an absolute path;, @@ -461,7 +481,7 @@ def main(): modules_with_python_tests = [m for m in test_modules if m.python_test_goals] if
spark git commit: [SPARK-8650] [SQL] Use the user-specified app name priority in SparkSQLCLIDriver or HiveThriftServer2
Repository: spark Updated Branches: refs/heads/master f79410c49 - e6c3f7462 [SPARK-8650] [SQL] Use the user-specified app name priority in SparkSQLCLIDriver or HiveThriftServer2 When run `./bin/spark-sql --name query1.sql` [Before] ![before](https://cloud.githubusercontent.com/assets/1400819/8370336/fa20b75a-1bf8-11e5-9171-040049a53240.png) [After] ![after](https://cloud.githubusercontent.com/assets/1400819/8370189/dcc35cb4-1bf6-11e5-8796-a0694140bffb.png) Author: Yadong Qi qiyadong2...@gmail.com Closes #7030 from watermen/SPARK-8650 and squashes the following commits: 51b5134 [Yadong Qi] Improve code and add comment. e3d7647 [Yadong Qi] use spark.app.name priority. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e6c3f746 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e6c3f746 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e6c3f746 Branch: refs/heads/master Commit: e6c3f7462b3fde220ec0084b52388dd4dabb75b9 Parents: f79410c Author: Yadong Qi qiyadong2...@gmail.com Authored: Mon Jun 29 22:34:38 2015 -0700 Committer: Yin Huai yh...@databricks.com Committed: Mon Jun 29 22:34:38 2015 -0700 -- .../org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e6c3f746/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 79eda1f..1d41c46 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -38,9 +38,14 @@ private[hive] object SparkSQLEnv extends Logging { val sparkConf = new SparkConf(loadDefaults = true) val maybeSerializer = sparkConf.getOption(spark.serializer) val maybeKryoReferenceTracking = sparkConf.getOption(spark.kryo.referenceTracking) + // If user doesn't specify the appName, we want to get [SparkSQL::localHostName] instead of + // the default appName [SparkSQLCLIDriver] in cli or beeline. + val maybeAppName = sparkConf +.getOption(spark.app.name) +.filterNot(_ == classOf[SparkSQLCLIDriver].getName) sparkConf -.setAppName(sSparkSQL::${Utils.localHostName()}) + .setAppName(maybeAppName.getOrElse(sSparkSQL::${Utils.localHostName()})) .set( spark.serializer, maybeSerializer.getOrElse(org.apache.spark.serializer.KryoSerializer)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8721][SQL] Rename ExpectsInputTypes = AutoCastInputTypes.
Repository: spark Updated Branches: refs/heads/master ea775b066 - f79410c49 [SPARK-8721][SQL] Rename ExpectsInputTypes = AutoCastInputTypes. Author: Reynold Xin r...@databricks.com Closes #7109 from rxin/auto-cast and squashes the following commits: a914cc3 [Reynold Xin] [SPARK-8721][SQL] Rename ExpectsInputTypes = AutoCastInputTypes. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f79410c4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f79410c4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f79410c4 Branch: refs/heads/master Commit: f79410c49b2225b2acdc58293574860230987775 Parents: ea775b0 Author: Reynold Xin r...@databricks.com Authored: Mon Jun 29 22:32:43 2015 -0700 Committer: Reynold Xin r...@databricks.com Committed: Mon Jun 29 22:32:43 2015 -0700 -- .../catalyst/analysis/HiveTypeCoercion.scala| 8 +- .../sql/catalyst/expressions/Expression.scala | 2 +- .../spark/sql/catalyst/expressions/math.scala | 118 +-- .../spark/sql/catalyst/expressions/misc.scala | 6 +- .../sql/catalyst/expressions/predicates.scala | 6 +- .../catalyst/expressions/stringOperations.scala | 10 +- 6 files changed, 71 insertions(+), 79 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f79410c4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 976fa57..c3d6819 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -116,7 +116,7 @@ trait HiveTypeCoercion { IfCoercion :: Division :: PropagateTypes :: -ExpectedInputConversion :: +AddCastForAutoCastInputTypes :: Nil /** @@ -709,15 +709,15 @@ trait HiveTypeCoercion { /** * Casts types according to the expected input types for Expressions that have the trait - * `ExpectsInputTypes`. + * [[AutoCastInputTypes]]. */ - object ExpectedInputConversion extends Rule[LogicalPlan] { + object AddCastForAutoCastInputTypes extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { // Skip nodes who's children have not been resolved yet. case e if !e.childrenResolved = e - case e: ExpectsInputTypes if e.children.map(_.dataType) != e.expectedChildTypes = + case e: AutoCastInputTypes if e.children.map(_.dataType) != e.expectedChildTypes = val newC = (e.children, e.children.map(_.dataType), e.expectedChildTypes).zipped.map { case (child, actual, expected) = if (actual == expected) child else Cast(child, expected) http://git-wip-us.apache.org/repos/asf/spark/blob/f79410c4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index f59db3d..e5dc7b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -261,7 +261,7 @@ abstract class UnaryExpression extends Expression with trees.UnaryNode[Expressio * Expressions that require a specific `DataType` as input should implement this trait * so that the proper type conversions can be performed in the analyzer. */ -trait ExpectsInputTypes { +trait AutoCastInputTypes { self: Expression = def expectedChildTypes: Seq[DataType] http://git-wip-us.apache.org/repos/asf/spark/blob/f79410c4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala index 4b57ddd..a022f37 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala @@ -56,7 +56,7 @@ abstract class LeafMathExpression(c: Double, name: String) * @param name The short name of the function */ abstract class UnaryMathExpression(f: Double = Double, name: String) -
spark git commit: [SPARK-8214] [SQL] Add function hex
Repository: spark Updated Branches: refs/heads/master 94e040d05 - 637b4eeda [SPARK-8214] [SQL] Add function hex cc chenghao-intel adrian-wang Author: zhichao.li zhichao...@intel.com Closes #6976 from zhichao-li/hex and squashes the following commits: e218d1b [zhichao.li] turn off scalastyle for non-ascii de3f5ea [zhichao.li] non-ascii char cf9c936 [zhichao.li] give separated buffer for each hex method 967ec90 [zhichao.li] Make 'value' as a feild of Hex 3b2fa13 [zhichao.li] tiny fix a647641 [zhichao.li] remove duplicate null check 7cab020 [zhichao.li] tiny refactoring 35ecfe5 [zhichao.li] add function hex Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/637b4eed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/637b4eed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/637b4eed Branch: refs/heads/master Commit: 637b4eedad84dcff1769454137a64ac70c7f2397 Parents: 94e040d Author: zhichao.li zhichao...@intel.com Authored: Mon Jun 29 12:25:16 2015 -0700 Committer: Davies Liu dav...@databricks.com Committed: Mon Jun 29 12:25:16 2015 -0700 -- .../catalyst/analysis/FunctionRegistry.scala| 1 + .../spark/sql/catalyst/expressions/math.scala | 86 +++- .../expressions/MathFunctionsSuite.scala| 14 +++- .../scala/org/apache/spark/sql/functions.scala | 16 .../apache/spark/sql/MathExpressionsSuite.scala | 13 +++ 5 files changed, 125 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/637b4eed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index b24064d..b17457d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -113,6 +113,7 @@ object FunctionRegistry { expression[Expm1](expm1), expression[Floor](floor), expression[Hypot](hypot), +expression[Hex](hex), expression[Logarithm](log), expression[Log](ln), expression[Log10](log10), http://git-wip-us.apache.org/repos/asf/spark/blob/637b4eed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala index 5694afc..4b57ddd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala @@ -18,9 +18,11 @@ package org.apache.spark.sql.catalyst.expressions import java.lang.{Long = JLong} +import java.util.Arrays +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen._ -import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StringType} +import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** @@ -273,9 +275,6 @@ case class Atan2(left: Expression, right: Expression) } } -case class Hypot(left: Expression, right: Expression) - extends BinaryMathExpression(math.hypot, HYPOT) - case class Pow(left: Expression, right: Expression) extends BinaryMathExpression(math.pow, POWER) { override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { @@ -287,6 +286,85 @@ case class Pow(left: Expression, right: Expression) } } +/** + * If the argument is an INT or binary, hex returns the number as a STRING in hexadecimal format. + * Otherwise if the number is a STRING, + * it converts each character into its hexadecimal representation and returns the resulting STRING. + * Negative numbers would be treated as two's complement. + */ +case class Hex(child: Expression) + extends UnaryExpression with Serializable { + + override def dataType: DataType = StringType + + override def checkInputDataTypes(): TypeCheckResult = { +if (child.dataType.isInstanceOf[StringType] + || child.dataType.isInstanceOf[IntegerType] + || child.dataType.isInstanceOf[LongType] + || child.dataType.isInstanceOf[BinaryType] + || child.dataType == NullType) { + TypeCheckResult.TypeCheckSuccess +} else { + TypeCheckResult.TypeCheckFailure(shex doesn't accepts ${child.dataType} type) +} + } + + override def eval(input: InternalRow): Any = { +val num =