spark git commit: [SPARK-19634][ML] Multivariate summarizer - dataframes API
Repository: spark Updated Branches: refs/heads/master 966083105 -> 07549b20a [SPARK-19634][ML] Multivariate summarizer - dataframes API ## What changes were proposed in this pull request? This patch adds the DataFrames API to the multivariate summarizer (mean, variance, etc.). In addition to all the features of MultivariateOnlineSummarizer, it also allows the user to select a subset of the metrics. ## How was this patch tested? Testcases added. ## Performance Resolve several performance issues in #17419, further optimization pending on SQL team's work. One of the SQL layer performance issue related to these feature has been resolved in #18712, thanks liancheng and cloud-fan ### Performance data (test on my laptop, use 2 partitions. tries out = 20, warm up = 10) The unit of test results is records/milliseconds (higher is better) Vector size/records number | 1/1000 | 10/100 | 100/100 | 1000/10 | 1/1 |--||---|| Dataframe | 15149 | 7441 | 2118 | 224 | 21 RDD from Dataframe | 4992 | 4440 | 2328 | 320 | 33 raw RDD | 53931 | 20683 | 3966 | 528 | 53 Author: WeichenXuCloses #18798 from WeichenXu123/SPARK-19634-dataframe-summarizer. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07549b20 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07549b20 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07549b20 Branch: refs/heads/master Commit: 07549b20a3fc2a282e080f76a2be075e4dd5ebc7 Parents: 9660831 Author: WeichenXu Authored: Wed Aug 16 10:41:05 2017 +0800 Committer: Yanbo Liang Committed: Wed Aug 16 10:41:05 2017 +0800 -- .../org/apache/spark/ml/linalg/VectorUDT.scala | 24 +- .../org/apache/spark/ml/stat/Summarizer.scala | 596 +++ .../apache/spark/ml/stat/SummarizerSuite.scala | 582 ++ .../sql/catalyst/expressions/Projection.scala | 6 + .../expressions/aggregate/interfaces.scala | 6 + 5 files changed, 1203 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/07549b20/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala index 9178613..37f173b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala @@ -27,17 +27,7 @@ import org.apache.spark.sql.types._ */ private[spark] class VectorUDT extends UserDefinedType[Vector] { - override def sqlType: StructType = { -// type: 0 = sparse, 1 = dense -// We only use "values" for dense vectors, and "size", "indices", and "values" for sparse -// vectors. The "values" field is nullable because we might want to add binary vectors later, -// which uses "size" and "indices", but not "values". -StructType(Seq( - StructField("type", ByteType, nullable = false), - StructField("size", IntegerType, nullable = true), - StructField("indices", ArrayType(IntegerType, containsNull = false), nullable = true), - StructField("values", ArrayType(DoubleType, containsNull = false), nullable = true))) - } + override final def sqlType: StructType = _sqlType override def serialize(obj: Vector): InternalRow = { obj match { @@ -94,4 +84,16 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] { override def typeName: String = "vector" private[spark] override def asNullable: VectorUDT = this + + private[this] val _sqlType = { +// type: 0 = sparse, 1 = dense +// We only use "values" for dense vectors, and "size", "indices", and "values" for sparse +// vectors. The "values" field is nullable because we might want to add binary vectors later, +// which uses "size" and "indices", but not "values". +StructType(Seq( + StructField("type", ByteType, nullable = false), + StructField("size", IntegerType, nullable = true), + StructField("indices", ArrayType(IntegerType, containsNull = false), nullable = true), + StructField("values", ArrayType(DoubleType, containsNull = false), nullable = true))) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/07549b20/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala new file mode 100644 index 000..7e408b9 --- /dev/null +++
spark git commit: [SPARK-21712][PYSPARK] Clarify type error for Column.substr()
Repository: spark Updated Branches: refs/heads/master 42b9eda80 -> 966083105 [SPARK-21712][PYSPARK] Clarify type error for Column.substr() Proposed changes: * Clarify the type error that `Column.substr()` gives. Test plan: * Tested this manually. * Test code: ```python from pyspark.sql.functions import col, lit spark.createDataFrame([['nick']], schema=['name']).select(col('name').substr(0, lit(1))) ``` * Before: ``` TypeError: Can not mix the type ``` * After: ``` TypeError: startPos and length must be the same type. Got and , respectively. ``` Author: Nicholas ChammasCloses #18926 from nchammas/SPARK-21712-substr-type-error. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96608310 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96608310 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96608310 Branch: refs/heads/master Commit: 96608310501a43fa4ab9f2697f202d655dba98c5 Parents: 42b9eda Author: Nicholas Chammas Authored: Wed Aug 16 11:19:15 2017 +0900 Committer: hyukjinkwon Committed: Wed Aug 16 11:19:15 2017 +0900 -- python/pyspark/sql/column.py | 10 -- python/pyspark/sql/tests.py | 12 2 files changed, 20 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/96608310/python/pyspark/sql/column.py -- diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index e753ed4..b172f38 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -406,8 +406,14 @@ class Column(object): [Row(col=u'Ali'), Row(col=u'Bob')] """ if type(startPos) != type(length): -raise TypeError("Can not mix the type") -if isinstance(startPos, (int, long)): +raise TypeError( +"startPos and length must be the same type. " +"Got {startPos_t} and {length_t}, respectively." +.format( +startPos_t=type(startPos), +length_t=type(length), +)) +if isinstance(startPos, int): jc = self._jc.substr(startPos, length) elif isinstance(startPos, Column): jc = self._jc.substr(startPos._jc, length._jc) http://git-wip-us.apache.org/repos/asf/spark/blob/96608310/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index cf2c473..45a3f9e 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1220,6 +1220,18 @@ class SQLTests(ReusedPySparkTestCase): rndn2 = df.select('key', functions.randn(0)).collect() self.assertEqual(sorted(rndn1), sorted(rndn2)) +def test_string_functions(self): +from pyspark.sql.functions import col, lit +df = self.spark.createDataFrame([['nick']], schema=['name']) +self.assertRaisesRegexp( +TypeError, +"must be the same type", +lambda: df.select(col('name').substr(0, lit(1 +if sys.version_info.major == 2: +self.assertRaises( +TypeError, +lambda: df.select(col('name').substr(long(0), long(1 + def test_array_contains_function(self): from pyspark.sql.functions import array_contains - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR] Fix a typo in the method name `UserDefinedFunction.asNonNullabe`
Repository: spark Updated Branches: refs/heads/master 3f958a999 -> 42b9eda80 [MINOR] Fix a typo in the method name `UserDefinedFunction.asNonNullabe` ## What changes were proposed in this pull request? The method name `asNonNullabe` should be `asNonNullable`. ## How was this patch tested? N/A Author: Xingbo JiangCloses #18952 from jiangxb1987/typo. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42b9eda8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42b9eda8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42b9eda8 Branch: refs/heads/master Commit: 42b9eda80e975d970c3e8da4047b318b83dd269f Parents: 3f958a9 Author: Xingbo Jiang Authored: Tue Aug 15 16:40:01 2017 -0700 Committer: gatorsmile Committed: Tue Aug 15 16:40:01 2017 -0700 -- .../org/apache/spark/sql/UDFRegistration.scala | 52 ++-- .../sql/expressions/UserDefinedFunction.scala | 2 +- .../scala/org/apache/spark/sql/functions.scala | 24 - 3 files changed, 39 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/42b9eda8/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index 737afb4..3ff4761 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -82,7 +82,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends * Registers a user-defined function (UDF), for a UDF that's already defined using the Dataset * API (i.e. of type UserDefinedFunction). To change a UDF to nondeterministic, call the API * `UserDefinedFunction.asNondeterministic()`. To change a UDF to nonNullable, call the API - * `UserDefinedFunction.asNonNullabe()`. + * `UserDefinedFunction.asNonNullable()`. * * Example: * {{{ @@ -90,7 +90,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends * spark.udf.register("random", foo.asNondeterministic()) * * val bar = udf(() => "bar") - * spark.udf.register("stringLit", bar.asNonNullabe()) + * spark.udf.register("stringLit", bar.asNonNullable()) * }}} * * @param name the name of the UDF. @@ -130,7 +130,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } functionRegistry.createOrReplaceTempFunction(name, builder) val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) - if (nullable) udf else udf.asNonNullabe() + if (nullable) udf else udf.asNonNullable() }""") } @@ -175,7 +175,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } functionRegistry.createOrReplaceTempFunction(name, builder) val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) -if (nullable) udf else udf.asNonNullabe() +if (nullable) udf else udf.asNonNullable() } /** @@ -194,7 +194,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } functionRegistry.createOrReplaceTempFunction(name, builder) val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) -if (nullable) udf else udf.asNonNullabe() +if (nullable) udf else udf.asNonNullable() } /** @@ -213,7 +213,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } functionRegistry.createOrReplaceTempFunction(name, builder) val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) -if (nullable) udf else udf.asNonNullabe() +if (nullable) udf else udf.asNonNullable() } /** @@ -232,7 +232,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } functionRegistry.createOrReplaceTempFunction(name, builder) val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) -if (nullable) udf else udf.asNonNullabe() +if (nullable) udf else udf.asNonNullable() } /** @@ -251,7 +251,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends } functionRegistry.createOrReplaceTempFunction(name, builder) val udf = UserDefinedFunction(func, dataType, inputTypes).withName(name) -if (nullable) udf else udf.asNonNullabe() +if (nullable) udf else udf.asNonNullable() } /** @@ -270,7 +270,7 @@ class UDFRegistration private[sql]
[1/3] spark git commit: [SPARK-21731][BUILD] Upgrade scalastyle to 0.9.
Repository: spark Updated Branches: refs/heads/master cba826d00 -> 3f958a999 http://git-wip-us.apache.org/repos/asf/spark/blob/3f958a99/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 5f8973f..43cf0ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -*http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.sql.execution.streaming http://git-wip-us.apache.org/repos/asf/spark/blob/3f958a99/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index dbe652b..258a642 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -*http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.spark.sql.execution.streaming http://git-wip-us.apache.org/repos/asf/spark/blob/3f958a99/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala -- diff --git
[2/3] spark git commit: [SPARK-21731][BUILD] Upgrade scalastyle to 0.9.
http://git-wip-us.apache.org/repos/asf/spark/blob/3f958a99/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala -- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 28780d3..9ee9cb1 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -30,8 +30,8 @@ import org.apache.mesos.Protos.Environment.Variable import org.apache.mesos.Protos.TaskStatus.Reason import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState} -import org.apache.spark.deploy.mesos.config import org.apache.spark.deploy.mesos.MesosDriverDescription +import org.apache.spark.deploy.mesos.config import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.Utils http://git-wip-us.apache.org/repos/asf/spark/blob/3f958a99/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala -- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index d9ff4a4..a8175e2 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -28,17 +28,17 @@ import org.apache.mesos.Protos._ import org.mockito.Matchers import org.mockito.Matchers._ import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.ScalaFutures import org.scalatest.mock.MockitoSugar -import org.scalatest.BeforeAndAfter import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor} import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor} import org.apache.spark.scheduler.cluster.mesos.Utils._ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite http://git-wip-us.apache.org/repos/asf/spark/blob/3f958a99/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala -- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala index 257dc83..0a7a16f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala @@ -17,8 +17,8 @@ package org.apache.spark.deploy.yarn -import scala.collection.mutable.{ArrayBuffer, HashMap, Set} import scala.collection.JavaConverters._ +import scala.collection.mutable.{ArrayBuffer, HashMap, Set} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records.{ContainerId, Resource} http://git-wip-us.apache.org/repos/asf/spark/blob/3f958a99/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala -- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index cc571c3..f73e7dc 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -22,9 +22,9 @@ import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger import
[3/3] spark git commit: [SPARK-21731][BUILD] Upgrade scalastyle to 0.9.
[SPARK-21731][BUILD] Upgrade scalastyle to 0.9. This version fixes a few issues in the import order checker; it provides better error messages, and detects more improper ordering (thus the need to change a lot of files in this patch). The main fix is that it correctly complains about the order of packages vs. classes. As part of the above, I moved some "SparkSession" import in ML examples inside the "$example on$" blocks; that didn't seem consistent across different source files to start with, and avoids having to add more on/off blocks around specific imports. The new scalastyle also seems to have a better header detector, so a few license headers had to be updated to match the expected indentation. Author: Marcelo VanzinCloses #18943 from vanzin/SPARK-21731. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f958a99 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f958a99 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f958a99 Branch: refs/heads/master Commit: 3f958a99921d149fb9fdf7ba7e78957afdad1405 Parents: cba826d Author: Marcelo Vanzin Authored: Tue Aug 15 13:59:00 2017 -0700 Committer: Marcelo Vanzin Committed: Tue Aug 15 13:59:00 2017 -0700 -- .../apache/spark/api/java/JavaDoubleRDD.scala | 2 +- .../spark/api/python/PythonWorkerFactory.scala | 2 +- .../apache/spark/api/r/JVMObjectTracker.scala | 2 +- .../apache/spark/api/r/RBackendHandler.scala| 2 +- .../org/apache/spark/deploy/PythonRunner.scala | 2 +- .../apache/spark/deploy/SparkHadoopUtil.scala | 2 +- .../deploy/rest/RestSubmissionServer.scala | 2 +- .../deploy/rest/StandaloneRestServer.scala | 2 +- .../spark/deploy/worker/WorkerArguments.scala | 2 +- .../apache/spark/input/PortableDataStream.scala | 2 +- .../spark/input/WholeTextFileInputFormat.scala | 5 +-- .../apache/spark/metrics/MetricsSystem.scala| 2 +- .../org/apache/spark/rdd/CoGroupedRDD.scala | 2 +- .../apache/spark/rdd/DoubleRDDFunctions.scala | 2 +- .../org/apache/spark/rdd/PairRDDFunctions.scala | 2 +- .../spark/rdd/PartitionwiseSampledRDD.scala | 2 +- .../org/apache/spark/rdd/coalesce-public.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../apache/spark/scheduler/TaskSetManager.scala | 2 +- .../status/api/v1/AllExecutorListResource.scala | 30 - .../status/api/v1/ExecutorListResource.scala| 30 - .../storage/BlockManagerMasterEndpoint.scala| 2 +- .../scala/org/apache/spark/ui/JettyUtils.scala | 2 +- .../org/apache/spark/ui/exec/ExecutorsTab.scala | 2 +- .../org/apache/spark/DistributedSuite.scala | 2 +- .../org/apache/spark/JobCancellationSuite.scala | 2 +- .../apache/spark/MapOutputTrackerSuite.scala| 2 +- .../org/apache/spark/SparkContextSuite.scala| 2 +- .../apache/spark/deploy/SparkSubmitSuite.scala | 4 +-- .../deploy/rest/StandaloneRestSubmitSuite.scala | 2 +- .../netty/NettyBlockTransferSecuritySuite.scala | 2 +- .../apache/spark/rdd/AsyncRDDActionsSuite.scala | 2 +- .../org/apache/spark/rpc/RpcEnvSuite.scala | 2 +- .../spark/scheduler/BlacklistTrackerSuite.scala | 2 +- .../apache/spark/scheduler/MapStatusSuite.scala | 2 +- .../spark/scheduler/SparkListenerSuite.scala| 2 +- .../spark/scheduler/TaskResultGetterSuite.scala | 2 +- .../scheduler/TaskSchedulerImplSuite.scala | 2 +- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- .../spark/storage/BlockManagerSuite.scala | 2 +- .../apache/spark/storage/DiskStoreSuite.scala | 2 +- .../apache/spark/util/JsonProtocolSuite.scala | 2 +- .../ml/BucketedRandomProjectionLSHExample.scala | 2 +- .../spark/examples/ml/MinHashLSHExample.scala | 2 +- .../spark/examples/ml/TokenizerExample.scala| 2 +- .../examples/ml/UnaryTransformerExample.scala | 4 +-- .../spark/examples/ml/VectorSlicerExample.scala | 3 +- .../spark/examples/mllib/TallSkinnyPCA.scala| 2 +- .../spark/examples/mllib/TallSkinnySVD.scala| 2 +- .../pythonconverters/AvroConverters.scala | 6 ++-- .../sql/UserDefinedTypedAggregation.scala | 4 +-- .../sql/UserDefinedUntypedAggregation.scala | 3 +- .../streaming/DirectKafkaWordCount.scala| 2 +- .../spark/streaming/flume/FlumeTestUtils.scala | 2 +- .../streaming/flume/PollingFlumeTestUtils.scala | 2 +- .../spark/sql/kafka010/KafkaTestUtils.scala | 2 +- .../kafka010/DirectKafkaStreamSuite.scala | 2 +- .../kafka/DirectKafkaStreamSuite.scala | 2 +- .../streaming/kinesis/KinesisInputDStream.scala | 3 +- .../org/apache/spark/graphx/GraphOps.scala | 2 +- .../org/apache/spark/ml/impl/UtilsSuite.scala | 2 +- .../apache/spark/ml/feature/Interaction.scala | 4 +--
spark git commit: [SPARK-17742][CORE] Handle child process exit in SparkLauncher.
Repository: spark Updated Branches: refs/heads/master 14bdb25fd -> cba826d00 [SPARK-17742][CORE] Handle child process exit in SparkLauncher. Currently the launcher handle does not monitor the child spark-submit process it launches; this means that if the child exits with an error, the handle's state will never change, and an application will not know that the application has failed. This change adds code to monitor the child process, and changes the handle state appropriately when the child process exits. Tested with added unit tests. Author: Marcelo VanzinCloses #18877 from vanzin/SPARK-17742. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cba826d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cba826d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cba826d0 Branch: refs/heads/master Commit: cba826d00173a945b0c9a7629c66e36fa73b723e Parents: 14bdb25 Author: Marcelo Vanzin Authored: Tue Aug 15 11:26:29 2017 -0700 Committer: Marcelo Vanzin Committed: Tue Aug 15 11:26:29 2017 -0700 -- .../spark/launcher/SparkLauncherSuite.java | 4 +- .../spark/launcher/ChildProcAppHandle.java | 65 +++-- .../apache/spark/launcher/OutputRedirector.java | 16 +- .../spark/launcher/ChildProcAppHandleSuite.java | 248 +++ .../spark/launcher/OutputRedirectionSuite.java | 226 - launcher/src/test/resources/log4j.properties| 2 +- 6 files changed, 313 insertions(+), 248 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cba826d0/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java -- diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index 19861b8..db4fc26 100644 --- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -116,11 +116,11 @@ public class SparkLauncherSuite { .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path")) .addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow") .setMainClass(SparkLauncherTestApp.class.getName()) + .redirectError() .addAppArgs("proc"); final Process app = launcher.launch(); -new OutputRedirector(app.getInputStream(), TF); -new OutputRedirector(app.getErrorStream(), TF); +new OutputRedirector(app.getInputStream(), getClass().getName() + ".child", TF); assertEquals(0, app.waitFor()); } http://git-wip-us.apache.org/repos/asf/spark/blob/cba826d0/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java -- diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java index 3ce4b79..bf91640 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java @@ -34,7 +34,7 @@ class ChildProcAppHandle implements SparkAppHandle { private final String secret; private final LauncherServer server; - private Process childProc; + private volatile Process childProc; private boolean disposed; private LauncherConnection connection; private List listeners; @@ -96,18 +96,14 @@ class ChildProcAppHandle implements SparkAppHandle { @Override public synchronized void kill() { -if (!disposed) { - disconnect(); -} +disconnect(); if (childProc != null) { - try { -childProc.exitValue(); - } catch (IllegalThreadStateException e) { + if (childProc.isAlive()) { childProc.destroyForcibly(); - } finally { -childProc = null; } + childProc = null; } +setState(State.KILLED); } String getSecret() { @@ -118,7 +114,13 @@ class ChildProcAppHandle implements SparkAppHandle { this.childProc = childProc; if (logStream != null) { this.redirector = new OutputRedirector(logStream, loggerName, -SparkLauncher.REDIRECTOR_FACTORY); +SparkLauncher.REDIRECTOR_FACTORY, this); +} else { + // If there is no log redirection, spawn a thread that will wait for the child process + // to finish. + Thread waiter = SparkLauncher.REDIRECTOR_FACTORY.newThread(this::monitorChild); + waiter.setDaemon(true); + waiter.start(); } } @@ -134,7 +136,7 @@ class ChildProcAppHandle implements SparkAppHandle { return connection; } - void setState(State
spark git commit: [SPARK-18464][SQL][FOLLOWUP] support old table which doesn't store schema in table properties
Repository: spark Updated Branches: refs/heads/master bc9902587 -> 14bdb25fd [SPARK-18464][SQL][FOLLOWUP] support old table which doesn't store schema in table properties ## What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/15900 , to fix one more bug: When table schema is empty and need to be inferred at runtime, we should not resolve parent plans before the schema has been inferred, or the parent plans will be resolved against an empty schema and may get wrong result for something like `select *` The fix logic is: introduce `UnresolvedCatalogRelation` as a placeholder. Then we replace it with `LogicalRelation` or `HiveTableRelation` during analysis, so that it's guaranteed that we won't resolve parent plans until the schema has been inferred. ## How was this patch tested? regression test Author: Wenchen FanCloses #18907 from cloud-fan/bug. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14bdb25f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14bdb25f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14bdb25f Branch: refs/heads/master Commit: 14bdb25fd76cf1da8f590231833f47b5b7059f11 Parents: bc99025 Author: Wenchen Fan Authored: Tue Aug 15 09:04:56 2017 -0700 Committer: gatorsmile Committed: Tue Aug 15 09:04:56 2017 -0700 -- .../sql/catalyst/catalog/SessionCatalog.scala | 7 +-- .../spark/sql/catalyst/catalog/interface.scala | 25 ++ .../catalyst/catalog/SessionCatalogSuite.scala | 5 +- .../org/apache/spark/sql/DataFrameWriter.scala | 11 ++--- .../scala/org/apache/spark/sql/Dataset.scala| 4 +- .../execution/OptimizeMetadataOnlyQuery.scala | 6 +-- .../datasources/DataSourceStrategy.scala| 49 +++- .../spark/sql/execution/datasources/rules.scala | 4 +- .../sql/StatisticsCollectionTestBase.scala | 4 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 6 +-- .../apache/spark/sql/hive/HiveStrategies.scala | 17 --- .../sql/hive/execution/HiveTableScanExec.scala | 6 +-- .../sql/hive/MetastoreDataSourcesSuite.scala| 1 + .../apache/spark/sql/hive/StatisticsSuite.scala | 10 ++-- .../sql/hive/execution/SQLQuerySuite.scala | 10 ++-- .../spark/sql/hive/orc/OrcQuerySuite.scala | 4 +- .../apache/spark/sql/hive/parquetSuites.scala | 4 +- 17 files changed, 90 insertions(+), 83 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index e3237a8..6030d90 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -678,12 +678,7 @@ class SessionCatalog( child = parser.parsePlan(viewText)) SubqueryAlias(table, child) } else { - val tableRelation = CatalogRelation( -metadata, -// we assume all the columns are nullable. -metadata.dataSchema.asNullable.toAttributes, -metadata.partitionSchema.asNullable.toAttributes) - SubqueryAlias(table, tableRelation) + SubqueryAlias(table, UnresolvedCatalogRelation(metadata)) } } else { SubqueryAlias(table, tempTables(table)) http://git-wip-us.apache.org/repos/asf/spark/blob/14bdb25f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index f865106..5a8c4e7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal} -import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical._ import
spark git commit: [SPARK-19471][SQL] AggregationIterator does not initialize the generated result projection before using it
Repository: spark Updated Branches: refs/heads/master 12411b5ed -> bc9902587 [SPARK-19471][SQL] AggregationIterator does not initialize the generated result projection before using it ## What changes were proposed in this pull request? This is a follow-up PR that moves the test case in PR-18920 (https://github.com/apache/spark/pull/18920) to DataFrameAggregateSuit. ## How was this patch tested? unit test Author: donnyzoneCloses #18946 from DonnyZone/branch-19471-followingPR. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc990258 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc990258 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc990258 Branch: refs/heads/master Commit: bc9902587a3a3fc6a835ec485c32c047f89100f2 Parents: 12411b5 Author: donnyzone Authored: Tue Aug 15 08:51:18 2017 -0700 Committer: gatorsmile Committed: Tue Aug 15 08:51:18 2017 -0700 -- .../spark/sql/DataFrameAggregateSuite.scala | 47 .../spark/sql/DataFrameFunctionsSuite.scala | 45 --- 2 files changed, 47 insertions(+), 45 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bc990258/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 69ea62e..affe971 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql +import scala.util.Random + +import org.apache.spark.sql.execution.WholeStageCodegenExec +import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -558,6 +562,49 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { assert(e.message.contains("aggregate functions are not allowed in GROUP BY")) } + private def assertNoExceptions(c: Column): Unit = { +for ((wholeStage, useObjectHashAgg) <- + Seq((true, true), (true, false), (false, true), (false, false))) { + withSQLConf( +(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString), +(SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString)) { + +val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y") + +// test case for HashAggregate +val hashAggDF = df.groupBy("x").agg(c, sum("y")) +val hashAggPlan = hashAggDF.queryExecution.executedPlan +if (wholeStage) { + assert(hashAggPlan.find { +case WholeStageCodegenExec(_: HashAggregateExec) => true +case _ => false + }.isDefined) +} else { + assert(hashAggPlan.isInstanceOf[HashAggregateExec]) +} +hashAggDF.collect() + +// test case for ObjectHashAggregate and SortAggregate +val objHashAggOrSortAggDF = df.groupBy("x").agg(c, collect_list("y")) +val objHashAggOrSortAggPlan = objHashAggOrSortAggDF.queryExecution.executedPlan +if (useObjectHashAgg) { + assert(objHashAggOrSortAggPlan.isInstanceOf[ObjectHashAggregateExec]) +} else { + assert(objHashAggOrSortAggPlan.isInstanceOf[SortAggregateExec]) +} +objHashAggOrSortAggDF.collect() + } +} + } + + test("SPARK-19471: AggregationIterator does not initialize the generated result projection" + +" before using it") { +Seq( + monotonically_increasing_id(), spark_partition_id(), + rand(Random.nextLong()), randn(Random.nextLong()) +).foreach(assertNoExceptions) + } + test("SPARK-21580 ints in aggregation expressions are taken as group-by ordinal.") { checkAnswer( testData2.groupBy(lit(3), lit(4)).agg(lit(6), lit(7), sum("b")), http://git-wip-us.apache.org/repos/asf/spark/blob/bc990258/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index fdb9f1d..0681b9c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -24,8 +24,6 @@ import
spark git commit: [SPARK-21721][SQL][BACKPORT-2.1] Clear FileSystem deleteOnExit cache when paths are successfully removed
Repository: spark Updated Branches: refs/heads/branch-2.1 9b749b6ce -> 6f366fbbf [SPARK-21721][SQL][BACKPORT-2.1] Clear FileSystem deleteOnExit cache when paths are successfully removed ## What changes were proposed in this pull request? Backport SPARK-21721 to branch 2.1: We put staging path to delete into the deleteOnExit cache of FileSystem in case of the path can't be successfully removed. But when we successfully remove the path, we don't remove it from the cache. We should do it to avoid continuing grow the cache size. ## How was this patch tested? Added test. Author: Liang-Chi HsiehCloses #18947 from viirya/SPARK-21721-backport-2.1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f366fbb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f366fbb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f366fbb Branch: refs/heads/branch-2.1 Commit: 6f366fbbf8dc0a00050040891635e1caae8a4faa Parents: 9b749b6 Author: Liang-Chi Hsieh Authored: Tue Aug 15 08:48:00 2017 -0700 Committer: gatorsmile Committed: Tue Aug 15 08:48:00 2017 -0700 -- .../hive/execution/InsertIntoHiveTable.scala| 8 +++- .../sql/hive/execution/SQLQuerySuite.scala | 21 +++- 2 files changed, 27 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6f366fbb/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3b9c2fc..3567819 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -382,7 +382,13 @@ case class InsertIntoHiveTable( // Attempt to delete the staging directory and the inclusive files. If failed, the files are // expected to be dropped at the normal termination of VM since deleteOnExit is used. try { - createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) } + createdTempDir.foreach { path => +val fs = path.getFileSystem(hadoopConf) +if (fs.delete(path, true)) { + // If we successfully delete the staging directory, remove it from FileSystem's cache. + fs.cancelDeleteOnExit(path) +} + } } catch { case NonFatal(e) => logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) http://git-wip-us.apache.org/repos/asf/spark/blob/6f366fbb/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 1619115..73ceaf8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -20,12 +20,13 @@ package org.apache.spark.sql.hive.execution import java.io.{File, PrintWriter} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} +import java.util.Set import scala.sys.process.{Process, ProcessLogger} import scala.util.Try import com.google.common.io.Files -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier @@ -2031,4 +2032,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { checkAnswer(table.filter($"p" === "p1\" and q=\"q1").select($"a"), Row(4)) } } + + test("SPARK-21721: Clear FileSystem deleterOnExit cache if path is successfully removed") { +withTable("test21721") { + val deleteOnExitField = classOf[FileSystem].getDeclaredField("deleteOnExit") + deleteOnExitField.setAccessible(true) + + val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) + val setOfPath = deleteOnExitField.get(fs).asInstanceOf[Set[Path]] + + val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF() + sql("CREATE TABLE test21721 (key INT, value STRING)") + val pathSizeToDeleteOnExit = setOfPath.size() + + (0 to 10).foreach(_ => testData.write.mode(SaveMode.Append).insertInto("test1")) + + assert(setOfPath.size() == pathSizeToDeleteOnExit) +} + } }
spark git commit: [SPARK-21732][SQL] Lazily init hive metastore client
Repository: spark Updated Branches: refs/heads/master 0422ce06d -> 12411b5ed [SPARK-21732][SQL] Lazily init hive metastore client ## What changes were proposed in this pull request? This PR changes the codes to lazily init hive metastore client so that we can create SparkSession without talking to the hive metastore sever. It's pretty helpful when you set a hive metastore server but it's down. You can still start the Spark shell to debug. ## How was this patch tested? The new unit test. Author: Shixiong ZhuCloses #18944 from zsxwing/hive-lazy-init. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12411b5e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12411b5e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12411b5e Branch: refs/heads/master Commit: 12411b5edf85ecbae6bf75c70635f5493c86fe09 Parents: 0422ce0 Author: Shixiong Zhu Authored: Mon Aug 14 23:46:52 2017 -0700 Committer: gatorsmile Committed: Mon Aug 14 23:46:52 2017 -0700 -- .../sql/internal/BaseSessionStateBuilder.scala | 8 +-- .../spark/sql/internal/SessionState.scala | 28 +++--- .../HiveMetastoreLazyInitializationSuite.scala | 57 3 files changed, 81 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/12411b5e/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 37f4f8d..4e75608 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -287,14 +287,14 @@ abstract class BaseSessionStateBuilder( experimentalMethods, functionRegistry, udfRegistration, - catalog, + () => catalog, sqlParser, - analyzer, - optimizer, + () => analyzer, + () => optimizer, planner, streamingQueryManager, listenerManager, - resourceLoader, + () => resourceLoader, createQueryExecution, createClone) } http://git-wip-us.apache.org/repos/asf/spark/blob/12411b5e/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index ac013ec..accbea4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -42,14 +42,17 @@ import org.apache.spark.sql.util.{ExecutionListenerManager, QueryExecutionListen * @param experimentalMethods Interface to add custom planning strategies and optimizers. * @param functionRegistry Internal catalog for managing functions registered by the user. * @param udfRegistration Interface exposed to the user for registering user-defined functions. - * @param catalog Internal catalog for managing table and database states. + * @param catalogBuilder a function to create an internal catalog for managing table and database + * states. * @param sqlParser Parser that extracts expressions, plans, table identifiers etc. from SQL texts. - * @param analyzer Logical query plan analyzer for resolving unresolved attributes and relations. - * @param optimizer Logical query plan optimizer. + * @param analyzerBuilder A function to create the logical query plan analyzer for resolving + *unresolved attributes and relations. + * @param optimizerBuilder a function to create the logical query plan optimizer. * @param planner Planner that converts optimized logical plans to physical plans. * @param streamingQueryManager Interface to start and stop streaming queries. * @param listenerManager Interface to register custom [[QueryExecutionListener]]s. - * @param resourceLoader Session shared resource loader to load JARs, files, etc. + * @param resourceLoaderBuilder a function to create a session shared resource loader to load JARs, + * files, etc. * @param createQueryExecution Function used to create QueryExecution objects. * @param createClone Function used to create clones of the session state. */ @@ -59,17 +62,26 @@ private[sql] class SessionState( val experimentalMethods: ExperimentalMethods, val
spark git commit: [SPARK-21724][SQL][DOC] Adds since information in the documentation of date functions
Repository: spark Updated Branches: refs/heads/master 4c3cf1cc5 -> 0422ce06d [SPARK-21724][SQL][DOC] Adds since information in the documentation of date functions ## What changes were proposed in this pull request? This PR adds `since` annotation in documentation so that this can be rendered as below: https://user-images.githubusercontent.com/6477701/29267050-034c1f64-8122-11e7-862b-7dfc38e292bf.png;> ## How was this patch tested? Manually checked the documentation by `cd sql && ./create-docs.sh`. Also, Jenkins tests are required. Author: hyukjinkwonCloses #18939 from HyukjinKwon/add-sinces-date-functions. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0422ce06 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0422ce06 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0422ce06 Branch: refs/heads/master Commit: 0422ce06df71538893b86f35b7308d76f701534b Parents: 4c3cf1c Author: hyukjinkwon Authored: Mon Aug 14 23:44:25 2017 -0700 Committer: gatorsmile Committed: Mon Aug 14 23:44:25 2017 -0700 -- .../expressions/datetimeExpressions.scala | 84 +--- 1 file changed, 56 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0422ce06/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 7dc32e1..7035420 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -53,7 +53,8 @@ trait TimeZoneAwareExpression extends Expression { * There is no code generation since this expression should get constant folded by the optimizer. */ @ExpressionDescription( - usage = "_FUNC_() - Returns the current date at the start of query evaluation.") + usage = "_FUNC_() - Returns the current date at the start of query evaluation.", + since = "1.5.0") case class CurrentDate(timeZoneId: Option[String] = None) extends LeafExpression with TimeZoneAwareExpression with CodegenFallback { @@ -81,7 +82,8 @@ case class CurrentDate(timeZoneId: Option[String] = None) * There is no code generation since this expression should get constant folded by the optimizer. */ @ExpressionDescription( - usage = "_FUNC_() - Returns the current timestamp at the start of query evaluation.") + usage = "_FUNC_() - Returns the current timestamp at the start of query evaluation.", + since = "1.5.0") case class CurrentTimestamp() extends LeafExpression with CodegenFallback { override def foldable: Boolean = true override def nullable: Boolean = false @@ -141,7 +143,8 @@ case class CurrentBatchTimestamp( Examples: > SELECT _FUNC_('2016-07-30', 1); 2016-07-31 - """) + """, + since = "1.5.0") case class DateAdd(startDate: Expression, days: Expression) extends BinaryExpression with ImplicitCastInputTypes { @@ -174,7 +177,8 @@ case class DateAdd(startDate: Expression, days: Expression) Examples: > SELECT _FUNC_('2016-07-30', 1); 2016-07-29 - """) + """, + since = "1.5.0") case class DateSub(startDate: Expression, days: Expression) extends BinaryExpression with ImplicitCastInputTypes { override def left: Expression = startDate @@ -203,7 +207,8 @@ case class DateSub(startDate: Expression, days: Expression) Examples: > SELECT _FUNC_('2009-07-30 12:58:59'); 12 - """) + """, + since = "1.5.0") case class Hour(child: Expression, timeZoneId: Option[String] = None) extends UnaryExpression with TimeZoneAwareExpression with ImplicitCastInputTypes { @@ -233,7 +238,8 @@ case class Hour(child: Expression, timeZoneId: Option[String] = None) Examples: > SELECT _FUNC_('2009-07-30 12:58:59'); 58 - """) + """, + since = "1.5.0") case class Minute(child: Expression, timeZoneId: Option[String] = None) extends UnaryExpression with TimeZoneAwareExpression with ImplicitCastInputTypes { @@ -263,7 +269,8 @@ case class Minute(child: Expression, timeZoneId: Option[String] = None) Examples: > SELECT _FUNC_('2009-07-30 12:58:59'); 59 - """) + """, + since = "1.5.0") case class Second(child: Expression, timeZoneId: Option[String] = None) extends UnaryExpression with TimeZoneAwareExpression with ImplicitCastInputTypes { @@ -293,7 +300,8 @@ case class Second(child: Expression, timeZoneId: