spark git commit: [SPARK-23303][SQL] improve the explain result for data source v2 relations
Repository: spark Updated Branches: refs/heads/master 8c5b34c42 -> ad640a5af [SPARK-23303][SQL] improve the explain result for data source v2 relations ## What changes were proposed in this pull request? The proposed explain format: **[streaming header] [RelationV2/ScanV2] [data source name] [output] [pushed filters] [options]** **streaming header**: if it's a streaming relation, put a "Streaming" at the beginning. **RelationV2/ScanV2**: if it's a logical plan, put a "RelationV2", else, put a "ScanV2" **data source name**: the simple class name of the data source implementation **output**: a string of the plan output attributes **pushed filters**: a string of all the filters that have been pushed to this data source **options**: all the options to create the data source reader. The current explain result for data source v2 relation is unreadable: ``` == Parsed Logical Plan == 'Filter ('i > 6) +- AnalysisBarrier +- Project [j#1] +- DataSourceV2Relation [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940 == Analyzed Logical Plan == j: int Project [j#1] +- Filter (i#0 > 6) +- Project [j#1, i#0] +- DataSourceV2Relation [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940 == Optimized Logical Plan == Project [j#1] +- Filter isnotnull(i#0) +- DataSourceV2Relation [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940 == Physical Plan == *(1) Project [j#1] +- *(1) Filter isnotnull(i#0) +- *(1) DataSourceV2Scan [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940 ``` after this PR ``` == Parsed Logical Plan == 'Project [unresolvedalias('j, None)] +- AnalysisBarrier +- RelationV2 AdvancedDataSourceV2[i#0, j#1] == Analyzed Logical Plan == j: int Project [j#1] +- RelationV2 AdvancedDataSourceV2[i#0, j#1] == Optimized Logical Plan == RelationV2 AdvancedDataSourceV2[j#1] == Physical Plan == *(1) ScanV2 AdvancedDataSourceV2[j#1] ``` --- ``` == Analyzed Logical Plan == i: int, j: int Filter (i#88 > 3) +- RelationV2 JavaAdvancedDataSourceV2[i#88, j#89] == Optimized Logical Plan == Filter isnotnull(i#88) +- RelationV2 JavaAdvancedDataSourceV2[i#88, j#89] (Pushed Filters: [GreaterThan(i,3)]) == Physical Plan == *(1) Filter isnotnull(i#88) +- *(1) ScanV2 JavaAdvancedDataSourceV2[i#88, j#89] (Pushed Filters: [GreaterThan(i,3)]) ``` an example for streaming query ``` == Parsed Logical Plan == Aggregate [value#6], [value#6, count(1) AS count(1)#11L] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6] +- MapElements , class java.lang.String, [StructField(value,StringType,true)], obj#5: java.lang.String +- DeserializeToObject cast(value#25 as string).toString, obj#4: java.lang.String +- Streaming RelationV2 MemoryStreamDataSource[value#25] == Analyzed Logical Plan == value: string, count(1): bigint Aggregate [value#6], [value#6, count(1) AS count(1)#11L] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6] +- MapElements , class java.lang.String, [StructField(value,StringType,true)], obj#5: java.lang.String +- DeserializeToObject cast(value#25 as string).toString, obj#4: java.lang.String +- Streaming RelationV2 MemoryStreamDataSource[value#25] == Optimized Logical Plan == Aggregate [value#6], [value#6, count(1) AS count(1)#11L] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6] +- MapElements , class java.lang.String, [StructField(value,StringType,true)], obj#5: java.lang.String +- DeserializeToObject value#25.toString, obj#4: java.lang.String +- Streaming RelationV2 MemoryStreamDataSource[value#25] == Physical Plan == *(4) HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, count(1)#11L]) +- StateStoreSave [value#6], state info [ checkpoint = *(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state, runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions = 5], Complete, 0 +- *(3) HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#16L]) +- StateStoreRestore [value#6], state info [ checkpoint = *(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state, runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions = 5] +- *(2) HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#16L]) +- Exchange hashpartitioning(value#6, 5) +- *(1) HashAggregate(keys=[value#6], functions=[partial_count(1)],
svn commit: r25476 - in /dev/spark/2.4.0-SNAPSHOT-2018_03_05_20_01-8c5b34c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Mar 6 04:15:37 2018 New Revision: 25476 Log: Apache Spark 2.4.0-SNAPSHOT-2018_03_05_20_01-8c5b34c docs [This commit notification would consist of 1444 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r25471 - in /dev/spark/2.3.1-SNAPSHOT-2018_03_05_18_01-b9ea2e8-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Mar 6 02:15:45 2018 New Revision: 25471 Log: Apache Spark 2.3.1-SNAPSHOT-2018_03_05_18_01-b9ea2e8 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23604][SQL] Change Statistics.isEmpty to !Statistics.hasNonNul…
Repository: spark Updated Branches: refs/heads/master f6b49f9d1 -> 8c5b34c42 [SPARK-23604][SQL] Change Statistics.isEmpty to !Statistics.hasNonNul⦠â¦lValue ## What changes were proposed in this pull request? Parquet 1.9 will change the semantics of Statistics.isEmpty slightly to reflect if the null value count has been set. That breaks a timestamp interoperability test that cares only about whether there are column values present in the statistics of a written file for an INT96 column. Fix by using Statistics.hasNonNullValue instead. ## How was this patch tested? Unit tests continue to pass against Parquet 1.8, and also pass against a Parquet build including PARQUET-1217. Author: Henry RobinsonCloses #20740 from henryr/spark-23604. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c5b34c4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c5b34c4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c5b34c4 Branch: refs/heads/master Commit: 8c5b34c425bda2079a1ff969b12c067f2bb3f18f Parents: f6b49f9 Author: Henry Robinson Authored: Mon Mar 5 16:49:24 2018 -0800 Committer: Marcelo Vanzin Committed: Mon Mar 5 16:49:24 2018 -0800 -- .../datasources/parquet/ParquetInteroperabilitySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8c5b34c4/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index fbd83a0..9c75965 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -184,7 +184,7 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS // when the data is read back as mentioned above, b/c int96 is unsigned. This // assert makes sure this holds even if we change parquet versions (if eg. there // were ever statistics even on unsigned columns). -assert(columnStats.isEmpty) +assert(!columnStats.hasNonNullValue) } // These queries should return the entire dataset with the conversion applied, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23586][SQL] Add interpreted execution to WrapOption
Repository: spark Updated Branches: refs/heads/master 7706eea6a -> f6b49f9d1 [SPARK-23586][SQL] Add interpreted execution to WrapOption ## What changes were proposed in this pull request? The PR adds interpreted execution to WrapOption. ## How was this patch tested? added UT Author: Marco GaidoCloses #20741 from mgaido91/SPARK-23586_2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6b49f9d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6b49f9d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6b49f9d Branch: refs/heads/master Commit: f6b49f9d1b6f218408197f7272c1999fe3d94328 Parents: 7706eea Author: Marco Gaido Authored: Tue Mar 6 01:37:51 2018 +0100 Committer: Herman van Hovell Committed: Tue Mar 6 01:37:51 2018 +0100 -- .../spark/sql/catalyst/expressions/objects/objects.scala | 3 +-- .../catalyst/expressions/ObjectExpressionsSuite.scala| 11 ++- 2 files changed, 11 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f6b49f9d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 03cc8ea..d832fe0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -422,8 +422,7 @@ case class WrapOption(child: Expression, optType: DataType) override def inputTypes: Seq[AbstractDataType] = optType :: Nil - override def eval(input: InternalRow): Any = -throw new UnsupportedOperationException("Only code-generated evaluation is supported") + override def eval(input: InternalRow): Any = Option(child.eval(input)) override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val inputObject = child.genCode(ctx) http://git-wip-us.apache.org/repos/asf/spark/blob/f6b49f9d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala index d95db58..d535578 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, UnwrapOption} +import org.apache.spark.sql.catalyst.expressions.objects._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} import org.apache.spark.sql.types.{IntegerType, ObjectType} @@ -75,4 +75,13 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(unwrapObject, expected, InternalRow.fromSeq(Seq(input))) } } + + test("SPARK-23586: WrapOption should support interpreted execution") { +val cls = ObjectType(classOf[java.lang.Integer]) +val inputObject = BoundReference(0, cls, nullable = true) +val wrapObject = WrapOption(inputObject, cls) +Seq((1, Some(1)), (null, None)).foreach { case (input, expected) => + checkEvaluation(wrapObject, expected, InternalRow.fromSeq(Seq(input))) +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r25462 - in /dev/spark/2.4.0-SNAPSHOT-2018_03_05_16_01-7706eea-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Mar 6 00:15:31 2018 New Revision: 25462 Log: Apache Spark 2.4.0-SNAPSHOT-2018_03_05_16_01-7706eea docs [This commit notification would consist of 1444 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18630][PYTHON][ML] Move del method from JavaParams to JavaWrapper; add tests
Repository: spark Updated Branches: refs/heads/master 508573958 -> 7706eea6a [SPARK-18630][PYTHON][ML] Move del method from JavaParams to JavaWrapper; add tests The `__del__` method that explicitly detaches the object was moved from `JavaParams` to `JavaWrapper` class, this way model summaries could also be garbage collected in Java. A test case was added to make sure that relevant error messages are thrown after the objects are deleted. I ran pyspark tests agains `pyspark-ml` module `./python/run-tests --python-executables=$(which python) --modules=pyspark-ml` Author: Yogesh GargCloses #20724 from yogeshg/java_wrapper_memory. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7706eea6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7706eea6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7706eea6 Branch: refs/heads/master Commit: 7706eea6a8bdcd73e9dde5212368f8825e2f1801 Parents: 5085739 Author: Yogesh Garg Authored: Mon Mar 5 15:53:10 2018 -0800 Committer: Joseph K. Bradley Committed: Mon Mar 5 15:53:10 2018 -0800 -- python/pyspark/ml/tests.py | 39 +++ python/pyspark/ml/wrapper.py | 8 2 files changed, 43 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7706eea6/python/pyspark/ml/tests.py -- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 1168859..6dee693 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -173,6 +173,45 @@ class MockModel(MockTransformer, Model, HasFake): pass +class JavaWrapperMemoryTests(SparkSessionTestCase): + +def test_java_object_gets_detached(self): +df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), + (0.0, 2.0, Vectors.sparse(1, [], []))], +["label", "weight", "features"]) +lr = LinearRegression(maxIter=1, regParam=0.0, solver="normal", weightCol="weight", + fitIntercept=False) + +model = lr.fit(df) +summary = model.summary + +self.assertIsInstance(model, JavaWrapper) +self.assertIsInstance(summary, JavaWrapper) +self.assertIsInstance(model, JavaParams) +self.assertNotIsInstance(summary, JavaParams) + +error_no_object = 'Target Object ID does not exist for this gateway' + +self.assertIn("LinearRegression_", model._java_obj.toString()) +self.assertIn("LinearRegressionTrainingSummary", summary._java_obj.toString()) + +model.__del__() + +with self.assertRaisesRegexp(py4j.protocol.Py4JError, error_no_object): +model._java_obj.toString() +self.assertIn("LinearRegressionTrainingSummary", summary._java_obj.toString()) + +try: +summary.__del__() +except: +pass + +with self.assertRaisesRegexp(py4j.protocol.Py4JError, error_no_object): +model._java_obj.toString() +with self.assertRaisesRegexp(py4j.protocol.Py4JError, error_no_object): +summary._java_obj.toString() + + class ParamTypeConversionTests(PySparkTestCase): """ Test that param type conversion happens. http://git-wip-us.apache.org/repos/asf/spark/blob/7706eea6/python/pyspark/ml/wrapper.py -- diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 0f846fb..5061f64 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -36,6 +36,10 @@ class JavaWrapper(object): super(JavaWrapper, self).__init__() self._java_obj = java_obj +def __del__(self): +if SparkContext._active_spark_context and self._java_obj is not None: +SparkContext._active_spark_context._gateway.detach(self._java_obj) + @classmethod def _create_from_java_class(cls, java_class, *args): """ @@ -100,10 +104,6 @@ class JavaParams(JavaWrapper, Params): __metaclass__ = ABCMeta -def __del__(self): -if SparkContext._active_spark_context: -SparkContext._active_spark_context._gateway.detach(self._java_obj) - def _make_java_param_pair(self, param, value): """ Makes a Java param pair. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23538][CORE] Remove custom configuration for SSL client.
Repository: spark Updated Branches: refs/heads/master f2cab56ca -> 508573958 [SPARK-23538][CORE] Remove custom configuration for SSL client. These options were used to configure the built-in JRE SSL libraries when downloading files from HTTPS servers. But because they were also used to set up the now (long) removed internal HTTPS file server, their default configuration chose convenience over security by having overly lenient settings. This change removes the configuration options that affect the JRE SSL libraries. The JRE trust store can still be configured via system properties (or globally in the JRE security config). The only lost functionality is not being able to disable the default hostname verifier when using spark-submit, which should be fine since Spark itself is not using https for any internal functionality anymore. I also removed the HTTP-related code from the REPL class loader, since we haven't had a HTTP server for REPL-generated classes for a while. Author: Marcelo VanzinCloses #20723 from vanzin/SPARK-23538. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50857395 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50857395 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50857395 Branch: refs/heads/master Commit: 508573958dc9b6402e684cd6dd37202deaaa97f6 Parents: f2cab56 Author: Marcelo Vanzin Authored: Mon Mar 5 15:03:27 2018 -0800 Committer: Marcelo Vanzin Committed: Mon Mar 5 15:03:27 2018 -0800 -- .../org/apache/spark/SecurityManager.scala | 45 - .../scala/org/apache/spark/util/Utils.scala | 15 - .../org/apache/spark/SSLSampleConfigs.scala | 68 .../org/apache/spark/SecurityManagerSuite.scala | 45 - docs/security.md| 4 -- .../apache/spark/repl/ExecutorClassLoader.scala | 53 ++- 6 files changed, 7 insertions(+), 223 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/50857395/core/src/main/scala/org/apache/spark/SecurityManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 2519d26..da1c89c 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -256,51 +256,6 @@ private[spark] class SecurityManager( // the default SSL configuration - it will be used by all communication layers unless overwritten private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None) - // SSL configuration for the file server. This is used by Utils.setupSecureURLConnection(). - val fileServerSSLOptions = getSSLOptions("fs") - val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) { -val trustStoreManagers = - for (trustStore <- fileServerSSLOptions.trustStore) yield { -val input = Files.asByteSource(fileServerSSLOptions.trustStore.get).openStream() - -try { - val ks = KeyStore.getInstance(KeyStore.getDefaultType) - ks.load(input, fileServerSSLOptions.trustStorePassword.get.toCharArray) - - val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) - tmf.init(ks) - tmf.getTrustManagers -} finally { - input.close() -} - } - -lazy val credulousTrustStoreManagers = Array({ - logWarning("Using 'accept-all' trust manager for SSL connections.") - new X509TrustManager { -override def getAcceptedIssuers: Array[X509Certificate] = null - -override def checkClientTrusted(x509Certificates: Array[X509Certificate], s: String) {} - -override def checkServerTrusted(x509Certificates: Array[X509Certificate], s: String) {} - }: TrustManager -}) - -require(fileServerSSLOptions.protocol.isDefined, - "spark.ssl.protocol is required when enabling SSL connections.") - -val sslContext = SSLContext.getInstance(fileServerSSLOptions.protocol.get) -sslContext.init(null, trustStoreManagers.getOrElse(credulousTrustStoreManagers), null) - -val hostVerifier = new HostnameVerifier { - override def verify(s: String, sslSession: SSLSession): Boolean = true -} - -(Some(sslContext.getSocketFactory), Some(hostVerifier)) - } else { -(None, None) - } - def getSSLOptions(module: String): SSLOptions = { val opts = SSLOptions.parse(sparkConf, s"spark.ssl.$module", Some(defaultSSLOptions)) logDebug(s"Created SSL options for $module: $opts")
spark git commit: [SPARK-23040][CORE] Returns interruptible iterator for shuffle reader
Repository: spark Updated Branches: refs/heads/master b0f422c38 -> f2cab56ca [SPARK-23040][CORE] Returns interruptible iterator for shuffle reader ## What changes were proposed in this pull request? Before this commit, a non-interruptible iterator is returned if aggregator or ordering is specified. This commit also ensures that sorter is closed even when task is cancelled(killed) in the middle of sorting. ## How was this patch tested? Add a unit test in JobCancellationSuite Author: Xianjin YECloses #20449 from advancedxy/SPARK-23040. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2cab56c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2cab56c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2cab56c Branch: refs/heads/master Commit: f2cab56ca22ed5db5ff604cd78cdb55aaa58f651 Parents: b0f422c Author: Xianjin YE Authored: Mon Mar 5 14:57:32 2018 -0800 Committer: Wenchen Fan Committed: Mon Mar 5 14:57:32 2018 -0800 -- .../spark/shuffle/BlockStoreShuffleReader.scala | 9 ++- .../org/apache/spark/JobCancellationSuite.scala | 65 +++- 2 files changed, 72 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f2cab56c/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala -- diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index edd6971..85e7e56 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -94,7 +94,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( } // Sort the output if there is a sort ordering defined. -dep.keyOrdering match { +val resultIter = dep.keyOrdering match { case Some(keyOrd: Ordering[K]) => // Create an ExternalSorter to sort the data. val sorter = @@ -103,9 +103,16 @@ private[spark] class BlockStoreShuffleReader[K, C]( context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) +// Use completion callback to stop sorter if task was finished/cancelled. +context.addTaskCompletionListener(_ => { + sorter.stop() +}) CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) case None => aggregatedIter } +// Use another interruptible iterator here to support task cancellation as aggregator or(and) +// sorter may have consumed previous interruptible iterator. +new InterruptibleIterator[Product2[K, C]](context, resultIter) } } http://git-wip-us.apache.org/repos/asf/spark/blob/f2cab56c/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index 8a77aea..3b793bb 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark import java.util.concurrent.Semaphore +import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future @@ -26,7 +27,7 @@ import scala.concurrent.duration._ import org.scalatest.BeforeAndAfter import org.scalatest.Matchers -import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart} +import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted, SparkListenerTaskEnd, SparkListenerTaskStart} import org.apache.spark.util.ThreadUtils /** @@ -40,6 +41,10 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft override def afterEach() { try { resetSparkContext() + JobCancellationSuite.taskStartedSemaphore.drainPermits() + JobCancellationSuite.taskCancelledSemaphore.drainPermits() + JobCancellationSuite.twoJobsSharingStageSemaphore.drainPermits() + JobCancellationSuite.executionOfInterruptibleCounter.set(0) } finally { super.afterEach() } @@ -320,6 +325,62 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft f2.get() } + test("interruptible iterator of shuffle reader") { +// In this
spark git commit: [SPARK-23434][SQL][BRANCH-2.2] Spark should not warn `metadata directory` for a HDFS file path
Repository: spark Updated Branches: refs/heads/branch-2.2 9bd25c9bf -> 4864d2104 [SPARK-23434][SQL][BRANCH-2.2] Spark should not warn `metadata directory` for a HDFS file path ## What changes were proposed in this pull request? In a kerberized cluster, when Spark reads a file path (e.g. `people.json`), it warns with a wrong warning message during looking up `people.json/_spark_metadata`. The root cause of this situation is the difference between `LocalFileSystem` and `DistributedFileSystem`. `LocalFileSystem.exists()` returns `false`, but `DistributedFileSystem.exists` raises `org.apache.hadoop.security.AccessControlException`. ```scala scala> spark.read.json("file:///usr/hdp/current/spark-client/examples/src/main/resources/people.json").show ++---+ | age| name| ++---+ |null|Michael| | 30| Andy| | 19| Justin| ++---+ scala> spark.read.json("hdfs:///tmp/people.json") 18/02/15 05:00:48 WARN streaming.FileStreamSink: Error while looking for metadata directory. 18/02/15 05:00:48 WARN streaming.FileStreamSink: Error while looking for metadata directory. ``` After this PR, ```scala scala> spark.read.json("hdfs:///tmp/people.json").show ++---+ | age| name| ++---+ |null|Michael| | 30| Andy| | 19| Justin| ++---+ ``` ## How was this patch tested? Manual. Author: Dongjoon HyunCloses #20715 from dongjoon-hyun/SPARK-23434-2.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4864d210 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4864d210 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4864d210 Branch: refs/heads/branch-2.2 Commit: 4864d21041374f2aca0846c7f22e8d0c93024b7a Parents: 9bd25c9 Author: Dongjoon Hyun Authored: Mon Mar 5 14:29:04 2018 -0800 Committer: Wenchen Fan Committed: Mon Mar 5 14:29:04 2018 -0800 -- .../spark/sql/execution/streaming/FileStreamSink.scala | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4864d210/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 6885d0b..397be27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -42,9 +42,11 @@ object FileStreamSink extends Logging { try { val hdfsPath = new Path(singlePath) val fs = hdfsPath.getFileSystem(hadoopConf) - val metadataPath = new Path(hdfsPath, metadataDir) - val res = fs.exists(metadataPath) - res + if (fs.isDirectory(hdfsPath)) { +fs.exists(new Path(hdfsPath, metadataDir)) + } else { +false + } } catch { case NonFatal(e) => logWarning(s"Error while looking for metadata directory.") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23434][SQL][BRANCH-2.3] Spark should not warn `metadata directory` for a HDFS file path
Repository: spark Updated Branches: refs/heads/branch-2.3 911b83da4 -> b9ea2e87b [SPARK-23434][SQL][BRANCH-2.3] Spark should not warn `metadata directory` for a HDFS file path ## What changes were proposed in this pull request? In a kerberized cluster, when Spark reads a file path (e.g. `people.json`), it warns with a wrong warning message during looking up `people.json/_spark_metadata`. The root cause of this situation is the difference between `LocalFileSystem` and `DistributedFileSystem`. `LocalFileSystem.exists()` returns `false`, but `DistributedFileSystem.exists` raises `org.apache.hadoop.security.AccessControlException`. ```scala scala> spark.read.json("file:///usr/hdp/current/spark-client/examples/src/main/resources/people.json").show ++---+ | age| name| ++---+ |null|Michael| | 30| Andy| | 19| Justin| ++---+ scala> spark.read.json("hdfs:///tmp/people.json") 18/02/15 05:00:48 WARN streaming.FileStreamSink: Error while looking for metadata directory. 18/02/15 05:00:48 WARN streaming.FileStreamSink: Error while looking for metadata directory. ``` After this PR, ```scala scala> spark.read.json("hdfs:///tmp/people.json").show ++---+ | age| name| ++---+ |null|Michael| | 30| Andy| | 19| Justin| ++---+ ``` ## How was this patch tested? Manual. Author: Dongjoon HyunCloses #20713 from dongjoon-hyun/SPARK-23434-2.3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b9ea2e87 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b9ea2e87 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b9ea2e87 Branch: refs/heads/branch-2.3 Commit: b9ea2e87bb24c3731bd2dbd044d10d18dbbf9c6f Parents: 911b83d Author: Dongjoon Hyun Authored: Mon Mar 5 14:20:10 2018 -0800 Committer: Wenchen Fan Committed: Mon Mar 5 14:20:10 2018 -0800 -- .../spark/sql/execution/streaming/FileStreamSink.scala | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b9ea2e87/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 2715fa9..87a17ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -42,9 +42,11 @@ object FileStreamSink extends Logging { try { val hdfsPath = new Path(singlePath) val fs = hdfsPath.getFileSystem(hadoopConf) - val metadataPath = new Path(hdfsPath, metadataDir) - val res = fs.exists(metadataPath) - res + if (fs.isDirectory(hdfsPath)) { +fs.exists(new Path(hdfsPath, metadataDir)) + } else { +false + } } catch { case NonFatal(e) => logWarning(s"Error while looking for metadata directory.") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r25458 - in /dev/spark/2.3.1-SNAPSHOT-2018_03_05_14_01-911b83d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Mar 5 22:15:15 2018 New Revision: 25458 Log: Apache Spark 2.3.1-SNAPSHOT-2018_03_05_14_01-911b83d docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.
Repository: spark Updated Branches: refs/heads/master ba622f45c -> b0f422c38 [SPARK-23559][SS] Add epoch ID to DataWriterFactory. ## What changes were proposed in this pull request? Add an epoch ID argument to DataWriterFactory for use in streaming. As a side effect of passing in this value, DataWriter will now have a consistent lifecycle; commit() or abort() ends the lifecycle of a DataWriter instance in any execution mode. I considered making a separate streaming interface and adding the epoch ID only to that one, but I think it requires a lot of extra work for no real gain. I think it makes sense to define epoch 0 as the one and only epoch of a non-streaming query. ## How was this patch tested? existing unit tests Author: Jose TorresCloses #20710 from jose-torres/api2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0f422c3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0f422c3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0f422c3 Branch: refs/heads/master Commit: b0f422c3861a5a3831e481b8ffac08f6fa085d00 Parents: ba622f4 Author: Jose Torres Authored: Mon Mar 5 13:23:01 2018 -0800 Committer: Tathagata Das Committed: Mon Mar 5 13:23:01 2018 -0800 -- .../spark/sql/kafka010/KafkaStreamWriter.scala | 5 +++- .../spark/sql/sources/v2/writer/DataWriter.java | 12 ++ .../sources/v2/writer/DataWriterFactory.java| 5 +++- .../v2/writer/streaming/StreamWriter.java | 19 +++ .../datasources/v2/WriteToDataSourceV2.scala| 25 +--- .../streaming/MicroBatchExecution.scala | 7 ++ .../sources/PackedRowWriterFactory.scala| 5 +++- .../execution/streaming/sources/memoryV2.scala | 5 +++- .../sources/v2/SimpleWritableDataSource.scala | 10 ++-- 9 files changed, 65 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala index 9307bfc..ae5b5c5 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala @@ -65,7 +65,10 @@ case class KafkaStreamWriterFactory( topic: Option[String], producerParams: Map[String, String], schema: StructType) extends DataWriterFactory[InternalRow] { - override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = { + override def createDataWriter( + partitionId: Int, + attemptNumber: Int, + epochId: Long): DataWriter[InternalRow] = { new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes) } } http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java index 53941a8..39bf458 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java @@ -22,7 +22,7 @@ import java.io.IOException; import org.apache.spark.annotation.InterfaceStability; /** - * A data writer returned by {@link DataWriterFactory#createDataWriter(int, int)} and is + * A data writer returned by {@link DataWriterFactory#createDataWriter(int, int, long)} and is * responsible for writing data for an input RDD partition. * * One Spark task has one exclusive data writer, so there is no thread-safe concern. @@ -31,13 +31,17 @@ import org.apache.spark.annotation.InterfaceStability; * the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will * not be processed. If all records are successfully written, {@link #commit()} is called. * + * Once a data writer returns successfully from {@link #commit()} or {@link #abort()}, its lifecycle + * is over and Spark will not use it again. + * * If this data writer succeeds(all records are successfully written and {@link #commit()} * succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit
svn commit: r25453 - in /dev/spark/2.4.0-SNAPSHOT-2018_03_05_12_01-ba622f4-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Mar 5 20:20:52 2018 New Revision: 25453 Log: Apache Spark 2.4.0-SNAPSHOT-2018_03_05_12_01-ba622f4 docs [This commit notification would consist of 1444 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23457][SQL][BRANCH-2.3] Register task completion listeners first in ParquetFileFormat
Repository: spark Updated Branches: refs/heads/branch-2.3 4550673b1 -> 911b83da4 [SPARK-23457][SQL][BRANCH-2.3] Register task completion listeners first in ParquetFileFormat ## What changes were proposed in this pull request? ParquetFileFormat leaks opened files in some cases. This PR prevents that by registering task completion listers first before initialization. - [spark-branch-2.3-test-sbt-hadoop-2.7](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.7/205/testReport/org.apache.spark.sql/FileBasedDataSourceSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/) - [spark-master-test-sbt-hadoop-2.6](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.6/4228/testReport/junit/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/) ``` Caused by: sbt.ForkMain$ForkError: java.lang.Throwable: null at org.apache.spark.DebugFilesystem$.addOpenStream(DebugFilesystem.scala:36) at org.apache.spark.DebugFilesystem.open(DebugFilesystem.scala:70) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769) at org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:538) at org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:149) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:133) at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$buildReaderWithPartitionValues$1.apply(ParquetFileFormat.scala:400) at ``` ## How was this patch tested? Manual. The following test case generates the same leakage. ```scala test("SPARK-23457 Register task completion listeners first in ParquetFileFormat") { withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") { withTempDir { dir => val basePath = dir.getCanonicalPath Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, "first").toString) Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, "second").toString) val df = spark.read.parquet( new Path(basePath, "first").toString, new Path(basePath, "second").toString) val e = intercept[SparkException] { df.collect() } assert(e.getCause.isInstanceOf[OutOfMemoryError]) } } } ``` Author: Dongjoon HyunCloses #20714 from dongjoon-hyun/SPARK-23457-2.3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/911b83da Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/911b83da Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/911b83da Branch: refs/heads/branch-2.3 Commit: 911b83da42fa850eb3ae419687c204cb2e25767b Parents: 4550673 Author: Dongjoon Hyun Authored: Mon Mar 5 11:53:23 2018 -0800 Committer: Wenchen Fan Committed: Mon Mar 5 11:53:23 2018 -0800 -- .../datasources/parquet/ParquetFileFormat.scala | 22 +--- 1 file changed, 10 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/911b83da/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index f53a97b..a6129da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -394,16 +394,21 @@ class ParquetFileFormat ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } val taskContext = Option(TaskContext.get()) - val parquetReader = if (enableVectorizedReader) { + if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader( convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined) +val iter = new RecordReaderIterator(vectorizedReader) +// SPARK-23457 Register a task completion lister before `initialization`. +taskContext.foreach(_.addTaskCompletionListener(_ => iter.close())) vectorizedReader.initialize(split, hadoopAttemptContext) logDebug(s"Appending $partitionSchema
spark git commit: [SPARK-23585][SQL] Add interpreted execution to UnwrapOption
Repository: spark Updated Branches: refs/heads/master 98a5c0a35 -> ba622f45c [SPARK-23585][SQL] Add interpreted execution to UnwrapOption ## What changes were proposed in this pull request? The PR adds interpreted execution to UnwrapOption. ## How was this patch tested? added UT Author: Marco GaidoCloses #20736 from mgaido91/SPARK-23586. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba622f45 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba622f45 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba622f45 Branch: refs/heads/master Commit: ba622f45caa808a9320c1f7ba4a4f344365dcf90 Parents: 98a5c0a Author: Marco Gaido Authored: Mon Mar 5 20:43:03 2018 +0100 Committer: Herman van Hovell Committed: Mon Mar 5 20:43:03 2018 +0100 -- .../spark/sql/catalyst/expressions/objects/objects.scala | 10 -- .../catalyst/expressions/ObjectExpressionsSuite.scala| 11 ++- 2 files changed, 18 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ba622f45/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 80618af..03cc8ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -382,8 +382,14 @@ case class UnwrapOption( override def inputTypes: Seq[AbstractDataType] = ObjectType :: Nil - override def eval(input: InternalRow): Any = -throw new UnsupportedOperationException("Only code-generated evaluation is supported") + override def eval(input: InternalRow): Any = { +val inputObject = child.eval(input) +if (inputObject == null) { + null +} else { + inputObject.asInstanceOf[Option[_]].orNull +} + } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val javaType = CodeGenerator.javaType(dataType) http://git-wip-us.apache.org/repos/asf/spark/blob/ba622f45/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala index 3edcc02..d95db58 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.objects.Invoke +import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, UnwrapOption} import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} import org.apache.spark.sql.types.{IntegerType, ObjectType} @@ -66,4 +66,13 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvalutionWithUnsafeProjection( mapEncoder.serializer.head, mapExpected, mapInputRow) } + + test("SPARK-23585: UnwrapOption should support interpreted execution") { +val cls = classOf[Option[Int]] +val inputObject = BoundReference(0, ObjectType(cls), nullable = true) +val unwrapObject = UnwrapOption(IntegerType, inputObject) +Seq((Some(1), 1), (None, null), (null, null)).foreach { case (input, expected) => + checkEvaluation(unwrapObject, expected, InternalRow.fromSeq(Seq(input))) +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22882][ML][TESTS] ML test for structured streaming: ml.classification
Repository: spark Updated Branches: refs/heads/master 4586eada4 -> 98a5c0a35 [SPARK-22882][ML][TESTS] ML test for structured streaming: ml.classification ## What changes were proposed in this pull request? adding Structured Streaming tests for all Models/Transformers in spark.ml.classification ## How was this patch tested? N/A Author: WeichenXuCloses #20121 from WeichenXu123/ml_stream_test_classification. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98a5c0a3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98a5c0a3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98a5c0a3 Branch: refs/heads/master Commit: 98a5c0a35f0a24730f5074522939acf57ef95422 Parents: 4586ead Author: WeichenXu Authored: Mon Mar 5 10:50:00 2018 -0800 Committer: Joseph K. Bradley Committed: Mon Mar 5 10:50:00 2018 -0800 -- .../DecisionTreeClassifierSuite.scala | 29 ++- .../ml/classification/GBTClassifierSuite.scala | 77 ++- .../ml/classification/LinearSVCSuite.scala | 15 +- .../LogisticRegressionSuite.scala | 229 +++ .../MultilayerPerceptronClassifierSuite.scala | 44 ++-- .../ml/classification/NaiveBayesSuite.scala | 47 ++-- .../ml/classification/OneVsRestSuite.scala | 21 +- .../ProbabilisticClassifierSuite.scala | 29 +-- .../RandomForestClassifierSuite.scala | 16 +- 9 files changed, 202 insertions(+), 305 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/98a5c0a3/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala index 38b265d..eeb0324 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala @@ -23,15 +23,14 @@ import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.tree.{CategoricalSplit, InternalNode, LeafNode} import org.apache.spark.ml.tree.impl.TreeTests -import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint} -import org.apache.spark.mllib.tree.{DecisionTree => OldDecisionTree, DecisionTreeSuite => OldDecisionTreeSuite} -import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.tree.{DecisionTree => OldDecisionTree, + DecisionTreeSuite => OldDecisionTreeSuite} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row} -class DecisionTreeClassifierSuite - extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { +class DecisionTreeClassifierSuite extends MLTest with DefaultReadWriteTest { import DecisionTreeClassifierSuite.compareAPIs import testImplicits._ @@ -251,20 +250,18 @@ class DecisionTreeClassifierSuite MLTestingUtils.checkCopyAndUids(dt, newTree) -val predictions = newTree.transform(newData) - .select(newTree.getPredictionCol, newTree.getRawPredictionCol, newTree.getProbabilityCol) - .collect() - -predictions.foreach { case Row(pred: Double, rawPred: Vector, probPred: Vector) => - assert(pred === rawPred.argmax, -s"Expected prediction $pred but calculated ${rawPred.argmax} from rawPrediction.") - val sum = rawPred.toArray.sum - assert(Vectors.dense(rawPred.toArray.map(_ / sum)) === probPred, -"probability prediction mismatch") +testTransformer[(Vector, Double)](newData, newTree, + "prediction", "rawPrediction", "probability") { + case Row(pred: Double, rawPred: Vector, probPred: Vector) => +assert(pred === rawPred.argmax, + s"Expected prediction $pred but calculated ${rawPred.argmax} from rawPrediction.") +val sum = rawPred.toArray.sum +assert(Vectors.dense(rawPred.toArray.map(_ / sum)) === probPred, + "probability prediction mismatch") } ProbabilisticClassifierSuite.testPredictMethods[ - Vector, DecisionTreeClassificationModel](newTree, newData) + Vector, DecisionTreeClassificationModel](this, newTree, newData) } test("training with 1-category categorical feature") {
spark git commit: [SPARK-22882][ML][TESTS] ML test for structured streaming: ml.classification
Repository: spark Updated Branches: refs/heads/branch-2.3 232b9f81f -> 4550673b1 [SPARK-22882][ML][TESTS] ML test for structured streaming: ml.classification ## What changes were proposed in this pull request? adding Structured Streaming tests for all Models/Transformers in spark.ml.classification ## How was this patch tested? N/A Author: WeichenXuCloses #20121 from WeichenXu123/ml_stream_test_classification. (cherry picked from commit 98a5c0a35f0a24730f5074522939acf57ef95422) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4550673b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4550673b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4550673b Branch: refs/heads/branch-2.3 Commit: 4550673b1a94e9023a0c6fdc6a92e4b860e1cfb2 Parents: 232b9f8 Author: WeichenXu Authored: Mon Mar 5 10:50:00 2018 -0800 Committer: Joseph K. Bradley Committed: Mon Mar 5 10:50:12 2018 -0800 -- .../DecisionTreeClassifierSuite.scala | 29 ++- .../ml/classification/GBTClassifierSuite.scala | 77 ++- .../ml/classification/LinearSVCSuite.scala | 15 +- .../LogisticRegressionSuite.scala | 229 +++ .../MultilayerPerceptronClassifierSuite.scala | 44 ++-- .../ml/classification/NaiveBayesSuite.scala | 47 ++-- .../ml/classification/OneVsRestSuite.scala | 21 +- .../ProbabilisticClassifierSuite.scala | 29 +-- .../RandomForestClassifierSuite.scala | 16 +- 9 files changed, 202 insertions(+), 305 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4550673b/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala index 98c879e..1968041 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala @@ -23,15 +23,14 @@ import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.tree.{CategoricalSplit, InternalNode, LeafNode} import org.apache.spark.ml.tree.impl.TreeTests -import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint} -import org.apache.spark.mllib.tree.{DecisionTree => OldDecisionTree, DecisionTreeSuite => OldDecisionTreeSuite} -import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.tree.{DecisionTree => OldDecisionTree, + DecisionTreeSuite => OldDecisionTreeSuite} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row} -class DecisionTreeClassifierSuite - extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { +class DecisionTreeClassifierSuite extends MLTest with DefaultReadWriteTest { import DecisionTreeClassifierSuite.compareAPIs import testImplicits._ @@ -251,20 +250,18 @@ class DecisionTreeClassifierSuite MLTestingUtils.checkCopyAndUids(dt, newTree) -val predictions = newTree.transform(newData) - .select(newTree.getPredictionCol, newTree.getRawPredictionCol, newTree.getProbabilityCol) - .collect() - -predictions.foreach { case Row(pred: Double, rawPred: Vector, probPred: Vector) => - assert(pred === rawPred.argmax, -s"Expected prediction $pred but calculated ${rawPred.argmax} from rawPrediction.") - val sum = rawPred.toArray.sum - assert(Vectors.dense(rawPred.toArray.map(_ / sum)) === probPred, -"probability prediction mismatch") +testTransformer[(Vector, Double)](newData, newTree, + "prediction", "rawPrediction", "probability") { + case Row(pred: Double, rawPred: Vector, probPred: Vector) => +assert(pred === rawPred.argmax, + s"Expected prediction $pred but calculated ${rawPred.argmax} from rawPrediction.") +val sum = rawPred.toArray.sum +assert(Vectors.dense(rawPred.toArray.map(_ / sum)) === probPred, + "probability prediction mismatch") } ProbabilisticClassifierSuite.testPredictMethods[ - Vector, DecisionTreeClassificationModel](newTree, newData) + Vector, DecisionTreeClassificationModel](this, newTree, newData) } test("training with 1-category categorical
svn commit: r25445 - in /dev/spark/2.3.1-SNAPSHOT-2018_03_05_10_01-232b9f8-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Mar 5 18:16:47 2018 New Revision: 25445 Log: Apache Spark 2.3.1-SNAPSHOT-2018_03_05_10_01-232b9f8 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/3] spark git commit: [SPARK-22430][R][DOCS] Unknown tag warnings when building R docs with Roxygen 6.0.1
http://git-wip-us.apache.org/repos/asf/spark/blob/4586eada/R/pkg/R/generics.R -- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index e0dde33..6fba4b6 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -19,7 +19,6 @@ # @rdname aggregateRDD # @seealso reduce -# @export setGeneric("aggregateRDD", function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") }) @@ -27,21 +26,17 @@ setGeneric("cacheRDD", function(x) { standardGeneric("cacheRDD") }) # @rdname coalesce # @seealso repartition -# @export setGeneric("coalesceRDD", function(x, numPartitions, ...) { standardGeneric("coalesceRDD") }) # @rdname checkpoint-methods -# @export setGeneric("checkpointRDD", function(x) { standardGeneric("checkpointRDD") }) setGeneric("collectRDD", function(x, ...) { standardGeneric("collectRDD") }) # @rdname collect-methods -# @export setGeneric("collectAsMap", function(x) { standardGeneric("collectAsMap") }) # @rdname collect-methods -# @export setGeneric("collectPartition", function(x, partitionId) { standardGeneric("collectPartition") @@ -52,19 +47,15 @@ setGeneric("countRDD", function(x) { standardGeneric("countRDD") }) setGeneric("lengthRDD", function(x) { standardGeneric("lengthRDD") }) # @rdname countByValue -# @export setGeneric("countByValue", function(x) { standardGeneric("countByValue") }) # @rdname crosstab -# @export setGeneric("crosstab", function(x, col1, col2) { standardGeneric("crosstab") }) # @rdname freqItems -# @export setGeneric("freqItems", function(x, cols, support = 0.01) { standardGeneric("freqItems") }) # @rdname approxQuantile -# @export setGeneric("approxQuantile", function(x, cols, probabilities, relativeError) { standardGeneric("approxQuantile") @@ -73,18 +64,15 @@ setGeneric("approxQuantile", setGeneric("distinctRDD", function(x, numPartitions = 1) { standardGeneric("distinctRDD") }) # @rdname filterRDD -# @export setGeneric("filterRDD", function(x, f) { standardGeneric("filterRDD") }) setGeneric("firstRDD", function(x, ...) { standardGeneric("firstRDD") }) # @rdname flatMap -# @export setGeneric("flatMap", function(X, FUN) { standardGeneric("flatMap") }) # @rdname fold # @seealso reduce -# @export setGeneric("fold", function(x, zeroValue, op) { standardGeneric("fold") }) setGeneric("foreach", function(x, func) { standardGeneric("foreach") }) @@ -95,17 +83,14 @@ setGeneric("foreachPartition", function(x, func) { standardGeneric("foreachParti setGeneric("getJRDD", function(rdd, ...) { standardGeneric("getJRDD") }) # @rdname glom -# @export setGeneric("glom", function(x) { standardGeneric("glom") }) # @rdname histogram -# @export setGeneric("histogram", function(df, col, nbins=10) { standardGeneric("histogram") }) setGeneric("joinRDD", function(x, y, ...) { standardGeneric("joinRDD") }) # @rdname keyBy -# @export setGeneric("keyBy", function(x, func) { standardGeneric("keyBy") }) setGeneric("lapplyPartition", function(X, FUN) { standardGeneric("lapplyPartition") }) @@ -123,47 +108,37 @@ setGeneric("mapPartitionsWithIndex", function(X, FUN) { standardGeneric("mapPartitionsWithIndex") }) # @rdname maximum -# @export setGeneric("maximum", function(x) { standardGeneric("maximum") }) # @rdname minimum -# @export setGeneric("minimum", function(x) { standardGeneric("minimum") }) # @rdname sumRDD -# @export setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") }) # @rdname name -# @export setGeneric("name", function(x) { standardGeneric("name") }) # @rdname getNumPartitionsRDD -# @export setGeneric("getNumPartitionsRDD", function(x) { standardGeneric("getNumPartitionsRDD") }) # @rdname getNumPartitions -# @export setGeneric("numPartitions", function(x) { standardGeneric("numPartitions") }) setGeneric("persistRDD", function(x, newLevel) { standardGeneric("persistRDD") }) # @rdname pipeRDD -# @export setGeneric("pipeRDD", function(x, command, env = list()) { standardGeneric("pipeRDD")}) # @rdname pivot -# @export setGeneric("pivot", function(x, colname, values = list()) { standardGeneric("pivot") }) # @rdname reduce -# @export setGeneric("reduce", function(x, func) { standardGeneric("reduce") }) setGeneric("repartitionRDD", function(x, ...) { standardGeneric("repartitionRDD") }) # @rdname sampleRDD -# @export setGeneric("sampleRDD", function(x, withReplacement, fraction, seed) { standardGeneric("sampleRDD") @@ -171,21 +146,17 @@ setGeneric("sampleRDD", # @rdname saveAsObjectFile # @seealso objectFile -# @export setGeneric("saveAsObjectFile", function(x, path) { standardGeneric("saveAsObjectFile") }) # @rdname saveAsTextFile -# @export setGeneric("saveAsTextFile", function(x, path) { standardGeneric("saveAsTextFile") }) # @rdname setName -# @export
[1/3] spark git commit: [SPARK-22430][R][DOCS] Unknown tag warnings when building R docs with Roxygen 6.0.1
Repository: spark Updated Branches: refs/heads/master 947b4e6f0 -> 4586eada4 http://git-wip-us.apache.org/repos/asf/spark/blob/4586eada/R/pkg/R/mllib_tree.R -- diff --git a/R/pkg/R/mllib_tree.R b/R/pkg/R/mllib_tree.R index 4e5ddf2..6769be0 100644 --- a/R/pkg/R/mllib_tree.R +++ b/R/pkg/R/mllib_tree.R @@ -20,42 +20,36 @@ #' S4 class that represents a GBTRegressionModel #' #' @param jobj a Java object reference to the backing Scala GBTRegressionModel -#' @export #' @note GBTRegressionModel since 2.1.0 setClass("GBTRegressionModel", representation(jobj = "jobj")) #' S4 class that represents a GBTClassificationModel #' #' @param jobj a Java object reference to the backing Scala GBTClassificationModel -#' @export #' @note GBTClassificationModel since 2.1.0 setClass("GBTClassificationModel", representation(jobj = "jobj")) #' S4 class that represents a RandomForestRegressionModel #' #' @param jobj a Java object reference to the backing Scala RandomForestRegressionModel -#' @export #' @note RandomForestRegressionModel since 2.1.0 setClass("RandomForestRegressionModel", representation(jobj = "jobj")) #' S4 class that represents a RandomForestClassificationModel #' #' @param jobj a Java object reference to the backing Scala RandomForestClassificationModel -#' @export #' @note RandomForestClassificationModel since 2.1.0 setClass("RandomForestClassificationModel", representation(jobj = "jobj")) #' S4 class that represents a DecisionTreeRegressionModel #' #' @param jobj a Java object reference to the backing Scala DecisionTreeRegressionModel -#' @export #' @note DecisionTreeRegressionModel since 2.3.0 setClass("DecisionTreeRegressionModel", representation(jobj = "jobj")) #' S4 class that represents a DecisionTreeClassificationModel #' #' @param jobj a Java object reference to the backing Scala DecisionTreeClassificationModel -#' @export #' @note DecisionTreeClassificationModel since 2.3.0 setClass("DecisionTreeClassificationModel", representation(jobj = "jobj")) @@ -179,7 +173,6 @@ print.summary.decisionTree <- function(x) { #' @return \code{spark.gbt} returns a fitted Gradient Boosted Tree model. #' @rdname spark.gbt #' @name spark.gbt -#' @export #' @examples #' \dontrun{ #' # fit a Gradient Boosted Tree Regression Model @@ -261,7 +254,6 @@ setMethod("spark.gbt", signature(data = "SparkDataFrame", formula = "formula"), #' \code{numTrees} (number of trees), and \code{treeWeights} (tree weights). #' @rdname spark.gbt #' @aliases summary,GBTRegressionModel-method -#' @export #' @note summary(GBTRegressionModel) since 2.1.0 setMethod("summary", signature(object = "GBTRegressionModel"), function(object) { @@ -275,7 +267,6 @@ setMethod("summary", signature(object = "GBTRegressionModel"), #' @param x summary object of Gradient Boosted Tree regression model or classification model #' returned by \code{summary}. #' @rdname spark.gbt -#' @export #' @note print.summary.GBTRegressionModel since 2.1.0 print.summary.GBTRegressionModel <- function(x, ...) { print.summary.treeEnsemble(x) @@ -285,7 +276,6 @@ print.summary.GBTRegressionModel <- function(x, ...) { #' @rdname spark.gbt #' @aliases summary,GBTClassificationModel-method -#' @export #' @note summary(GBTClassificationModel) since 2.1.0 setMethod("summary", signature(object = "GBTClassificationModel"), function(object) { @@ -297,7 +287,6 @@ setMethod("summary", signature(object = "GBTClassificationModel"), # Prints the summary of Gradient Boosted Tree Classification Model #' @rdname spark.gbt -#' @export #' @note print.summary.GBTClassificationModel since 2.1.0 print.summary.GBTClassificationModel <- function(x, ...) { print.summary.treeEnsemble(x) @@ -310,7 +299,6 @@ print.summary.GBTClassificationModel <- function(x, ...) { #' "prediction". #' @rdname spark.gbt #' @aliases predict,GBTRegressionModel-method -#' @export #' @note predict(GBTRegressionModel) since 2.1.0 setMethod("predict", signature(object = "GBTRegressionModel"), function(object, newData) { @@ -319,7 +307,6 @@ setMethod("predict", signature(object = "GBTRegressionModel"), #' @rdname spark.gbt #' @aliases predict,GBTClassificationModel-method -#' @export #' @note predict(GBTClassificationModel) since 2.1.0 setMethod("predict", signature(object = "GBTClassificationModel"), function(object, newData) { @@ -334,7 +321,6 @@ setMethod("predict", signature(object = "GBTClassificationModel"), #' which means throw exception if the output path exists. #' @aliases write.ml,GBTRegressionModel,character-method #' @rdname spark.gbt -#' @export #' @note write.ml(GBTRegressionModel, character) since 2.1.0 setMethod("write.ml", signature(object = "GBTRegressionModel", path = "character"), function(object, path, overwrite = FALSE) { @@ -343,7 +329,6 @@
[3/3] spark git commit: [SPARK-22430][R][DOCS] Unknown tag warnings when building R docs with Roxygen 6.0.1
[SPARK-22430][R][DOCS] Unknown tag warnings when building R docs with Roxygen 6.0.1 ## What changes were proposed in this pull request? Removed export tag to get rid of unknown tag warnings ## How was this patch tested? Existing tests Author: Rekha JoshiAuthor: rjoshi2 Closes #20501 from rekhajoshm/SPARK-22430. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4586eada Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4586eada Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4586eada Branch: refs/heads/master Commit: 4586eada42d6a16bb78d1650d145531c51fa747f Parents: 947b4e6 Author: Rekha Joshi Authored: Mon Mar 5 09:30:49 2018 -0800 Committer: Felix Cheung Committed: Mon Mar 5 09:30:49 2018 -0800 -- R/pkg/R/DataFrame.R| 92 -- R/pkg/R/SQLContext.R | 16 -- R/pkg/R/WindowSpec.R | 8 - R/pkg/R/broadcast.R| 3 - R/pkg/R/catalog.R | 18 -- R/pkg/R/column.R | 7 - R/pkg/R/context.R | 6 - R/pkg/R/functions.R| 181 --- R/pkg/R/generics.R | 343 R/pkg/R/group.R| 7 - R/pkg/R/install.R | 1 - R/pkg/R/jvm.R | 3 - R/pkg/R/mllib_classification.R | 20 --- R/pkg/R/mllib_clustering.R | 23 --- R/pkg/R/mllib_fpm.R| 6 - R/pkg/R/mllib_recommendation.R | 5 - R/pkg/R/mllib_regression.R | 17 -- R/pkg/R/mllib_stat.R | 4 - R/pkg/R/mllib_tree.R | 33 R/pkg/R/mllib_utils.R | 3 - R/pkg/R/schema.R | 7 - R/pkg/R/sparkR.R | 7 - R/pkg/R/stats.R| 6 - R/pkg/R/streaming.R| 9 - R/pkg/R/utils.R| 1 - R/pkg/R/window.R | 4 - 26 files changed, 830 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4586eada/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 41c3c3a..c485202 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -36,7 +36,6 @@ setOldClass("structType") #' @slot sdf A Java object reference to the backing Scala DataFrame #' @seealso \link{createDataFrame}, \link{read.json}, \link{table} #' @seealso \url{https://spark.apache.org/docs/latest/sparkr.html#sparkr-dataframes} -#' @export #' @examples #'\dontrun{ #' sparkR.session() @@ -77,7 +76,6 @@ setWriteMode <- function(write, mode) { write } -#' @export #' @param sdf A Java object reference to the backing Scala DataFrame #' @param isCached TRUE if the SparkDataFrame is cached #' @noRd @@ -97,7 +95,6 @@ dataFrame <- function(sdf, isCached = FALSE) { #' @rdname printSchema #' @name printSchema #' @aliases printSchema,SparkDataFrame-method -#' @export #' @examples #'\dontrun{ #' sparkR.session() @@ -123,7 +120,6 @@ setMethod("printSchema", #' @rdname schema #' @name schema #' @aliases schema,SparkDataFrame-method -#' @export #' @examples #'\dontrun{ #' sparkR.session() @@ -146,7 +142,6 @@ setMethod("schema", #' @aliases explain,SparkDataFrame-method #' @rdname explain #' @name explain -#' @export #' @examples #'\dontrun{ #' sparkR.session() @@ -178,7 +173,6 @@ setMethod("explain", #' @rdname isLocal #' @name isLocal #' @aliases isLocal,SparkDataFrame-method -#' @export #' @examples #'\dontrun{ #' sparkR.session() @@ -209,7 +203,6 @@ setMethod("isLocal", #' @aliases showDF,SparkDataFrame-method #' @rdname showDF #' @name showDF -#' @export #' @examples #'\dontrun{ #' sparkR.session() @@ -241,7 +234,6 @@ setMethod("showDF", #' @rdname show #' @aliases show,SparkDataFrame-method #' @name show -#' @export #' @examples #'\dontrun{ #' sparkR.session() @@ -269,7 +261,6 @@ setMethod("show", "SparkDataFrame", #' @rdname dtypes #' @name dtypes #' @aliases dtypes,SparkDataFrame-method -#' @export #' @examples #'\dontrun{ #' sparkR.session() @@ -296,7 +287,6 @@ setMethod("dtypes", #' @rdname columns #' @name columns #' @aliases columns,SparkDataFrame-method -#' @export #' @examples #'\dontrun{ #' sparkR.session() @@ -388,7 +378,6 @@ setMethod("colnames<-", #' @aliases coltypes,SparkDataFrame-method #' @name coltypes #' @family SparkDataFrame functions -#' @export #' @examples #'\dontrun{ #' irisDF <- createDataFrame(iris) @@ -445,7 +434,6 @@ setMethod("coltypes", #' @rdname coltypes #' @name coltypes<- #' @aliases coltypes<-,SparkDataFrame,character-method -#' @export #' @examples #'\dontrun{ #' sparkR.session() @@ -494,7 +482,6 @@ setMethod("coltypes<-", #' @rdname
spark-website git commit: update committer
Repository: spark-website Updated Branches: refs/heads/asf-site 32ff6fa97 -> 8bd24fb6d update committer Author: Felix CheungCloses #103 from felixcheung/fc. Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/8bd24fb6 Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/8bd24fb6 Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/8bd24fb6 Branch: refs/heads/asf-site Commit: 8bd24fb6d1a78a13e741f63a5ad6935f3a572ed2 Parents: 32ff6fa Author: Felix Cheung Authored: Mon Mar 5 09:22:44 2018 -0800 Committer: Felix Cheung Committed: Mon Mar 5 09:22:44 2018 -0800 -- committers.md| 2 +- site/committers.html | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/8bd24fb6/committers.md -- diff --git a/committers.md b/committers.md index 5d7a3d0..38fb3b0 100644 --- a/committers.md +++ b/committers.md @@ -14,7 +14,7 @@ navigation: |Michael Armbrust|Databricks| |Joseph Bradley|Databricks| |Matthew Cheah|Palantir| -|Felix Cheung|Microsoft| +|Felix Cheung|Uber| |Mosharaf Chowdhury|University of Michigan, Ann Arbor| |Bryan Cutler|IBM| |Jason Dai|Intel| http://git-wip-us.apache.org/repos/asf/spark-website/blob/8bd24fb6/site/committers.html -- diff --git a/site/committers.html b/site/committers.html index 1f02da2..bd691ba 100644 --- a/site/committers.html +++ b/site/committers.html @@ -222,7 +222,7 @@ Felix Cheung - Microsoft + Uber Mosharaf Chowdhury - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r25438 - in /dev/spark/2.4.0-SNAPSHOT-2018_03_05_08_01-947b4e6-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Mar 5 16:15:36 2018 New Revision: 25438 Log: Apache Spark 2.4.0-SNAPSHOT-2018_03_05_08_01-947b4e6 docs [This commit notification would consist of 1444 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23510][DOC][FOLLOW-UP] Update spark.sql.hive.metastore.version
Repository: spark Updated Branches: refs/heads/master a366b950b -> 947b4e6f0 [SPARK-23510][DOC][FOLLOW-UP] Update spark.sql.hive.metastore.version ## What changes were proposed in this pull request? Update `spark.sql.hive.metastore.version` to 2.3.2, same as HiveUtils.scala: https://github.com/apache/spark/blob/ff1480189b827af0be38605d566a4ee71b4c36f6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala#L63-L65 ## How was this patch tested? N/A Author: Yuming WangCloses #20734 from wangyum/SPARK-23510-FOLLOW-UP. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/947b4e6f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/947b4e6f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/947b4e6f Branch: refs/heads/master Commit: 947b4e6f09db6aa5d92409344b6e273e9faeb24e Parents: a366b95 Author: Yuming Wang Authored: Mon Mar 5 16:21:02 2018 +0100 Committer: Herman van Hovell Committed: Mon Mar 5 16:21:02 2018 +0100 -- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/947b4e6f/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 4d0f015..01e2076 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1214,7 +1214,7 @@ The following options can be used to configure the version of Hive that is used 1.2.1 Version of the Hive metastore. Available - options are 0.12.0 through 1.2.1. + options are 0.12.0 through 2.3.2. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23329][SQL] Fix documentation of trigonometric functions
Repository: spark Updated Branches: refs/heads/branch-2.3 88dd335f6 -> 232b9f81f [SPARK-23329][SQL] Fix documentation of trigonometric functions ## What changes were proposed in this pull request? Provide more details in trigonometric function documentations. Referenced `java.lang.Math` for further details in the descriptions. ## How was this patch tested? Ran full build, checked generated documentation manually Author: Mihaly TothCloses #20618 from misutoth/trigonometric-doc. (cherry picked from commit a366b950b90650693ad0eb1e5b9a988ad028d845) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/232b9f81 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/232b9f81 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/232b9f81 Branch: refs/heads/branch-2.3 Commit: 232b9f81f02ec00fc698f610ecc1ca25740e8802 Parents: 88dd335 Author: Mihaly Toth Authored: Mon Mar 5 23:46:40 2018 +0900 Committer: hyukjinkwon Committed: Mon Mar 5 23:47:07 2018 +0900 -- R/pkg/R/functions.R | 34 ++-- python/pyspark/sql/functions.py | 62 --- .../catalyst/expressions/mathExpressions.scala | 99 +--- .../scala/org/apache/spark/sql/functions.scala | 160 +-- 4 files changed, 248 insertions(+), 107 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/232b9f81/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 9f7c631..29ee146 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -278,8 +278,8 @@ setMethod("abs", }) #' @details -#' \code{acos}: Computes the cosine inverse of the given value; the returned angle is in -#' the range 0.0 through pi. +#' \code{acos}: Returns the inverse cosine of the given value, +#' as if computed by \code{java.lang.Math.acos()} #' #' @rdname column_math_functions #' @export @@ -334,8 +334,8 @@ setMethod("ascii", }) #' @details -#' \code{asin}: Computes the sine inverse of the given value; the returned angle is in -#' the range -pi/2 through pi/2. +#' \code{asin}: Returns the inverse sine of the given value, +#' as if computed by \code{java.lang.Math.asin()} #' #' @rdname column_math_functions #' @export @@ -349,8 +349,8 @@ setMethod("asin", }) #' @details -#' \code{atan}: Computes the tangent inverse of the given value; the returned angle is in the range -#' -pi/2 through pi/2. +#' \code{atan}: Returns the inverse tangent of the given value, +#' as if computed by \code{java.lang.Math.atan()} #' #' @rdname column_math_functions #' @export @@ -613,7 +613,8 @@ setMethod("covar_pop", signature(col1 = "characterOrColumn", col2 = "characterOr }) #' @details -#' \code{cos}: Computes the cosine of the given value. Units in radians. +#' \code{cos}: Returns the cosine of the given value, +#' as if computed by \code{java.lang.Math.cos()}. Units in radians. #' #' @rdname column_math_functions #' @aliases cos cos,Column-method @@ -627,7 +628,8 @@ setMethod("cos", }) #' @details -#' \code{cosh}: Computes the hyperbolic cosine of the given value. +#' \code{cosh}: Returns the hyperbolic cosine of the given value, +#' as if computed by \code{java.lang.Math.cosh()}. #' #' @rdname column_math_functions #' @aliases cosh cosh,Column-method @@ -1463,7 +1465,8 @@ setMethod("sign", signature(x = "Column"), }) #' @details -#' \code{sin}: Computes the sine of the given value. Units in radians. +#' \code{sin}: Returns the sine of the given value, +#' as if computed by \code{java.lang.Math.sin()}. Units in radians. #' #' @rdname column_math_functions #' @aliases sin sin,Column-method @@ -1477,7 +1480,8 @@ setMethod("sin", }) #' @details -#' \code{sinh}: Computes the hyperbolic sine of the given value. +#' \code{sinh}: Returns the hyperbolic sine of the given value, +#' as if computed by \code{java.lang.Math.sinh()}. #' #' @rdname column_math_functions #' @aliases sinh sinh,Column-method @@ -1653,7 +1657,9 @@ setMethod("sumDistinct", }) #' @details -#' \code{tan}: Computes the tangent of the given value. Units in radians. +#' \code{tan}: Returns the tangent of the given value, +#' as if computed by \code{java.lang.Math.tan()}. +#' Units in radians. #' #' @rdname column_math_functions #' @aliases tan tan,Column-method @@ -1667,7 +1673,8 @@ setMethod("tan", }) #' @details -#' \code{tanh}: Computes the hyperbolic tangent of the given value. +#' \code{tanh}: Returns the hyperbolic tangent of the given value, +#' as if computed by
spark git commit: [SPARK-23329][SQL] Fix documentation of trigonometric functions
Repository: spark Updated Branches: refs/heads/master 5ff72ffcf -> a366b950b [SPARK-23329][SQL] Fix documentation of trigonometric functions ## What changes were proposed in this pull request? Provide more details in trigonometric function documentations. Referenced `java.lang.Math` for further details in the descriptions. ## How was this patch tested? Ran full build, checked generated documentation manually Author: Mihaly TothCloses #20618 from misutoth/trigonometric-doc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a366b950 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a366b950 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a366b950 Branch: refs/heads/master Commit: a366b950b90650693ad0eb1e5b9a988ad028d845 Parents: 5ff72ff Author: Mihaly Toth Authored: Mon Mar 5 23:46:40 2018 +0900 Committer: hyukjinkwon Committed: Mon Mar 5 23:46:40 2018 +0900 -- R/pkg/R/functions.R | 34 ++-- python/pyspark/sql/functions.py | 62 --- .../catalyst/expressions/mathExpressions.scala | 99 +--- .../scala/org/apache/spark/sql/functions.scala | 160 +-- 4 files changed, 248 insertions(+), 107 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a366b950/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 9f7c631..29ee146 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -278,8 +278,8 @@ setMethod("abs", }) #' @details -#' \code{acos}: Computes the cosine inverse of the given value; the returned angle is in -#' the range 0.0 through pi. +#' \code{acos}: Returns the inverse cosine of the given value, +#' as if computed by \code{java.lang.Math.acos()} #' #' @rdname column_math_functions #' @export @@ -334,8 +334,8 @@ setMethod("ascii", }) #' @details -#' \code{asin}: Computes the sine inverse of the given value; the returned angle is in -#' the range -pi/2 through pi/2. +#' \code{asin}: Returns the inverse sine of the given value, +#' as if computed by \code{java.lang.Math.asin()} #' #' @rdname column_math_functions #' @export @@ -349,8 +349,8 @@ setMethod("asin", }) #' @details -#' \code{atan}: Computes the tangent inverse of the given value; the returned angle is in the range -#' -pi/2 through pi/2. +#' \code{atan}: Returns the inverse tangent of the given value, +#' as if computed by \code{java.lang.Math.atan()} #' #' @rdname column_math_functions #' @export @@ -613,7 +613,8 @@ setMethod("covar_pop", signature(col1 = "characterOrColumn", col2 = "characterOr }) #' @details -#' \code{cos}: Computes the cosine of the given value. Units in radians. +#' \code{cos}: Returns the cosine of the given value, +#' as if computed by \code{java.lang.Math.cos()}. Units in radians. #' #' @rdname column_math_functions #' @aliases cos cos,Column-method @@ -627,7 +628,8 @@ setMethod("cos", }) #' @details -#' \code{cosh}: Computes the hyperbolic cosine of the given value. +#' \code{cosh}: Returns the hyperbolic cosine of the given value, +#' as if computed by \code{java.lang.Math.cosh()}. #' #' @rdname column_math_functions #' @aliases cosh cosh,Column-method @@ -1463,7 +1465,8 @@ setMethod("sign", signature(x = "Column"), }) #' @details -#' \code{sin}: Computes the sine of the given value. Units in radians. +#' \code{sin}: Returns the sine of the given value, +#' as if computed by \code{java.lang.Math.sin()}. Units in radians. #' #' @rdname column_math_functions #' @aliases sin sin,Column-method @@ -1477,7 +1480,8 @@ setMethod("sin", }) #' @details -#' \code{sinh}: Computes the hyperbolic sine of the given value. +#' \code{sinh}: Returns the hyperbolic sine of the given value, +#' as if computed by \code{java.lang.Math.sinh()}. #' #' @rdname column_math_functions #' @aliases sinh sinh,Column-method @@ -1653,7 +1657,9 @@ setMethod("sumDistinct", }) #' @details -#' \code{tan}: Computes the tangent of the given value. Units in radians. +#' \code{tan}: Returns the tangent of the given value, +#' as if computed by \code{java.lang.Math.tan()}. +#' Units in radians. #' #' @rdname column_math_functions #' @aliases tan tan,Column-method @@ -1667,7 +1673,8 @@ setMethod("tan", }) #' @details -#' \code{tanh}: Computes the hyperbolic tangent of the given value. +#' \code{tanh}: Returns the hyperbolic tangent of the given value, +#' as if computed by \code{java.lang.Math.tanh()}. #' #' @rdname column_math_functions #' @aliases tanh tanh,Column-method @@ -1973,7 +1980,8 @@ setMethod("year", #'
spark git commit: [SPARK-23566][MINOR][DOC] Argument name mismatch fixed
Repository: spark Updated Branches: refs/heads/master 42cf48e20 -> 5ff72ffcf [SPARK-23566][MINOR][DOC] Argument name mismatch fixed Argument name mismatch fixed. ## What changes were proposed in this pull request? `col` changed to `new` in doc string to match the argument list. Patch file added: https://issues.apache.org/jira/browse/SPARK-23566 Please review http://spark.apache.org/contributing.html before opening a pull request. Author: AnirudhCloses #20716 from animenon/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ff72ffc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ff72ffc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ff72ffc Branch: refs/heads/master Commit: 5ff72ffcf495d2823f7f1186078d1cb261667c3d Parents: 42cf48e Author: Anirudh Authored: Mon Mar 5 23:17:16 2018 +0900 Committer: hyukjinkwon Committed: Mon Mar 5 23:17:16 2018 +0900 -- python/pyspark/sql/dataframe.py | 20 +--- 1 file changed, 13 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5ff72ffc/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index f3e..9d8e85c 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -588,6 +588,8 @@ class DataFrame(object): """ Returns a new :class:`DataFrame` that has exactly `numPartitions` partitions. +:param numPartitions: int, to specify the target number of partitions + Similar to coalesce defined on an :class:`RDD`, this operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the 100 new partitions will @@ -612,9 +614,10 @@ class DataFrame(object): Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The resulting DataFrame is hash partitioned. -``numPartitions`` can be an int to specify the target number of partitions or a Column. -If it is a Column, it will be used as the first partitioning column. If not specified, -the default number of partitions is used. +:param numPartitions: +can be an int to specify the target number of partitions or a Column. +If it is a Column, it will be used as the first partitioning column. If not specified, +the default number of partitions is used. .. versionchanged:: 1.6 Added optional arguments to specify the partitioning columns. Also made numPartitions @@ -673,9 +676,10 @@ class DataFrame(object): Returns a new :class:`DataFrame` partitioned by the given partitioning expressions. The resulting DataFrame is range partitioned. -``numPartitions`` can be an int to specify the target number of partitions or a Column. -If it is a Column, it will be used as the first partitioning column. If not specified, -the default number of partitions is used. +:param numPartitions: +can be an int to specify the target number of partitions or a Column. +If it is a Column, it will be used as the first partitioning column. If not specified, +the default number of partitions is used. At least one partition-by expression must be specified. When no explicit sort order is specified, "ascending nulls first" is assumed. @@ -892,6 +896,8 @@ class DataFrame(object): def alias(self, alias): """Returns a new :class:`DataFrame` with an alias set. +:param alias: string, an alias name to be set for the DataFrame. + >>> from pyspark.sql.functions import * >>> df_as1 = df.alias("df_as1") >>> df_as2 = df.alias("df_as2") @@ -1900,7 +1906,7 @@ class DataFrame(object): This is a no-op if schema doesn't contain the given column name. :param existing: string, name of the existing column to rename. -:param col: string, new name of the column. +:param new: string, new name of the column. >>> df.withColumnRenamed('age', 'age2').collect() [Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r25432 - in /dev/spark/2.4.0-SNAPSHOT-2018_03_05_05_24-2ce37b5-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Mar 5 13:37:51 2018 New Revision: 25432 Log: Apache Spark 2.4.0-SNAPSHOT-2018_03_05_05_24-2ce37b5 docs [This commit notification would consist of 1444 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23496][CORE] Locality of coalesced partitions can be severely skewed by the order of input partitions
Repository: spark Updated Branches: refs/heads/master 2ce37b50f -> 42cf48e20 [SPARK-23496][CORE] Locality of coalesced partitions can be severely skewed by the order of input partitions ## What changes were proposed in this pull request? The algorithm in `DefaultPartitionCoalescer.setupGroups` is responsible for picking preferred locations for coalesced partitions. It analyzes the preferred locations of input partitions. It starts by trying to create one partition for each unique location in the input. However, if the the requested number of coalesced partitions is higher that the number of unique locations, it has to pick duplicate locations. Previously, the duplicate locations would be picked by iterating over the input partitions in order, and copying their preferred locations to coalesced partitions. If the input partitions were clustered by location, this could result in severe skew. With the fix, instead of iterating over the list of input partitions in order, we pick them at random. It's not perfectly balanced, but it's much better. ## How was this patch tested? Unit test reproducing the behavior was added. Author: Ala LuszczakCloses #20664 from ala/SPARK-23496. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42cf48e2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42cf48e2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42cf48e2 Branch: refs/heads/master Commit: 42cf48e20cd5e47e1b7557af9c71c4eea142f10f Parents: 2ce37b5 Author: Ala Luszczak Authored: Mon Mar 5 14:33:12 2018 +0100 Committer: Herman van Hovell Committed: Mon Mar 5 14:33:12 2018 +0100 -- .../org/apache/spark/rdd/CoalescedRDD.scala | 8 ++-- .../scala/org/apache/spark/rdd/RDDSuite.scala | 42 2 files changed, 46 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/42cf48e2/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 10451a3..94e7d0b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -266,17 +266,17 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) numCreated += 1 } } -tries = 0 // if we don't have enough partition groups, create duplicates while (numCreated < targetLen) { - val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries) - tries += 1 + // Copy the preferred location from a random input partition. + // This helps in avoiding skew when the input partitions are clustered by preferred location. + val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs( +rnd.nextInt(partitionLocs.partsWithLocs.length)) val pgroup = new PartitionGroup(Some(nxt_replica)) groupArr += pgroup groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup addPartToPGroup(nxt_part, pgroup) numCreated += 1 - if (tries >= partitionLocs.partsWithLocs.length) tries = 0 } } http://git-wip-us.apache.org/repos/asf/spark/blob/42cf48e2/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index e994d72..191c612 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -1129,6 +1129,35 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { }.collect() } + test("SPARK-23496: order of input partitions can result in severe skew in coalesce") { +val numInputPartitions = 100 +val numCoalescedPartitions = 50 +val locations = Array("locA", "locB") + +val inputRDD = sc.makeRDD(Range(0, numInputPartitions).toArray[Int], numInputPartitions) +assert(inputRDD.getNumPartitions == numInputPartitions) + +val locationPrefRDD = new LocationPrefRDD(inputRDD, { (p: Partition) => + if (p.index < numCoalescedPartitions) { +Seq(locations(0)) + } else { +Seq(locations(1)) + } +}) +val coalescedRDD = new CoalescedRDD(locationPrefRDD, numCoalescedPartitions) + +val numPartsPerLocation = coalescedRDD + .getPartitions + .map(coalescedRDD.getPreferredLocations(_).head) + .groupBy(identity) + .mapValues(_.size) + +// Make sure the coalesced partitions are
[1/2] spark git commit: [SPARK-23546][SQL] Refactor stateless methods/values in CodegenContext
Repository: spark Updated Branches: refs/heads/master 269cd5359 -> 2ce37b50f http://git-wip-us.apache.org/repos/asf/spark/blob/2ce37b50/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala index d8dc086..2c2cf3d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala @@ -1128,15 +1128,16 @@ abstract class RoundBase(child: Expression, scale: Expression, }""" } +val javaType = CodeGenerator.javaType(dataType) if (scaleV == null) { // if scale is null, no need to eval its child at all ev.copy(code = s""" boolean ${ev.isNull} = true; -${ctx.javaType(dataType)} ${ev.value} = ${ctx.defaultValue(dataType)};""") +$javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)};""") } else { ev.copy(code = s""" ${ce.code} boolean ${ev.isNull} = ${ce.isNull}; -${ctx.javaType(dataType)} ${ev.value} = ${ctx.defaultValue(dataType)}; +$javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; if (!${ev.isNull}) { $evaluationCode }""") http://git-wip-us.apache.org/repos/asf/spark/blob/2ce37b50/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala index 470d5da..b35fa72 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.types._ @@ -72,7 +72,7 @@ case class Coalesce(children: Seq[Expression]) extends Expression { } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -ev.isNull = ctx.addMutableState(ctx.JAVA_BOOLEAN, ev.isNull) +ev.isNull = ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, ev.isNull) // all the evals are meant to be in a do { ... } while (false); loop val evals = children.map { e => @@ -87,14 +87,14 @@ case class Coalesce(children: Seq[Expression]) extends Expression { """.stripMargin } -val resultType = ctx.javaType(dataType) +val resultType = CodeGenerator.javaType(dataType) val codes = ctx.splitExpressionsWithCurrentInputs( expressions = evals, funcName = "coalesce", returnType = resultType, makeSplitFunction = func => s""" - |$resultType ${ev.value} = ${ctx.defaultValue(dataType)}; + |$resultType ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; |do { | $func |} while (false); @@ -113,7 +113,7 @@ case class Coalesce(children: Seq[Expression]) extends Expression { ev.copy(code = s""" |${ev.isNull} = true; - |$resultType ${ev.value} = ${ctx.defaultValue(dataType)}; + |$resultType ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; |do { | $codes |} while (false); @@ -234,7 +234,7 @@ case class IsNaN(child: Expression) extends UnaryExpression case DoubleType | FloatType => ev.copy(code = s""" ${eval.code} - ${ctx.javaType(dataType)} ${ev.value} = ${ctx.defaultValue(dataType)}; + ${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; ${ev.value} = !${eval.isNull} && Double.isNaN(${eval.value});""", isNull = "false") } } @@ -281,7 +281,7 @@ case class NaNvl(left: Expression, right: Expression) ev.copy(code = s""" ${leftGen.code} boolean ${ev.isNull} = false; - ${ctx.javaType(dataType)} ${ev.value} = ${ctx.defaultValue(dataType)}; + ${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; if (${leftGen.isNull}) { ${ev.isNull} = true; } else { @@
[2/2] spark git commit: [SPARK-23546][SQL] Refactor stateless methods/values in CodegenContext
[SPARK-23546][SQL] Refactor stateless methods/values in CodegenContext ## What changes were proposed in this pull request? A current `CodegenContext` class has immutable value or method without mutable state, too. This refactoring moves them to `CodeGenerator` object class which can be accessed from anywhere without an instantiated `CodegenContext` in the program. ## How was this patch tested? Existing tests Author: Kazuaki IshizakiCloses #20700 from kiszk/SPARK-23546. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ce37b50 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ce37b50 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ce37b50 Branch: refs/heads/master Commit: 2ce37b50fc01558f49ad22f89c8659f50544ffec Parents: 269cd53 Author: Kazuaki Ishizaki Authored: Mon Mar 5 11:39:01 2018 +0100 Committer: Herman van Hovell Committed: Mon Mar 5 11:39:01 2018 +0100 -- .../catalyst/expressions/BoundAttribute.scala | 9 +- .../spark/sql/catalyst/expressions/Cast.scala | 35 +- .../sql/catalyst/expressions/Expression.scala | 16 +- .../expressions/MonotonicallyIncreasingID.scala | 8 +- .../sql/catalyst/expressions/ScalaUDF.scala | 7 +- .../catalyst/expressions/SparkPartitionID.scala | 7 +- .../sql/catalyst/expressions/TimeWindow.scala | 4 +- .../sql/catalyst/expressions/arithmetic.scala | 51 +-- .../expressions/bitwiseExpressions.scala| 2 +- .../expressions/codegen/CodeGenerator.scala | 458 ++- .../expressions/codegen/CodegenFallback.scala | 7 +- .../codegen/GenerateMutableProjection.scala | 6 +- .../expressions/codegen/GenerateOrdering.scala | 4 +- .../codegen/GenerateSafeProjection.scala| 6 +- .../codegen/GenerateUnsafeProjection.scala | 11 +- .../expressions/collectionOperations.scala | 6 +- .../expressions/complexTypeCreator.scala| 4 +- .../expressions/complexTypeExtractors.scala | 15 +- .../expressions/conditionalExpressions.scala| 10 +- .../expressions/datetimeExpressions.scala | 18 +- .../spark/sql/catalyst/expressions/hash.scala | 25 +- .../catalyst/expressions/inputFileBlock.scala | 8 +- .../sql/catalyst/expressions/literals.scala | 8 +- .../catalyst/expressions/mathExpressions.scala | 5 +- .../catalyst/expressions/nullExpressions.scala | 22 +- .../catalyst/expressions/objects/objects.scala | 99 ++-- .../sql/catalyst/expressions/predicates.scala | 14 +- .../expressions/randomExpressions.scala | 8 +- .../expressions/regexpExpressions.scala | 8 +- .../expressions/stringExpressions.scala | 39 +- .../expressions/CodeGenerationSuite.scala | 4 +- .../spark/sql/execution/ColumnarBatchScan.scala | 13 +- .../apache/spark/sql/execution/ExpandExec.scala | 5 +- .../spark/sql/execution/GenerateExec.scala | 8 +- .../apache/spark/sql/execution/SortExec.scala | 5 +- .../sql/execution/WholeStageCodegenExec.scala | 2 +- .../execution/aggregate/HashAggregateExec.scala | 16 +- .../execution/aggregate/HashMapGenerator.scala | 8 +- .../aggregate/RowBasedHashMapGenerator.scala| 8 +- .../aggregate/VectorizedHashMapGenerator.scala | 11 +- .../sql/execution/basicPhysicalOperators.scala | 10 +- .../columnar/GenerateColumnAccessor.scala | 2 +- .../execution/joins/BroadcastHashJoinExec.scala | 5 +- .../sql/execution/joins/SortMergeJoinExec.scala | 8 +- .../org/apache/spark/sql/execution/limit.scala | 7 +- 45 files changed, 535 insertions(+), 497 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2ce37b50/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index 6a17a39..89ffbb0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors.attachTree -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode} import org.apache.spark.sql.types._ /** @@ -66,13 +66,14 @@ case class
svn commit: r25405 - in /dev/spark/2.4.0-SNAPSHOT-2018_03_05_00_01-269cd53-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Mar 5 08:16:15 2018 New Revision: 25405 Log: Apache Spark 2.4.0-SNAPSHOT-2018_03_05_00_01-269cd53 docs [This commit notification would consist of 1444 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org