spark git commit: [DOC][MINOR][SQL] Fix internal link
Repository: spark Updated Branches: refs/heads/master 9e48cdfbd -> b541b3163 [DOC][MINOR][SQL] Fix internal link It doesn't show up as a hyperlink currently. It will show up as a hyperlink after this change. Author: Rohit AgarwalCloses #9544 from mindprince/patch-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b541b316 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b541b316 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b541b316 Branch: refs/heads/master Commit: b541b31630b1b85b48d6096079d073ccf46a62e8 Parents: 9e48cdf Author: Rohit Agarwal Authored: Mon Nov 9 13:28:00 2015 +0100 Committer: Sean Owen Committed: Mon Nov 9 13:28:00 2015 +0100 -- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b541b316/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 52e03b9..ccd2690 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -2287,7 +2287,7 @@ Several caching related features are not supported yet: Spark SQL is designed to be compatible with the Hive Metastore, SerDes and UDFs. Currently Hive SerDes and UDFs are based on Hive 1.2.1, and Spark SQL can be connected to different versions of Hive Metastore -(from 0.12.0 to 1.2.1. Also see http://spark.apache.org/docs/latest/sql-programming-guide.html#interacting-with-different-versions-of-hive-metastore). +(from 0.12.0 to 1.2.1. Also see [Interacting with Different Versions of Hive Metastore] (#interacting-with-different-versions-of-hive-metastore)). Deploying in Existing Hive Warehouses - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [DOC][MINOR][SQL] Fix internal link
Repository: spark Updated Branches: refs/heads/branch-1.6 f53c9fb18 -> 0f03bd13e [DOC][MINOR][SQL] Fix internal link It doesn't show up as a hyperlink currently. It will show up as a hyperlink after this change. Author: Rohit AgarwalCloses #9544 from mindprince/patch-2. (cherry picked from commit b541b31630b1b85b48d6096079d073ccf46a62e8) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f03bd13 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f03bd13 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f03bd13 Branch: refs/heads/branch-1.6 Commit: 0f03bd13ed17dba5410e3a34011282c0d417f297 Parents: f53c9fb Author: Rohit Agarwal Authored: Mon Nov 9 13:28:00 2015 +0100 Committer: Sean Owen Committed: Mon Nov 9 13:28:09 2015 +0100 -- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0f03bd13/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 52e03b9..ccd2690 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -2287,7 +2287,7 @@ Several caching related features are not supported yet: Spark SQL is designed to be compatible with the Hive Metastore, SerDes and UDFs. Currently Hive SerDes and UDFs are based on Hive 1.2.1, and Spark SQL can be connected to different versions of Hive Metastore -(from 0.12.0 to 1.2.1. Also see http://spark.apache.org/docs/latest/sql-programming-guide.html#interacting-with-different-versions-of-hive-metastore). +(from 0.12.0 to 1.2.1. Also see [Interacting with Different Versions of Hive Metastore] (#interacting-with-different-versions-of-hive-metastore)). Deploying in Existing Hive Warehouses - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11218][CORE] show help messages for start-slave and start-master
Repository: spark Updated Branches: refs/heads/branch-1.6 7eaf48eeb -> f53c9fb18 [SPARK-11218][CORE] show help messages for start-slave and start-master Addressing https://issues.apache.org/jira/browse/SPARK-11218, mostly copied start-thriftserver.sh. ``` charlesyeh-mbp:spark charlesyeh$ ./sbin/start-master.sh --help Usage: Master [options] Options: -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) -h HOST, --host HOST Hostname to listen on -p PORT, --port PORT Port to listen on (default: 7077) --webui-port PORT Port for web UI (default: 8080) --properties-file FILE Path to a custom Spark properties file. Default is conf/spark-defaults.conf. ``` ``` charlesyeh-mbp:spark charlesyeh$ ./sbin/start-slave.sh Usage: Worker [options] Master must be a URL of the form spark://hostname:port Options: -c CORES, --cores CORES Number of cores to use -m MEM, --memory MEM Amount of memory to use (e.g. 1000M, 2G) -d DIR, --work-dir DIR Directory to run apps in (default: SPARK_HOME/work) -i HOST, --ip IP Hostname to listen on (deprecated, please use --host or -h) -h HOST, --host HOST Hostname to listen on -p PORT, --port PORT Port to listen on (default: random) --webui-port PORTPort for web UI (default: 8081) --properties-file FILE Path to a custom Spark properties file. Default is conf/spark-defaults.conf. ``` Author: Charles YehCloses #9432 from CharlesYeh/helpmsg. (cherry picked from commit 9e48cdfbdecc9554a425ba35c0252910fd1e8faa) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f53c9fb1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f53c9fb1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f53c9fb1 Branch: refs/heads/branch-1.6 Commit: f53c9fb18b346b0d0dfabb736f1db20471d0a0d3 Parents: 7eaf48e Author: Charles Yeh Authored: Mon Nov 9 13:22:05 2015 +0100 Committer: Sean Owen Committed: Mon Nov 9 13:24:17 2015 +0100 -- sbin/start-master.sh | 24 +++- sbin/start-slave.sh | 24 +++- 2 files changed, 34 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f53c9fb1/sbin/start-master.sh -- diff --git a/sbin/start-master.sh b/sbin/start-master.sh index c20e19a..9f2e14d 100755 --- a/sbin/start-master.sh +++ b/sbin/start-master.sh @@ -23,6 +23,20 @@ if [ -z "${SPARK_HOME}" ]; then export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" fi +# NOTE: This exact class name is matched downstream by SparkSubmit. +# Any changes need to be reflected there. +CLASS="org.apache.spark.deploy.master.Master" + +if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then + echo "Usage: ./sbin/start-master.sh [options]" + pattern="Usage:" + pattern+="\|Using Spark's default log4j profile:" + pattern+="\|Registered signal handlers for" + + "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2 + exit 1 +fi + ORIGINAL_ARGS="$@" START_TACHYON=false @@ -30,7 +44,7 @@ START_TACHYON=false while (( "$#" )); do case $1 in --with-tachyon) - if [ ! -e "$sbin"/../tachyon/bin/tachyon ]; then + if [ ! -e "${SPARK_HOME}"/tachyon/bin/tachyon ]; then echo "Error: --with-tachyon specified, but tachyon not found." exit -1 fi @@ -56,12 +70,12 @@ if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then SPARK_MASTER_WEBUI_PORT=8080 fi -"${SPARK_HOME}/sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 \ +"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \ --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \ $ORIGINAL_ARGS if [ "$START_TACHYON" == "true" ]; then - "${SPARK_HOME}/sbin"/../tachyon/bin/tachyon bootstrap-conf $SPARK_MASTER_IP - "${SPARK_HOME}/sbin"/../tachyon/bin/tachyon format -s - "${SPARK_HOME}/sbin"/../tachyon/bin/tachyon-start.sh master + "${SPARK_HOME}"/tachyon/bin/tachyon bootstrap-conf $SPARK_MASTER_IP + "${SPARK_HOME}"/tachyon/bin/tachyon format -s + "${SPARK_HOME}"/tachyon/bin/tachyon-start.sh master fi http://git-wip-us.apache.org/repos/asf/spark/blob/f53c9fb1/sbin/start-slave.sh -- diff --git a/sbin/start-slave.sh b/sbin/start-slave.sh index 2145564..8c268b8 100755 --- a/sbin/start-slave.sh +++ b/sbin/start-slave.sh @@ -31,18 +31,24 @@ # worker. Subsequent workers will increment this # number. Default is 8081. -usage="Usage: start-slave.sh where
spark git commit: [SPARK-10471][CORE][MESOS] prevent getting offers for unmet constraints
Repository: spark Updated Branches: refs/heads/master 88a3fdcc7 -> 5039a49b6 [SPARK-10471][CORE][MESOS] prevent getting offers for unmet constraints this change rejects offers for slaves with unmet constraints for 120s to mitigate offer starvation. this prevents mesos to send us these offers again and again. in return, we get more offers for slaves which might meet our constraints. and it enables mesos to send the rejected offers to other frameworks. Author: Felix BechsteinCloses #8639 from felixb/decline_offers_constraint_mismatch. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5039a49b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5039a49b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5039a49b Branch: refs/heads/master Commit: 5039a49b636325f321daa089971107003fae9d4b Parents: 88a3fdc Author: Felix Bechstein Authored: Mon Nov 9 13:36:14 2015 -0800 Committer: Andrew Or Committed: Mon Nov 9 13:36:14 2015 -0800 -- .../mesos/CoarseMesosSchedulerBackend.scala | 92 +++- .../cluster/mesos/MesosSchedulerBackend.scala | 48 +++--- .../cluster/mesos/MesosSchedulerUtils.scala | 4 + 3 files changed, 91 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5039a49b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index d10a77f..2de9b6a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -101,6 +101,10 @@ private[spark] class CoarseMesosSchedulerBackend( private val slaveOfferConstraints = parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) + // reject offers with mismatched constraints in seconds + private val rejectOfferDurationForUnmetConstraints = +getRejectOfferDurationForUnmetConstraints(sc) + // A client for talking to the external shuffle service, if it is a private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = { if (shuffleServiceEnabled) { @@ -249,48 +253,56 @@ private[spark] class CoarseMesosSchedulerBackend( val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt val id = offer.getId.getValue -if (taskIdToSlaveId.size < executorLimit && -totalCoresAcquired < maxCores && -meetsConstraints && -mem >= calculateTotalMemory(sc) && -cpus >= 1 && -failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && -!slaveIdsWithExecutors.contains(slaveId)) { - // Launch an executor on the slave - val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) - totalCoresAcquired += cpusToUse - val taskId = newMesosTaskId() - taskIdToSlaveId.put(taskId, slaveId) - slaveIdsWithExecutors += slaveId - coresByTaskId(taskId) = cpusToUse - // Gather cpu resources from the available resources and use them in the task. - val (remainingResources, cpuResourcesToUse) = -partitionResources(offer.getResourcesList, "cpus", cpusToUse) - val (_, memResourcesToUse) = -partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc)) - val taskBuilder = MesosTaskInfo.newBuilder() -.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) -.setSlaveId(offer.getSlaveId) -.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId)) -.setName("Task " + taskId) -.addAllResources(cpuResourcesToUse.asJava) -.addAllResources(memResourcesToUse.asJava) - - sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => -MesosSchedulerBackendUtil - .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder()) +if (meetsConstraints) { + if (taskIdToSlaveId.size < executorLimit && + totalCoresAcquired < maxCores && + mem >= calculateTotalMemory(sc) && + cpus >= 1 && + failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && + !slaveIdsWithExecutors.contains(slaveId)) { +//
spark git commit: [SPARK-10471][CORE][MESOS] prevent getting offers for unmet constraints
Repository: spark Updated Branches: refs/heads/branch-1.6 2459b3432 -> 74f50275e [SPARK-10471][CORE][MESOS] prevent getting offers for unmet constraints this change rejects offers for slaves with unmet constraints for 120s to mitigate offer starvation. this prevents mesos to send us these offers again and again. in return, we get more offers for slaves which might meet our constraints. and it enables mesos to send the rejected offers to other frameworks. Author: Felix BechsteinCloses #8639 from felixb/decline_offers_constraint_mismatch. (cherry picked from commit 5039a49b636325f321daa089971107003fae9d4b) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74f50275 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74f50275 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74f50275 Branch: refs/heads/branch-1.6 Commit: 74f50275e429e649212928a9f36552941b862edc Parents: 2459b34 Author: Felix Bechstein Authored: Mon Nov 9 13:36:14 2015 -0800 Committer: Andrew Or Committed: Mon Nov 9 13:36:28 2015 -0800 -- .../mesos/CoarseMesosSchedulerBackend.scala | 92 +++- .../cluster/mesos/MesosSchedulerBackend.scala | 48 +++--- .../cluster/mesos/MesosSchedulerUtils.scala | 4 + 3 files changed, 91 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/74f50275/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index d10a77f..2de9b6a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -101,6 +101,10 @@ private[spark] class CoarseMesosSchedulerBackend( private val slaveOfferConstraints = parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) + // reject offers with mismatched constraints in seconds + private val rejectOfferDurationForUnmetConstraints = +getRejectOfferDurationForUnmetConstraints(sc) + // A client for talking to the external shuffle service, if it is a private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = { if (shuffleServiceEnabled) { @@ -249,48 +253,56 @@ private[spark] class CoarseMesosSchedulerBackend( val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt val id = offer.getId.getValue -if (taskIdToSlaveId.size < executorLimit && -totalCoresAcquired < maxCores && -meetsConstraints && -mem >= calculateTotalMemory(sc) && -cpus >= 1 && -failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && -!slaveIdsWithExecutors.contains(slaveId)) { - // Launch an executor on the slave - val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) - totalCoresAcquired += cpusToUse - val taskId = newMesosTaskId() - taskIdToSlaveId.put(taskId, slaveId) - slaveIdsWithExecutors += slaveId - coresByTaskId(taskId) = cpusToUse - // Gather cpu resources from the available resources and use them in the task. - val (remainingResources, cpuResourcesToUse) = -partitionResources(offer.getResourcesList, "cpus", cpusToUse) - val (_, memResourcesToUse) = -partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc)) - val taskBuilder = MesosTaskInfo.newBuilder() -.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) -.setSlaveId(offer.getSlaveId) -.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId)) -.setName("Task " + taskId) -.addAllResources(cpuResourcesToUse.asJava) -.addAllResources(memResourcesToUse.asJava) - - sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => -MesosSchedulerBackendUtil - .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder()) +if (meetsConstraints) { + if (taskIdToSlaveId.size < executorLimit && + totalCoresAcquired < maxCores && + mem >= calculateTotalMemory(sc) && + cpus >= 1 && +
spark git commit: [SPARK-10280][MLLIB][PYSPARK][DOCS] Add @since annotation to pyspark.ml.classification
Repository: spark Updated Branches: refs/heads/branch-1.6 62f664c5a -> 2459b3432 [SPARK-10280][MLLIB][PYSPARK][DOCS] Add @since annotation to pyspark.ml.classification Author: Yu ISHIKAWACloses #8690 from yu-iskw/SPARK-10280. (cherry picked from commit 88a3fdcc783f880a8d01c7e194ec42fc114bdf8a) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2459b343 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2459b343 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2459b343 Branch: refs/heads/branch-1.6 Commit: 2459b3432bad48da9f5c72763e2088bea7e26308 Parents: 62f664c Author: Yu ISHIKAWA Authored: Mon Nov 9 13:16:04 2015 -0800 Committer: Xiangrui Meng Committed: Mon Nov 9 13:16:13 2015 -0800 -- python/pyspark/ml/classification.py | 56 1 file changed, 56 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2459b343/python/pyspark/ml/classification.py -- diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 2e468f6..603f2c7 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -67,6 +67,8 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti Traceback (most recent call last): ... TypeError: Method setParams forces keyword arguments. + +.. versionadded:: 1.3.0 """ # a placeholder to make it appear in the generated doc @@ -99,6 +101,7 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti self._checkThresholdConsistency() @keyword_only +@since("1.3.0") def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.1, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, threshold=0.5, thresholds=None, probabilityCol="probability", @@ -119,6 +122,7 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti def _create_model(self, java_model): return LogisticRegressionModel(java_model) +@since("1.4.0") def setThreshold(self, value): """ Sets the value of :py:attr:`threshold`. @@ -129,6 +133,7 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti del self._paramMap[self.thresholds] return self +@since("1.4.0") def getThreshold(self): """ Gets the value of threshold or its default value. @@ -144,6 +149,7 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti else: return self.getOrDefault(self.threshold) +@since("1.5.0") def setThresholds(self, value): """ Sets the value of :py:attr:`thresholds`. @@ -154,6 +160,7 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti del self._paramMap[self.threshold] return self +@since("1.5.0") def getThresholds(self): """ If :py:attr:`thresholds` is set, return its value. @@ -185,9 +192,12 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti class LogisticRegressionModel(JavaModel): """ Model fitted by LogisticRegression. + +.. versionadded:: 1.3.0 """ @property +@since("1.4.0") def weights(self): """ Model weights. @@ -205,6 +215,7 @@ class LogisticRegressionModel(JavaModel): return self._call_java("coefficients") @property +@since("1.4.0") def intercept(self): """ Model intercept. @@ -215,6 +226,8 @@ class LogisticRegressionModel(JavaModel): class TreeClassifierParams(object): """ Private class to track supported impurity measures. + +.. versionadded:: 1.4.0 """ supportedImpurities = ["entropy", "gini"] @@ -231,6 +244,7 @@ class TreeClassifierParams(object): "gain calculation (case-insensitive). Supported options: " + ", ".join(self.supportedImpurities)) +@since("1.6.0") def setImpurity(self, value): """ Sets the value of :py:attr:`impurity`. @@ -238,6 +252,7 @@ class TreeClassifierParams(object): self._paramMap[self.impurity] = value return self +@since("1.6.0") def getImpurity(self): """ Gets the value of impurity or its default value. @@ -248,6 +263,8 @@ class TreeClassifierParams(object): class GBTParams(TreeEnsembleParams): """
spark git commit: [SPARK-10280][MLLIB][PYSPARK][DOCS] Add @since annotation to pyspark.ml.classification
Repository: spark Updated Branches: refs/heads/master 860ea0d38 -> 88a3fdcc7 [SPARK-10280][MLLIB][PYSPARK][DOCS] Add @since annotation to pyspark.ml.classification Author: Yu ISHIKAWACloses #8690 from yu-iskw/SPARK-10280. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88a3fdcc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88a3fdcc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88a3fdcc Branch: refs/heads/master Commit: 88a3fdcc783f880a8d01c7e194ec42fc114bdf8a Parents: 860ea0d Author: Yu ISHIKAWA Authored: Mon Nov 9 13:16:04 2015 -0800 Committer: Xiangrui Meng Committed: Mon Nov 9 13:16:04 2015 -0800 -- python/pyspark/ml/classification.py | 56 1 file changed, 56 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/88a3fdcc/python/pyspark/ml/classification.py -- diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 2e468f6..603f2c7 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -67,6 +67,8 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti Traceback (most recent call last): ... TypeError: Method setParams forces keyword arguments. + +.. versionadded:: 1.3.0 """ # a placeholder to make it appear in the generated doc @@ -99,6 +101,7 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti self._checkThresholdConsistency() @keyword_only +@since("1.3.0") def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxIter=100, regParam=0.1, elasticNetParam=0.0, tol=1e-6, fitIntercept=True, threshold=0.5, thresholds=None, probabilityCol="probability", @@ -119,6 +122,7 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti def _create_model(self, java_model): return LogisticRegressionModel(java_model) +@since("1.4.0") def setThreshold(self, value): """ Sets the value of :py:attr:`threshold`. @@ -129,6 +133,7 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti del self._paramMap[self.thresholds] return self +@since("1.4.0") def getThreshold(self): """ Gets the value of threshold or its default value. @@ -144,6 +149,7 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti else: return self.getOrDefault(self.threshold) +@since("1.5.0") def setThresholds(self, value): """ Sets the value of :py:attr:`thresholds`. @@ -154,6 +160,7 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti del self._paramMap[self.threshold] return self +@since("1.5.0") def getThresholds(self): """ If :py:attr:`thresholds` is set, return its value. @@ -185,9 +192,12 @@ class LogisticRegression(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredicti class LogisticRegressionModel(JavaModel): """ Model fitted by LogisticRegression. + +.. versionadded:: 1.3.0 """ @property +@since("1.4.0") def weights(self): """ Model weights. @@ -205,6 +215,7 @@ class LogisticRegressionModel(JavaModel): return self._call_java("coefficients") @property +@since("1.4.0") def intercept(self): """ Model intercept. @@ -215,6 +226,8 @@ class LogisticRegressionModel(JavaModel): class TreeClassifierParams(object): """ Private class to track supported impurity measures. + +.. versionadded:: 1.4.0 """ supportedImpurities = ["entropy", "gini"] @@ -231,6 +244,7 @@ class TreeClassifierParams(object): "gain calculation (case-insensitive). Supported options: " + ", ".join(self.supportedImpurities)) +@since("1.6.0") def setImpurity(self, value): """ Sets the value of :py:attr:`impurity`. @@ -238,6 +252,7 @@ class TreeClassifierParams(object): self._paramMap[self.impurity] = value return self +@since("1.6.0") def getImpurity(self): """ Gets the value of impurity or its default value. @@ -248,6 +263,8 @@ class TreeClassifierParams(object): class GBTParams(TreeEnsembleParams): """ Private class to track supported GBT params. + +.. versionadded:: 1.4.0 """ supportedLossTypes = ["logistic"]
spark git commit: [SPARK-11552][DOCS][Replaced example code in ml-decision-tree.md using include_example]
Repository: spark Updated Branches: refs/heads/branch-1.6 74f50275e -> 129cfab4f [SPARK-11552][DOCS][Replaced example code in ml-decision-tree.md using include_example] I have tested it on my local, it is working fine, please review Author: sachin aggarwalCloses #9539 from agsachin/SPARK-11552-real. (cherry picked from commit 51d41e4b1a3a25a3fde3a4345afcfe4766023d23) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/129cfab4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/129cfab4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/129cfab4 Branch: refs/heads/branch-1.6 Commit: 129cfab4f8577e6f6ab29b5f1cb82d00ced56124 Parents: 74f5027 Author: sachin aggarwal Authored: Mon Nov 9 14:25:42 2015 -0800 Committer: Xiangrui Meng Committed: Mon Nov 9 14:25:56 2015 -0800 -- docs/ml-decision-tree.md| 338 +-- .../JavaDecisionTreeClassificationExample.java | 103 ++ .../ml/JavaDecisionTreeRegressionExample.java | 90 + .../ml/decision_tree_classification_example.py | 77 + .../ml/decision_tree_regression_example.py | 74 .../ml/DecisionTreeClassificationExample.scala | 94 ++ .../ml/DecisionTreeRegressionExample.scala | 81 + 7 files changed, 527 insertions(+), 330 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/129cfab4/docs/ml-decision-tree.md -- diff --git a/docs/ml-decision-tree.md b/docs/ml-decision-tree.md index 542819e..2bfac6f 100644 --- a/docs/ml-decision-tree.md +++ b/docs/ml-decision-tree.md @@ -118,196 +118,24 @@ We use two feature transformers to prepare the data; these help index categories More details on parameters can be found in the [Scala API documentation](api/scala/index.html#org.apache.spark.ml.classification.DecisionTreeClassifier). -{% highlight scala %} -import org.apache.spark.ml.Pipeline -import org.apache.spark.ml.classification.DecisionTreeClassifier -import org.apache.spark.ml.classification.DecisionTreeClassificationModel -import org.apache.spark.ml.feature.{StringIndexer, IndexToString, VectorIndexer} -import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator -import org.apache.spark.mllib.util.MLUtils - -// Load and parse the data file, converting it to a DataFrame. -val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF() - -// Index labels, adding metadata to the label column. -// Fit on whole dataset to include all labels in index. -val labelIndexer = new StringIndexer() - .setInputCol("label") - .setOutputCol("indexedLabel") - .fit(data) -// Automatically identify categorical features, and index them. -val featureIndexer = new VectorIndexer() - .setInputCol("features") - .setOutputCol("indexedFeatures") - .setMaxCategories(4) // features with > 4 distinct values are treated as continuous - .fit(data) - -// Split the data into training and test sets (30% held out for testing) -val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3)) - -// Train a DecisionTree model. -val dt = new DecisionTreeClassifier() - .setLabelCol("indexedLabel") - .setFeaturesCol("indexedFeatures") - -// Convert indexed labels back to original labels. -val labelConverter = new IndexToString() - .setInputCol("prediction") - .setOutputCol("predictedLabel") - .setLabels(labelIndexer.labels) - -// Chain indexers and tree in a Pipeline -val pipeline = new Pipeline() - .setStages(Array(labelIndexer, featureIndexer, dt, labelConverter)) - -// Train model. This also runs the indexers. -val model = pipeline.fit(trainingData) - -// Make predictions. -val predictions = model.transform(testData) - -// Select example rows to display. -predictions.select("predictedLabel", "label", "features").show(5) - -// Select (prediction, true label) and compute test error -val evaluator = new MulticlassClassificationEvaluator() - .setLabelCol("indexedLabel") - .setPredictionCol("prediction") - .setMetricName("precision") -val accuracy = evaluator.evaluate(predictions) -println("Test Error = " + (1.0 - accuracy)) - -val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel] -println("Learned classification tree model:\n" + treeModel.toDebugString) -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/ml/DecisionTreeClassificationExample.scala %} + More details on parameters can be found in the [Java API documentation](api/java/org/apache/spark/ml/classification/DecisionTreeClassifier.html). -{% highlight java %} -import org.apache.spark.ml.Pipeline; -import
spark git commit: [SPARK-11548][DOCS] Replaced example code in mllib-collaborative-filtering.md using include_example
Repository: spark Updated Branches: refs/heads/branch-1.6 129cfab4f -> 85bb319a2 [SPARK-11548][DOCS] Replaced example code in mllib-collaborative-filtering.md using include_example Kindly review the changes. Author: Rishabh BhardwajCloses #9519 from rishabhbhardwaj/SPARK-11337. (cherry picked from commit b7720fa45525cff6e812fa448d0841cb41f6c8a5) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85bb319a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85bb319a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85bb319a Branch: refs/heads/branch-1.6 Commit: 85bb319a20d0b1b4255e6b568ebffa272eba9e34 Parents: 129cfab Author: Rishabh Bhardwaj Authored: Mon Nov 9 14:27:36 2015 -0800 Committer: Xiangrui Meng Committed: Mon Nov 9 14:27:46 2015 -0800 -- docs/mllib-collaborative-filtering.md | 138 +-- .../mllib/JavaRecommendationExample.java| 97 + .../main/python/mllib/recommendation_example.py | 54 .../examples/mllib/RecommendationExample.scala | 67 + 4 files changed, 221 insertions(+), 135 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/85bb319a/docs/mllib-collaborative-filtering.md -- diff --git a/docs/mllib-collaborative-filtering.md b/docs/mllib-collaborative-filtering.md index 1ad5212..7cd1b89 100644 --- a/docs/mllib-collaborative-filtering.md +++ b/docs/mllib-collaborative-filtering.md @@ -66,43 +66,7 @@ recommendation model by measuring the Mean Squared Error of rating prediction. Refer to the [`ALS` Scala docs](api/scala/index.html#org.apache.spark.mllib.recommendation.ALS) for details on the API. -{% highlight scala %} -import org.apache.spark.mllib.recommendation.ALS -import org.apache.spark.mllib.recommendation.MatrixFactorizationModel -import org.apache.spark.mllib.recommendation.Rating - -// Load and parse the data -val data = sc.textFile("data/mllib/als/test.data") -val ratings = data.map(_.split(',') match { case Array(user, item, rate) => -Rating(user.toInt, item.toInt, rate.toDouble) - }) - -// Build the recommendation model using ALS -val rank = 10 -val numIterations = 10 -val model = ALS.train(ratings, rank, numIterations, 0.01) - -// Evaluate the model on rating data -val usersProducts = ratings.map { case Rating(user, product, rate) => - (user, product) -} -val predictions = - model.predict(usersProducts).map { case Rating(user, product, rate) => -((user, product), rate) - } -val ratesAndPreds = ratings.map { case Rating(user, product, rate) => - ((user, product), rate) -}.join(predictions) -val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) => - val err = (r1 - r2) - err * err -}.mean() -println("Mean Squared Error = " + MSE) - -// Save and load model -model.save(sc, "myModelPath") -val sameModel = MatrixFactorizationModel.load(sc, "myModelPath") -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/mllib/RecommendationExample.scala %} If the rating matrix is derived from another source of information (e.g., it is inferred from other signals), you can use the `trainImplicit` method to get better results. @@ -123,81 +87,7 @@ that is equivalent to the provided example in Scala is given below: Refer to the [`ALS` Java docs](api/java/org/apache/spark/mllib/recommendation/ALS.html) for details on the API. -{% highlight java %} -import scala.Tuple2; - -import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.mllib.recommendation.ALS; -import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; -import org.apache.spark.mllib.recommendation.Rating; -import org.apache.spark.SparkConf; - -public class CollaborativeFiltering { - public static void main(String[] args) { -SparkConf conf = new SparkConf().setAppName("Collaborative Filtering Example"); -JavaSparkContext sc = new JavaSparkContext(conf); - -// Load and parse the data -String path = "data/mllib/als/test.data"; -JavaRDD data = sc.textFile(path); -JavaRDD ratings = data.map( - new Function () { -public Rating call(String s) { - String[] sarray = s.split(","); - return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), -Double.parseDouble(sarray[2])); -} - } -); - -// Build the recommendation model using ALS -int rank = 10; -int numIterations = 10; -MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01); - -// Evaluate
spark git commit: [SPARK-11548][DOCS] Replaced example code in mllib-collaborative-filtering.md using include_example
Repository: spark Updated Branches: refs/heads/master 51d41e4b1 -> b7720fa45 [SPARK-11548][DOCS] Replaced example code in mllib-collaborative-filtering.md using include_example Kindly review the changes. Author: Rishabh BhardwajCloses #9519 from rishabhbhardwaj/SPARK-11337. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7720fa4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7720fa4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7720fa4 Branch: refs/heads/master Commit: b7720fa45525cff6e812fa448d0841cb41f6c8a5 Parents: 51d41e4 Author: Rishabh Bhardwaj Authored: Mon Nov 9 14:27:36 2015 -0800 Committer: Xiangrui Meng Committed: Mon Nov 9 14:27:36 2015 -0800 -- docs/mllib-collaborative-filtering.md | 138 +-- .../mllib/JavaRecommendationExample.java| 97 + .../main/python/mllib/recommendation_example.py | 54 .../examples/mllib/RecommendationExample.scala | 67 + 4 files changed, 221 insertions(+), 135 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b7720fa4/docs/mllib-collaborative-filtering.md -- diff --git a/docs/mllib-collaborative-filtering.md b/docs/mllib-collaborative-filtering.md index 1ad5212..7cd1b89 100644 --- a/docs/mllib-collaborative-filtering.md +++ b/docs/mllib-collaborative-filtering.md @@ -66,43 +66,7 @@ recommendation model by measuring the Mean Squared Error of rating prediction. Refer to the [`ALS` Scala docs](api/scala/index.html#org.apache.spark.mllib.recommendation.ALS) for details on the API. -{% highlight scala %} -import org.apache.spark.mllib.recommendation.ALS -import org.apache.spark.mllib.recommendation.MatrixFactorizationModel -import org.apache.spark.mllib.recommendation.Rating - -// Load and parse the data -val data = sc.textFile("data/mllib/als/test.data") -val ratings = data.map(_.split(',') match { case Array(user, item, rate) => -Rating(user.toInt, item.toInt, rate.toDouble) - }) - -// Build the recommendation model using ALS -val rank = 10 -val numIterations = 10 -val model = ALS.train(ratings, rank, numIterations, 0.01) - -// Evaluate the model on rating data -val usersProducts = ratings.map { case Rating(user, product, rate) => - (user, product) -} -val predictions = - model.predict(usersProducts).map { case Rating(user, product, rate) => -((user, product), rate) - } -val ratesAndPreds = ratings.map { case Rating(user, product, rate) => - ((user, product), rate) -}.join(predictions) -val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) => - val err = (r1 - r2) - err * err -}.mean() -println("Mean Squared Error = " + MSE) - -// Save and load model -model.save(sc, "myModelPath") -val sameModel = MatrixFactorizationModel.load(sc, "myModelPath") -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/mllib/RecommendationExample.scala %} If the rating matrix is derived from another source of information (e.g., it is inferred from other signals), you can use the `trainImplicit` method to get better results. @@ -123,81 +87,7 @@ that is equivalent to the provided example in Scala is given below: Refer to the [`ALS` Java docs](api/java/org/apache/spark/mllib/recommendation/ALS.html) for details on the API. -{% highlight java %} -import scala.Tuple2; - -import org.apache.spark.api.java.*; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.mllib.recommendation.ALS; -import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; -import org.apache.spark.mllib.recommendation.Rating; -import org.apache.spark.SparkConf; - -public class CollaborativeFiltering { - public static void main(String[] args) { -SparkConf conf = new SparkConf().setAppName("Collaborative Filtering Example"); -JavaSparkContext sc = new JavaSparkContext(conf); - -// Load and parse the data -String path = "data/mllib/als/test.data"; -JavaRDD data = sc.textFile(path); -JavaRDD ratings = data.map( - new Function () { -public Rating call(String s) { - String[] sarray = s.split(","); - return new Rating(Integer.parseInt(sarray[0]), Integer.parseInt(sarray[1]), -Double.parseDouble(sarray[2])); -} - } -); - -// Build the recommendation model using ALS -int rank = 10; -int numIterations = 10; -MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01); - -// Evaluate the model on rating data -JavaRDD > userProducts = ratings.map( - new Function
spark git commit: [SPARK-11462][STREAMING] Add JavaStreamingListener
Repository: spark Updated Branches: refs/heads/branch-1.6 dccc4645d -> ab7da0eae [SPARK-11462][STREAMING] Add JavaStreamingListener Currently, StreamingListener is not Java friendly because it exposes some Scala collections to Java users directly, such as Option, Map. This PR added a Java version of StreamingListener and a bunch of Java friendly classes for Java users. Author: zsxwingAuthor: Shixiong Zhu Closes #9420 from zsxwing/java-streaming-listener. (cherry picked from commit 1f0f14efe35f986e338ee2cbc1ef2a9ce7395c00) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab7da0ea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab7da0ea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab7da0ea Branch: refs/heads/branch-1.6 Commit: ab7da0eae4ed9ae23e5fd6623d1fb4dcc1979976 Parents: dccc464 Author: zsxwing Authored: Mon Nov 9 17:38:19 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 9 17:38:37 2015 -0800 -- .../api/java/JavaStreamingListener.scala| 168 +++ .../api/java/JavaStreamingListenerWrapper.scala | 122 .../JavaStreamingListenerAPISuite.java | 85 ++ .../JavaStreamingListenerWrapperSuite.scala | 290 +++ 4 files changed, 665 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ab7da0ea/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala new file mode 100644 index 000..c86c710 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java + +import org.apache.spark.streaming.Time + +/** + * A listener interface for receiving information about an ongoing streaming computation. + */ +private[streaming] class JavaStreamingListener { + + /** Called when a receiver has been started */ + def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = { } + + /** Called when a receiver has reported an error */ + def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit = { } + + /** Called when a receiver has been stopped */ + def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped): Unit = { } + + /** Called when a batch of jobs has been submitted for processing. */ + def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): Unit = { } + + /** Called when processing of a batch of jobs has started. */ + def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = { } + + /** Called when processing of a batch of jobs has completed. */ + def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): Unit = { } + + /** Called when processing of a job of a batch has started. */ + def onOutputOperationStarted( + outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit = { } + + /** Called when processing of a job of a batch has completed. */ + def onOutputOperationCompleted( + outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): Unit = { } +} + +/** + * Base trait for events related to JavaStreamingListener + */ +private[streaming] sealed trait JavaStreamingListenerEvent + +private[streaming] class JavaStreamingListenerBatchSubmitted(val batchInfo: JavaBatchInfo) + extends JavaStreamingListenerEvent + +private[streaming] class JavaStreamingListenerBatchCompleted(val batchInfo: JavaBatchInfo) + extends JavaStreamingListenerEvent + +private[streaming] class
spark git commit: [SPARK-11333][STREAMING] Add executorId to ReceiverInfo and display it in UI
Repository: spark Updated Branches: refs/heads/master 1f0f14efe -> 6502944f3 [SPARK-11333][STREAMING] Add executorId to ReceiverInfo and display it in UI Expose executorId to `ReceiverInfo` and UI since it's helpful when there are multiple executors running in the same host. Screenshot: https://cloud.githubusercontent.com/assets/1000778/10890968/2e2f5512-8150-11e5-8d9d-746e826b69e8.png;> Author: Shixiong ZhuAuthor: zsxwing Closes #9418 from zsxwing/SPARK-11333. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6502944f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6502944f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6502944f Branch: refs/heads/master Commit: 6502944f39893b9dfb472f8406d5f3a02a316eff Parents: 1f0f14e Author: Shixiong Zhu Authored: Mon Nov 9 18:13:37 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 9 18:13:37 2015 -0800 -- .../spark/streaming/api/java/JavaStreamingListener.scala | 1 + .../streaming/api/java/JavaStreamingListenerWrapper.scala| 1 + .../org/apache/spark/streaming/scheduler/ReceiverInfo.scala | 1 + .../spark/streaming/scheduler/ReceiverTrackingInfo.scala | 1 + .../scala/org/apache/spark/streaming/ui/StreamingPage.scala | 8 ++-- .../spark/streaming/JavaStreamingListenerAPISuite.java | 3 +++ .../api/java/JavaStreamingListenerWrapperSuite.scala | 8 ++-- .../streaming/ui/StreamingJobProgressListenerSuite.scala | 6 +++--- 8 files changed, 22 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6502944f/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala index c86c710..3442907 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala @@ -140,6 +140,7 @@ private[streaming] case class JavaReceiverInfo( name: String, active: Boolean, location: String, +executorId: String, lastErrorMessage: String, lastError: String, lastErrorTime: Long) http://git-wip-us.apache.org/repos/asf/spark/blob/6502944f/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala index 2c60b39..b109b9f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala @@ -33,6 +33,7 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav receiverInfo.name, receiverInfo.active, receiverInfo.location, + receiverInfo.executorId, receiverInfo.lastErrorMessage, receiverInfo.lastError, receiverInfo.lastErrorTime http://git-wip-us.apache.org/repos/asf/spark/blob/6502944f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala index 59df892..3b35964 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala @@ -30,6 +30,7 @@ case class ReceiverInfo( name: String, active: Boolean, location: String, +executorId: String, lastErrorMessage: String = "", lastError: String = "", lastErrorTime: Long = -1L http://git-wip-us.apache.org/repos/asf/spark/blob/6502944f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala index ab0a84f..4dc5bb9 100644 ---
spark git commit: [SPARK-11333][STREAMING] Add executorId to ReceiverInfo and display it in UI
Repository: spark Updated Branches: refs/heads/branch-1.6 ab7da0eae -> d33f18c42 [SPARK-11333][STREAMING] Add executorId to ReceiverInfo and display it in UI Expose executorId to `ReceiverInfo` and UI since it's helpful when there are multiple executors running in the same host. Screenshot: https://cloud.githubusercontent.com/assets/1000778/10890968/2e2f5512-8150-11e5-8d9d-746e826b69e8.png;> Author: Shixiong ZhuAuthor: zsxwing Closes #9418 from zsxwing/SPARK-11333. (cherry picked from commit 6502944f39893b9dfb472f8406d5f3a02a316eff) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d33f18c4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d33f18c4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d33f18c4 Branch: refs/heads/branch-1.6 Commit: d33f18c4223912b44d9168ef0aff10271286aa8f Parents: ab7da0e Author: Shixiong Zhu Authored: Mon Nov 9 18:13:37 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 9 18:13:53 2015 -0800 -- .../spark/streaming/api/java/JavaStreamingListener.scala | 1 + .../streaming/api/java/JavaStreamingListenerWrapper.scala| 1 + .../org/apache/spark/streaming/scheduler/ReceiverInfo.scala | 1 + .../spark/streaming/scheduler/ReceiverTrackingInfo.scala | 1 + .../scala/org/apache/spark/streaming/ui/StreamingPage.scala | 8 ++-- .../spark/streaming/JavaStreamingListenerAPISuite.java | 3 +++ .../api/java/JavaStreamingListenerWrapperSuite.scala | 8 ++-- .../streaming/ui/StreamingJobProgressListenerSuite.scala | 6 +++--- 8 files changed, 22 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d33f18c4/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala index c86c710..3442907 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala @@ -140,6 +140,7 @@ private[streaming] case class JavaReceiverInfo( name: String, active: Boolean, location: String, +executorId: String, lastErrorMessage: String, lastError: String, lastErrorTime: Long) http://git-wip-us.apache.org/repos/asf/spark/blob/d33f18c4/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala index 2c60b39..b109b9f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala @@ -33,6 +33,7 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav receiverInfo.name, receiverInfo.active, receiverInfo.location, + receiverInfo.executorId, receiverInfo.lastErrorMessage, receiverInfo.lastError, receiverInfo.lastErrorTime http://git-wip-us.apache.org/repos/asf/spark/blob/d33f18c4/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala index 59df892..3b35964 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala @@ -30,6 +30,7 @@ case class ReceiverInfo( name: String, active: Boolean, location: String, +executorId: String, lastErrorMessage: String = "", lastError: String = "", lastErrorTime: Long = -1L http://git-wip-us.apache.org/repos/asf/spark/blob/d33f18c4/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
spark git commit: Add mockito as an explicit test dependency to spark-streaming
Repository: spark Updated Branches: refs/heads/master 6502944f3 -> 1431319e5 Add mockito as an explicit test dependency to spark-streaming While sbt successfully compiles as it properly pulls the mockito dependency, maven builds have broken. We need this in ASAP. tdas Author: Burak YavuzCloses #9584 from brkyvz/fix-master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1431319e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1431319e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1431319e Branch: refs/heads/master Commit: 1431319e5bc46c7225a8edeeec482816d14a83b8 Parents: 6502944 Author: Burak Yavuz Authored: Mon Nov 9 18:53:57 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 9 18:53:57 2015 -0800 -- streaming/pom.xml | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1431319e/streaming/pom.xml -- diff --git a/streaming/pom.xml b/streaming/pom.xml index 145c8a7..435e16d 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -93,6 +93,11 @@ selenium-java test + + org.mockito + mockito-core + test + target/scala-${scala.binary.version}/classes - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Add mockito as an explicit test dependency to spark-streaming
Repository: spark Updated Branches: refs/heads/branch-1.6 d33f18c42 -> d6f4b56a6 Add mockito as an explicit test dependency to spark-streaming While sbt successfully compiles as it properly pulls the mockito dependency, maven builds have broken. We need this in ASAP. tdas Author: Burak YavuzCloses #9584 from brkyvz/fix-master. (cherry picked from commit 1431319e5bc46c7225a8edeeec482816d14a83b8) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6f4b56a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6f4b56a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6f4b56a Branch: refs/heads/branch-1.6 Commit: d6f4b56a67a388346e4a7a9d7af6024e61e188ce Parents: d33f18c Author: Burak Yavuz Authored: Mon Nov 9 18:53:57 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 9 18:54:11 2015 -0800 -- streaming/pom.xml | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d6f4b56a/streaming/pom.xml -- diff --git a/streaming/pom.xml b/streaming/pom.xml index 145c8a7..435e16d 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -93,6 +93,11 @@ selenium-java test + + org.mockito + mockito-core + test + target/scala-${scala.binary.version}/classes - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11587][SPARKR] Fix the summary generic to match base R
Repository: spark Updated Branches: refs/heads/master 1431319e5 -> c4e19b381 [SPARK-11587][SPARKR] Fix the summary generic to match base R The signature is summary(object, ...) as defined in https://stat.ethz.ch/R-manual/R-devel/library/base/html/summary.html Author: Shivaram VenkataramanCloses #9582 from shivaram/summary-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4e19b38 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4e19b38 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4e19b38 Branch: refs/heads/master Commit: c4e19b3819df4cd7a1c495a00bd2844cf55f4dbd Parents: 1431319 Author: Shivaram Venkataraman Authored: Mon Nov 9 21:06:01 2015 -0800 Committer: Shivaram Venkataraman Committed: Mon Nov 9 21:06:01 2015 -0800 -- R/pkg/R/DataFrame.R | 6 +++--- R/pkg/R/generics.R| 2 +- R/pkg/R/mllib.R | 12 ++-- R/pkg/inst/tests/test_mllib.R | 6 ++ 4 files changed, 16 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c4e19b38/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 44ce941..e9013aa 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1944,9 +1944,9 @@ setMethod("describe", #' @rdname summary #' @name summary setMethod("summary", - signature(x = "DataFrame"), - function(x) { -describe(x) + signature(object = "DataFrame"), + function(object, ...) { +describe(object) }) http://git-wip-us.apache.org/repos/asf/spark/blob/c4e19b38/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 083d37f..efef7d6 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -561,7 +561,7 @@ setGeneric("summarize", function(x,...) { standardGeneric("summarize") }) #' @rdname summary #' @export -setGeneric("summary", function(x, ...) { standardGeneric("summary") }) +setGeneric("summary", function(object, ...) { standardGeneric("summary") }) # @rdname tojson # @export http://git-wip-us.apache.org/repos/asf/spark/blob/c4e19b38/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 7ff8597..7126b7c 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -89,17 +89,17 @@ setMethod("predict", signature(object = "PipelineModel"), #' model <- glm(y ~ x, trainingData) #' summary(model) #'} -setMethod("summary", signature(x = "PipelineModel"), - function(x, ...) { +setMethod("summary", signature(object = "PipelineModel"), + function(object, ...) { modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelName", x@model) + "getModelName", object@model) features <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelFeatures", x@model) + "getModelFeatures", object@model) coefficients <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelCoefficients", x@model) + "getModelCoefficients", object@model) if (modelName == "LinearRegressionModel") { devianceResiduals <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelDevianceResiduals", x@model) + "getModelDevianceResiduals", object@model) devianceResiduals <- matrix(devianceResiduals, nrow = 1) colnames(devianceResiduals) <- c("Min", "Max") rownames(devianceResiduals) <- rep("", times = 1) http://git-wip-us.apache.org/repos/asf/spark/blob/c4e19b38/R/pkg/inst/tests/test_mllib.R -- diff --git a/R/pkg/inst/tests/test_mllib.R b/R/pkg/inst/tests/test_mllib.R index 2606407..42287ea 100644 --- a/R/pkg/inst/tests/test_mllib.R +++ b/R/pkg/inst/tests/test_mllib.R @@ -113,3 +113,9 @@ test_that("summary coefficients match with native glm of family 'binomial'", { rownames(stats$Coefficients) == c("(Intercept)", "Sepal_Length", "Sepal_Width"))) }) + +test_that("summary works on base GLM models", { + baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris) + baseSummary <- summary(baseModel) + expect_true(abs(baseSummary$deviance -
spark git commit: [SPARK-11587][SPARKR] Fix the summary generic to match base R
Repository: spark Updated Branches: refs/heads/branch-1.6 d6f4b56a6 -> a5651f0a5 [SPARK-11587][SPARKR] Fix the summary generic to match base R The signature is summary(object, ...) as defined in https://stat.ethz.ch/R-manual/R-devel/library/base/html/summary.html Author: Shivaram VenkataramanCloses #9582 from shivaram/summary-fix. (cherry picked from commit c4e19b3819df4cd7a1c495a00bd2844cf55f4dbd) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5651f0a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5651f0a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5651f0a Branch: refs/heads/branch-1.6 Commit: a5651f0a57a60d642b6b8cd420f0bb194e8a471e Parents: d6f4b56 Author: Shivaram Venkataraman Authored: Mon Nov 9 21:06:01 2015 -0800 Committer: Shivaram Venkataraman Committed: Mon Nov 9 21:06:09 2015 -0800 -- R/pkg/R/DataFrame.R | 6 +++--- R/pkg/R/generics.R| 2 +- R/pkg/R/mllib.R | 12 ++-- R/pkg/inst/tests/test_mllib.R | 6 ++ 4 files changed, 16 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a5651f0a/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 44ce941..e9013aa 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1944,9 +1944,9 @@ setMethod("describe", #' @rdname summary #' @name summary setMethod("summary", - signature(x = "DataFrame"), - function(x) { -describe(x) + signature(object = "DataFrame"), + function(object, ...) { +describe(object) }) http://git-wip-us.apache.org/repos/asf/spark/blob/a5651f0a/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 083d37f..efef7d6 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -561,7 +561,7 @@ setGeneric("summarize", function(x,...) { standardGeneric("summarize") }) #' @rdname summary #' @export -setGeneric("summary", function(x, ...) { standardGeneric("summary") }) +setGeneric("summary", function(object, ...) { standardGeneric("summary") }) # @rdname tojson # @export http://git-wip-us.apache.org/repos/asf/spark/blob/a5651f0a/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 7ff8597..7126b7c 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -89,17 +89,17 @@ setMethod("predict", signature(object = "PipelineModel"), #' model <- glm(y ~ x, trainingData) #' summary(model) #'} -setMethod("summary", signature(x = "PipelineModel"), - function(x, ...) { +setMethod("summary", signature(object = "PipelineModel"), + function(object, ...) { modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelName", x@model) + "getModelName", object@model) features <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelFeatures", x@model) + "getModelFeatures", object@model) coefficients <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelCoefficients", x@model) + "getModelCoefficients", object@model) if (modelName == "LinearRegressionModel") { devianceResiduals <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", - "getModelDevianceResiduals", x@model) + "getModelDevianceResiduals", object@model) devianceResiduals <- matrix(devianceResiduals, nrow = 1) colnames(devianceResiduals) <- c("Min", "Max") rownames(devianceResiduals) <- rep("", times = 1) http://git-wip-us.apache.org/repos/asf/spark/blob/a5651f0a/R/pkg/inst/tests/test_mllib.R -- diff --git a/R/pkg/inst/tests/test_mllib.R b/R/pkg/inst/tests/test_mllib.R index 2606407..42287ea 100644 --- a/R/pkg/inst/tests/test_mllib.R +++ b/R/pkg/inst/tests/test_mllib.R @@ -113,3 +113,9 @@ test_that("summary coefficients match with native glm of family 'binomial'", { rownames(stats$Coefficients) == c("(Intercept)", "Sepal_Length", "Sepal_Width"))) }) + +test_that("summary works on base GLM models", { + baseModel <-
spark git commit: Typo fixes + code readability improvements
Repository: spark Updated Branches: refs/heads/branch-1.6 a91d21314 -> c859be2dd Typo fixes + code readability improvements Author: Jacek LaskowskiCloses #9501 from jaceklaskowski/typos-with-style. (cherry picked from commit 62bb290773c9f9fa53cbe6d4eedc6e153761a763) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c859be2d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c859be2d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c859be2d Branch: refs/heads/branch-1.6 Commit: c859be2dd1464737cb4370fb14b8cacba79767b0 Parents: a91d213 Author: Jacek Laskowski Authored: Fri Nov 6 20:05:18 2015 + Committer: Reynold Xin Committed: Mon Nov 9 09:58:45 2015 -0800 -- .../main/scala/org/apache/spark/rdd/HadoopRDD.scala | 14 ++ .../org/apache/spark/scheduler/DAGScheduler.scala | 12 +--- .../org/apache/spark/scheduler/ShuffleMapTask.scala | 10 +- .../scala/org/apache/spark/scheduler/TaskSet.scala| 2 +- 4 files changed, 21 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c859be2d/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index d841f05..0453614 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -88,8 +88,8 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, s: InputSplit) * * @param sc The SparkContext to associate the RDD with. * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed - * variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job. - * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. + * variable references an instance of JobConf, then that JobConf will be used for the Hadoop job. + * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD * creates. * @param inputFormatClass Storage format of the data to be read. @@ -123,7 +123,7 @@ class HadoopRDD[K, V]( sc, sc.broadcast(new SerializableConfiguration(conf)) .asInstanceOf[Broadcast[SerializableConfiguration]], - None /* initLocalJobConfFuncOpt */, + initLocalJobConfFuncOpt = None, inputFormatClass, keyClass, valueClass, @@ -184,8 +184,9 @@ class HadoopRDD[K, V]( protected def getInputFormat(conf: JobConf): InputFormat[K, V] = { val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) .asInstanceOf[InputFormat[K, V]] -if (newInputFormat.isInstanceOf[Configurable]) { - newInputFormat.asInstanceOf[Configurable].setConf(conf) +newInputFormat match { + case c: Configurable => c.setConf(conf) + case _ => } newInputFormat } @@ -195,9 +196,6 @@ class HadoopRDD[K, V]( // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) -if (inputFormat.isInstanceOf[Configurable]) { - inputFormat.asInstanceOf[Configurable].setConf(jobConf) -} val inputSplits = inputFormat.getSplits(jobConf, minPartitions) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { http://git-wip-us.apache.org/repos/asf/spark/blob/c859be2d/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a1f0fd0..4a9518f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -541,8 +541,7 @@ class DAGScheduler( } /** - * Submit an action job to the scheduler and get a JobWaiter object back. The JobWaiter object - * can be used to block until the the job finishes executing or can be used to cancel the job. + * Submit an action job to the scheduler. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD @@ -551,6 +550,11 @@ class DAGScheduler( * @param
spark git commit: [SPARK-11112] DAG visualization: display RDD callsite
Repository: spark Updated Branches: refs/heads/branch-1.6 c859be2dd -> 42d933fbb [SPARK-2] DAG visualization: display RDD callsite https://cloud.githubusercontent.com/assets/2133137/10870343/2a8cd070-807d-11e5-857a-4ebcace77b5b.png;> mateiz sarutak Author: Andrew OrCloses #9398 from andrewor14/rdd-callsite. (cherry picked from commit 7f741905b06ed6d3dfbff6db41a3355dab71aa3c) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42d933fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42d933fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42d933fb Branch: refs/heads/branch-1.6 Commit: 42d933fbba0584b39bd8218eafc44fb03aeb157d Parents: c859be2 Author: Andrew Or Authored: Sat Nov 7 05:35:53 2015 +0100 Committer: Reynold Xin Committed: Mon Nov 9 09:59:20 2015 -0800 -- .../apache/spark/ui/static/spark-dag-viz.css| 4 +++ .../org/apache/spark/storage/RDDInfo.scala | 16 +++-- .../spark/ui/scope/RDDOperationGraph.scala | 10 +++--- .../org/apache/spark/util/JsonProtocol.scala| 17 - .../scala/org/apache/spark/util/Utils.scala | 1 + .../org/apache/spark/ui/UISeleniumSuite.scala | 14 .../apache/spark/util/JsonProtocolSuite.scala | 37 7 files changed, 79 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css index 3b4ae2e..9cc5c79 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css @@ -122,3 +122,7 @@ stroke: #52C366; stroke-width: 2px; } + +.tooltip-inner { + white-space: pre-wrap; +} http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 9606262..3fa209b 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -19,7 +19,7 @@ package org.apache.spark.storage import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.{RDDOperationScope, RDD} -import org.apache.spark.util.Utils +import org.apache.spark.util.{CallSite, Utils} @DeveloperApi class RDDInfo( @@ -28,9 +28,20 @@ class RDDInfo( val numPartitions: Int, var storageLevel: StorageLevel, val parentIds: Seq[Int], +val callSite: CallSite, val scope: Option[RDDOperationScope] = None) extends Ordered[RDDInfo] { + def this( + id: Int, + name: String, + numPartitions: Int, + storageLevel: StorageLevel, + parentIds: Seq[Int], + scope: Option[RDDOperationScope] = None) { +this(id, name, numPartitions, storageLevel, parentIds, CallSite.empty, scope) + } + var numCachedPartitions = 0 var memSize = 0L var diskSize = 0L @@ -56,6 +67,7 @@ private[spark] object RDDInfo { def fromRdd(rdd: RDD[_]): RDDInfo = { val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd)) val parentIds = rdd.dependencies.map(_.rdd.id) -new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, parentIds, rdd.scope) +new RDDInfo(rdd.id, rddName, rdd.partitions.length, + rdd.getStorageLevel, parentIds, rdd.creationSite, rdd.scope) } } http://git-wip-us.apache.org/repos/asf/spark/blob/42d933fb/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index 81f168a..2427456 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.{StringBuilder, ListBuffer} import org.apache.spark.Logging import org.apache.spark.scheduler.StageInfo import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.CallSite /** * A representation of a generic cluster graph used for storing information on RDD operations. @@ -38,7 +39,7 @@ private[ui]
spark git commit: [SPARK-9865][SPARKR] Flaky SparkR test: test_sparkSQL.R: sample on a DataFrame
Repository: spark Updated Branches: refs/heads/branch-1.6 fb469e76a -> 7b4d7abfc [SPARK-9865][SPARKR] Flaky SparkR test: test_sparkSQL.R: sample on a DataFrame Make sample test less flaky by setting the seed Tested with ``` repeat { if (count(sample(df, FALSE, 0.1)) == 3) { break } } ``` Author: felixcheungCloses #9549 from felixcheung/rsample. (cherry picked from commit cd174882a5a211298d6e173fe989d567d08ebc0d) Signed-off-by: Shivaram Venkataraman Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b4d7abf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b4d7abf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b4d7abf Branch: refs/heads/branch-1.6 Commit: 7b4d7abfc8ec6a6ad6fdc80b8cf03e9729b7ae90 Parents: fb469e7 Author: felixcheung Authored: Mon Nov 9 10:26:09 2015 -0800 Committer: Shivaram Venkataraman Committed: Mon Nov 9 10:26:17 2015 -0800 -- R/pkg/inst/tests/test_sparkSQL.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7b4d7abf/R/pkg/inst/tests/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index 92cff1f..fbdb9a8 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -647,11 +647,11 @@ test_that("sample on a DataFrame", { sampled <- sample(df, FALSE, 1.0) expect_equal(nrow(collect(sampled)), count(df)) expect_is(sampled, "DataFrame") - sampled2 <- sample(df, FALSE, 0.1) + sampled2 <- sample(df, FALSE, 0.1, 0) # set seed for predictable result expect_true(count(sampled2) < 3) # Also test sample_frac - sampled3 <- sample_frac(df, FALSE, 0.1) + sampled3 <- sample_frac(df, FALSE, 0.1, 0) # set seed for predictable result expect_true(count(sampled3) < 3) }) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11112] Fix Scala 2.11 compilation error in RDDInfo.scala
Repository: spark Updated Branches: refs/heads/branch-1.6 2946c85f5 -> fb469e76a [SPARK-2] Fix Scala 2.11 compilation error in RDDInfo.scala As shown in https://amplab.cs.berkeley.edu/jenkins/view/Spark-QA-Compile/job/Spark-Master-Scala211-Compile/1946/console , compilation fails with: ``` [error] /home/jenkins/workspace/Spark-Master-Scala211-Compile/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala:25: in class RDDInfo, multiple overloaded alternatives of constructor RDDInfo define default arguments. [error] class RDDInfo( [error] ``` This PR tries to fix the compilation error Author: tedyuCloses #9538 from tedyu/master. (cherry picked from commit 404a28f4edd09cf17361dcbd770e4cafde51bf6d) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb469e76 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb469e76 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb469e76 Branch: refs/heads/branch-1.6 Commit: fb469e76ae28d50f77ebf38cfc2d96237a07c79e Parents: 2946c85 Author: tedyu Authored: Mon Nov 9 10:07:58 2015 -0800 Committer: Andrew Or Committed: Mon Nov 9 10:08:04 2015 -0800 -- .../main/scala/org/apache/spark/storage/RDDInfo.scala | 12 +--- 1 file changed, 1 insertion(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fb469e76/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 3fa209b..87c1b98 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -28,20 +28,10 @@ class RDDInfo( val numPartitions: Int, var storageLevel: StorageLevel, val parentIds: Seq[Int], -val callSite: CallSite, +val callSite: CallSite = CallSite.empty, val scope: Option[RDDOperationScope] = None) extends Ordered[RDDInfo] { - def this( - id: Int, - name: String, - numPartitions: Int, - storageLevel: StorageLevel, - parentIds: Seq[Int], - scope: Option[RDDOperationScope] = None) { -this(id, name, numPartitions, storageLevel, parentIds, CallSite.empty, scope) - } - var numCachedPartitions = 0 var memSize = 0L var diskSize = 0L - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9865][SPARKR] Flaky SparkR test: test_sparkSQL.R: sample on a DataFrame
Repository: spark Updated Branches: refs/heads/master 404a28f4e -> cd174882a [SPARK-9865][SPARKR] Flaky SparkR test: test_sparkSQL.R: sample on a DataFrame Make sample test less flaky by setting the seed Tested with ``` repeat { if (count(sample(df, FALSE, 0.1)) == 3) { break } } ``` Author: felixcheungCloses #9549 from felixcheung/rsample. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd174882 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd174882 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd174882 Branch: refs/heads/master Commit: cd174882a5a211298d6e173fe989d567d08ebc0d Parents: 404a28f Author: felixcheung Authored: Mon Nov 9 10:26:09 2015 -0800 Committer: Shivaram Venkataraman Committed: Mon Nov 9 10:26:09 2015 -0800 -- R/pkg/inst/tests/test_sparkSQL.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cd174882/R/pkg/inst/tests/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index 92cff1f..fbdb9a8 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -647,11 +647,11 @@ test_that("sample on a DataFrame", { sampled <- sample(df, FALSE, 1.0) expect_equal(nrow(collect(sampled)), count(df)) expect_is(sampled, "DataFrame") - sampled2 <- sample(df, FALSE, 0.1) + sampled2 <- sample(df, FALSE, 0.1, 0) # set seed for predictable result expect_true(count(sampled2) < 3) # Also test sample_frac - sampled3 <- sample_frac(df, FALSE, 0.1) + sampled3 <- sample_frac(df, FALSE, 0.1, 0) # set seed for predictable result expect_true(count(sampled3) < 3) }) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [DOCS] Fix typo for Python section on unifying Kafka streams
Repository: spark Updated Branches: refs/heads/branch-1.6 7b4d7abfc -> 006d73a74 [DOCS] Fix typo for Python section on unifying Kafka streams 1) kafkaStreams is a list. The list should be unpacked when passing it into the streaming context union method, which accepts a variable number of streams. 2) print() should be pprint() for pyspark. This contribution is my original work, and I license the work to the project under the project's open source license. Author: chriskang90Closes #9545 from c-kang/streaming_python_typo. (cherry picked from commit 874cd66d4b6d156d0ef112a3d0f3bc5683c6a0ec) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/006d73a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/006d73a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/006d73a7 Branch: refs/heads/branch-1.6 Commit: 006d73a741f92840f747a80c372f2d3f49fe7a1f Parents: 7b4d7ab Author: chriskang90 Authored: Mon Nov 9 19:39:22 2015 +0100 Committer: Sean Owen Committed: Mon Nov 9 19:39:33 2015 +0100 -- docs/streaming-programming-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/006d73a7/docs/streaming-programming-guide.md -- diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index c751dbb..e9a27f4 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1948,8 +1948,8 @@ unifiedStream.print(); {% highlight python %} numStreams = 5 kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)] -unifiedStream = streamingContext.union(kafkaStreams) -unifiedStream.print() +unifiedStream = streamingContext.union(*kafkaStreams) +unifiedStream.pprint() {% endhighlight %} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11577][SQL] Handle code review comments for SPARK-11188
Repository: spark Updated Branches: refs/heads/branch-1.5 78a5cf198 -> 6b314fe9e [SPARK-11577][SQL] Handle code review comments for SPARK-11188 Handle the code review comments from Michael for SPARK-11188 Author: Dilip BiswalCloses #9551 from dilipbiswal/spark-11577-codereview-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b314fe9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b314fe9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b314fe9 Branch: refs/heads/branch-1.5 Commit: 6b314fe9ebda8837e25aa8d8249b57fe8f11751d Parents: 78a5cf1 Author: Dilip Biswal Authored: Mon Nov 9 09:55:14 2015 -0800 Committer: Reynold Xin Committed: Mon Nov 9 09:55:14 2015 -0800 -- .../spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala | 12 +--- .../spark/sql/hive/thriftserver/SparkSQLDriver.scala| 6 +++--- .../apache/spark/sql/hive/thriftserver/CliSuite.scala | 2 +- 3 files changed, 9 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6b314fe9/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 212bd2c..ea43e6f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -17,16 +17,13 @@ package org.apache.spark.sql.hive.thriftserver -import scala.collection.JavaConversions._ - import java.io._ import java.util.{ArrayList => JArrayList, Locale} -import org.apache.spark.sql.AnalysisException +import scala.collection.JavaConversions._ import jline.console.ConsoleReader import jline.console.history.FileHistory - import org.apache.commons.lang3.StringUtils import org.apache.commons.logging.LogFactory import org.apache.hadoop.conf.Configuration @@ -35,13 +32,14 @@ import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.exec.Utilities -import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, SetProcessor, CommandProcessor, CommandProcessorFactory} +import org.apache.hadoop.hive.ql.processors.{AddResourceProcessor, CommandProcessor, CommandProcessorFactory, SetProcessor} import org.apache.hadoop.hive.ql.session.SessionState import org.apache.thrift.transport.TSocket import org.apache.spark.Logging +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.util.{ShutdownHookManager, Utils} +import org.apache.spark.util.ShutdownHookManager /** * This code doesn't support remote connections in Hive 1.2+, as the underlying CliDriver @@ -313,7 +311,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { if (ret != 0) { // For analysis exception, only the error is printed out to the console. rc.getException() match { - case e : AnalysisException => + case e: AnalysisException => err.println(s"""Error in query: ${e.getMessage}""") case _ => err.println(rc.getErrorMessage()) } http://git-wip-us.apache.org/repos/asf/spark/blob/6b314fe9/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index e44fa5e..33fcf12 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.hive.thriftserver import java.util.{ArrayList => JArrayList, List => JList} -import org.apache.spark.sql.AnalysisException + +import scala.collection.JavaConversions._ import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema} @@ -26,10 +27,9 @@ import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse import
spark git commit: [SPARK-10565][CORE] add missing web UI stats to /api/v1/applications JSON
Repository: spark Updated Branches: refs/heads/master 9b88e1dca -> 08a7a836c [SPARK-10565][CORE] add missing web UI stats to /api/v1/applications JSON I looked at the other endpoints, and they don't seem to be missing any fields. Added fields: ![image](https://cloud.githubusercontent.com/assets/613879/10948801/58159982-82e4-11e5-86dc-62da201af910.png) Author: Charles YehCloses #9472 from CharlesYeh/api_vars. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08a7a836 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08a7a836 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08a7a836 Branch: refs/heads/master Commit: 08a7a836c393d6a62b9b216eeb01fad0b90b6c52 Parents: 9b88e1d Author: Charles Yeh Authored: Mon Nov 9 11:59:32 2015 -0600 Committer: Imran Rashid Committed: Mon Nov 9 11:59:32 2015 -0600 -- .../spark/deploy/master/ui/MasterWebUI.scala| 7 +- .../status/api/v1/ApplicationListResource.scala | 8 ++ .../org/apache/spark/status/api/v1/api.scala| 4 + .../scala/org/apache/spark/ui/SparkUI.scala | 4 + .../deploy/master/ui/MasterWebUISuite.scala | 90 project/MimaExcludes.scala | 3 + 6 files changed, 114 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/08a7a836/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 6174fc1..e41554a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -28,14 +28,17 @@ import org.apache.spark.ui.JettyUtils._ * Web UI server for the standalone master. */ private[master] -class MasterWebUI(val master: Master, requestedPort: Int) +class MasterWebUI( +val master: Master, +requestedPort: Int, +customMasterPage: Option[MasterPage] = None) extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging with UIRoot { val masterEndpointRef = master.self val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true) - val masterPage = new MasterPage(this) + val masterPage = customMasterPage.getOrElse(new MasterPage(this)) initialize() http://git-wip-us.apache.org/repos/asf/spark/blob/08a7a836/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala -- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index 17b521f..0fc0fb5 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -62,6 +62,10 @@ private[spark] object ApplicationsListResource { new ApplicationInfo( id = app.id, name = app.name, + coresGranted = None, + maxCores = None, + coresPerExecutor = None, + memoryPerExecutorMB = None, attempts = app.attempts.map { internalAttemptInfo => new ApplicationAttemptInfo( attemptId = internalAttemptInfo.attemptId, @@ -81,6 +85,10 @@ private[spark] object ApplicationsListResource { new ApplicationInfo( id = internal.id, name = internal.desc.name, + coresGranted = Some(internal.coresGranted), + maxCores = internal.desc.maxCores, + coresPerExecutor = internal.desc.coresPerExecutor, + memoryPerExecutorMB = Some(internal.desc.memoryPerExecutorMB), attempts = Seq(new ApplicationAttemptInfo( attemptId = None, startTime = new Date(internal.startTime), http://git-wip-us.apache.org/repos/asf/spark/blob/08a7a836/core/src/main/scala/org/apache/spark/status/api/v1/api.scala -- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 2bec64f..baddfc5 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -25,6 +25,10 @@ import org.apache.spark.JobExecutionStatus class ApplicationInfo private[spark]( val id: String, val name: String, +val coresGranted: Option[Int], +val maxCores: Option[Int], +val coresPerExecutor: Option[Int], +
spark git commit: [SPARK-11112] Fix Scala 2.11 compilation error in RDDInfo.scala
Repository: spark Updated Branches: refs/heads/master 08a7a836c -> 404a28f4e [SPARK-2] Fix Scala 2.11 compilation error in RDDInfo.scala As shown in https://amplab.cs.berkeley.edu/jenkins/view/Spark-QA-Compile/job/Spark-Master-Scala211-Compile/1946/console , compilation fails with: ``` [error] /home/jenkins/workspace/Spark-Master-Scala211-Compile/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala:25: in class RDDInfo, multiple overloaded alternatives of constructor RDDInfo define default arguments. [error] class RDDInfo( [error] ``` This PR tries to fix the compilation error Author: tedyuCloses #9538 from tedyu/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/404a28f4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/404a28f4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/404a28f4 Branch: refs/heads/master Commit: 404a28f4edd09cf17361dcbd770e4cafde51bf6d Parents: 08a7a83 Author: tedyu Authored: Mon Nov 9 10:07:58 2015 -0800 Committer: Andrew Or Committed: Mon Nov 9 10:07:58 2015 -0800 -- .../main/scala/org/apache/spark/storage/RDDInfo.scala | 12 +--- 1 file changed, 1 insertion(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/404a28f4/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 3fa209b..87c1b98 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -28,20 +28,10 @@ class RDDInfo( val numPartitions: Int, var storageLevel: StorageLevel, val parentIds: Seq[Int], -val callSite: CallSite, +val callSite: CallSite = CallSite.empty, val scope: Option[RDDOperationScope] = None) extends Ordered[RDDInfo] { - def this( - id: Int, - name: String, - numPartitions: Int, - storageLevel: StorageLevel, - parentIds: Seq[Int], - scope: Option[RDDOperationScope] = None) { -this(id, name, numPartitions, storageLevel, parentIds, CallSite.empty, scope) - } - var numCachedPartitions = 0 var memSize = 0L var diskSize = 0L - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11362] [SQL] Use Spark BitSet in BroadcastNestedLoopJoin
Repository: spark Updated Branches: refs/heads/branch-1.6 fc2942d12 -> 2946c85f5 [SPARK-11362] [SQL] Use Spark BitSet in BroadcastNestedLoopJoin JIRA: https://issues.apache.org/jira/browse/SPARK-11362 We use scala.collection.mutable.BitSet in BroadcastNestedLoopJoin now. We should use Spark's BitSet. Author: Liang-Chi HsiehCloses #9316 from viirya/use-spark-bitset. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2946c85f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2946c85f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2946c85f Branch: refs/heads/branch-1.6 Commit: 2946c85f5f48516637a6ce52ba9e31caf3c8ee3a Parents: fc2942d Author: Liang-Chi Hsieh Authored: Sat Nov 7 19:44:45 2015 -0800 Committer: Andrew Or Committed: Mon Nov 9 10:02:46 2015 -0800 -- .../execution/joins/BroadcastNestedLoopJoin.scala | 18 -- 1 file changed, 8 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2946c85f/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala index 05d20f5..aab177b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.util.collection.CompactBuffer +import org.apache.spark.util.collection.{BitSet, CompactBuffer} case class BroadcastNestedLoopJoin( @@ -95,9 +95,7 @@ case class BroadcastNestedLoopJoin( /** All rows that either match both-way, or rows from streamed joined with nulls. */ val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { streamedIter => val matchedRows = new CompactBuffer[InternalRow] - // TODO: Use Spark's BitSet. - val includedBroadcastTuples = -new scala.collection.mutable.BitSet(broadcastedRelation.value.size) + val includedBroadcastTuples = new BitSet(broadcastedRelation.value.size) val joinedRow = new JoinedRow val leftNulls = new GenericMutableRow(left.output.size) @@ -115,11 +113,11 @@ case class BroadcastNestedLoopJoin( case BuildRight if boundCondition(joinedRow(streamedRow, broadcastedRow)) => matchedRows += resultProj(joinedRow(streamedRow, broadcastedRow)).copy() streamRowMatched = true - includedBroadcastTuples += i + includedBroadcastTuples.set(i) case BuildLeft if boundCondition(joinedRow(broadcastedRow, streamedRow)) => matchedRows += resultProj(joinedRow(broadcastedRow, streamedRow)).copy() streamRowMatched = true - includedBroadcastTuples += i + includedBroadcastTuples.set(i) case _ => } i += 1 @@ -138,8 +136,8 @@ case class BroadcastNestedLoopJoin( val includedBroadcastTuples = matchesOrStreamedRowsWithNulls.map(_._2) val allIncludedBroadcastTuples = includedBroadcastTuples.fold( - new scala.collection.mutable.BitSet(broadcastedRelation.value.size) -)(_ ++ _) + new BitSet(broadcastedRelation.value.size) +)(_ | _) val leftNulls = new GenericMutableRow(left.output.size) val rightNulls = new GenericMutableRow(right.output.size) @@ -155,7 +153,7 @@ case class BroadcastNestedLoopJoin( val joinedRow = new JoinedRow joinedRow.withLeft(leftNulls) while (i < rel.length) { -if (!allIncludedBroadcastTuples.contains(i)) { +if (!allIncludedBroadcastTuples.get(i)) { buf += resultProj(joinedRow.withRight(rel(i))).copy() } i += 1 @@ -164,7 +162,7 @@ case class BroadcastNestedLoopJoin( val joinedRow = new JoinedRow joinedRow.withRight(rightNulls) while (i < rel.length) { -if (!allIncludedBroadcastTuples.contains(i)) { +if (!allIncludedBroadcastTuples.get(i)) { buf += resultProj(joinedRow.withLeft(rel(i))).copy() } i += 1 - To unsubscribe, e-mail:
spark git commit: [DOCS] Fix typo for Python section on unifying Kafka streams
Repository: spark Updated Branches: refs/heads/master cd174882a -> 874cd66d4 [DOCS] Fix typo for Python section on unifying Kafka streams 1) kafkaStreams is a list. The list should be unpacked when passing it into the streaming context union method, which accepts a variable number of streams. 2) print() should be pprint() for pyspark. This contribution is my original work, and I license the work to the project under the project's open source license. Author: chriskang90Closes #9545 from c-kang/streaming_python_typo. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/874cd66d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/874cd66d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/874cd66d Branch: refs/heads/master Commit: 874cd66d4b6d156d0ef112a3d0f3bc5683c6a0ec Parents: cd17488 Author: chriskang90 Authored: Mon Nov 9 19:39:22 2015 +0100 Committer: Sean Owen Committed: Mon Nov 9 19:39:22 2015 +0100 -- docs/streaming-programming-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/874cd66d/docs/streaming-programming-guide.md -- diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index c751dbb..e9a27f4 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1948,8 +1948,8 @@ unifiedStream.print(); {% highlight python %} numStreams = 5 kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)] -unifiedStream = streamingContext.union(kafkaStreams) -unifiedStream.print() +unifiedStream = streamingContext.union(*kafkaStreams) +unifiedStream.pprint() {% endhighlight %} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [DOCS] Fix typo for Python section on unifying Kafka streams
Repository: spark Updated Branches: refs/heads/branch-1.5 6b314fe9e -> a33fd737c [DOCS] Fix typo for Python section on unifying Kafka streams 1) kafkaStreams is a list. The list should be unpacked when passing it into the streaming context union method, which accepts a variable number of streams. 2) print() should be pprint() for pyspark. This contribution is my original work, and I license the work to the project under the project's open source license. Author: chriskang90Closes #9545 from c-kang/streaming_python_typo. (cherry picked from commit 874cd66d4b6d156d0ef112a3d0f3bc5683c6a0ec) Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a33fd737 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a33fd737 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a33fd737 Branch: refs/heads/branch-1.5 Commit: a33fd737cb5db7200bb8ebb080f729f85fcc7c47 Parents: 6b314fe Author: chriskang90 Authored: Mon Nov 9 19:39:22 2015 +0100 Committer: Sean Owen Committed: Mon Nov 9 19:39:45 2015 +0100 -- docs/streaming-programming-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a33fd737/docs/streaming-programming-guide.md -- diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index c751dbb..e9a27f4 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1948,8 +1948,8 @@ unifiedStream.print(); {% highlight python %} numStreams = 5 kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)] -unifiedStream = streamingContext.union(kafkaStreams) -unifiedStream.print() +unifiedStream = streamingContext.union(*kafkaStreams) +unifiedStream.pprint() {% endhighlight %} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11494][ML][R] Expose R-like summary statistics in SparkR::glm for linear regression
Repository: spark Updated Branches: refs/heads/master b541b3163 -> 8c0e1b50e [SPARK-11494][ML][R] Expose R-like summary statistics in SparkR::glm for linear regression Expose R-like summary statistics in SparkR::glm for linear regression, the output of ```summary``` like ```Java $DevianceResiduals MinMax -0.9509607 0.7291832 $Coefficients Estimate Std. Error t value Pr(>|t|) (Intercept)1.6765 0.2353597 7.123139 4.456124e-11 Sepal_Length 0.3498801 0.04630128 7.556598 4.187317e-12 Species_versicolor -0.9833885 0.07207471 -13.64402 0 Species_virginica -1.00751 0.09330565 -10.79796 0 ``` Author: Yanbo LiangCloses #9561 from yanboliang/spark-11494. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c0e1b50 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c0e1b50 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c0e1b50 Branch: refs/heads/master Commit: 8c0e1b50e960d3e8e51d0618c462eed2bb4936f0 Parents: b541b31 Author: Yanbo Liang Authored: Mon Nov 9 08:56:22 2015 -0800 Committer: Xiangrui Meng Committed: Mon Nov 9 08:56:22 2015 -0800 -- R/pkg/R/mllib.R | 22 +++-- R/pkg/inst/tests/test_mllib.R | 31 +--- .../org/apache/spark/ml/r/SparkRWrappers.scala | 50 ++-- 3 files changed, 88 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8c0e1b50/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index b0d73dd..7ff8597 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -91,12 +91,26 @@ setMethod("predict", signature(object = "PipelineModel"), #'} setMethod("summary", signature(x = "PipelineModel"), function(x, ...) { +modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getModelName", x@model) features <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "getModelFeatures", x@model) coefficients <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "getModelCoefficients", x@model) -coefficients <- as.matrix(unlist(coefficients)) -colnames(coefficients) <- c("Estimate") -rownames(coefficients) <- unlist(features) -return(list(coefficients = coefficients)) +if (modelName == "LinearRegressionModel") { + devianceResiduals <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getModelDevianceResiduals", x@model) + devianceResiduals <- matrix(devianceResiduals, nrow = 1) + colnames(devianceResiduals) <- c("Min", "Max") + rownames(devianceResiduals) <- rep("", times = 1) + coefficients <- matrix(coefficients, ncol = 4) + colnames(coefficients) <- c("Estimate", "Std. Error", "t value", "Pr(>|t|)") + rownames(coefficients) <- unlist(features) + return(list(DevianceResiduals = devianceResiduals, Coefficients = coefficients)) +} else { + coefficients <- as.matrix(unlist(coefficients)) + colnames(coefficients) <- c("Estimate") + rownames(coefficients) <- unlist(features) + return(list(coefficients = coefficients)) +} }) http://git-wip-us.apache.org/repos/asf/spark/blob/8c0e1b50/R/pkg/inst/tests/test_mllib.R -- diff --git a/R/pkg/inst/tests/test_mllib.R b/R/pkg/inst/tests/test_mllib.R index 4761e28..2606407 100644 --- a/R/pkg/inst/tests/test_mllib.R +++ b/R/pkg/inst/tests/test_mllib.R @@ -71,12 +71,23 @@ test_that("feature interaction vs native glm", { test_that("summary coefficients match with native glm", { training <- createDataFrame(sqlContext, iris) - stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training, solver = "l-bfgs")) - coefs <- as.vector(stats$coefficients) + stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training, solver = "normal")) + coefs <- unlist(stats$Coefficients) + devianceResiduals <- unlist(stats$DevianceResiduals) + rCoefs <- as.vector(coef(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))) - expect_true(all(abs(rCoefs - coefs) < 1e-6)) + rStdError <- c(0.23536, 0.04630, 0.07207, 0.09331) + rTValue <- c(7.123, 7.557, -13.644, -10.798) + rPValue <- c(0.0, 0.0, 0.0, 0.0) + rDevianceResiduals <- c(-0.95096, 0.72918) + +
spark git commit: [SPARK-11494][ML][R] Expose R-like summary statistics in SparkR::glm for linear regression
Repository: spark Updated Branches: refs/heads/branch-1.6 0f03bd13e -> 029e931da [SPARK-11494][ML][R] Expose R-like summary statistics in SparkR::glm for linear regression Expose R-like summary statistics in SparkR::glm for linear regression, the output of ```summary``` like ```Java $DevianceResiduals MinMax -0.9509607 0.7291832 $Coefficients Estimate Std. Error t value Pr(>|t|) (Intercept)1.6765 0.2353597 7.123139 4.456124e-11 Sepal_Length 0.3498801 0.04630128 7.556598 4.187317e-12 Species_versicolor -0.9833885 0.07207471 -13.64402 0 Species_virginica -1.00751 0.09330565 -10.79796 0 ``` Author: Yanbo LiangCloses #9561 from yanboliang/spark-11494. (cherry picked from commit 8c0e1b50e960d3e8e51d0618c462eed2bb4936f0) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/029e931d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/029e931d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/029e931d Branch: refs/heads/branch-1.6 Commit: 029e931dae82b9843ac0fe9348fe6f64ae6556db Parents: 0f03bd1 Author: Yanbo Liang Authored: Mon Nov 9 08:56:22 2015 -0800 Committer: Xiangrui Meng Committed: Mon Nov 9 08:56:32 2015 -0800 -- R/pkg/R/mllib.R | 22 +++-- R/pkg/inst/tests/test_mllib.R | 31 +--- .../org/apache/spark/ml/r/SparkRWrappers.scala | 50 ++-- 3 files changed, 88 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/029e931d/R/pkg/R/mllib.R -- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index b0d73dd..7ff8597 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -91,12 +91,26 @@ setMethod("predict", signature(object = "PipelineModel"), #'} setMethod("summary", signature(x = "PipelineModel"), function(x, ...) { +modelName <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getModelName", x@model) features <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "getModelFeatures", x@model) coefficients <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "getModelCoefficients", x@model) -coefficients <- as.matrix(unlist(coefficients)) -colnames(coefficients) <- c("Estimate") -rownames(coefficients) <- unlist(features) -return(list(coefficients = coefficients)) +if (modelName == "LinearRegressionModel") { + devianceResiduals <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getModelDevianceResiduals", x@model) + devianceResiduals <- matrix(devianceResiduals, nrow = 1) + colnames(devianceResiduals) <- c("Min", "Max") + rownames(devianceResiduals) <- rep("", times = 1) + coefficients <- matrix(coefficients, ncol = 4) + colnames(coefficients) <- c("Estimate", "Std. Error", "t value", "Pr(>|t|)") + rownames(coefficients) <- unlist(features) + return(list(DevianceResiduals = devianceResiduals, Coefficients = coefficients)) +} else { + coefficients <- as.matrix(unlist(coefficients)) + colnames(coefficients) <- c("Estimate") + rownames(coefficients) <- unlist(features) + return(list(coefficients = coefficients)) +} }) http://git-wip-us.apache.org/repos/asf/spark/blob/029e931d/R/pkg/inst/tests/test_mllib.R -- diff --git a/R/pkg/inst/tests/test_mllib.R b/R/pkg/inst/tests/test_mllib.R index 4761e28..2606407 100644 --- a/R/pkg/inst/tests/test_mllib.R +++ b/R/pkg/inst/tests/test_mllib.R @@ -71,12 +71,23 @@ test_that("feature interaction vs native glm", { test_that("summary coefficients match with native glm", { training <- createDataFrame(sqlContext, iris) - stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training, solver = "l-bfgs")) - coefs <- as.vector(stats$coefficients) + stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training, solver = "normal")) + coefs <- unlist(stats$Coefficients) + devianceResiduals <- unlist(stats$DevianceResiduals) + rCoefs <- as.vector(coef(glm(Sepal.Width ~ Sepal.Length + Species, data = iris))) - expect_true(all(abs(rCoefs - coefs) < 1e-6)) + rStdError <- c(0.23536, 0.04630, 0.07207, 0.09331) + rTValue <- c(7.123,
spark git commit: [SPARK-10689][ML][DOC] User guide and example code for AFTSurvivalRegression
Repository: spark Updated Branches: refs/heads/branch-1.6 029e931da -> a85a9122f [SPARK-10689][ML][DOC] User guide and example code for AFTSurvivalRegression Add user guide and example code for ```AFTSurvivalRegression```. Author: Yanbo LiangCloses #9491 from yanboliang/spark-10689. (cherry picked from commit d50a66cc04bfa1c483f04daffe465322316c745e) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a85a9122 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a85a9122 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a85a9122 Branch: refs/heads/branch-1.6 Commit: a85a9122fb9390aa589da0eb7eacdbf949662600 Parents: 029e931 Author: Yanbo Liang Authored: Mon Nov 9 08:57:29 2015 -0800 Committer: Xiangrui Meng Committed: Mon Nov 9 08:57:36 2015 -0800 -- docs/ml-guide.md| 1 + docs/ml-survival-regression.md | 96 .../ml/JavaAFTSurvivalRegressionExample.java| 71 +++ .../main/python/ml/aft_survival_regression.py | 51 +++ .../ml/AFTSurvivalRegressionExample.scala | 62 + 5 files changed, 281 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a85a9122/docs/ml-guide.md -- diff --git a/docs/ml-guide.md b/docs/ml-guide.md index fd3a616..c293e71 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -44,6 +44,7 @@ provide class probabilities, and linear models provide model summaries. * [Ensembles](ml-ensembles.html) * [Linear methods with elastic net regularization](ml-linear-methods.html) * [Multilayer perceptron classifier](ml-ann.html) +* [Survival Regression](ml-survival-regression.html) # Main concepts in Pipelines http://git-wip-us.apache.org/repos/asf/spark/blob/a85a9122/docs/ml-survival-regression.md -- diff --git a/docs/ml-survival-regression.md b/docs/ml-survival-regression.md new file mode 100644 index 000..ab27521 --- /dev/null +++ b/docs/ml-survival-regression.md @@ -0,0 +1,96 @@ +--- +layout: global +title: Survival Regression - ML +displayTitle: ML - Survival Regression +--- + + +`\[ +\newcommand{\R}{\mathbb{R}} +\newcommand{\E}{\mathbb{E}} +\newcommand{\x}{\mathbf{x}} +\newcommand{\y}{\mathbf{y}} +\newcommand{\wv}{\mathbf{w}} +\newcommand{\av}{\mathbf{\alpha}} +\newcommand{\bv}{\mathbf{b}} +\newcommand{\N}{\mathbb{N}} +\newcommand{\id}{\mathbf{I}} +\newcommand{\ind}{\mathbf{1}} +\newcommand{\0}{\mathbf{0}} +\newcommand{\unit}{\mathbf{e}} +\newcommand{\one}{\mathbf{1}} +\newcommand{\zero}{\mathbf{0}} +\]` + + +In `spark.ml`, we implement the [Accelerated failure time (AFT)](https://en.wikipedia.org/wiki/Accelerated_failure_time_model) +model which is a parametric survival regression model for censored data. +It describes a model for the log of survival time, so it's often called +log-linear model for survival analysis. Different from +[Proportional hazards](https://en.wikipedia.org/wiki/Proportional_hazards_model) model +designed for the same purpose, the AFT model is more easily to parallelize +because each instance contribute to the objective function independently. + +Given the values of the covariates $x^{'}$, for random lifetime $t_{i}$ of +subjects i = 1, ..., n, with possible right-censoring, +the likelihood function under the AFT model is given as: +`\[ +L(\beta,\sigma)=\prod_{i=1}^n[\frac{1}{\sigma}f_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})]^{\delta_{i}}S_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})^{1-\delta_{i}} +\]` +Where $\delta_{i}$ is the indicator of the event has occurred i.e. uncensored or not. +Using $\epsilon_{i}=\frac{\log{t_{i}}-x^{'}\beta}{\sigma}$, the log-likelihood function +assumes the form: +`\[ +\iota(\beta,\sigma)=\sum_{i=1}^{n}[-\delta_{i}\log\sigma+\delta_{i}\log{f_{0}}(\epsilon_{i})+(1-\delta_{i})\log{S_{0}(\epsilon_{i})}] +\]` +Where $S_{0}(\epsilon_{i})$ is the baseline survivor function, +and $f_{0}(\epsilon_{i})$ is corresponding density function. + +The most commonly used AFT model is based on the Weibull distribution of the survival time. +The Weibull distribution for lifetime corresponding to extreme value distribution for +log of the lifetime, and the $S_{0}(\epsilon)$ function is: +`\[ +S_{0}(\epsilon_{i})=\exp(-e^{\epsilon_{i}}) +\]` +the $f_{0}(\epsilon_{i})$ function is: +`\[ +f_{0}(\epsilon_{i})=e^{\epsilon_{i}}\exp(-e^{\epsilon_{i}}) +\]` +The log-likelihood function for AFT model with Weibull distribution of lifetime is: +`\[ +\iota(\beta,\sigma)=
spark git commit: [SPARK-10689][ML][DOC] User guide and example code for AFTSurvivalRegression
Repository: spark Updated Branches: refs/heads/master 8c0e1b50e -> d50a66cc0 [SPARK-10689][ML][DOC] User guide and example code for AFTSurvivalRegression Add user guide and example code for ```AFTSurvivalRegression```. Author: Yanbo LiangCloses #9491 from yanboliang/spark-10689. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d50a66cc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d50a66cc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d50a66cc Branch: refs/heads/master Commit: d50a66cc04bfa1c483f04daffe465322316c745e Parents: 8c0e1b5 Author: Yanbo Liang Authored: Mon Nov 9 08:57:29 2015 -0800 Committer: Xiangrui Meng Committed: Mon Nov 9 08:57:29 2015 -0800 -- docs/ml-guide.md| 1 + docs/ml-survival-regression.md | 96 .../ml/JavaAFTSurvivalRegressionExample.java| 71 +++ .../main/python/ml/aft_survival_regression.py | 51 +++ .../ml/AFTSurvivalRegressionExample.scala | 62 + 5 files changed, 281 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d50a66cc/docs/ml-guide.md -- diff --git a/docs/ml-guide.md b/docs/ml-guide.md index fd3a616..c293e71 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -44,6 +44,7 @@ provide class probabilities, and linear models provide model summaries. * [Ensembles](ml-ensembles.html) * [Linear methods with elastic net regularization](ml-linear-methods.html) * [Multilayer perceptron classifier](ml-ann.html) +* [Survival Regression](ml-survival-regression.html) # Main concepts in Pipelines http://git-wip-us.apache.org/repos/asf/spark/blob/d50a66cc/docs/ml-survival-regression.md -- diff --git a/docs/ml-survival-regression.md b/docs/ml-survival-regression.md new file mode 100644 index 000..ab27521 --- /dev/null +++ b/docs/ml-survival-regression.md @@ -0,0 +1,96 @@ +--- +layout: global +title: Survival Regression - ML +displayTitle: ML - Survival Regression +--- + + +`\[ +\newcommand{\R}{\mathbb{R}} +\newcommand{\E}{\mathbb{E}} +\newcommand{\x}{\mathbf{x}} +\newcommand{\y}{\mathbf{y}} +\newcommand{\wv}{\mathbf{w}} +\newcommand{\av}{\mathbf{\alpha}} +\newcommand{\bv}{\mathbf{b}} +\newcommand{\N}{\mathbb{N}} +\newcommand{\id}{\mathbf{I}} +\newcommand{\ind}{\mathbf{1}} +\newcommand{\0}{\mathbf{0}} +\newcommand{\unit}{\mathbf{e}} +\newcommand{\one}{\mathbf{1}} +\newcommand{\zero}{\mathbf{0}} +\]` + + +In `spark.ml`, we implement the [Accelerated failure time (AFT)](https://en.wikipedia.org/wiki/Accelerated_failure_time_model) +model which is a parametric survival regression model for censored data. +It describes a model for the log of survival time, so it's often called +log-linear model for survival analysis. Different from +[Proportional hazards](https://en.wikipedia.org/wiki/Proportional_hazards_model) model +designed for the same purpose, the AFT model is more easily to parallelize +because each instance contribute to the objective function independently. + +Given the values of the covariates $x^{'}$, for random lifetime $t_{i}$ of +subjects i = 1, ..., n, with possible right-censoring, +the likelihood function under the AFT model is given as: +`\[ +L(\beta,\sigma)=\prod_{i=1}^n[\frac{1}{\sigma}f_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})]^{\delta_{i}}S_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})^{1-\delta_{i}} +\]` +Where $\delta_{i}$ is the indicator of the event has occurred i.e. uncensored or not. +Using $\epsilon_{i}=\frac{\log{t_{i}}-x^{'}\beta}{\sigma}$, the log-likelihood function +assumes the form: +`\[ +\iota(\beta,\sigma)=\sum_{i=1}^{n}[-\delta_{i}\log\sigma+\delta_{i}\log{f_{0}}(\epsilon_{i})+(1-\delta_{i})\log{S_{0}(\epsilon_{i})}] +\]` +Where $S_{0}(\epsilon_{i})$ is the baseline survivor function, +and $f_{0}(\epsilon_{i})$ is corresponding density function. + +The most commonly used AFT model is based on the Weibull distribution of the survival time. +The Weibull distribution for lifetime corresponding to extreme value distribution for +log of the lifetime, and the $S_{0}(\epsilon)$ function is: +`\[ +S_{0}(\epsilon_{i})=\exp(-e^{\epsilon_{i}}) +\]` +the $f_{0}(\epsilon_{i})$ function is: +`\[ +f_{0}(\epsilon_{i})=e^{\epsilon_{i}}\exp(-e^{\epsilon_{i}}) +\]` +The log-likelihood function for AFT model with Weibull distribution of lifetime is: +`\[ +\iota(\beta,\sigma)= -\sum_{i=1}^n[\delta_{i}\log\sigma-\delta_{i}\epsilon_{i}+e^{\epsilon_{i}}] +\]` +Due to minimizing the negative log-likelihood equivalent to maximum a posteriori probability, +the loss function we use
spark git commit: [SPARK-11582][MLLIB] specifying pmml version attribute =4.2 in the root node of pmml model
Repository: spark Updated Branches: refs/heads/master d50a66cc0 -> 9b88e1dca [SPARK-11582][MLLIB] specifying pmml version attribute =4.2 in the root node of pmml model The current pmml models generated do not specify the pmml version in its root node. This is a problem when using this pmml model in other tools because they expect the version attribute to be set explicitly. This fix adds the pmml version attribute to the generated pmml models and specifies its value as 4.2. Author: fazlan-nazeemCloses #9558 from fazlan-nazeem/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b88e1dc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b88e1dc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b88e1dc Branch: refs/heads/master Commit: 9b88e1dcad6b5b14a22cf64a1055ad9870507b5a Parents: d50a66c Author: fazlan-nazeem Authored: Mon Nov 9 08:58:55 2015 -0800 Committer: Xiangrui Meng Committed: Mon Nov 9 08:58:55 2015 -0800 -- .../scala/org/apache/spark/mllib/pmml/export/PMMLModelExport.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9b88e1dc/mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLModelExport.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLModelExport.scala b/mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLModelExport.scala index c5fdecd..9267e6d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLModelExport.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLModelExport.scala @@ -32,6 +32,7 @@ private[mllib] trait PMMLModelExport { @BeanProperty val pmml: PMML = new PMML + pmml.setVersion("4.2") setHeader(pmml) private def setHeader(pmml: PMML): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11582][MLLIB] specifying pmml version attribute =4.2 in the root node of pmml model
Repository: spark Updated Branches: refs/heads/branch-1.6 a85a9122f -> a91d21314 [SPARK-11582][MLLIB] specifying pmml version attribute =4.2 in the root node of pmml model The current pmml models generated do not specify the pmml version in its root node. This is a problem when using this pmml model in other tools because they expect the version attribute to be set explicitly. This fix adds the pmml version attribute to the generated pmml models and specifies its value as 4.2. Author: fazlan-nazeemCloses #9558 from fazlan-nazeem/master. (cherry picked from commit 9b88e1dcad6b5b14a22cf64a1055ad9870507b5a) Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a91d2131 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a91d2131 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a91d2131 Branch: refs/heads/branch-1.6 Commit: a91d213146d2dde82710c55a630a3c79f6b7af41 Parents: a85a912 Author: fazlan-nazeem Authored: Mon Nov 9 08:58:55 2015 -0800 Committer: Xiangrui Meng Committed: Mon Nov 9 08:59:02 2015 -0800 -- .../scala/org/apache/spark/mllib/pmml/export/PMMLModelExport.scala | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a91d2131/mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLModelExport.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLModelExport.scala b/mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLModelExport.scala index c5fdecd..9267e6d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLModelExport.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/pmml/export/PMMLModelExport.scala @@ -32,6 +32,7 @@ private[mllib] trait PMMLModelExport { @BeanProperty val pmml: PMML = new PMML + pmml.setVersion("4.2") setHeader(pmml) private def setHeader(pmml: PMML): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11581][DOCS] Example mllib code in documentation incorrectly computes MSE
Repository: spark Updated Branches: refs/heads/branch-1.5 a33fd737c -> 0512960fc [SPARK-11581][DOCS] Example mllib code in documentation incorrectly computes MSE Author: Bharat LalCloses #9560 from bharatl/SPARK-11581. (cherry picked from commit 860ea0d386b5fbbe26bf2954f402a9a73ad37edc) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0512960f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0512960f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0512960f Branch: refs/heads/branch-1.5 Commit: 0512960fc7e9a5b6c2e40e17ce50aaba07f31125 Parents: a33fd73 Author: Bharat Lal Authored: Mon Nov 9 11:33:01 2015 -0800 Committer: Joseph K. Bradley Committed: Mon Nov 9 11:34:13 2015 -0800 -- docs/mllib-decision-tree.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0512960f/docs/mllib-decision-tree.md -- diff --git a/docs/mllib-decision-tree.md b/docs/mllib-decision-tree.md index c1d0f8a..dfcc413 100644 --- a/docs/mllib-decision-tree.md +++ b/docs/mllib-decision-tree.md @@ -430,7 +430,7 @@ Double testMSE = public Double call(Double a, Double b) { return a + b; } - }) / data.count(); + }) / testData.count(); System.out.println("Test Mean Squared Error: " + testMSE); System.out.println("Learned regression tree model:\n" + model.toDebugString()); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11581][DOCS] Example mllib code in documentation incorrectly computes MSE
Repository: spark Updated Branches: refs/heads/branch-1.6 006d73a74 -> 62f664c5a [SPARK-11581][DOCS] Example mllib code in documentation incorrectly computes MSE Author: Bharat LalCloses #9560 from bharatl/SPARK-11581. (cherry picked from commit 860ea0d386b5fbbe26bf2954f402a9a73ad37edc) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62f664c5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62f664c5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62f664c5 Branch: refs/heads/branch-1.6 Commit: 62f664c5a647786af5ff2d83f99d48a4bf0b8665 Parents: 006d73a Author: Bharat Lal Authored: Mon Nov 9 11:33:01 2015 -0800 Committer: Joseph K. Bradley Committed: Mon Nov 9 11:33:29 2015 -0800 -- docs/mllib-decision-tree.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/62f664c5/docs/mllib-decision-tree.md -- diff --git a/docs/mllib-decision-tree.md b/docs/mllib-decision-tree.md index f31c4f8..b5b454b 100644 --- a/docs/mllib-decision-tree.md +++ b/docs/mllib-decision-tree.md @@ -439,7 +439,7 @@ Double testMSE = public Double call(Double a, Double b) { return a + b; } - }) / data.count(); + }) / testData.count(); System.out.println("Test Mean Squared Error: " + testMSE); System.out.println("Learned regression tree model:\n" + model.toDebugString()); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11581][DOCS] Example mllib code in documentation incorrectly computes MSE
Repository: spark Updated Branches: refs/heads/branch-1.4 4f98014b9 -> 72ab06e8a [SPARK-11581][DOCS] Example mllib code in documentation incorrectly computes MSE Author: Bharat LalCloses #9560 from bharatl/SPARK-11581. (cherry picked from commit 860ea0d386b5fbbe26bf2954f402a9a73ad37edc) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72ab06e8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72ab06e8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72ab06e8 Branch: refs/heads/branch-1.4 Commit: 72ab06e8a512a267f5779c6759b6e370c15b9a87 Parents: 4f98014 Author: Bharat Lal Authored: Mon Nov 9 11:33:01 2015 -0800 Committer: Joseph K. Bradley Committed: Mon Nov 9 11:34:36 2015 -0800 -- docs/mllib-decision-tree.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/72ab06e8/docs/mllib-decision-tree.md -- diff --git a/docs/mllib-decision-tree.md b/docs/mllib-decision-tree.md index c1d0f8a..dfcc413 100644 --- a/docs/mllib-decision-tree.md +++ b/docs/mllib-decision-tree.md @@ -430,7 +430,7 @@ Double testMSE = public Double call(Double a, Double b) { return a + b; } - }) / data.count(); + }) / testData.count(); System.out.println("Test Mean Squared Error: " + testMSE); System.out.println("Learned regression tree model:\n" + model.toDebugString()); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11581][DOCS] Example mllib code in documentation incorrectly computes MSE
Repository: spark Updated Branches: refs/heads/master 874cd66d4 -> 860ea0d38 [SPARK-11581][DOCS] Example mllib code in documentation incorrectly computes MSE Author: Bharat LalCloses #9560 from bharatl/SPARK-11581. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/860ea0d3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/860ea0d3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/860ea0d3 Branch: refs/heads/master Commit: 860ea0d386b5fbbe26bf2954f402a9a73ad37edc Parents: 874cd66 Author: Bharat Lal Authored: Mon Nov 9 11:33:01 2015 -0800 Committer: Joseph K. Bradley Committed: Mon Nov 9 11:33:01 2015 -0800 -- docs/mllib-decision-tree.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/860ea0d3/docs/mllib-decision-tree.md -- diff --git a/docs/mllib-decision-tree.md b/docs/mllib-decision-tree.md index f31c4f8..b5b454b 100644 --- a/docs/mllib-decision-tree.md +++ b/docs/mllib-decision-tree.md @@ -439,7 +439,7 @@ Double testMSE = public Double call(Double a, Double b) { return a + b; } - }) / data.count(); + }) / testData.count(); System.out.println("Test Mean Squared Error: " + testMSE); System.out.println("Learned regression tree model:\n" + model.toDebugString()); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11599] [SQL] fix NPE when resolve Hive UDF in SQLParser
Repository: spark Updated Branches: refs/heads/master c4e19b381 -> d6cd3a18e [SPARK-11599] [SQL] fix NPE when resolve Hive UDF in SQLParser The DataFrame APIs that takes a SQL expression always use SQLParser, then the HiveFunctionRegistry will called outside of Hive state, cause NPE if there is not a active Session State for current thread (in PySpark). cc rxin yhuai Author: Davies LiuCloses #9576 from davies/hive_udf. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6cd3a18 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6cd3a18 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6cd3a18 Branch: refs/heads/master Commit: d6cd3a18e720e8f6f1f307e0dffad3512952d997 Parents: c4e19b3 Author: Davies Liu Authored: Mon Nov 9 23:27:36 2015 -0800 Committer: Davies Liu Committed: Mon Nov 9 23:27:36 2015 -0800 -- .../org/apache/spark/sql/hive/HiveContext.scala | 10 +- .../sql/hive/execution/HiveQuerySuite.scala | 33 +++- 2 files changed, 34 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d6cd3a18/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 2d72b95..c5f6965 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -454,7 +454,15 @@ class HiveContext private[hive]( // Note that HiveUDFs will be overridden by functions registered in this context. @transient override protected[sql] lazy val functionRegistry: FunctionRegistry = -new HiveFunctionRegistry(FunctionRegistry.builtin.copy()) +new HiveFunctionRegistry(FunctionRegistry.builtin.copy()) { + override def lookupFunction(name: String, children: Seq[Expression]): Expression = { +// Hive Registry need current database to lookup function +// TODO: the current database of executionHive should be consistent with metadataHive +executionHive.withHiveState { + super.lookupFunction(name, children) +} + } +} // The Hive UDF current_database() is foldable, will be evaluated by optimizer, but the optimizer // can't access the SessionState of metadataHive. http://git-wip-us.apache.org/repos/asf/spark/blob/d6cd3a18/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 78378c8..f0a7a6c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -20,22 +20,19 @@ package org.apache.spark.sql.hive.execution import java.io.File import java.util.{Locale, TimeZone} -import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin - import scala.util.Try -import org.scalatest.BeforeAndAfter - import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkFiles, SparkException} -import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin import org.apache.spark.sql.hive._ -import org.apache.spark.sql.hive.test.TestHiveContext -import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row} +import org.apache.spark.{SparkException, SparkFiles} case class TestData(a: Int, b: String) @@ -1237,6 +1234,26 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { } + test("lookup hive UDF in another thread") { +val e = intercept[AnalysisException] { + range(1).selectExpr("not_a_udf()") +} +assert(e.getMessage.contains("undefined function not_a_udf")) +var success = false +val t = new Thread("test") { + override def run(): Unit = { +val e = intercept[AnalysisException] { + range(1).selectExpr("not_a_udf()") +} +assert(e.getMessage.contains("undefined function not_a_udf")) +success =
spark git commit: [SPARK-11599] [SQL] fix NPE when resolve Hive UDF in SQLParser
Repository: spark Updated Branches: refs/heads/branch-1.6 a5651f0a5 -> b426d24db [SPARK-11599] [SQL] fix NPE when resolve Hive UDF in SQLParser The DataFrame APIs that takes a SQL expression always use SQLParser, then the HiveFunctionRegistry will called outside of Hive state, cause NPE if there is not a active Session State for current thread (in PySpark). cc rxin yhuai Author: Davies LiuCloses #9576 from davies/hive_udf. (cherry picked from commit d6cd3a18e720e8f6f1f307e0dffad3512952d997) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b426d24d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b426d24d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b426d24d Branch: refs/heads/branch-1.6 Commit: b426d24dbabd526fdd9982ed7e3239079923549f Parents: a5651f0 Author: Davies Liu Authored: Mon Nov 9 23:27:36 2015 -0800 Committer: Davies Liu Committed: Mon Nov 9 23:28:11 2015 -0800 -- .../org/apache/spark/sql/hive/HiveContext.scala | 10 +- .../sql/hive/execution/HiveQuerySuite.scala | 33 +++- 2 files changed, 34 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b426d24d/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 2d72b95..c5f6965 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -454,7 +454,15 @@ class HiveContext private[hive]( // Note that HiveUDFs will be overridden by functions registered in this context. @transient override protected[sql] lazy val functionRegistry: FunctionRegistry = -new HiveFunctionRegistry(FunctionRegistry.builtin.copy()) +new HiveFunctionRegistry(FunctionRegistry.builtin.copy()) { + override def lookupFunction(name: String, children: Seq[Expression]): Expression = { +// Hive Registry need current database to lookup function +// TODO: the current database of executionHive should be consistent with metadataHive +executionHive.withHiveState { + super.lookupFunction(name, children) +} + } +} // The Hive UDF current_database() is foldable, will be evaluated by optimizer, but the optimizer // can't access the SessionState of metadataHive. http://git-wip-us.apache.org/repos/asf/spark/blob/b426d24d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 78378c8..f0a7a6c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -20,22 +20,19 @@ package org.apache.spark.sql.hive.execution import java.io.File import java.util.{Locale, TimeZone} -import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin - import scala.util.Try -import org.scalatest.BeforeAndAfter - import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkFiles, SparkException} -import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin import org.apache.spark.sql.hive._ -import org.apache.spark.sql.hive.test.TestHiveContext -import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row} +import org.apache.spark.{SparkException, SparkFiles} case class TestData(a: Int, b: String) @@ -1237,6 +1234,26 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { } + test("lookup hive UDF in another thread") { +val e = intercept[AnalysisException] { + range(1).selectExpr("not_a_udf()") +} +assert(e.getMessage.contains("undefined function not_a_udf")) +var success = false +val t = new Thread("test") { + override def run(): Unit = { +val e = intercept[AnalysisException] { +
spark git commit: [SPARK-11598] [SQL] enable tests for ShuffledHashOuterJoin
Repository: spark Updated Branches: refs/heads/master d6cd3a18e -> 521b3cae1 [SPARK-11598] [SQL] enable tests for ShuffledHashOuterJoin Author: Davies LiuCloses #9573 from davies/join_condition. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/521b3cae Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/521b3cae Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/521b3cae Branch: refs/heads/master Commit: 521b3cae118d1e22c170e2aad43f9baa162db55e Parents: d6cd3a1 Author: Davies Liu Authored: Mon Nov 9 23:28:32 2015 -0800 Committer: Davies Liu Committed: Mon Nov 9 23:28:32 2015 -0800 -- .../scala/org/apache/spark/sql/JoinSuite.scala | 435 ++- 1 file changed, 231 insertions(+), 204 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/521b3cae/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index a9ca46c..3f3b837 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -237,214 +237,241 @@ class JoinSuite extends QueryTest with SharedSQLContext { Row(2, 2, 2, 2) :: Nil) } - test("left outer join") { -checkAnswer( - upperCaseData.join(lowerCaseData, $"n" === $"N", "left"), - Row(1, "A", 1, "a") :: -Row(2, "B", 2, "b") :: -Row(3, "C", 3, "c") :: -Row(4, "D", 4, "d") :: -Row(5, "E", null, null) :: -Row(6, "F", null, null) :: Nil) - -checkAnswer( - upperCaseData.join(lowerCaseData, $"n" === $"N" && $"n" > 1, "left"), - Row(1, "A", null, null) :: -Row(2, "B", 2, "b") :: -Row(3, "C", 3, "c") :: -Row(4, "D", 4, "d") :: -Row(5, "E", null, null) :: -Row(6, "F", null, null) :: Nil) - -checkAnswer( - upperCaseData.join(lowerCaseData, $"n" === $"N" && $"N" > 1, "left"), - Row(1, "A", null, null) :: -Row(2, "B", 2, "b") :: -Row(3, "C", 3, "c") :: -Row(4, "D", 4, "d") :: -Row(5, "E", null, null) :: -Row(6, "F", null, null) :: Nil) - -checkAnswer( - upperCaseData.join(lowerCaseData, $"n" === $"N" && $"l" > $"L", "left"), - Row(1, "A", 1, "a") :: -Row(2, "B", 2, "b") :: -Row(3, "C", 3, "c") :: -Row(4, "D", 4, "d") :: -Row(5, "E", null, null) :: -Row(6, "F", null, null) :: Nil) - -// Make sure we are choosing left.outputPartitioning as the -// outputPartitioning for the outer join operator. -checkAnswer( - sql( -""" - |SELECT l.N, count(*) - |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a) - |GROUP BY l.N -""".stripMargin), - Row(1, 1) :: -Row(2, 1) :: -Row(3, 1) :: -Row(4, 1) :: -Row(5, 1) :: -Row(6, 1) :: Nil) - -checkAnswer( - sql( -""" - |SELECT r.a, count(*) - |FROM upperCaseData l LEFT OUTER JOIN allNulls r ON (l.N = r.a) - |GROUP BY r.a -""".stripMargin), - Row(null, 6) :: Nil) - } + def test_outer_join(useSMJ: Boolean): Unit = { + +val algo = if (useSMJ) "SortMergeOuterJoin" else "ShuffledHashOuterJoin" + +test("left outer join: " + algo) { + withSQLConf(SQLConf.SORTMERGE_JOIN.key -> useSMJ.toString) { + +checkAnswer( + upperCaseData.join(lowerCaseData, $"n" === $"N", "left"), + Row(1, "A", 1, "a") :: +Row(2, "B", 2, "b") :: +Row(3, "C", 3, "c") :: +Row(4, "D", 4, "d") :: +Row(5, "E", null, null) :: +Row(6, "F", null, null) :: Nil) + +checkAnswer( + upperCaseData.join(lowerCaseData, $"n" === $"N" && $"n" > 1, "left"), + Row(1, "A", null, null) :: +Row(2, "B", 2, "b") :: +Row(3, "C", 3, "c") :: +Row(4, "D", 4, "d") :: +Row(5, "E", null, null) :: +Row(6, "F", null, null) :: Nil) + +checkAnswer( + upperCaseData.join(lowerCaseData, $"n" === $"N" && $"N" > 1, "left"), + Row(1, "A", null, null) :: +Row(2, "B", 2, "b") :: +Row(3, "C", 3, "c") :: +Row(4, "D", 4, "d") :: +Row(5, "E", null, null) :: +Row(6, "F", null, null) :: Nil) + +checkAnswer( + upperCaseData.join(lowerCaseData, $"n" === $"N" && $"l" > $"L", "left"), + Row(1, "A", 1, "a") :: +Row(2, "B", 2, "b") :: +Row(3, "C",
spark git commit: [SPARK-11141][STREAMING] Batch ReceivedBlockTrackerLogEvents for WAL writes
Repository: spark Updated Branches: refs/heads/master 26062d226 -> 0ce6f9b2d [SPARK-11141][STREAMING] Batch ReceivedBlockTrackerLogEvents for WAL writes When using S3 as a directory for WALs, the writes take too long. The driver gets very easily bottlenecked when multiple receivers send AddBlock events to the ReceiverTracker. This PR adds batching of events in the ReceivedBlockTracker so that receivers don't get blocked by the driver for too long. cc zsxwing tdas Author: Burak YavuzCloses #9143 from brkyvz/batch-wal-writes. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ce6f9b2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ce6f9b2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ce6f9b2 Branch: refs/heads/master Commit: 0ce6f9b2d203ce67aeb4d3aedf19bbd997fe01b9 Parents: 26062d2 Author: Burak Yavuz Authored: Mon Nov 9 17:35:12 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 9 17:35:12 2015 -0800 -- .../scheduler/ReceivedBlockTracker.scala| 62 ++- .../streaming/scheduler/ReceiverTracker.scala | 25 +- .../streaming/util/BatchedWriteAheadLog.scala | 223 .../streaming/util/WriteAheadLogUtils.scala | 21 +- .../streaming/util/WriteAheadLogSuite.scala | 506 +-- .../util/WriteAheadLogUtilsSuite.scala | 122 + 6 files changed, 767 insertions(+), 192 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0ce6f9b2/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index f2711d1..500dc70 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -22,12 +22,13 @@ import java.nio.ByteBuffer import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.implicitConversions +import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.streaming.Time -import org.apache.spark.streaming.util.{WriteAheadLog, WriteAheadLogUtils} +import org.apache.spark.streaming.util.{BatchedWriteAheadLog, WriteAheadLog, WriteAheadLogUtils} import org.apache.spark.util.{Clock, Utils} import org.apache.spark.{Logging, SparkConf} @@ -41,7 +42,6 @@ private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: private[streaming] case class BatchCleanupEvent(times: Seq[Time]) extends ReceivedBlockTrackerLogEvent - /** Class representing the blocks of all the streams allocated to a batch */ private[streaming] case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) { @@ -82,15 +82,22 @@ private[streaming] class ReceivedBlockTracker( } /** Add received block. This event will get written to the write ahead log (if enabled). */ - def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized { + def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { try { - writeToLog(BlockAdditionEvent(receivedBlockInfo)) - getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo - logDebug(s"Stream ${receivedBlockInfo.streamId} received " + -s"block ${receivedBlockInfo.blockStoreResult.blockId}") - true + val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) + if (writeResult) { +synchronized { + getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo +} +logDebug(s"Stream ${receivedBlockInfo.streamId} received " + + s"block ${receivedBlockInfo.blockStoreResult.blockId}") + } else { +logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " + + s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.") + } + writeResult } catch { - case e: Exception => + case NonFatal(e) => logError(s"Error adding block $receivedBlockInfo", e) false } @@ -106,10 +113,12 @@ private[streaming] class ReceivedBlockTracker( (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) - writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks)) - timeToAllocatedBlocks(batchTime) = allocatedBlocks -
spark git commit: [SPARK-11141][STREAMING] Batch ReceivedBlockTrackerLogEvents for WAL writes
Repository: spark Updated Branches: refs/heads/branch-1.6 116b7158f -> dccc4645d [SPARK-11141][STREAMING] Batch ReceivedBlockTrackerLogEvents for WAL writes When using S3 as a directory for WALs, the writes take too long. The driver gets very easily bottlenecked when multiple receivers send AddBlock events to the ReceiverTracker. This PR adds batching of events in the ReceivedBlockTracker so that receivers don't get blocked by the driver for too long. cc zsxwing tdas Author: Burak YavuzCloses #9143 from brkyvz/batch-wal-writes. (cherry picked from commit 0ce6f9b2d203ce67aeb4d3aedf19bbd997fe01b9) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dccc4645 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dccc4645 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dccc4645 Branch: refs/heads/branch-1.6 Commit: dccc4645df629f35c4788d50b2c0a6ab381db4b7 Parents: 116b715 Author: Burak Yavuz Authored: Mon Nov 9 17:35:12 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 9 17:35:22 2015 -0800 -- .../scheduler/ReceivedBlockTracker.scala| 62 ++- .../streaming/scheduler/ReceiverTracker.scala | 25 +- .../streaming/util/BatchedWriteAheadLog.scala | 223 .../streaming/util/WriteAheadLogUtils.scala | 21 +- .../streaming/util/WriteAheadLogSuite.scala | 506 +-- .../util/WriteAheadLogUtilsSuite.scala | 122 + 6 files changed, 767 insertions(+), 192 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dccc4645/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index f2711d1..500dc70 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -22,12 +22,13 @@ import java.nio.ByteBuffer import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.implicitConversions +import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.streaming.Time -import org.apache.spark.streaming.util.{WriteAheadLog, WriteAheadLogUtils} +import org.apache.spark.streaming.util.{BatchedWriteAheadLog, WriteAheadLog, WriteAheadLogUtils} import org.apache.spark.util.{Clock, Utils} import org.apache.spark.{Logging, SparkConf} @@ -41,7 +42,6 @@ private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: private[streaming] case class BatchCleanupEvent(times: Seq[Time]) extends ReceivedBlockTrackerLogEvent - /** Class representing the blocks of all the streams allocated to a batch */ private[streaming] case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) { @@ -82,15 +82,22 @@ private[streaming] class ReceivedBlockTracker( } /** Add received block. This event will get written to the write ahead log (if enabled). */ - def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized { + def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { try { - writeToLog(BlockAdditionEvent(receivedBlockInfo)) - getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo - logDebug(s"Stream ${receivedBlockInfo.streamId} received " + -s"block ${receivedBlockInfo.blockStoreResult.blockId}") - true + val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) + if (writeResult) { +synchronized { + getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo +} +logDebug(s"Stream ${receivedBlockInfo.streamId} received " + + s"block ${receivedBlockInfo.blockStoreResult.blockId}") + } else { +logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " + + s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.") + } + writeResult } catch { - case e: Exception => + case NonFatal(e) => logError(s"Error adding block $receivedBlockInfo", e) false } @@ -106,10 +113,12 @@ private[streaming] class ReceivedBlockTracker( (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) }.toMap val allocatedBlocks =
spark git commit: [SPARK-11462][STREAMING] Add JavaStreamingListener
Repository: spark Updated Branches: refs/heads/master 0ce6f9b2d -> 1f0f14efe [SPARK-11462][STREAMING] Add JavaStreamingListener Currently, StreamingListener is not Java friendly because it exposes some Scala collections to Java users directly, such as Option, Map. This PR added a Java version of StreamingListener and a bunch of Java friendly classes for Java users. Author: zsxwingAuthor: Shixiong Zhu Closes #9420 from zsxwing/java-streaming-listener. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f0f14ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f0f14ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f0f14ef Branch: refs/heads/master Commit: 1f0f14efe35f986e338ee2cbc1ef2a9ce7395c00 Parents: 0ce6f9b Author: zsxwing Authored: Mon Nov 9 17:38:19 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 9 17:38:19 2015 -0800 -- .../api/java/JavaStreamingListener.scala| 168 +++ .../api/java/JavaStreamingListenerWrapper.scala | 122 .../JavaStreamingListenerAPISuite.java | 85 ++ .../JavaStreamingListenerWrapperSuite.scala | 290 +++ 4 files changed, 665 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1f0f14ef/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala new file mode 100644 index 000..c86c710 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.api.java + +import org.apache.spark.streaming.Time + +/** + * A listener interface for receiving information about an ongoing streaming computation. + */ +private[streaming] class JavaStreamingListener { + + /** Called when a receiver has been started */ + def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = { } + + /** Called when a receiver has reported an error */ + def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit = { } + + /** Called when a receiver has been stopped */ + def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped): Unit = { } + + /** Called when a batch of jobs has been submitted for processing. */ + def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): Unit = { } + + /** Called when processing of a batch of jobs has started. */ + def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = { } + + /** Called when processing of a batch of jobs has completed. */ + def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): Unit = { } + + /** Called when processing of a job of a batch has started. */ + def onOutputOperationStarted( + outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit = { } + + /** Called when processing of a job of a batch has completed. */ + def onOutputOperationCompleted( + outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): Unit = { } +} + +/** + * Base trait for events related to JavaStreamingListener + */ +private[streaming] sealed trait JavaStreamingListenerEvent + +private[streaming] class JavaStreamingListenerBatchSubmitted(val batchInfo: JavaBatchInfo) + extends JavaStreamingListenerEvent + +private[streaming] class JavaStreamingListenerBatchCompleted(val batchInfo: JavaBatchInfo) + extends JavaStreamingListenerEvent + +private[streaming] class JavaStreamingListenerBatchStarted(val batchInfo: JavaBatchInfo) + extends JavaStreamingListenerEvent + +private[streaming] class
spark git commit: [SPARK-11359][STREAMING][KINESIS] Checkpoint to DynamoDB even when new data doesn't come in
Repository: spark Updated Branches: refs/heads/branch-1.6 bdd8a6bd4 -> 9e80db7c7 [SPARK-11359][STREAMING][KINESIS] Checkpoint to DynamoDB even when new data doesn't come in Currently, the checkpoints to DynamoDB occur only when new data comes in, as we update the clock for the checkpointState. This PR makes the checkpoint a scheduled execution based on the `checkpointInterval`. Author: Burak YavuzCloses #9421 from brkyvz/kinesis-checkpoint. (cherry picked from commit a3a7c9103e136035d65a5564f9eb0fa04727c4f3) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e80db7c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e80db7c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e80db7c Branch: refs/heads/branch-1.6 Commit: 9e80db7c7d1600691a5c012610e3f28f35210d46 Parents: bdd8a6b Author: Burak Yavuz Authored: Mon Nov 9 14:39:18 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 9 14:39:30 2015 -0800 -- .../kinesis/KinesisCheckpointState.scala| 54 --- .../streaming/kinesis/KinesisCheckpointer.scala | 133 .../streaming/kinesis/KinesisReceiver.scala | 38 - .../kinesis/KinesisRecordProcessor.scala| 59 ++- .../kinesis/KinesisCheckpointerSuite.scala | 152 +++ .../kinesis/KinesisReceiverSuite.scala | 96 +++- 6 files changed, 349 insertions(+), 183 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9e80db7c/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala -- diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala deleted file mode 100644 index 83a4537..000 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.streaming.kinesis - -import org.apache.spark.Logging -import org.apache.spark.streaming.Duration -import org.apache.spark.util.{Clock, ManualClock, SystemClock} - -/** - * This is a helper class for managing checkpoint clocks. - * - * @param checkpointInterval - * @param currentClock. Default to current SystemClock if none is passed in (mocking purposes) - */ -private[kinesis] class KinesisCheckpointState( -checkpointInterval: Duration, -currentClock: Clock = new SystemClock()) - extends Logging { - - /* Initialize the checkpoint clock using the given currentClock + checkpointInterval millis */ - val checkpointClock = new ManualClock() - checkpointClock.setTime(currentClock.getTimeMillis() + checkpointInterval.milliseconds) - - /** - * Check if it's time to checkpoint based on the current time and the derived time - * for the next checkpoint - * - * @return true if it's time to checkpoint - */ - def shouldCheckpoint(): Boolean = { -new SystemClock().getTimeMillis() > checkpointClock.getTimeMillis() - } - - /** - * Advance the checkpoint clock by the checkpoint interval. - */ - def advanceCheckpoint(): Unit = { -checkpointClock.advance(checkpointInterval.milliseconds) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/9e80db7c/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala -- diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointer.scala new file mode 100644 index 000..1ca6d43 --- /dev/null +++
spark git commit: [SPARK-6517][MLLIB] Implement the Algorithm of Hierarchical Clustering
Repository: spark Updated Branches: refs/heads/master a3a7c9103 -> 8a2336893 [SPARK-6517][MLLIB] Implement the Algorithm of Hierarchical Clustering I implemented a hierarchical clustering algorithm again. This PR doesn't include examples, documentation and spark.ml APIs. I am going to send another PRs later. https://issues.apache.org/jira/browse/SPARK-6517 - This implementation based on a bi-sectiong K-means clustering. - It derives from the freeman-lab 's implementation - The basic idea is not changed from the previous version. (#2906) - However, It is 1000x faster than the previous version through parallel processing. Thank you for your great cooperation, RJ Nowling(rnowling), Jeremy Freeman(freeman-lab), Xiangrui Meng(mengxr) and Sean Owen(srowen). Author: Yu ISHIKAWAAuthor: Xiangrui Meng Author: Yu ISHIKAWA Closes #5267 from yu-iskw/new-hierarchical-clustering. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a233689 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a233689 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a233689 Branch: refs/heads/master Commit: 8a2336893a7ff610a6c4629dd567b85078730616 Parents: a3a7c91 Author: Yu ISHIKAWA Authored: Mon Nov 9 14:56:36 2015 -0800 Committer: Xiangrui Meng Committed: Mon Nov 9 14:56:36 2015 -0800 -- .../mllib/clustering/BisectingKMeans.scala | 491 +++ .../mllib/clustering/BisectingKMeansModel.scala | 95 .../clustering/JavaBisectingKMeansSuite.java| 73 +++ .../mllib/clustering/BisectingKMeansSuite.scala | 182 +++ 4 files changed, 841 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8a233689/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala new file mode 100644 index 000..29a7aa0 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import java.util.Random + +import scala.collection.mutable + +import org.apache.spark.Logging +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors} +import org.apache.spark.mllib.util.MLUtils +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel + +/** + * A bisecting k-means algorithm based on the paper "A comparison of document clustering techniques" + * by Steinbach, Karypis, and Kumar, with modification to fit Spark. + * The algorithm starts from a single cluster that contains all points. + * Iteratively it finds divisible clusters on the bottom level and bisects each of them using + * k-means, until there are `k` leaf clusters in total or no leaf clusters are divisible. + * The bisecting steps of clusters on the same level are grouped together to increase parallelism. + * If bisecting all divisible clusters on the bottom level would result more than `k` leaf clusters, + * larger clusters get higher priority. + * + * @param k the desired number of leaf clusters (default: 4). The actual number could be smaller if + * there are no divisible leaf clusters. + * @param maxIterations the max number of k-means iterations to split clusters (default: 20) + * @param minDivisibleClusterSize the minimum number of points (if >= 1.0) or the minimum proportion + *of points (if < 1.0) of a divisible cluster (default: 1) + * @param seed a random seed (default: hash value of the class name) + * + * @see
spark git commit: [SPARK-11564][SQL][FOLLOW-UP] improve java api for GroupedDataset
Repository: spark Updated Branches: refs/heads/branch-1.6 1585f559d -> b9adfdf9c [SPARK-11564][SQL][FOLLOW-UP] improve java api for GroupedDataset created `MapGroupFunction`, `FlatMapGroupFunction`, `CoGroupFunction` Author: Wenchen FanCloses #9564 from cloud-fan/map. (cherry picked from commit fcb57e9c7323e24b8563800deb035f94f616474e) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9adfdf9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9adfdf9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9adfdf9 Branch: refs/heads/branch-1.6 Commit: b9adfdf9ca18292799e684c8510028c75fbf2808 Parents: 1585f55 Author: Wenchen Fan Authored: Mon Nov 9 15:16:47 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 9 15:17:08 2015 -0800 -- .../api/java/function/CoGroupFunction.java | 29 .../api/java/function/FlatMapFunction.java | 2 +- .../api/java/function/FlatMapFunction2.java | 2 +- .../api/java/function/FlatMapGroupFunction.java | 28 +++ .../api/java/function/MapGroupFunction.java | 28 +++ .../catalyst/plans/logical/basicOperators.scala | 4 +-- .../org/apache/spark/sql/GroupedDataset.scala | 12 +++ .../spark/sql/execution/basicOperators.scala| 2 +- .../org/apache/spark/sql/JavaDatasetSuite.java | 36 +--- 9 files changed, 118 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b9adfdf9/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java -- diff --git a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java new file mode 100644 index 000..279639a --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java.function; + +import java.io.Serializable; +import java.util.Iterator; + +/** + * A function that returns zero or more output records from each grouping key and its values from 2 + * Datasets. + */ +public interface CoGroupFunction extends Serializable { + Iterable call(K key, Iterator left, Iterator right) throws Exception; +} http://git-wip-us.apache.org/repos/asf/spark/blob/b9adfdf9/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java -- diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java index 23f5fdd..ef0d182 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java @@ -23,5 +23,5 @@ import java.io.Serializable; * A function that returns zero or more output records from each input record. */ public interface FlatMapFunction extends Serializable { - public Iterable call(T t) throws Exception; + Iterable call(T t) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/b9adfdf9/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java -- diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java index c48e92f..14a98a3 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java @@ -23,5 +23,5 @@ import java.io.Serializable; * A function that takes two inputs and returns zero or more
spark git commit: [SPARK-9557][SQL] Refactor ParquetFilterSuite and remove old ParquetFilters code
Repository: spark Updated Branches: refs/heads/branch-1.6 b9adfdf9c -> c42433d02 [SPARK-9557][SQL] Refactor ParquetFilterSuite and remove old ParquetFilters code Actually this was resolved by https://github.com/apache/spark/pull/8275. But I found the JIRA issue for this is not marked as resolved since the PR above was made for another issue but the PR above resolved both. I commented that this is resolved by the PR above; however, I opened this PR as I would like to just add a little bit of corrections. In the previous PR, I refactored the test by not reducing just collecting filters; however, this would not test properly `And` filter (which is not given to the tests). I unintentionally changed this from the original way (before being refactored). In this PR, I just followed the original way to collect filters by reducing. I would like to close this if this PR is inappropriate and somebody would like this deal with it in the separate PR related with this. Author: hyukjinkwonCloses #9554 from HyukjinKwon/SPARK-9557. (cherry picked from commit 9565c246eadecf4836d247d0067f2200f061d25f) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c42433d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c42433d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c42433d0 Branch: refs/heads/branch-1.6 Commit: c42433d0272d43217c0247b03bc6684df1eabfec Parents: b9adfdf Author: hyukjinkwon Authored: Mon Nov 9 15:20:50 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 9 15:21:06 2015 -0800 -- .../execution/datasources/parquet/ParquetFilterSuite.scala | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c42433d0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index c24c9f0..579dabf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -54,12 +54,12 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex .select(output.map(e => Column(e)): _*) .where(Column(predicate)) - val analyzedPredicate = query.queryExecution.optimizedPlan.collect { + val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation, _)) => filters - }.flatten - assert(analyzedPredicate.nonEmpty) + }.flatten.reduceLeftOption(_ && _) + assert(maybeAnalyzedPredicate.isDefined) - val selectedFilters = analyzedPredicate.flatMap(DataSourceStrategy.translateFilter) + val selectedFilters = maybeAnalyzedPredicate.flatMap(DataSourceStrategy.translateFilter) assert(selectedFilters.nonEmpty) selectedFilters.foreach { pred => - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11552][DOCS][Replaced example code in ml-decision-tree.md using include_example]
Repository: spark Updated Branches: refs/heads/master 5039a49b6 -> 51d41e4b1 [SPARK-11552][DOCS][Replaced example code in ml-decision-tree.md using include_example] I have tested it on my local, it is working fine, please review Author: sachin aggarwalCloses #9539 from agsachin/SPARK-11552-real. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51d41e4b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51d41e4b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51d41e4b Branch: refs/heads/master Commit: 51d41e4b1a3a25a3fde3a4345afcfe4766023d23 Parents: 5039a49 Author: sachin aggarwal Authored: Mon Nov 9 14:25:42 2015 -0800 Committer: Xiangrui Meng Committed: Mon Nov 9 14:25:42 2015 -0800 -- docs/ml-decision-tree.md| 338 +-- .../JavaDecisionTreeClassificationExample.java | 103 ++ .../ml/JavaDecisionTreeRegressionExample.java | 90 + .../ml/decision_tree_classification_example.py | 77 + .../ml/decision_tree_regression_example.py | 74 .../ml/DecisionTreeClassificationExample.scala | 94 ++ .../ml/DecisionTreeRegressionExample.scala | 81 + 7 files changed, 527 insertions(+), 330 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/51d41e4b/docs/ml-decision-tree.md -- diff --git a/docs/ml-decision-tree.md b/docs/ml-decision-tree.md index 542819e..2bfac6f 100644 --- a/docs/ml-decision-tree.md +++ b/docs/ml-decision-tree.md @@ -118,196 +118,24 @@ We use two feature transformers to prepare the data; these help index categories More details on parameters can be found in the [Scala API documentation](api/scala/index.html#org.apache.spark.ml.classification.DecisionTreeClassifier). -{% highlight scala %} -import org.apache.spark.ml.Pipeline -import org.apache.spark.ml.classification.DecisionTreeClassifier -import org.apache.spark.ml.classification.DecisionTreeClassificationModel -import org.apache.spark.ml.feature.{StringIndexer, IndexToString, VectorIndexer} -import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator -import org.apache.spark.mllib.util.MLUtils - -// Load and parse the data file, converting it to a DataFrame. -val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF() - -// Index labels, adding metadata to the label column. -// Fit on whole dataset to include all labels in index. -val labelIndexer = new StringIndexer() - .setInputCol("label") - .setOutputCol("indexedLabel") - .fit(data) -// Automatically identify categorical features, and index them. -val featureIndexer = new VectorIndexer() - .setInputCol("features") - .setOutputCol("indexedFeatures") - .setMaxCategories(4) // features with > 4 distinct values are treated as continuous - .fit(data) - -// Split the data into training and test sets (30% held out for testing) -val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3)) - -// Train a DecisionTree model. -val dt = new DecisionTreeClassifier() - .setLabelCol("indexedLabel") - .setFeaturesCol("indexedFeatures") - -// Convert indexed labels back to original labels. -val labelConverter = new IndexToString() - .setInputCol("prediction") - .setOutputCol("predictedLabel") - .setLabels(labelIndexer.labels) - -// Chain indexers and tree in a Pipeline -val pipeline = new Pipeline() - .setStages(Array(labelIndexer, featureIndexer, dt, labelConverter)) - -// Train model. This also runs the indexers. -val model = pipeline.fit(trainingData) - -// Make predictions. -val predictions = model.transform(testData) - -// Select example rows to display. -predictions.select("predictedLabel", "label", "features").show(5) - -// Select (prediction, true label) and compute test error -val evaluator = new MulticlassClassificationEvaluator() - .setLabelCol("indexedLabel") - .setPredictionCol("prediction") - .setMetricName("precision") -val accuracy = evaluator.evaluate(predictions) -println("Test Error = " + (1.0 - accuracy)) - -val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel] -println("Learned classification tree model:\n" + treeModel.toDebugString) -{% endhighlight %} +{% include_example scala/org/apache/spark/examples/ml/DecisionTreeClassificationExample.scala %} + More details on parameters can be found in the [Java API documentation](api/java/org/apache/spark/ml/classification/DecisionTreeClassifier.html). -{% highlight java %} -import org.apache.spark.ml.Pipeline; -import org.apache.spark.ml.PipelineModel; -import org.apache.spark.ml.PipelineStage; -import org.apache.spark.ml.classification.DecisionTreeClassifier; -import
spark git commit: [SPARK-9301][SQL] Add collect_set and collect_list aggregate functions
Repository: spark Updated Branches: refs/heads/master b7720fa45 -> f138cb873 [SPARK-9301][SQL] Add collect_set and collect_list aggregate functions For now they are thin wrappers around the corresponding Hive UDAFs. One limitation with these in Hive 0.13.0 is they only support aggregating primitive types. I chose snake_case here instead of camelCase because it seems to be used in the majority of the multi-word fns. Do we also want to add these to `functions.py`? This approach was recommended here: https://github.com/apache/spark/pull/8592#issuecomment-154247089 marmbrus rxin Author: Nick BuroojyCloses #9526 from nburoojy/nick/udaf-alias. (cherry picked from commit a6ee4f989d020420dd08b97abb24802200ff23b2) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f138cb87 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f138cb87 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f138cb87 Branch: refs/heads/master Commit: f138cb873335654476d1cd1070900b552dd8b21a Parents: b7720fa Author: Nick Buroojy Authored: Mon Nov 9 14:30:37 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 9 14:30:52 2015 -0800 -- python/pyspark/sql/functions.py | 25 +++- python/pyspark/sql/tests.py | 17 + .../scala/org/apache/spark/sql/functions.scala | 20 .../sql/hive/HiveDataFrameAnalyticsSuite.scala | 15 ++-- 4 files changed, 64 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f138cb87/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 2f7c2f4..962f676 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -124,17 +124,20 @@ _functions_1_4 = { _functions_1_6 = { # unary math functions -"stddev": "Aggregate function: returns the unbiased sample standard deviation of" + - " the expression in a group.", -"stddev_samp": "Aggregate function: returns the unbiased sample standard deviation of" + - " the expression in a group.", -"stddev_pop": "Aggregate function: returns population standard deviation of" + - " the expression in a group.", -"variance": "Aggregate function: returns the population variance of the values in a group.", -"var_samp": "Aggregate function: returns the unbiased variance of the values in a group.", -"var_pop": "Aggregate function: returns the population variance of the values in a group.", -"skewness": "Aggregate function: returns the skewness of the values in a group.", -"kurtosis": "Aggregate function: returns the kurtosis of the values in a group." +'stddev': 'Aggregate function: returns the unbiased sample standard deviation of' + + ' the expression in a group.', +'stddev_samp': 'Aggregate function: returns the unbiased sample standard deviation of' + + ' the expression in a group.', +'stddev_pop': 'Aggregate function: returns population standard deviation of' + + ' the expression in a group.', +'variance': 'Aggregate function: returns the population variance of the values in a group.', +'var_samp': 'Aggregate function: returns the unbiased variance of the values in a group.', +'var_pop': 'Aggregate function: returns the population variance of the values in a group.', +'skewness': 'Aggregate function: returns the skewness of the values in a group.', +'kurtosis': 'Aggregate function: returns the kurtosis of the values in a group.', +'collect_list': 'Aggregate function: returns a list of objects with duplicates.', +'collect_set': 'Aggregate function: returns a set of objects with duplicate elements' + + ' eliminated.' } # math functions that take two arguments as input http://git-wip-us.apache.org/repos/asf/spark/blob/f138cb87/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 4c03a0d..e224574 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1230,6 +1230,23 @@ class HiveContextSQLTests(ReusedPySparkTestCase): for r, ex in zip(rs, expected): self.assertEqual(tuple(r), ex[:len(r)]) +def test_collect_functions(self): +df = self.sqlCtx.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"]) +from pyspark.sql import
spark git commit: [SPARK-9301][SQL] Add collect_set and collect_list aggregate functions
Repository: spark Updated Branches: refs/heads/branch-1.6 85bb319a2 -> a6ee4f989 [SPARK-9301][SQL] Add collect_set and collect_list aggregate functions For now they are thin wrappers around the corresponding Hive UDAFs. One limitation with these in Hive 0.13.0 is they only support aggregating primitive types. I chose snake_case here instead of camelCase because it seems to be used in the majority of the multi-word fns. Do we also want to add these to `functions.py`? This approach was recommended here: https://github.com/apache/spark/pull/8592#issuecomment-154247089 marmbrus rxin Author: Nick BuroojyCloses #9526 from nburoojy/nick/udaf-alias. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6ee4f98 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6ee4f98 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6ee4f98 Branch: refs/heads/branch-1.6 Commit: a6ee4f989d020420dd08b97abb24802200ff23b2 Parents: 85bb319 Author: Nick Buroojy Authored: Mon Nov 9 14:30:37 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 9 14:30:37 2015 -0800 -- python/pyspark/sql/functions.py | 25 +++- python/pyspark/sql/tests.py | 17 + .../scala/org/apache/spark/sql/functions.scala | 20 .../sql/hive/HiveDataFrameAnalyticsSuite.scala | 15 ++-- 4 files changed, 64 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a6ee4f98/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 2f7c2f4..962f676 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -124,17 +124,20 @@ _functions_1_4 = { _functions_1_6 = { # unary math functions -"stddev": "Aggregate function: returns the unbiased sample standard deviation of" + - " the expression in a group.", -"stddev_samp": "Aggregate function: returns the unbiased sample standard deviation of" + - " the expression in a group.", -"stddev_pop": "Aggregate function: returns population standard deviation of" + - " the expression in a group.", -"variance": "Aggregate function: returns the population variance of the values in a group.", -"var_samp": "Aggregate function: returns the unbiased variance of the values in a group.", -"var_pop": "Aggregate function: returns the population variance of the values in a group.", -"skewness": "Aggregate function: returns the skewness of the values in a group.", -"kurtosis": "Aggregate function: returns the kurtosis of the values in a group." +'stddev': 'Aggregate function: returns the unbiased sample standard deviation of' + + ' the expression in a group.', +'stddev_samp': 'Aggregate function: returns the unbiased sample standard deviation of' + + ' the expression in a group.', +'stddev_pop': 'Aggregate function: returns population standard deviation of' + + ' the expression in a group.', +'variance': 'Aggregate function: returns the population variance of the values in a group.', +'var_samp': 'Aggregate function: returns the unbiased variance of the values in a group.', +'var_pop': 'Aggregate function: returns the population variance of the values in a group.', +'skewness': 'Aggregate function: returns the skewness of the values in a group.', +'kurtosis': 'Aggregate function: returns the kurtosis of the values in a group.', +'collect_list': 'Aggregate function: returns a list of objects with duplicates.', +'collect_set': 'Aggregate function: returns a set of objects with duplicate elements' + + ' eliminated.' } # math functions that take two arguments as input http://git-wip-us.apache.org/repos/asf/spark/blob/a6ee4f98/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 4c03a0d..e224574 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1230,6 +1230,23 @@ class HiveContextSQLTests(ReusedPySparkTestCase): for r, ex in zip(rs, expected): self.assertEqual(tuple(r), ex[:len(r)]) +def test_collect_functions(self): +df = self.sqlCtx.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"]) +from pyspark.sql import functions + +self.assertEqual( +
spark git commit: [SPARK-11069][ML] Add RegexTokenizer option to convert to lowercase
Repository: spark Updated Branches: refs/heads/master 7dc9d8dba -> 61f9c8711 [SPARK-11069][ML] Add RegexTokenizer option to convert to lowercase jira: https://issues.apache.org/jira/browse/SPARK-11069 quotes from jira: Tokenizer converts strings to lowercase automatically, but RegexTokenizer does not. It would be nice to add an option to RegexTokenizer to convert to lowercase. Proposal: call the Boolean Param "toLowercase" set default to false (so behavior does not change) Actually sklearn converts to lowercase before tokenizing too Author: Yuhao YangCloses #9092 from hhbyyh/tokenLower. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61f9c871 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61f9c871 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61f9c871 Branch: refs/heads/master Commit: 61f9c8711c79f35d67b0456155866da316b131d9 Parents: 7dc9d8d Author: Yuhao Yang Authored: Mon Nov 9 16:55:23 2015 -0800 Committer: Joseph K. Bradley Committed: Mon Nov 9 16:55:23 2015 -0800 -- .../org/apache/spark/ml/feature/Tokenizer.scala | 19 +++-- .../spark/ml/feature/JavaTokenizerSuite.java| 1 + .../spark/ml/feature/TokenizerSuite.scala | 22 +++- 3 files changed, 35 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/61f9c871/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala index 248288c..1b82b40 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala @@ -100,10 +100,25 @@ class RegexTokenizer(override val uid: String) /** @group getParam */ def getPattern: String = $(pattern) - setDefault(minTokenLength -> 1, gaps -> true, pattern -> "\\s+") + /** + * Indicates whether to convert all characters to lowercase before tokenizing. + * Default: true + * @group param + */ + final val toLowercase: BooleanParam = new BooleanParam(this, "toLowercase", +"whether to convert all characters to lowercase before tokenizing.") + + /** @group setParam */ + def setToLowercase(value: Boolean): this.type = set(toLowercase, value) + + /** @group getParam */ + def getToLowercase: Boolean = $(toLowercase) + + setDefault(minTokenLength -> 1, gaps -> true, pattern -> "\\s+", toLowercase -> true) - override protected def createTransformFunc: String => Seq[String] = { str => + override protected def createTransformFunc: String => Seq[String] = { originStr => val re = $(pattern).r +val str = if ($(toLowercase)) originStr.toLowerCase() else originStr val tokens = if ($(gaps)) re.split(str).toSeq else re.findAllIn(str).toSeq val minLength = $(minTokenLength) tokens.filter(_.length >= minLength) http://git-wip-us.apache.org/repos/asf/spark/blob/61f9c871/mllib/src/test/java/org/apache/spark/ml/feature/JavaTokenizerSuite.java -- diff --git a/mllib/src/test/java/org/apache/spark/ml/feature/JavaTokenizerSuite.java b/mllib/src/test/java/org/apache/spark/ml/feature/JavaTokenizerSuite.java index 02309ce..c407d98 100644 --- a/mllib/src/test/java/org/apache/spark/ml/feature/JavaTokenizerSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/feature/JavaTokenizerSuite.java @@ -53,6 +53,7 @@ public class JavaTokenizerSuite { .setOutputCol("tokens") .setPattern("\\s") .setGaps(true) + .setToLowercase(false) .setMinTokenLength(3); http://git-wip-us.apache.org/repos/asf/spark/blob/61f9c871/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala index e5fd21c..a02992a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala @@ -48,13 +48,13 @@ class RegexTokenizerSuite extends SparkFunSuite with MLlibTestSparkContext { .setInputCol("rawText") .setOutputCol("tokens") val dataset0 = sqlContext.createDataFrame(Seq( - TokenizerTestData("Test for tokenization.", Array("Test", "for", "tokenization", ".")), - TokenizerTestData("Te,st. punct", Array("Te", ",", "st", ".", "punct")) + TokenizerTestData("Test for tokenization.", Array("test",
spark git commit: [SPARK-11069][ML] Add RegexTokenizer option to convert to lowercase
Repository: spark Updated Branches: refs/heads/branch-1.6 08253874a -> 34e824d90 [SPARK-11069][ML] Add RegexTokenizer option to convert to lowercase jira: https://issues.apache.org/jira/browse/SPARK-11069 quotes from jira: Tokenizer converts strings to lowercase automatically, but RegexTokenizer does not. It would be nice to add an option to RegexTokenizer to convert to lowercase. Proposal: call the Boolean Param "toLowercase" set default to false (so behavior does not change) Actually sklearn converts to lowercase before tokenizing too Author: Yuhao YangCloses #9092 from hhbyyh/tokenLower. (cherry picked from commit 61f9c8711c79f35d67b0456155866da316b131d9) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34e824d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34e824d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34e824d9 Branch: refs/heads/branch-1.6 Commit: 34e824d906b90783013021029e0e483ff20c78d5 Parents: 0825387 Author: Yuhao Yang Authored: Mon Nov 9 16:55:23 2015 -0800 Committer: Joseph K. Bradley Committed: Mon Nov 9 16:57:19 2015 -0800 -- .../org/apache/spark/ml/feature/Tokenizer.scala | 19 +++-- .../spark/ml/feature/JavaTokenizerSuite.java| 1 + .../spark/ml/feature/TokenizerSuite.scala | 22 +++- 3 files changed, 35 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/34e824d9/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala index 248288c..1b82b40 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala @@ -100,10 +100,25 @@ class RegexTokenizer(override val uid: String) /** @group getParam */ def getPattern: String = $(pattern) - setDefault(minTokenLength -> 1, gaps -> true, pattern -> "\\s+") + /** + * Indicates whether to convert all characters to lowercase before tokenizing. + * Default: true + * @group param + */ + final val toLowercase: BooleanParam = new BooleanParam(this, "toLowercase", +"whether to convert all characters to lowercase before tokenizing.") + + /** @group setParam */ + def setToLowercase(value: Boolean): this.type = set(toLowercase, value) + + /** @group getParam */ + def getToLowercase: Boolean = $(toLowercase) + + setDefault(minTokenLength -> 1, gaps -> true, pattern -> "\\s+", toLowercase -> true) - override protected def createTransformFunc: String => Seq[String] = { str => + override protected def createTransformFunc: String => Seq[String] = { originStr => val re = $(pattern).r +val str = if ($(toLowercase)) originStr.toLowerCase() else originStr val tokens = if ($(gaps)) re.split(str).toSeq else re.findAllIn(str).toSeq val minLength = $(minTokenLength) tokens.filter(_.length >= minLength) http://git-wip-us.apache.org/repos/asf/spark/blob/34e824d9/mllib/src/test/java/org/apache/spark/ml/feature/JavaTokenizerSuite.java -- diff --git a/mllib/src/test/java/org/apache/spark/ml/feature/JavaTokenizerSuite.java b/mllib/src/test/java/org/apache/spark/ml/feature/JavaTokenizerSuite.java index 02309ce..c407d98 100644 --- a/mllib/src/test/java/org/apache/spark/ml/feature/JavaTokenizerSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/feature/JavaTokenizerSuite.java @@ -53,6 +53,7 @@ public class JavaTokenizerSuite { .setOutputCol("tokens") .setPattern("\\s") .setGaps(true) + .setToLowercase(false) .setMinTokenLength(3); http://git-wip-us.apache.org/repos/asf/spark/blob/34e824d9/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala index e5fd21c..a02992a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala @@ -48,13 +48,13 @@ class RegexTokenizerSuite extends SparkFunSuite with MLlibTestSparkContext { .setInputCol("rawText") .setOutputCol("tokens") val dataset0 = sqlContext.createDataFrame(Seq( - TokenizerTestData("Test for tokenization.", Array("Test", "for", "tokenization", ".")), -
svn commit: r1713570 - in /spark/site/docs/1.5.2: ./ api/ api/R/ api/java/ api/java/lib/ api/java/org/ api/java/org/apache/ api/java/org/apache/spark/ api/java/org/apache/spark/annotation/ api/java/or
Author: rxin Date: Mon Nov 9 23:54:32 2015 New Revision: 1713570 URL: http://svn.apache.org/viewvc?rev=1713570=rev Log: Add Spark 1.5.2 doc [This commit notification would consist of 805 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.4.2-rc1 [deleted] 0b22a3c7a - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.5.2-rc1 [deleted] ad6ade124 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.3.2-rc1 [deleted] 5a139750b - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11564][SQL] Fix documentation for DataFrame.take/collect
Repository: spark Updated Branches: refs/heads/master 9c740a9dd -> 675c7e723 [SPARK-11564][SQL] Fix documentation for DataFrame.take/collect Author: Reynold XinCloses #9557 from rxin/SPARK-11564-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/675c7e72 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/675c7e72 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/675c7e72 Branch: refs/heads/master Commit: 675c7e723cadff588405c23826a00686587728b8 Parents: 9c740a9 Author: Reynold Xin Authored: Mon Nov 9 16:22:15 2015 -0800 Committer: Reynold Xin Committed: Mon Nov 9 16:22:15 2015 -0800 -- sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/675c7e72/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 8ab958a..d25807c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1479,8 +1479,8 @@ class DataFrame private[sql]( /** * Returns the first `n` rows in the [[DataFrame]]. * - * Running take requires moving data into the application's driver process, and doing so on a - * very large dataset can crash the driver process with OutOfMemoryError. + * Running take requires moving data into the application's driver process, and doing so with + * a very large `n` can crash the driver process with OutOfMemoryError. * * @group action * @since 1.3.0 @@ -1501,8 +1501,8 @@ class DataFrame private[sql]( /** * Returns an array that contains all of [[Row]]s in this [[DataFrame]]. * - * Running take requires moving data into the application's driver process, and doing so with - * a very large `n` can crash the driver process with OutOfMemoryError. + * Running collect requires moving all the data into the application's driver process, and + * doing so on a very large dataset can crash the driver process with OutOfMemoryError. * * For Java API, use [[collectAsList]]. * - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11610][MLLIB][PYTHON][DOCS] Make the docs of LDAModel.describeTopics in Python more specific
Repository: spark Updated Branches: refs/heads/branch-1.6 5616282ce -> 08253874a [SPARK-11610][MLLIB][PYTHON][DOCS] Make the docs of LDAModel.describeTopics in Python more specific cc jkbradley Author: Yu ISHIKAWACloses #9577 from yu-iskw/SPARK-11610. (cherry picked from commit 7dc9d8dba6c4bc655896b137062d896dec4ef64a) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08253874 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08253874 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08253874 Branch: refs/heads/branch-1.6 Commit: 08253874a50eda4d8a4bb974fbcf100fd0e4631e Parents: 5616282 Author: Yu ISHIKAWA Authored: Mon Nov 9 16:25:29 2015 -0800 Committer: Joseph K. Bradley Committed: Mon Nov 9 16:25:37 2015 -0800 -- python/pyspark/mllib/clustering.py | 6 ++ 1 file changed, 6 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/08253874/python/pyspark/mllib/clustering.py -- diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 12081f8..1fa061d 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -734,6 +734,12 @@ class LDAModel(JavaModelWrapper, JavaSaveable, Loader): """Return the topics described by weighted terms. WARNING: If vocabSize and k are large, this can return a large object! + +:param maxTermsPerTopic: Maximum number of terms to collect for each topic. +(default: vocabulary size) +:return: Array over topics. Each topic is represented as a pair of matching arrays: +(term indices, term weights in topic). +Each topic's terms are sorted in order of decreasing weight. """ if maxTermsPerTopic is None: topics = self.call("describeTopics") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r11096 - /dev/spark/spark-1.5.2-rc2/
Author: rxin Date: Mon Nov 9 23:42:48 2015 New Revision: 11096 Log: Add spark-1.5.2-rc2 Added: dev/spark/spark-1.5.2-rc2/ dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-cdh4.tgz (with props) dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-cdh4.tgz.asc dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-cdh4.tgz.md5 dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-cdh4.tgz.sha dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop1-scala2.11.tgz (with props) dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop1-scala2.11.tgz.asc dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop1-scala2.11.tgz.md5 dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop1-scala2.11.tgz.sha dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop2.3.tgz (with props) dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop2.3.tgz.asc dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop2.3.tgz.md5 dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop2.3.tgz.sha dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop2.4.tgz (with props) dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop2.4.tgz.asc dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop2.4.tgz.md5 dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop2.4.tgz.sha dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop2.6.tgz (with props) dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop2.6.tgz.asc dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop2.6.tgz.md5 dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop2.6.tgz.sha dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-without-hadoop.tgz (with props) dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-without-hadoop.tgz.asc dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-without-hadoop.tgz.md5 dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-without-hadoop.tgz.sha dev/spark/spark-1.5.2-rc2/spark-1.5.2.tgz (with props) dev/spark/spark-1.5.2-rc2/spark-1.5.2.tgz.asc dev/spark/spark-1.5.2-rc2/spark-1.5.2.tgz.md5 dev/spark/spark-1.5.2-rc2/spark-1.5.2.tgz.sha Added: dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-cdh4.tgz == Binary file - no diff available. Propchange: dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-cdh4.tgz -- svn:mime-type = application/octet-stream Added: dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-cdh4.tgz.asc == --- dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-cdh4.tgz.asc (added) +++ dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-cdh4.tgz.asc Mon Nov 9 23:42:48 2015 @@ -0,0 +1,11 @@ +-BEGIN PGP SIGNATURE- +Version: GnuPG v2.0.14 (GNU/Linux) + +iQEcBAABAgAGBQJWOPdRAAoJEHxsEF/8jtCJ414IAIXt7hmLcKTnnoEn4DStvFvZ +5Kd2LovP6XQrOVhgPQlFtQSlv+9ceV9GfNHsv0zKfnZGrpWyq7RmAGW1QZTwhDMR +pzrCrW9cP/d7mkviUeM6c7mlG1tUh/IZl6xX5H6wcuYmTFNlx4zR5dChJZPrymSI +QOABeY9e+rVfAyucloeH9A9T4tmYh67TSo83kNyKZe5zJSXjy7M66vEIJy1T/EhZ +sP5RrlseAAGeVyLCuJD12dFjL0ad7MjDMNPq5cL5p+5fge8kCQFy40F0uIXrYiRg +6kQzPQEM0AD5EWcHGWq7fPS5A10IsUZwG1svDk081FkkmNMnx3dWz6UQeirf4dI= +=GUAf +-END PGP SIGNATURE- Added: dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-cdh4.tgz.md5 == --- dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-cdh4.tgz.md5 (added) +++ dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-cdh4.tgz.md5 Mon Nov 9 23:42:48 2015 @@ -0,0 +1 @@ +spark-1.5.2-bin-cdh4.tgz: 8E 3F 5D 15 D5 44 06 89 F3 39 A2 A3 C5 19 20 02 Added: dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-cdh4.tgz.sha == --- dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-cdh4.tgz.sha (added) +++ dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-cdh4.tgz.sha Mon Nov 9 23:42:48 2015 @@ -0,0 +1,3 @@ +spark-1.5.2-bin-cdh4.tgz: 5E359FBB 99FDBAF5 53606079 72050BCE AFF1235A CF21F47A + 5281FD29 BBF173FC 7E8AD31A 84EBF825 C6CBCB0E 05CA7733 + 22664490 B23C32C0 8A6FE380 5A936C1D Added: dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop1-scala2.11.tgz == Binary file - no diff available. Propchange: dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop1-scala2.11.tgz -- svn:mime-type = application/octet-stream Added: dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop1-scala2.11.tgz.asc == --- dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop1-scala2.11.tgz.asc (added) +++ dev/spark/spark-1.5.2-rc2/spark-1.5.2-bin-hadoop1-scala2.11.tgz.asc Mon Nov 9 23:42:48 2015 @@ -0,0 +1,11 @@ +-BEGIN PGP SIGNATURE- +Version: GnuPG v2.0.14 (GNU/Linux) + +iQEcBAABAgAGBQJWOPcHAAoJEHxsEF/8jtCJBZgIAIUcMQPIfMNgtX2CFdvNTiHE
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.5.2 [created] 5cf17f954 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11578][SQL] User API for Typed Aggregation
Repository: spark Updated Branches: refs/heads/master 2f3837885 -> 9c740a9dd [SPARK-11578][SQL] User API for Typed Aggregation This PR adds a new interface for user-defined aggregations, that can be used in `DataFrame` and `Dataset` operations to take all of the elements of a group and reduce them to a single value. For example, the following aggregator extracts an `int` from a specific class and adds them up: ```scala case class Data(i: Int) val customSummer = new Aggregator[Data, Int, Int] { def prepare(d: Data) = d.i def reduce(l: Int, r: Int) = l + r def present(r: Int) = r }.toColumn() val ds: Dataset[Data] = ... val aggregated = ds.select(customSummer) ``` By using helper functions, users can make a generic `Aggregator` that works on any input type: ```scala /** An `Aggregator` that adds up any numeric type returned by the given function. */ class SumOf[I, N : Numeric](f: I => N) extends Aggregator[I, N, N] with Serializable { val numeric = implicitly[Numeric[N]] override def zero: N = numeric.zero override def reduce(b: N, a: I): N = numeric.plus(b, f(a)) override def present(reduction: N): N = reduction } def sum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N] = new SumOf(f).toColumn ``` These aggregators can then be used alongside other built-in SQL aggregations. ```scala val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() ds .groupBy(_._1) .agg( sum(_._2),// The aggregator defined above. expr("sum(_2)").as[Int], // A built-in dynatically typed aggregation. count("*")) // A built-in statically typed aggregation. .collect() res0: ("a", 30, 30, 2L), ("b", 3, 3, 2L), ("c", 1, 1, 1L) ``` The current implementation focuses on integrating this into the typed API, but currently only supports running aggregations that return a single long value as explained in `TypedAggregateExpression`. This will be improved in a followup PR. Author: Michael ArmbrustCloses #9555 from marmbrus/dataset-useragg. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c740a9d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c740a9d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c740a9d Branch: refs/heads/master Commit: 9c740a9ddf6344a03b4b45380eaf0cfc6e2299b5 Parents: 2f38378 Author: Michael Armbrust Authored: Mon Nov 9 16:11:00 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 9 16:11:00 2015 -0800 -- .../scala/org/apache/spark/sql/Column.scala | 11 +- .../scala/org/apache/spark/sql/Dataset.scala| 30 ++--- .../org/apache/spark/sql/GroupedDataset.scala | 51 +--- .../scala/org/apache/spark/sql/SQLContext.scala | 1 - .../aggregate/TypedAggregateExpression.scala| 129 +++ .../spark/sql/expressions/Aggregator.scala | 81 .../scala/org/apache/spark/sql/functions.scala | 30 - .../org/apache/spark/sql/JavaDatasetSuite.java | 4 +- .../spark/sql/DatasetAggregatorSuite.scala | 65 ++ 9 files changed, 360 insertions(+), 42 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9c740a9d/sql/core/src/main/scala/org/apache/spark/sql/Column.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index c32c938..d26b6c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -23,7 +23,7 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.Logging import org.apache.spark.sql.functions.lit import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.encoders.Encoder +import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DataTypeParser import org.apache.spark.sql.types._ @@ -39,10 +39,13 @@ private[sql] object Column { } /** - * A [[Column]] where an [[Encoder]] has been given for the expected return type. + * A [[Column]] where an [[Encoder]] has been given for the expected input and return type. * @since 1.6.0 + * @tparam T The input type expected for this expression. Can be `Any` if the expression is type + * checked by the analyzer instead of the compiler (i.e. `expr("sum(...)")`). + * @tparam U The output type of this column. */ -class TypedColumn[T](expr: Expression)(implicit val encoder: Encoder[T]) extends Column(expr) +class TypedColumn[-T, U](expr: Expression, val encoder: Encoder[U])
spark git commit: [SPARK-11578][SQL] User API for Typed Aggregation
Repository: spark Updated Branches: refs/heads/branch-1.6 523db0df5 -> a9f58b445 [SPARK-11578][SQL] User API for Typed Aggregation This PR adds a new interface for user-defined aggregations, that can be used in `DataFrame` and `Dataset` operations to take all of the elements of a group and reduce them to a single value. For example, the following aggregator extracts an `int` from a specific class and adds them up: ```scala case class Data(i: Int) val customSummer = new Aggregator[Data, Int, Int] { def prepare(d: Data) = d.i def reduce(l: Int, r: Int) = l + r def present(r: Int) = r }.toColumn() val ds: Dataset[Data] = ... val aggregated = ds.select(customSummer) ``` By using helper functions, users can make a generic `Aggregator` that works on any input type: ```scala /** An `Aggregator` that adds up any numeric type returned by the given function. */ class SumOf[I, N : Numeric](f: I => N) extends Aggregator[I, N, N] with Serializable { val numeric = implicitly[Numeric[N]] override def zero: N = numeric.zero override def reduce(b: N, a: I): N = numeric.plus(b, f(a)) override def present(reduction: N): N = reduction } def sum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N] = new SumOf(f).toColumn ``` These aggregators can then be used alongside other built-in SQL aggregations. ```scala val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() ds .groupBy(_._1) .agg( sum(_._2),// The aggregator defined above. expr("sum(_2)").as[Int], // A built-in dynatically typed aggregation. count("*")) // A built-in statically typed aggregation. .collect() res0: ("a", 30, 30, 2L), ("b", 3, 3, 2L), ("c", 1, 1, 1L) ``` The current implementation focuses on integrating this into the typed API, but currently only supports running aggregations that return a single long value as explained in `TypedAggregateExpression`. This will be improved in a followup PR. Author: Michael ArmbrustCloses #9555 from marmbrus/dataset-useragg. (cherry picked from commit 9c740a9ddf6344a03b4b45380eaf0cfc6e2299b5) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9f58b44 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9f58b44 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9f58b44 Branch: refs/heads/branch-1.6 Commit: a9f58b445b38138a371d178bb8674a8dc2ad9e9e Parents: 523db0d Author: Michael Armbrust Authored: Mon Nov 9 16:11:00 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 9 16:11:14 2015 -0800 -- .../scala/org/apache/spark/sql/Column.scala | 11 +- .../scala/org/apache/spark/sql/Dataset.scala| 30 ++--- .../org/apache/spark/sql/GroupedDataset.scala | 51 +--- .../scala/org/apache/spark/sql/SQLContext.scala | 1 - .../aggregate/TypedAggregateExpression.scala| 129 +++ .../spark/sql/expressions/Aggregator.scala | 81 .../scala/org/apache/spark/sql/functions.scala | 30 - .../org/apache/spark/sql/JavaDatasetSuite.java | 4 +- .../spark/sql/DatasetAggregatorSuite.scala | 65 ++ 9 files changed, 360 insertions(+), 42 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a9f58b44/sql/core/src/main/scala/org/apache/spark/sql/Column.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index c32c938..d26b6c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -23,7 +23,7 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.Logging import org.apache.spark.sql.functions.lit import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.encoders.Encoder +import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DataTypeParser import org.apache.spark.sql.types._ @@ -39,10 +39,13 @@ private[sql] object Column { } /** - * A [[Column]] where an [[Encoder]] has been given for the expected return type. + * A [[Column]] where an [[Encoder]] has been given for the expected input and return type. * @since 1.6.0 + * @tparam T The input type expected for this expression. Can be `Any` if the expression is type + * checked by the analyzer instead of the compiler (i.e. `expr("sum(...)")`). + * @tparam U The output type of this column. */ -class TypedColumn[T](expr:
spark git commit: [SPARK-11595] [SQL] Fixes ADD JAR when the input path contains URL scheme
Repository: spark Updated Branches: refs/heads/master f138cb873 -> 150f6a89b [SPARK-11595] [SQL] Fixes ADD JAR when the input path contains URL scheme Author: Cheng LianCloses #9569 from liancheng/spark-11595.fix-add-jar. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/150f6a89 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/150f6a89 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/150f6a89 Branch: refs/heads/master Commit: 150f6a89b79f0e5bc31aa83731429dc7ac5ea76b Parents: f138cb8 Author: Cheng Lian Authored: Mon Nov 9 14:32:52 2015 -0800 Committer: Davies Liu Committed: Mon Nov 9 14:32:52 2015 -0800 -- .../sql/hive/thriftserver/HiveThriftServer2Suites.scala | 1 + .../org/apache/spark/sql/hive/client/ClientWrapper.scala | 11 +-- .../spark/sql/hive/client/IsolatedClientLoader.scala | 9 +++-- .../apache/spark/sql/hive/execution/HiveQuerySuite.scala | 8 +--- 4 files changed, 18 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/150f6a89/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala -- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index ff8ca01..5903b9e 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -41,6 +41,7 @@ import org.apache.thrift.transport.TSocket import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.util.Utils import org.apache.spark.{Logging, SparkFunSuite} http://git-wip-us.apache.org/repos/asf/spark/blob/150f6a89/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 3dce86c..f1c2489 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.client import java.io.{File, PrintStream} import java.util.{Map => JMap} -import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.language.reflectiveCalls @@ -548,7 +547,15 @@ private[hive] class ClientWrapper( } def addJar(path: String): Unit = { -clientLoader.addJar(path) +val uri = new Path(path).toUri +val jarURL = if (uri.getScheme == null) { + // `path` is a local file path without a URL scheme + new File(path).toURI.toURL +} else { + // `path` is a URL with a scheme + uri.toURL +} +clientLoader.addJar(jarURL) runSqlHive(s"ADD JAR $path") } http://git-wip-us.apache.org/repos/asf/spark/blob/150f6a89/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index f99c3ed..e041e0d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -22,7 +22,6 @@ import java.lang.reflect.InvocationTargetException import java.net.{URL, URLClassLoader} import java.util -import scala.collection.mutable import scala.language.reflectiveCalls import scala.util.Try @@ -30,10 +29,9 @@ import org.apache.commons.io.{FileUtils, IOUtils} import org.apache.spark.Logging import org.apache.spark.deploy.SparkSubmitUtils -import org.apache.spark.util.{MutableURLClassLoader, Utils} - import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.util.{MutableURLClassLoader, Utils} /** Factory for `IsolatedClientLoader` with specific versions of hive. */ private[hive] object IsolatedClientLoader { @@ -190,9 +188,8 @@ private[hive]
spark git commit: [SPARK-11595] [SQL] Fixes ADD JAR when the input path contains URL scheme
Repository: spark Updated Branches: refs/heads/branch-1.6 a6ee4f989 -> bdd8a6bd4 [SPARK-11595] [SQL] Fixes ADD JAR when the input path contains URL scheme Author: Cheng LianCloses #9569 from liancheng/spark-11595.fix-add-jar. (cherry picked from commit 150f6a89b79f0e5bc31aa83731429dc7ac5ea76b) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bdd8a6bd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bdd8a6bd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bdd8a6bd Branch: refs/heads/branch-1.6 Commit: bdd8a6bd4ed06c1381fb96bf2e35b2b55fc701e5 Parents: a6ee4f9 Author: Cheng Lian Authored: Mon Nov 9 14:32:52 2015 -0800 Committer: Davies Liu Committed: Mon Nov 9 14:33:02 2015 -0800 -- .../sql/hive/thriftserver/HiveThriftServer2Suites.scala | 1 + .../org/apache/spark/sql/hive/client/ClientWrapper.scala | 11 +-- .../spark/sql/hive/client/IsolatedClientLoader.scala | 9 +++-- .../apache/spark/sql/hive/execution/HiveQuerySuite.scala | 8 +--- 4 files changed, 18 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bdd8a6bd/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala -- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index ff8ca01..5903b9e 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -41,6 +41,7 @@ import org.apache.thrift.transport.TSocket import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.util.Utils import org.apache.spark.{Logging, SparkFunSuite} http://git-wip-us.apache.org/repos/asf/spark/blob/bdd8a6bd/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 3dce86c..f1c2489 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.client import java.io.{File, PrintStream} import java.util.{Map => JMap} -import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.language.reflectiveCalls @@ -548,7 +547,15 @@ private[hive] class ClientWrapper( } def addJar(path: String): Unit = { -clientLoader.addJar(path) +val uri = new Path(path).toUri +val jarURL = if (uri.getScheme == null) { + // `path` is a local file path without a URL scheme + new File(path).toURI.toURL +} else { + // `path` is a URL with a scheme + uri.toURL +} +clientLoader.addJar(jarURL) runSqlHive(s"ADD JAR $path") } http://git-wip-us.apache.org/repos/asf/spark/blob/bdd8a6bd/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index f99c3ed..e041e0d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -22,7 +22,6 @@ import java.lang.reflect.InvocationTargetException import java.net.{URL, URLClassLoader} import java.util -import scala.collection.mutable import scala.language.reflectiveCalls import scala.util.Try @@ -30,10 +29,9 @@ import org.apache.commons.io.{FileUtils, IOUtils} import org.apache.spark.Logging import org.apache.spark.deploy.SparkSubmitUtils -import org.apache.spark.util.{MutableURLClassLoader, Utils} - import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.util.{MutableURLClassLoader, Utils} /** Factory for
spark git commit: [SPARK-11198][STREAMING][KINESIS] Support de-aggregation of records during recovery
Repository: spark Updated Branches: refs/heads/master 61f9c8711 -> 26062d226 [SPARK-11198][STREAMING][KINESIS] Support de-aggregation of records during recovery While the KCL handles de-aggregation during the regular operation, during recovery we use the lower level api, and therefore need to de-aggregate the records. tdas Testing is an issue, we need protobuf magic to do the aggregated records. Maybe we could depend on KPL for tests? Author: Burak YavuzCloses #9403 from brkyvz/kinesis-deaggregation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26062d22 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26062d22 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26062d22 Branch: refs/heads/master Commit: 26062d22607e1f9854bc2588ba22a4e0f8bba48c Parents: 61f9c87 Author: Burak Yavuz Authored: Mon Nov 9 17:18:49 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 9 17:18:49 2015 -0800 -- extras/kinesis-asl/pom.xml | 6 + .../kinesis/KinesisBackedBlockRDD.scala | 6 +- .../streaming/kinesis/KinesisReceiver.scala | 1 - .../kinesis/KinesisRecordProcessor.scala| 2 +- .../streaming/kinesis/KinesisTestUtils.scala| 235 .../kinesis/KinesisBackedBlockRDDSuite.scala| 12 +- .../streaming/kinesis/KinesisStreamSuite.scala | 17 +- .../streaming/kinesis/KinesisTestUtils.scala| 266 +++ pom.xml | 2 + 9 files changed, 299 insertions(+), 248 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/26062d22/extras/kinesis-asl/pom.xml -- diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml index ef72d97..519a920 100644 --- a/extras/kinesis-asl/pom.xml +++ b/extras/kinesis-asl/pom.xml @@ -65,6 +65,12 @@ ${aws.java.sdk.version} + com.amazonaws + amazon-kinesis-producer + ${aws.kinesis.producer.version} + test + + org.mockito mockito-core test http://git-wip-us.apache.org/repos/asf/spark/blob/26062d22/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala -- diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index 000897a..691c179 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -23,6 +23,7 @@ import scala.util.control.NonFatal import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain} import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord import com.amazonaws.services.kinesis.model._ import org.apache.spark._ @@ -210,7 +211,10 @@ class KinesisSequenceRangeIterator( s"getting records using shard iterator") { client.getRecords(getRecordsRequest) } -(getRecordsResult.getRecords.iterator().asScala, getRecordsResult.getNextShardIterator) +// De-aggregate records, if KPL was used in producing the records. The KCL automatically +// handles de-aggregation during regular operation. This code path is used during recovery +val recordIterator = UserRecord.deaggregate(getRecordsResult.getRecords) +(recordIterator.iterator().asScala, getRecordsResult.getNextShardIterator) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/26062d22/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala -- diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 50993f1..97dbb91 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -216,7 +216,6 @@ private[kinesis] class KinesisReceiver[T]( val metadata = SequenceNumberRange(streamName, shardId, records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber()) blockGenerator.addMultipleDataWithCallback(dataIterator, metadata) - } }
spark git commit: [SPARK-11198][STREAMING][KINESIS] Support de-aggregation of records during recovery
Repository: spark Updated Branches: refs/heads/branch-1.6 34e824d90 -> 116b7158f [SPARK-11198][STREAMING][KINESIS] Support de-aggregation of records during recovery While the KCL handles de-aggregation during the regular operation, during recovery we use the lower level api, and therefore need to de-aggregate the records. tdas Testing is an issue, we need protobuf magic to do the aggregated records. Maybe we could depend on KPL for tests? Author: Burak YavuzCloses #9403 from brkyvz/kinesis-deaggregation. (cherry picked from commit 26062d22607e1f9854bc2588ba22a4e0f8bba48c) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/116b7158 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/116b7158 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/116b7158 Branch: refs/heads/branch-1.6 Commit: 116b7158fa27cf9dbd935be1f395c68d2f8928ec Parents: 34e824d Author: Burak Yavuz Authored: Mon Nov 9 17:18:49 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 9 17:18:59 2015 -0800 -- extras/kinesis-asl/pom.xml | 6 + .../kinesis/KinesisBackedBlockRDD.scala | 6 +- .../streaming/kinesis/KinesisReceiver.scala | 1 - .../kinesis/KinesisRecordProcessor.scala| 2 +- .../streaming/kinesis/KinesisTestUtils.scala| 235 .../kinesis/KinesisBackedBlockRDDSuite.scala| 12 +- .../streaming/kinesis/KinesisStreamSuite.scala | 17 +- .../streaming/kinesis/KinesisTestUtils.scala| 266 +++ pom.xml | 2 + 9 files changed, 299 insertions(+), 248 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/116b7158/extras/kinesis-asl/pom.xml -- diff --git a/extras/kinesis-asl/pom.xml b/extras/kinesis-asl/pom.xml index ef72d97..519a920 100644 --- a/extras/kinesis-asl/pom.xml +++ b/extras/kinesis-asl/pom.xml @@ -65,6 +65,12 @@ ${aws.java.sdk.version} + com.amazonaws + amazon-kinesis-producer + ${aws.kinesis.producer.version} + test + + org.mockito mockito-core test http://git-wip-us.apache.org/repos/asf/spark/blob/116b7158/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala -- diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index 000897a..691c179 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -23,6 +23,7 @@ import scala.util.control.NonFatal import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain} import com.amazonaws.services.kinesis.AmazonKinesisClient +import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord import com.amazonaws.services.kinesis.model._ import org.apache.spark._ @@ -210,7 +211,10 @@ class KinesisSequenceRangeIterator( s"getting records using shard iterator") { client.getRecords(getRecordsRequest) } -(getRecordsResult.getRecords.iterator().asScala, getRecordsResult.getNextShardIterator) +// De-aggregate records, if KPL was used in producing the records. The KCL automatically +// handles de-aggregation during regular operation. This code path is used during recovery +val recordIterator = UserRecord.deaggregate(getRecordsResult.getRecords) +(recordIterator.iterator().asScala, getRecordsResult.getNextShardIterator) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/116b7158/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala -- diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 50993f1..97dbb91 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -216,7 +216,6 @@ private[kinesis] class KinesisReceiver[T]( val metadata = SequenceNumberRange(streamName, shardId, records.get(0).getSequenceNumber(), records.get(records.size() -