spark git commit: [SPARK-20665][SQL][FOLLOW-UP] Move test case to MathExpressionsSuite
Repository: spark Updated Branches: refs/heads/master 3476390c6 -> d14091809 [SPARK-20665][SQL][FOLLOW-UP] Move test case to MathExpressionsSuite ## What changes were proposed in this pull request? add test case to MathExpressionsSuite as #17906 ## How was this patch tested? unit test cases Author: liuxian Closes #18082 from 10110346/wip-lx-0524. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1409180 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1409180 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1409180 Branch: refs/heads/master Commit: d1409180932f2658daad2c6dbf5d80fdf4606dc5 Parents: 3476390 Author: liuxian Authored: Sun Jun 11 22:29:09 2017 -0700 Committer: Xiao Li Committed: Sun Jun 11 22:29:09 2017 -0700 -- .../expressions/MathExpressionsSuite.scala | 64 1 file changed, 52 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d1409180/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala index 6af0cde..f4d5a44 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala @@ -23,6 +23,7 @@ import com.google.common.math.LongMath import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCoercion.ImplicitTypeCasts.implicitCast import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer @@ -223,6 +224,14 @@ class MathExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { def f: (Double) => Double = (x: Double) => 1 / math.tan(x) testUnary(Cot, f) checkConsistencyBetweenInterpretedAndCodegen(Cot, DoubleType) +val nullLit = Literal.create(null, NullType) +val intNullLit = Literal.create(null, IntegerType) +val intLit = Literal.create(1, IntegerType) +checkEvaluation(checkDataTypeAndCast(Cot(nullLit)), null, EmptyRow) +checkEvaluation(checkDataTypeAndCast(Cot(intNullLit)), null, EmptyRow) +checkEvaluation(checkDataTypeAndCast(Cot(intLit)), 1 / math.tan(1), EmptyRow) +checkEvaluation(checkDataTypeAndCast(Cot(-intLit)), 1 / math.tan(-1), EmptyRow) +checkEvaluation(checkDataTypeAndCast(Cot(0)), 1 / math.tan(0), EmptyRow) } test("atan") { @@ -250,6 +259,11 @@ class MathExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkConsistencyBetweenInterpretedAndCodegen(Cbrt, DoubleType) } + def checkDataTypeAndCast(expression: UnaryMathExpression): Expression = { +val expNew = implicitCast(expression.child, expression.inputTypes(0)).getOrElse(expression) +expression.withNewChildren(Seq(expNew)) + } + test("ceil") { testUnary(Ceil, (d: Double) => math.ceil(d).toLong) checkConsistencyBetweenInterpretedAndCodegen(Ceil, DoubleType) @@ -262,12 +276,22 @@ class MathExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val doublePi: Double = 3.1415 val floatPi: Float = 3.1415f val longLit: Long = 12345678901234567L -checkEvaluation(Ceil(doublePi), 4L, EmptyRow) -checkEvaluation(Ceil(floatPi.toDouble), 4L, EmptyRow) -checkEvaluation(Ceil(longLit), longLit, EmptyRow) -checkEvaluation(Ceil(-doublePi), -3L, EmptyRow) -checkEvaluation(Ceil(-floatPi.toDouble), -3L, EmptyRow) -checkEvaluation(Ceil(-longLit), -longLit, EmptyRow) +val nullLit = Literal.create(null, NullType) +val floatNullLit = Literal.create(null, FloatType) +checkEvaluation(checkDataTypeAndCast(Ceil(doublePi)), 4L, EmptyRow) +checkEvaluation(checkDataTypeAndCast(Ceil(floatPi)), 4L, EmptyRow) +checkEvaluation(checkDataTypeAndCast(Ceil(longLit)), longLit, EmptyRow) +checkEvaluation(checkDataTypeAndCast(Ceil(-doublePi)), -3L, EmptyRow) +checkEvaluation(checkDataTypeAndCast(Ceil(-floatPi)), -3L, EmptyRow) +checkEvaluation(checkDataTypeAndCast(Ceil(-longLit)), -longLit, EmptyRow) + +checkEvaluation(checkDataTypeAndCast(Ceil(nullLit)), null, EmptyRow) +checkEvaluation(checkDataTypeAndCast(Ceil(floatNullLit)), null, EmptyRow) +checkEvaluation(checkDataTypeAndCast(Ceil(0)), 0L, EmptyRow) +checkEvaluation(checkDataTypeAndCast(Ceil(1)), 1L, EmptyRow) +
spark git commit: [SPARK-20715] Store MapStatuses only in MapOutputTracker, not ShuffleMapStage
Repository: spark Updated Branches: refs/heads/master f48273c13 -> 3476390c6 [SPARK-20715] Store MapStatuses only in MapOutputTracker, not ShuffleMapStage ## What changes were proposed in this pull request? This PR refactors `ShuffleMapStage` and `MapOutputTracker` in order to simplify the management of `MapStatuses`, reduce driver memory consumption, and remove a potential source of scheduler correctness bugs. ### Background In Spark there are currently two places where MapStatuses are tracked: - The `MapOutputTracker` maintains an `Array[MapStatus]` storing a single location for each map output. This mapping is used by the `DAGScheduler` for determining reduce-task locality preferences (when locality-aware reduce task scheduling is enabled) and is also used to serve map output locations to executors / tasks. - Each `ShuffleMapStage` also contains a mapping of `Array[List[MapStatus]]` which holds the complete set of locations where each map output could be available. This mapping is used to determine which map tasks need to be run when constructing `TaskSets` for the stage. This duplication adds complexity and creates the potential for certain types of correctness bugs. Bad things can happen if these two copies of the map output locations get out of sync. For instance, if the `MapOutputTracker` is missing locations for a map output but `ShuffleMapStage` believes that locations are available then tasks will fail with `MetadataFetchFailedException` but `ShuffleMapStage` will not be updated to reflect the missing map outputs, leading to situations where the stage will be reattempted (because downstream stages experienced fetch failures) but no task sets will be launched (because `ShuffleMapStage` thinks all maps are available). I observed this behavior in a real-world deployment. I'm still not quite sure how the state got out of sync in the first place, but we can completely avoid this class of bug if we eliminate the duplicate state. ### Why we only need to track a single location for each map output I think that storing an `Array[List[MapStatus]]` in `ShuffleMapStage` is unnecessary. First, note that this adds memory/object bloat to the driver we need one extra `List` per task. If you have millions of tasks across all stages then this can add up to be a significant amount of resources. Secondly, I believe that it's extremely uncommon that these lists will ever contain more than one entry. It's not impossible, but is very unlikely given the conditions which must occur for that to happen: - In normal operation (no task failures) we'll only run each task once and thus will have at most one output. - If speculation is enabled then it's possible that we'll have multiple attempts of a task. The TaskSetManager will [kill duplicate attempts of a task](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L717) after a task finishes successfully, reducing the likelihood that both the original and speculated task will successfully register map outputs. - There is a [comment in `TaskSetManager`](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L113) which suggests that running tasks are not killed if a task set becomes a zombie. However: - If the task set becomes a zombie due to the job being cancelled then it doesn't matter whether we record map outputs. - If the task set became a zombie because of a stage failure (e.g. the map stage itself had a fetch failure from an upstream match stage) then I believe that the "failedEpoch" will be updated which may cause map outputs from still-running tasks to [be ignored](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1213). (I'm not 100% sure on this point, though). - Even if you _do_ manage to record multiple map outputs for a stage, only a single map output is reported to / tracked by the MapOutputTracker. The only situation where the additional output locations could actually be read or used would be if a task experienced a `FetchFailure` exception. The most likely cause of a `FetchFailure` exception is an executor lost, which will have most likely caused the loss of several map tasks' output, so saving on potential re-execution of a single map task isn't a huge win if we're going to have to recompute several other lost map outputs from other tasks which ran on that lost executor. Also note that the re-population of MapOutputTracker state from state in the ShuffleMapTask only happens after the reduce stage has failed; the additional location doesn't help to prevent FetchFailures but, instead, can only reduce the amount of work when recomputing missing parent stages. Given this, this patch chooses to do away with tracking m
spark git commit: [SPARK-18891][SQL] Support for specific Java List subtypes
Repository: spark Updated Branches: refs/heads/master 0538f3b0a -> f48273c13 [SPARK-18891][SQL] Support for specific Java List subtypes ## What changes were proposed in this pull request? Add support for specific Java `List` subtypes in deserialization as well as a generic implicit encoder. All `List` subtypes are supported by using either the size-specifying constructor (one `int` parameter) or the default constructor. Interfaces/abstract classes use the following implementations: * `java.util.List`, `java.util.AbstractList` or `java.util.AbstractSequentialList` => `java.util.ArrayList` ## How was this patch tested? ```bash build/mvn -DskipTests clean package && dev/run-tests ``` Additionally in Spark shell: ``` scala> val jlist = new java.util.LinkedList[Int]; jlist.add(1) jlist: java.util.LinkedList[Int] = [1] res0: Boolean = true scala> Seq(jlist).toDS().map(_.element()).collect() res1: Array[Int] = Array(1) ``` Author: Michal Senkyr Closes #18009 from michalsenkyr/dataset-java-lists. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f48273c1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f48273c1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f48273c1 Branch: refs/heads/master Commit: f48273c13c9e9fea2d9bb6dda10fca50c588 Parents: 0538f3b Author: Michal Senkyr Authored: Mon Jun 12 08:53:23 2017 +0800 Committer: Wenchen Fan Committed: Mon Jun 12 08:53:23 2017 +0800 -- .../spark/sql/catalyst/JavaTypeInference.scala | 15 ++--- .../catalyst/expressions/objects/objects.scala | 19 +- .../org/apache/spark/sql/JavaDatasetSuite.java | 61 3 files changed, 83 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f48273c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index 86a73a3..7683ee7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -267,16 +267,11 @@ object JavaTypeInference { case c if listType.isAssignableFrom(typeToken) => val et = elementType(typeToken) -val array = - Invoke( -MapObjects( - p => deserializerFor(et, Some(p)), - getPath, - inferDataType(et)._1), -"array", -ObjectType(classOf[Array[Any]])) - -StaticInvoke(classOf[java.util.Arrays], ObjectType(c), "asList", array :: Nil) +MapObjects( + p => deserializerFor(et, Some(p)), + getPath, + inferDataType(et)._1, + customCollectionCls = Some(c)) case _ if mapType.isAssignableFrom(typeToken) => val (keyType, valueType) = mapKeyValueType(typeToken) http://git-wip-us.apache.org/repos/asf/spark/blob/f48273c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 79b7b9f..5bb0feb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -22,6 +22,7 @@ import java.lang.reflect.Modifier import scala.collection.mutable.Builder import scala.language.existentials import scala.reflect.ClassTag +import scala.util.Try import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.serializer._ @@ -597,8 +598,8 @@ case class MapObjects private( val (initCollection, addElement, getResult): (String, String => String, String) = customCollectionCls match { -case Some(cls) => - // collection +case Some(cls) if classOf[Seq[_]].isAssignableFrom(cls) => + // Scala sequence val getBuilder = s"${cls.getName}$$.MODULE$$.newBuilder()" val builder = ctx.freshName("collectionBuilder") ( @@ -609,6 +610,20 @@ case class MapObjects private( genValue => s"$builder.$$plus$$eq($genValue);", s"(${cls.getName}) $builder.result();" ) +case Some(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) => + // Java list + val builder = ctx.freshName("collectionBuilder
spark git commit: [SPARK-18891][SQL] Support for Scala Map collection types
Repository: spark Updated Branches: refs/heads/master a7c61c100 -> 0538f3b0a [SPARK-18891][SQL] Support for Scala Map collection types ## What changes were proposed in this pull request? Add support for arbitrary Scala `Map` types in deserialization as well as a generic implicit encoder. Used the builder approach as in #16541 to construct any provided `Map` type upon deserialization. Please note that this PR also adds (ignored) tests for issue [SPARK-19104 CompileException with Map and Case Class in Spark 2.1.0](https://issues.apache.org/jira/browse/SPARK-19104) but doesn't solve it. Added support for Java Maps in codegen code (encoders will be added in a different PR) with the following default implementations for interfaces/abstract classes: * `java.util.Map`, `java.util.AbstractMap` => `java.util.HashMap` * `java.util.SortedMap`, `java.util.NavigableMap` => `java.util.TreeMap` * `java.util.concurrent.ConcurrentMap` => `java.util.concurrent.ConcurrentHashMap` * `java.util.concurrent.ConcurrentNavigableMap` => `java.util.concurrent.ConcurrentSkipListMap` Resulting codegen for `Seq(Map(1 -> 2)).toDS().map(identity).queryExecution.debug.codegen`: ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIterator(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private scala.collection.Iterator inputadapter_input; /* 009 */ private boolean CollectObjectsToMap_loopIsNull1; /* 010 */ private int CollectObjectsToMap_loopValue0; /* 011 */ private boolean CollectObjectsToMap_loopIsNull3; /* 012 */ private int CollectObjectsToMap_loopValue2; /* 013 */ private UnsafeRow deserializetoobject_result; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder; /* 015 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter; /* 016 */ private scala.collection.immutable.Map mapelements_argValue; /* 017 */ private UnsafeRow mapelements_result; /* 018 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder; /* 019 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter; /* 020 */ private UnsafeRow serializefromobject_result; /* 021 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter; /* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter; /* 024 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter1; /* 025 */ /* 026 */ public GeneratedIterator(Object[] references) { /* 027 */ this.references = references; /* 028 */ } /* 029 */ /* 030 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 031 */ partitionIndex = index; /* 032 */ this.inputs = inputs; /* 033 */ wholestagecodegen_init_0(); /* 034 */ wholestagecodegen_init_1(); /* 035 */ /* 036 */ } /* 037 */ /* 038 */ private void wholestagecodegen_init_0() { /* 039 */ inputadapter_input = inputs[0]; /* 040 */ /* 041 */ deserializetoobject_result = new UnsafeRow(1); /* 042 */ this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32); /* 043 */ this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1); /* 044 */ /* 045 */ mapelements_result = new UnsafeRow(1); /* 046 */ this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32); /* 047 */ this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1); /* 048 */ serializefromobject_result = new UnsafeRow(1); /* 049 */ this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32); /* 050 */ this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1); /* 051 */ this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 052 */ /* 053 */ } /* 054 */ /* 055 */ private void wholestagecodegen_init_1() { /* 056 */ this.serializefromobject_arrayWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(); /* 057 */ /* 058 */ } /* 059 */ /* 060 */ protected void processNext() throw
spark git commit: [SPARK-21031][SQL] Add `alterTableStats` to store spark's stats and let `alterTable` keep existing stats
Repository: spark Updated Branches: refs/heads/master 3a840048e -> a7c61c100 [SPARK-21031][SQL] Add `alterTableStats` to store spark's stats and let `alterTable` keep existing stats ## What changes were proposed in this pull request? Currently, hive's stats are read into `CatalogStatistics`, while spark's stats are also persisted through `CatalogStatistics`. As a result, hive's stats can be unexpectedly propagated into spark' stats. For example, for a catalog table, we read stats from hive, e.g. "totalSize" and put it into `CatalogStatistics`. Then, by using "ALTER TABLE" command, we will store the stats in `CatalogStatistics` into metastore as spark's stats (because we don't know whether it's from spark or not). But spark's stats should be only generated by "ANALYZE" command. This is unexpected from this command. Secondly, now that we have spark's stats in metastore, after inserting new data, although hive updated "totalSize" in metastore, we still cannot get the right `sizeInBytes` in `CatalogStatistics`, because we respect spark's stats (should not exist) over hive's stats. A running example is shown in [JIRA](https://issues.apache.org/jira/browse/SPARK-21031). To fix this, we add a new method `alterTableStats` to store spark's stats, and let `alterTable` keep existing stats. ## How was this patch tested? Added new tests. Author: Zhenhua Wang Closes #18248 from wzhfy/separateHiveStats. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a7c61c10 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a7c61c10 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a7c61c10 Branch: refs/heads/master Commit: a7c61c100b6e4380e8d0e588969dd7f2fd58d40c Parents: 3a84004 Author: Zhenhua Wang Authored: Mon Jun 12 08:23:04 2017 +0800 Committer: Wenchen Fan Committed: Mon Jun 12 08:23:04 2017 +0800 -- .../sql/catalyst/catalog/ExternalCatalog.scala | 2 + .../sql/catalyst/catalog/InMemoryCatalog.scala | 9 +++ .../sql/catalyst/catalog/SessionCatalog.scala | 13 .../catalyst/catalog/ExternalCatalogSuite.scala | 11 ++- .../catalyst/catalog/SessionCatalogSuite.scala | 12 +++ .../command/AnalyzeColumnCommand.scala | 2 +- .../execution/command/AnalyzeTableCommand.scala | 2 +- .../spark/sql/hive/HiveExternalCatalog.scala| 68 ++--- .../apache/spark/sql/hive/StatisticsSuite.scala | 80 +++- 9 files changed, 132 insertions(+), 67 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a7c61c10/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 974ef90..12ba5ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -160,6 +160,8 @@ abstract class ExternalCatalog */ def alterTableSchema(db: String, table: String, schema: StructType): Unit + def alterTableStats(db: String, table: String, stats: CatalogStatistics): Unit + def getTable(db: String, table: String): CatalogTable def getTableOption(db: String, table: String): Option[CatalogTable] http://git-wip-us.apache.org/repos/asf/spark/blob/a7c61c10/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 8a5319b..9820522 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -312,6 +312,15 @@ class InMemoryCatalog( catalog(db).tables(table).table = origTable.copy(schema = schema) } + override def alterTableStats( + db: String, + table: String, + stats: CatalogStatistics): Unit = synchronized { +requireTableExists(db, table) +val origTable = catalog(db).tables(table).table +catalog(db).tables(table).table = origTable.copy(stats = Some(stats)) + } + override def getTable(db: String, table: String): CatalogTable = synchronized { requireTableExists(db, table) catalog(db).tables(table).table http://git-wip-us.apache.org/repos/asf/spark/blob/a7c61c10/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCa
spark git commit: Fixed typo in sql.functions
Repository: spark Updated Branches: refs/heads/master 9f4ff9552 -> 3a840048e Fixed typo in sql.functions ## What changes were proposed in this pull request? I fixed a typo in the Scaladoc for the method `def struct(cols: Column*): Column`. 'retained' was misspelt as 'remained'. ## How was this patch tested? Before: Creates a new struct column. If the input column is a column in a `DataFrame`, or a derived column expression that is named (i.e. aliased), its name would be **remained** as the StructField's name, otherwise, the newly generated StructField's name would be auto generated as `col` with a suffix `index + 1`, i.e. col1, col2, col3, ... After: Creates a new struct column. If the input column is a column in a `DataFrame`, or a derived column expression that is named (i.e. aliased), its name would be **retained** as the StructField's name, otherwise, the newly generated StructField's name would be auto generated as `col` with a suffix `index + 1`, i.e. col1, col2, col3, ... Author: sujithjay Closes #18254 from sujithjay/fix-typo. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a840048 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a840048 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a840048 Branch: refs/heads/master Commit: 3a840048ed3501e06260b7c5df18cc0bbdb1505c Parents: 9f4ff95 Author: sujithjay Authored: Sun Jun 11 18:23:57 2017 +0100 Committer: Sean Owen Committed: Sun Jun 11 18:23:57 2017 +0100 -- sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3a840048/sql/core/src/main/scala/org/apache/spark/sql/functions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 8d0a8c2..8d2e1f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -1210,7 +1210,7 @@ object functions { /** * Creates a new struct column. * If the input column is a column in a `DataFrame`, or a derived column expression - * that is named (i.e. aliased), its name would be remained as the StructField's name, + * that is named (i.e. aliased), its name would be retained as the StructField's name, * otherwise, the newly generated StructField's name would be auto generated as * `col` with a suffix `index + 1`, i.e. col1, col2, col3, ... * - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20877][SPARKR][FOLLOWUP] clean up after test move
Repository: spark Updated Branches: refs/heads/branch-2.2 0b0be47e7 -> 26003de55 [SPARK-20877][SPARKR][FOLLOWUP] clean up after test move clean up after big test move unit tests, jenkins Author: Felix Cheung Closes #18267 from felixcheung/rtestset2. (cherry picked from commit 9f4ff9552470fb97ca38bb56bbf43be49a9a316c) Signed-off-by: Felix Cheung Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26003de5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26003de5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26003de5 Branch: refs/heads/branch-2.2 Commit: 26003de55ba13695649b0d874563a76d71cda88d Parents: 0b0be47 Author: Felix Cheung Authored: Sun Jun 11 03:00:44 2017 -0700 Committer: Felix Cheung Committed: Sun Jun 11 03:13:56 2017 -0700 -- R/pkg/.Rbuildignore | 1 + R/pkg/R/install.R | 2 +- R/pkg/R/utils.R | 8 +- R/pkg/tests/fulltests/test_Serde.R | 6 -- R/pkg/tests/fulltests/test_Windows.R| 7 +- R/pkg/tests/fulltests/test_binaryFile.R | 8 -- R/pkg/tests/fulltests/test_binary_function.R| 6 -- R/pkg/tests/fulltests/test_broadcast.R | 4 - R/pkg/tests/fulltests/test_client.R | 8 -- R/pkg/tests/fulltests/test_context.R| 16 --- R/pkg/tests/fulltests/test_includePackage.R | 4 - .../tests/fulltests/test_mllib_classification.R | 12 +-- R/pkg/tests/fulltests/test_mllib_clustering.R | 14 +-- R/pkg/tests/fulltests/test_mllib_fpm.R | 2 +- .../tests/fulltests/test_mllib_recommendation.R | 2 +- R/pkg/tests/fulltests/test_mllib_regression.R | 16 +-- R/pkg/tests/fulltests/test_mllib_tree.R | 14 ++- .../tests/fulltests/test_parallelize_collect.R | 8 -- R/pkg/tests/fulltests/test_rdd.R| 102 --- R/pkg/tests/fulltests/test_shuffle.R| 24 - R/pkg/tests/fulltests/test_sparkR.R | 2 - R/pkg/tests/fulltests/test_sparkSQL.R | 92 ++--- R/pkg/tests/fulltests/test_streaming.R | 14 +-- R/pkg/tests/fulltests/test_take.R | 2 - R/pkg/tests/fulltests/test_textFile.R | 18 R/pkg/tests/fulltests/test_utils.R | 8 -- R/pkg/tests/run-all.R | 2 - 27 files changed, 32 insertions(+), 370 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/26003de5/R/pkg/.Rbuildignore -- diff --git a/R/pkg/.Rbuildignore b/R/pkg/.Rbuildignore index f12f8c2..18b2db6 100644 --- a/R/pkg/.Rbuildignore +++ b/R/pkg/.Rbuildignore @@ -6,3 +6,4 @@ ^README\.Rmd$ ^src-native$ ^html$ +^tests/fulltests/* http://git-wip-us.apache.org/repos/asf/spark/blob/26003de5/R/pkg/R/install.R -- diff --git a/R/pkg/R/install.R b/R/pkg/R/install.R index 4ca7aa6..ec931be 100644 --- a/R/pkg/R/install.R +++ b/R/pkg/R/install.R @@ -267,7 +267,7 @@ hadoopVersionName <- function(hadoopVersion) { # The implementation refers to appdirs package: https://pypi.python.org/pypi/appdirs and # adapt to Spark context sparkCachePath <- function() { - if (.Platform$OS.type == "windows") { + if (is_windows()) { winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA) if (is.na(winAppPath)) { stop(paste("%LOCALAPPDATA% not found.", http://git-wip-us.apache.org/repos/asf/spark/blob/26003de5/R/pkg/R/utils.R -- diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index b19556a..7225da9 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -900,10 +900,6 @@ isAtomicLengthOne <- function(x) { is.atomic(x) && length(x) == 1 } -is_cran <- function() { - !identical(Sys.getenv("NOT_CRAN"), "true") -} - is_windows <- function() { .Platform$OS.type == "windows" } @@ -912,6 +908,6 @@ hadoop_home_set <- function() { !identical(Sys.getenv("HADOOP_HOME"), "") } -not_cran_or_windows_with_hadoop <- function() { - !is_cran() && (!is_windows() || hadoop_home_set()) +windows_with_hadoop <- function() { + !is_windows() || hadoop_home_set() } http://git-wip-us.apache.org/repos/asf/spark/blob/26003de5/R/pkg/tests/fulltests/test_Serde.R -- diff --git a/R/pkg/tests/fulltests/test_Serde.R b/R/pkg/tests/fulltests/test_Serde.R index 6e160fa..6bbd201 100644 --- a/R/pkg/tests/fulltests/test_Serde.R +++ b/R/pkg/tests/fulltests/test_Serde.R @@ -20,8 +20,6 @@ context("SerDe functionality") sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) test_that
spark git commit: [SPARK-20877][SPARKR][FOLLOWUP] clean up after test move
Repository: spark Updated Branches: refs/heads/master 823f1eef5 -> 9f4ff9552 [SPARK-20877][SPARKR][FOLLOWUP] clean up after test move ## What changes were proposed in this pull request? clean up after big test move ## How was this patch tested? unit tests, jenkins Author: Felix Cheung Closes #18267 from felixcheung/rtestset2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f4ff955 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f4ff955 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f4ff955 Branch: refs/heads/master Commit: 9f4ff9552470fb97ca38bb56bbf43be49a9a316c Parents: 823f1ee Author: Felix Cheung Authored: Sun Jun 11 03:00:44 2017 -0700 Committer: Felix Cheung Committed: Sun Jun 11 03:00:44 2017 -0700 -- R/pkg/.Rbuildignore | 1 + R/pkg/R/install.R | 2 +- R/pkg/R/utils.R | 8 +- R/pkg/tests/fulltests/test_Serde.R | 6 -- R/pkg/tests/fulltests/test_Windows.R| 7 +- R/pkg/tests/fulltests/test_binaryFile.R | 8 -- R/pkg/tests/fulltests/test_binary_function.R| 6 -- R/pkg/tests/fulltests/test_broadcast.R | 4 - R/pkg/tests/fulltests/test_client.R | 8 -- R/pkg/tests/fulltests/test_context.R| 16 --- R/pkg/tests/fulltests/test_includePackage.R | 4 - .../tests/fulltests/test_mllib_classification.R | 12 +-- R/pkg/tests/fulltests/test_mllib_clustering.R | 14 +-- R/pkg/tests/fulltests/test_mllib_fpm.R | 2 +- .../tests/fulltests/test_mllib_recommendation.R | 2 +- R/pkg/tests/fulltests/test_mllib_regression.R | 16 +-- R/pkg/tests/fulltests/test_mllib_tree.R | 22 ++-- .../tests/fulltests/test_parallelize_collect.R | 8 -- R/pkg/tests/fulltests/test_rdd.R| 102 --- R/pkg/tests/fulltests/test_shuffle.R| 24 - R/pkg/tests/fulltests/test_sparkR.R | 2 - R/pkg/tests/fulltests/test_sparkSQL.R | 92 ++--- R/pkg/tests/fulltests/test_streaming.R | 14 +-- R/pkg/tests/fulltests/test_take.R | 2 - R/pkg/tests/fulltests/test_textFile.R | 18 R/pkg/tests/fulltests/test_utils.R | 9 -- R/pkg/tests/run-all.R | 2 - 27 files changed, 35 insertions(+), 376 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9f4ff955/R/pkg/.Rbuildignore -- diff --git a/R/pkg/.Rbuildignore b/R/pkg/.Rbuildignore index f12f8c2..18b2db6 100644 --- a/R/pkg/.Rbuildignore +++ b/R/pkg/.Rbuildignore @@ -6,3 +6,4 @@ ^README\.Rmd$ ^src-native$ ^html$ +^tests/fulltests/* http://git-wip-us.apache.org/repos/asf/spark/blob/9f4ff955/R/pkg/R/install.R -- diff --git a/R/pkg/R/install.R b/R/pkg/R/install.R index 4ca7aa6..ec931be 100644 --- a/R/pkg/R/install.R +++ b/R/pkg/R/install.R @@ -267,7 +267,7 @@ hadoopVersionName <- function(hadoopVersion) { # The implementation refers to appdirs package: https://pypi.python.org/pypi/appdirs and # adapt to Spark context sparkCachePath <- function() { - if (.Platform$OS.type == "windows") { + if (is_windows()) { winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA) if (is.na(winAppPath)) { stop(paste("%LOCALAPPDATA% not found.", http://git-wip-us.apache.org/repos/asf/spark/blob/9f4ff955/R/pkg/R/utils.R -- diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index ea45e39..91483a4 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -908,10 +908,6 @@ isAtomicLengthOne <- function(x) { is.atomic(x) && length(x) == 1 } -is_cran <- function() { - !identical(Sys.getenv("NOT_CRAN"), "true") -} - is_windows <- function() { .Platform$OS.type == "windows" } @@ -920,6 +916,6 @@ hadoop_home_set <- function() { !identical(Sys.getenv("HADOOP_HOME"), "") } -not_cran_or_windows_with_hadoop <- function() { - !is_cran() && (!is_windows() || hadoop_home_set()) +windows_with_hadoop <- function() { + !is_windows() || hadoop_home_set() } http://git-wip-us.apache.org/repos/asf/spark/blob/9f4ff955/R/pkg/tests/fulltests/test_Serde.R -- diff --git a/R/pkg/tests/fulltests/test_Serde.R b/R/pkg/tests/fulltests/test_Serde.R index 6e160fa..6bbd201 100644 --- a/R/pkg/tests/fulltests/test_Serde.R +++ b/R/pkg/tests/fulltests/test_Serde.R @@ -20,8 +20,6 @@ context("SerDe functionality") sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) test_that("SerDe of primitive t
spark git commit: [SPARK-13933][BUILD] Update hadoop-2.7 profile's curator version to 2.7.1
Repository: spark Updated Branches: refs/heads/master eb3ea3a08 -> 823f1eef5 [SPARK-13933][BUILD] Update hadoop-2.7 profile's curator version to 2.7.1 ## What changes were proposed in this pull request? Update hadoop-2.7 profile's curator version to 2.7.1, more see [SPARK-13933](https://issues.apache.org/jira/browse/SPARK-13933). ## How was this patch tested? manual tests Author: Yuming Wang Closes #18247 from wangyum/SPARK-13933. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/823f1eef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/823f1eef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/823f1eef Branch: refs/heads/master Commit: 823f1eef580763048b08b640090519e884f29c47 Parents: eb3ea3a Author: Yuming Wang Authored: Sun Jun 11 10:05:47 2017 +0100 Committer: Sean Owen Committed: Sun Jun 11 10:05:47 2017 +0100 -- dev/deps/spark-deps-hadoop-2.7 | 6 +++--- pom.xml| 1 + 2 files changed, 4 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/823f1eef/dev/deps/spark-deps-hadoop-2.7 -- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index ab1de3d..9127413 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -47,9 +47,9 @@ commons-net-2.2.jar commons-pool-1.5.4.jar compress-lzf-1.0.3.jar core-1.1.2.jar -curator-client-2.6.0.jar -curator-framework-2.6.0.jar -curator-recipes-2.6.0.jar +curator-client-2.7.1.jar +curator-framework-2.7.1.jar +curator-recipes-2.7.1.jar datanucleus-api-jdo-3.2.6.jar datanucleus-core-3.2.10.jar datanucleus-rdbms-3.2.9.jar http://git-wip-us.apache.org/repos/asf/spark/blob/823f1eef/pom.xml -- diff --git a/pom.xml b/pom.xml index 6835ea1..5f52407 100644 --- a/pom.xml +++ b/pom.xml @@ -2532,6 +2532,7 @@ hadoop-2.7 2.7.3 +2.7.1 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-20935][STREAMING] Always close WriteAheadLog and make it idempotent
Repository: spark Updated Branches: refs/heads/master 8da3f7041 -> eb3ea3a08 [SPARK-20935][STREAMING] Always close WriteAheadLog and make it idempotent ## What changes were proposed in this pull request? This PR proposes to stop `ReceiverTracker` to close `WriteAheadLog` whenever it is and make `WriteAheadLog` and its implementations idempotent. ## How was this patch tested? Added a test in `WriteAheadLogSuite`. Note that the added test looks passing even if it closes twice (namely even without the changes in `FileBasedWriteAheadLog` and `BatchedWriteAheadLog`. It looks both are already idempotent but this is a rather sanity check. Author: hyukjinkwon Closes #18224 from HyukjinKwon/streaming-closing. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb3ea3a0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb3ea3a0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb3ea3a0 Branch: refs/heads/master Commit: eb3ea3a0831b26d3dc35a97566716b92868a7beb Parents: 8da3f70 Author: hyukjinkwon Authored: Sun Jun 11 09:54:57 2017 +0100 Committer: Sean Owen Committed: Sun Jun 11 09:54:57 2017 +0100 -- .../spark/streaming/util/WriteAheadLog.java | 2 +- .../streaming/scheduler/ReceiverTracker.scala | 27 .../streaming/util/BatchedWriteAheadLog.scala | 13 +- .../streaming/util/FileBasedWriteAheadLog.scala | 8 +++--- .../scheduler/ReceiverTrackerSuite.scala| 2 ++ .../streaming/util/WriteAheadLogSuite.scala | 2 ++ 6 files changed, 26 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eb3ea3a0/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java -- diff --git a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java index 2803cad..00c5972 100644 --- a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java +++ b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java @@ -56,7 +56,7 @@ public abstract class WriteAheadLog { public abstract void clean(long threshTime, boolean waitForCompletion); /** - * Close this log and release any resources. + * Close this log and release any resources. It must be idempotent. */ public abstract void close(); } http://git-wip-us.apache.org/repos/asf/spark/blob/eb3ea3a0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index bd7ab0b..6f130c8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -165,11 +165,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Stop the receiver execution thread. */ def stop(graceful: Boolean): Unit = synchronized { -if (isTrackerStarted) { - // First, stop the receivers - trackerState = Stopping +val isStarted: Boolean = isTrackerStarted +trackerState = Stopping +if (isStarted) { if (!skipReceiverLaunch) { -// Send the stop signal to all the receivers +// First, stop the receivers. Send the stop signal to all the receivers endpoint.askSync[Boolean](StopAllReceivers) // Wait for the Spark job that runs the receivers to be over @@ -194,17 +194,13 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false // Finally, stop the endpoint ssc.env.rpcEnv.stop(endpoint) endpoint = null - receivedBlockTracker.stop() - logInfo("ReceiverTracker stopped") - trackerState = Stopped -} else if (isTrackerInitialized) { - trackerState = Stopping - // `ReceivedBlockTracker` is open when this instance is created. We should - // close this even if this `ReceiverTracker` is not started. - receivedBlockTracker.stop() - logInfo("ReceiverTracker stopped") - trackerState = Stopped } + +// `ReceivedBlockTracker` is open when this instance is created. We should +// close this even if this `ReceiverTracker` is not started. +receivedBlockTracker.stop() +logInfo("ReceiverTracker stopped") +trackerState = Stopped } /** Allocate all unallocated blocks to the given batch. */ @@ -453,9 +449,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Bool
spark git commit: [SPARK-21000][MESOS] Add Mesos labels support to the Spark Dispatcher
Repository: spark Updated Branches: refs/heads/master dc4c35183 -> 8da3f7041 [SPARK-21000][MESOS] Add Mesos labels support to the Spark Dispatcher ## What changes were proposed in this pull request? Add Mesos labels support to the Spark Dispatcher ## How was this patch tested? unit tests Author: Michael Gummelt Closes #18220 from mgummelt/SPARK-21000-dispatcher-labels. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8da3f704 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8da3f704 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8da3f704 Branch: refs/heads/master Commit: 8da3f7041aafa71d7596b531625edb899970fec2 Parents: dc4c351 Author: Michael Gummelt Authored: Sun Jun 11 09:49:39 2017 +0100 Committer: Sean Owen Committed: Sun Jun 11 09:49:39 2017 +0100 -- docs/running-on-mesos.md| 14 +- .../org/apache/spark/deploy/mesos/config.scala | 7 +++ .../cluster/mesos/MesosClusterScheduler.scala | 10 ++-- .../MesosCoarseGrainedSchedulerBackend.scala| 28 ++- .../cluster/mesos/MesosProtoUtils.scala | 53 .../mesos/MesosClusterSchedulerSuite.scala | 27 ++ ...esosCoarseGrainedSchedulerBackendSuite.scala | 23 - .../cluster/mesos/MesosProtoUtilsSuite.scala| 48 ++ 8 files changed, 157 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8da3f704/docs/running-on-mesos.md -- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 8745e76..ec130c1 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -382,8 +382,9 @@ See the [configuration page](configuration.html) for information on Spark config (none) Set the Mesos labels to add to each task. Labels are free-form key-value pairs. -Key-value pairs should be separated by a colon, and commas used to list more than one. -Ex. key:value,key2:value2. +Key-value pairs should be separated by a colon, and commas used to +list more than one. If your label includes a colon or comma, you +can escape it with a backslash. Ex. key:value,key2:a\:b. @@ -469,6 +470,15 @@ See the [configuration page](configuration.html) for information on Spark config + spark.mesos.driver.labels + (none) + +Mesos labels to add to the driver. See spark.mesos.task.labels +for formatting information. + + + + spark.mesos.driverEnv.[EnvironmentVariableName] (none) http://git-wip-us.apache.org/repos/asf/spark/blob/8da3f704/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala -- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index 19e2533..56d697f 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -56,4 +56,11 @@ package object config { .stringConf .createOptional + private [spark] val DRIVER_LABELS = +ConfigBuilder("spark.mesos.driver.labels") + .doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value" + +"pairs should be separated by a colon, and commas used to list more than one." + +"Ex. key:value,key2:value2") + .stringConf + .createOptional } http://git-wip-us.apache.org/repos/asf/spark/blob/8da3f704/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala -- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 1bc6f71..577f9a8 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -30,11 +30,13 @@ import org.apache.mesos.Protos.Environment.Variable import org.apache.mesos.Protos.TaskStatus.Reason import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState} +import org.apache.spark.deploy.mesos.config import org.apache.spark.deploy.mesos.MesosDriverDescription import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse} import org.apach
[1/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN
Repository: spark Updated Branches: refs/heads/branch-2.2 815a0820b -> 0b0be47e7 http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/tests/fulltests/test_streaming.R -- diff --git a/R/pkg/tests/fulltests/test_streaming.R b/R/pkg/tests/fulltests/test_streaming.R new file mode 100644 index 000..b20b431 --- /dev/null +++ b/R/pkg/tests/fulltests/test_streaming.R @@ -0,0 +1,167 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +library(testthat) + +context("Structured Streaming") + +# Tests for Structured Streaming functions in SparkR + +sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) + +jsonSubDir <- file.path("sparkr-test", "json", "") +if (.Platform$OS.type == "windows") { + # file.path removes the empty separator on Windows, adds it back + jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep) +} +jsonDir <- file.path(tempdir(), jsonSubDir) +dir.create(jsonDir, recursive = TRUE) + +mockLines <- c("{\"name\":\"Michael\"}", + "{\"name\":\"Andy\", \"age\":30}", + "{\"name\":\"Justin\", \"age\":19}") +jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp") +writeLines(mockLines, jsonPath) + +mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", + "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}", + "{\"name\":\"David\",\"age\":60,\"height\":null}") +jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp") + +schema <- structType(structField("name", "string"), + structField("age", "integer"), + structField("count", "double")) + +test_that("read.stream, write.stream, awaitTermination, stopQuery", { + skip_on_cran() + + df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1) + expect_true(isStreaming(df)) + counts <- count(group_by(df, "name")) + q <- write.stream(counts, "memory", queryName = "people", outputMode = "complete") + + expect_false(awaitTermination(q, 5 * 1000)) + callJMethod(q@ssq, "processAllAvailable") + expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3) + + writeLines(mockLinesNa, jsonPathNa) + awaitTermination(q, 5 * 1000) + callJMethod(q@ssq, "processAllAvailable") + expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6) + + stopQuery(q) + expect_true(awaitTermination(q, 1)) + expect_error(awaitTermination(q), NA) +}) + +test_that("print from explain, lastProgress, status, isActive", { + skip_on_cran() + + df <- read.stream("json", path = jsonDir, schema = schema) + expect_true(isStreaming(df)) + counts <- count(group_by(df, "name")) + q <- write.stream(counts, "memory", queryName = "people2", outputMode = "complete") + + awaitTermination(q, 5 * 1000) + callJMethod(q@ssq, "processAllAvailable") + + expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==") + expect_true(any(grepl("\"description\" : \"MemorySink\"", capture.output(lastProgress(q) + expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q) + + expect_equal(queryName(q), "people2") + expect_true(isActive(q)) + + stopQuery(q) +}) + +test_that("Stream other format", { + skip_on_cran() + + parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet") + df <- read.df(jsonPath, "json", schema) + write.df(df, parquetPath, "parquet", "overwrite") + + df <- read.stream(path = parquetPath, schema = schema) + expect_true(isStreaming(df)) + counts <- count(group_by(df, "name")) + q <- write.stream(counts, "memory", queryName = "people3", outputMode = "complete") + + expect_false(awaitTermination(q, 5 * 1000)) + callJMethod(q@ssq, "processAllAvailable") + expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3) + + expect_equal(queryName(q), "people3") + expect_true(any(grepl("\"description\" : \"FileStreamSource[[:print:]]+parquet", + capture.output(lastProgress(q) + expect_true(isActive(q)) + + stopQuery(q) + expect_true(awaitTermination(q, 1)) + expect_false(isActive(q)) + + unlink(parquetPath) +}) + +test_that("Non-streaming DataFrame", { + skip_on_cran() + + c <- as.DataFrame(cars) +
[5/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R deleted file mode 100644 index c790d02..000 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ /dev/null @@ -1,3474 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -library(testthat) - -context("SparkSQL functions") - -# Utility function for easily checking the values of a StructField -checkStructField <- function(actual, expectedName, expectedType, expectedNullable) { - expect_equal(class(actual), "structField") - expect_equal(actual$name(), expectedName) - expect_equal(actual$dataType.toString(), expectedType) - expect_equal(actual$nullable(), expectedNullable) -} - -markUtf8 <- function(s) { - Encoding(s) <- "UTF-8" - s -} - -setHiveContext <- function(sc) { - if (exists(".testHiveSession", envir = .sparkREnv)) { -hiveSession <- get(".testHiveSession", envir = .sparkREnv) - } else { -# initialize once and reuse -ssc <- callJMethod(sc, "sc") -hiveCtx <- tryCatch({ - newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc, FALSE) -}, -error = function(err) { - skip("Hive is not build with SparkSQL, skipped") -}) -hiveSession <- callJMethod(hiveCtx, "sparkSession") - } - previousSession <- get(".sparkRsession", envir = .sparkREnv) - assign(".sparkRsession", hiveSession, envir = .sparkREnv) - assign(".prevSparkRsession", previousSession, envir = .sparkREnv) - hiveSession -} - -unsetHiveContext <- function() { - previousSession <- get(".prevSparkRsession", envir = .sparkREnv) - assign(".sparkRsession", previousSession, envir = .sparkREnv) - remove(".prevSparkRsession", envir = .sparkREnv) -} - -# Tests for SparkSQL functions in SparkR - -filesBefore <- list.files(path = sparkRDir, all.files = TRUE) -sparkSession <- if (not_cran_or_windows_with_hadoop()) { -sparkR.session(master = sparkRTestMaster) - } else { -sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) - } -sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) - -mockLines <- c("{\"name\":\"Michael\"}", - "{\"name\":\"Andy\", \"age\":30}", - "{\"name\":\"Justin\", \"age\":19}") -jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") -parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet") -orcPath <- tempfile(pattern = "sparkr-test", fileext = ".orc") -writeLines(mockLines, jsonPath) - -# For test nafunctions, like dropna(), fillna(),... -mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", - "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}", - "{\"name\":\"David\",\"age\":60,\"height\":null}", - "{\"name\":\"Amy\",\"age\":null,\"height\":null}", - "{\"name\":null,\"age\":null,\"height\":null}") -jsonPathNa <- tempfile(pattern = "sparkr-test", fileext = ".tmp") -writeLines(mockLinesNa, jsonPathNa) - -# For test complex types in DataFrame -mockLinesComplexType <- - c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}", -"{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}", -"{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}") -complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") -writeLines(mockLinesComplexType, complexTypeJsonPath) - -# For test map type and struct type in DataFrame -mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}", - "{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}", - "{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}") -mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") -writeLines(mockLinesMapType, mapTypeJsonPath) - -if (.Platform$OS.type == "windows") { - Sys.setenv(TZ = "GMT") -} - -test_that("calling sparkRSQL.init returns existing SQL context", { - skip_on_cran() - - sqlContext <- suppressWarnings(sparkRSQL.init(sc)) - expect
[2/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN
http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/tests/fulltests/test_sparkSQL.R -- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R new file mode 100644 index 000..d2d5191 --- /dev/null +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -0,0 +1,3198 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +library(testthat) + +context("SparkSQL functions") + +# Utility function for easily checking the values of a StructField +checkStructField <- function(actual, expectedName, expectedType, expectedNullable) { + expect_equal(class(actual), "structField") + expect_equal(actual$name(), expectedName) + expect_equal(actual$dataType.toString(), expectedType) + expect_equal(actual$nullable(), expectedNullable) +} + +markUtf8 <- function(s) { + Encoding(s) <- "UTF-8" + s +} + +setHiveContext <- function(sc) { + if (exists(".testHiveSession", envir = .sparkREnv)) { +hiveSession <- get(".testHiveSession", envir = .sparkREnv) + } else { +# initialize once and reuse +ssc <- callJMethod(sc, "sc") +hiveCtx <- tryCatch({ + newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc, FALSE) +}, +error = function(err) { + skip("Hive is not build with SparkSQL, skipped") +}) +hiveSession <- callJMethod(hiveCtx, "sparkSession") + } + previousSession <- get(".sparkRsession", envir = .sparkREnv) + assign(".sparkRsession", hiveSession, envir = .sparkREnv) + assign(".prevSparkRsession", previousSession, envir = .sparkREnv) + hiveSession +} + +unsetHiveContext <- function() { + previousSession <- get(".prevSparkRsession", envir = .sparkREnv) + assign(".sparkRsession", previousSession, envir = .sparkREnv) + remove(".prevSparkRsession", envir = .sparkREnv) +} + +# Tests for SparkSQL functions in SparkR + +filesBefore <- list.files(path = sparkRDir, all.files = TRUE) +sparkSession <- if (not_cran_or_windows_with_hadoop()) { +sparkR.session(master = sparkRTestMaster) + } else { +sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) + } +sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) + +mockLines <- c("{\"name\":\"Michael\"}", + "{\"name\":\"Andy\", \"age\":30}", + "{\"name\":\"Justin\", \"age\":19}") +jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") +parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet") +orcPath <- tempfile(pattern = "sparkr-test", fileext = ".orc") +writeLines(mockLines, jsonPath) + +# For test nafunctions, like dropna(), fillna(),... +mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", + "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}", + "{\"name\":\"David\",\"age\":60,\"height\":null}", + "{\"name\":\"Amy\",\"age\":null,\"height\":null}", + "{\"name\":null,\"age\":null,\"height\":null}") +jsonPathNa <- tempfile(pattern = "sparkr-test", fileext = ".tmp") +writeLines(mockLinesNa, jsonPathNa) + +# For test complex types in DataFrame +mockLinesComplexType <- + c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}", +"{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}", +"{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}") +complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") +writeLines(mockLinesComplexType, complexTypeJsonPath) + +# For test map type and struct type in DataFrame +mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}", + "{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}", + "{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}") +mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") +writeLines(mockLinesMapType, mapTypeJsonPath) + +if (.Platform$OS.type == "windows") { + Sys.setenv(TZ = "GMT") +} + +test_that("calling sparkRSQL.init returns existing SQL context", { + skip_on_cran() + + sqlContext <- suppressWarnings(sparkRSQL.init(sc)) + expect_equal(suppressWarni
[7/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN
[SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN ## What changes were proposed in this pull request? Move all existing tests to non-installed directory so that it will never run by installing SparkR package For a follow-up PR: - remove all skip_on_cran() calls in tests - clean up test timer - improve or change basic tests that do run on CRAN (if anyone has suggestion) It looks like `R CMD build pkg` will still put pkg\tests (ie. the full tests) into the source package but `R CMD INSTALL` on such source package does not install these tests (and so `R CMD check` does not run them) ## How was this patch tested? - [x] unit tests, Jenkins - [x] AppVeyor - [x] make a source package, install it, `R CMD check` it - verify the full tests are not installed or run Author: Felix Cheung Closes #18264 from felixcheung/rtestset. (cherry picked from commit dc4c351837879dab26ad8fb471dc51c06832a9e4) Signed-off-by: Felix Cheung Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b0be47e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b0be47e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b0be47e Branch: refs/heads/branch-2.2 Commit: 0b0be47e7b742d96810c60b19a9aa920242e5224 Parents: 815a082 Author: Felix Cheung Authored: Sun Jun 11 00:00:33 2017 -0700 Committer: Felix Cheung Committed: Sun Jun 11 00:00:45 2017 -0700 -- R/pkg/inst/tests/testthat/jarTest.R | 32 - R/pkg/inst/tests/testthat/packageInAJarTest.R | 30 - R/pkg/inst/tests/testthat/test_Serde.R | 85 - R/pkg/inst/tests/testthat/test_Windows.R| 32 - R/pkg/inst/tests/testthat/test_basic.R | 90 + R/pkg/inst/tests/testthat/test_binaryFile.R | 100 - .../inst/tests/testthat/test_binary_function.R | 110 - R/pkg/inst/tests/testthat/test_broadcast.R | 55 - R/pkg/inst/tests/testthat/test_client.R | 51 - R/pkg/inst/tests/testthat/test_context.R| 226 -- R/pkg/inst/tests/testthat/test_includePackage.R | 64 - R/pkg/inst/tests/testthat/test_jvm_api.R| 36 - .../tests/testthat/test_mllib_classification.R | 396 --- .../inst/tests/testthat/test_mllib_clustering.R | 328 -- R/pkg/inst/tests/testthat/test_mllib_fpm.R | 85 - .../tests/testthat/test_mllib_recommendation.R | 67 - .../inst/tests/testthat/test_mllib_regression.R | 480 --- R/pkg/inst/tests/testthat/test_mllib_stat.R | 53 - R/pkg/inst/tests/testthat/test_mllib_tree.R | 226 -- .../tests/testthat/test_parallelize_collect.R | 120 - R/pkg/inst/tests/testthat/test_rdd.R| 906 - R/pkg/inst/tests/testthat/test_shuffle.R| 248 -- R/pkg/inst/tests/testthat/test_sparkR.R | 48 - R/pkg/inst/tests/testthat/test_sparkSQL.R | 3198 -- R/pkg/inst/tests/testthat/test_streaming.R | 167 - R/pkg/inst/tests/testthat/test_take.R | 71 - R/pkg/inst/tests/testthat/test_textFile.R | 182 - R/pkg/inst/tests/testthat/test_utils.R | 247 -- R/pkg/tests/fulltests/jarTest.R | 32 + R/pkg/tests/fulltests/packageInAJarTest.R | 30 + R/pkg/tests/fulltests/test_Serde.R | 85 + R/pkg/tests/fulltests/test_Windows.R| 32 + R/pkg/tests/fulltests/test_binaryFile.R | 100 + R/pkg/tests/fulltests/test_binary_function.R| 110 + R/pkg/tests/fulltests/test_broadcast.R | 55 + R/pkg/tests/fulltests/test_client.R | 51 + R/pkg/tests/fulltests/test_context.R| 226 ++ R/pkg/tests/fulltests/test_includePackage.R | 64 + R/pkg/tests/fulltests/test_jvm_api.R| 36 + .../tests/fulltests/test_mllib_classification.R | 396 +++ R/pkg/tests/fulltests/test_mllib_clustering.R | 328 ++ R/pkg/tests/fulltests/test_mllib_fpm.R | 85 + .../tests/fulltests/test_mllib_recommendation.R | 67 + R/pkg/tests/fulltests/test_mllib_regression.R | 480 +++ R/pkg/tests/fulltests/test_mllib_stat.R | 53 + R/pkg/tests/fulltests/test_mllib_tree.R | 226 ++ .../tests/fulltests/test_parallelize_collect.R | 120 + R/pkg/tests/fulltests/test_rdd.R| 906 + R/pkg/tests/fulltests/test_shuffle.R| 248 ++ R/pkg/tests/fulltests/test_sparkR.R | 48 + R/pkg/tests/fulltests/test_sparkSQL.R | 3198 ++ R/pkg/tests/fulltests/test_streaming.R | 167 + R/pkg/tests/fulltests/test_take.R | 71 + R/pkg/tests/fulltests/test_textFile.R | 182 + R/pkg/tests/fulltests/test_utils.R | 247 ++ R/pkg/tests/run-all.R |8 + 56 files changed, 7741 insertions(+), 7643 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/bl
[6/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/inst/tests/testthat/test_mllib_regression.R -- diff --git a/R/pkg/inst/tests/testthat/test_mllib_regression.R b/R/pkg/inst/tests/testthat/test_mllib_regression.R deleted file mode 100644 index b05fdd3..000 --- a/R/pkg/inst/tests/testthat/test_mllib_regression.R +++ /dev/null @@ -1,480 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -library(testthat) - -context("MLlib regression algorithms, except for tree-based algorithms") - -# Tests for MLlib regression algorithms in SparkR -sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) - -test_that("formula of spark.glm", { - skip_on_cran() - - training <- suppressWarnings(createDataFrame(iris)) - # directly calling the spark API - # dot minus and intercept vs native glm - model <- spark.glm(training, Sepal_Width ~ . - Species + 0) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # feature interaction vs native glm - model <- spark.glm(training, Sepal_Width ~ Species:Sepal_Length) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # glm should work with long formula - training <- suppressWarnings(createDataFrame(iris)) - training$LongLongLongLongLongName <- training$Sepal_Width - training$VeryLongLongLongLonLongName <- training$Sepal_Length - training$AnotherLongLongLongLongName <- training$Species - model <- spark.glm(training, LongLongLongLongLongName ~ VeryLongLongLongLonLongName + -AnotherLongLongLongLongName) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) -}) - -test_that("spark.glm and predict", { - training <- suppressWarnings(createDataFrame(iris)) - # gaussian family - model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species) - prediction <- predict(model, training) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") - vals <- collect(select(prediction, "prediction")) - rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # poisson family - model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species, - family = poisson(link = identity)) - prediction <- predict(model, training) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") - vals <- collect(select(prediction, "prediction")) - rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species, -data = iris, family = poisson(link = identity)), iris)) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # Gamma family - x <- runif(100, -1, 1) - y <- rgamma(100, rate = 10 / exp(0.5 + 1.2 * x), shape = 10) - df <- as.DataFrame(as.data.frame(list(x = x, y = y))) - model <- glm(y ~ x, family = Gamma, df) - out <- capture.output(print(summary(model))) - expect_true(any(grepl("Dispersion parameter for gamma family", out))) - - # tweedie family - model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species, - family = "tweedie", var.power = 1.2, link.power = 0.0) - prediction <- predict(model, training) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") - vals <- collect(select(prediction, "prediction")) - - # manual calculation of the R predicted values to avoid dependence on statmod - #' library(statmod) - #' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris, - #' family = tweedie(var.power = 1.2, link.power = 0.0)) - #' print(coef(rModel)) - - rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174) - rVals <- exp(as.numeric(model.
[5/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN
http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/inst/tests/testthat/test_sparkSQL.R -- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R deleted file mode 100644 index d2d5191..000 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ /dev/null @@ -1,3198 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -library(testthat) - -context("SparkSQL functions") - -# Utility function for easily checking the values of a StructField -checkStructField <- function(actual, expectedName, expectedType, expectedNullable) { - expect_equal(class(actual), "structField") - expect_equal(actual$name(), expectedName) - expect_equal(actual$dataType.toString(), expectedType) - expect_equal(actual$nullable(), expectedNullable) -} - -markUtf8 <- function(s) { - Encoding(s) <- "UTF-8" - s -} - -setHiveContext <- function(sc) { - if (exists(".testHiveSession", envir = .sparkREnv)) { -hiveSession <- get(".testHiveSession", envir = .sparkREnv) - } else { -# initialize once and reuse -ssc <- callJMethod(sc, "sc") -hiveCtx <- tryCatch({ - newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc, FALSE) -}, -error = function(err) { - skip("Hive is not build with SparkSQL, skipped") -}) -hiveSession <- callJMethod(hiveCtx, "sparkSession") - } - previousSession <- get(".sparkRsession", envir = .sparkREnv) - assign(".sparkRsession", hiveSession, envir = .sparkREnv) - assign(".prevSparkRsession", previousSession, envir = .sparkREnv) - hiveSession -} - -unsetHiveContext <- function() { - previousSession <- get(".prevSparkRsession", envir = .sparkREnv) - assign(".sparkRsession", previousSession, envir = .sparkREnv) - remove(".prevSparkRsession", envir = .sparkREnv) -} - -# Tests for SparkSQL functions in SparkR - -filesBefore <- list.files(path = sparkRDir, all.files = TRUE) -sparkSession <- if (not_cran_or_windows_with_hadoop()) { -sparkR.session(master = sparkRTestMaster) - } else { -sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) - } -sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) - -mockLines <- c("{\"name\":\"Michael\"}", - "{\"name\":\"Andy\", \"age\":30}", - "{\"name\":\"Justin\", \"age\":19}") -jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") -parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet") -orcPath <- tempfile(pattern = "sparkr-test", fileext = ".orc") -writeLines(mockLines, jsonPath) - -# For test nafunctions, like dropna(), fillna(),... -mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", - "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}", - "{\"name\":\"David\",\"age\":60,\"height\":null}", - "{\"name\":\"Amy\",\"age\":null,\"height\":null}", - "{\"name\":null,\"age\":null,\"height\":null}") -jsonPathNa <- tempfile(pattern = "sparkr-test", fileext = ".tmp") -writeLines(mockLinesNa, jsonPathNa) - -# For test complex types in DataFrame -mockLinesComplexType <- - c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}", -"{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}", -"{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}") -complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") -writeLines(mockLinesComplexType, complexTypeJsonPath) - -# For test map type and struct type in DataFrame -mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}", - "{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}", - "{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}") -mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") -writeLines(mockLinesMapType, mapTypeJsonPath) - -if (.Platform$OS.type == "windows") { - Sys.setenv(TZ = "GMT") -} - -test_that("calling sparkRSQL.init returns existing SQL context", { - skip_on_cran() - - sqlContext <- suppressWarnings(sparkRSQL.init(sc)) - expect
[7/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN
[SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN ## What changes were proposed in this pull request? Move all existing tests to non-installed directory so that it will never run by installing SparkR package For a follow-up PR: - remove all skip_on_cran() calls in tests - clean up test timer - improve or change basic tests that do run on CRAN (if anyone has suggestion) It looks like `R CMD build pkg` will still put pkg\tests (ie. the full tests) into the source package but `R CMD INSTALL` on such source package does not install these tests (and so `R CMD check` does not run them) ## How was this patch tested? - [x] unit tests, Jenkins - [x] AppVeyor - [x] make a source package, install it, `R CMD check` it - verify the full tests are not installed or run Author: Felix Cheung Closes #18264 from felixcheung/rtestset. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc4c3518 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc4c3518 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc4c3518 Branch: refs/heads/master Commit: dc4c351837879dab26ad8fb471dc51c06832a9e4 Parents: 5301a19 Author: Felix Cheung Authored: Sun Jun 11 00:00:33 2017 -0700 Committer: Felix Cheung Committed: Sun Jun 11 00:00:33 2017 -0700 -- R/pkg/inst/tests/testthat/jarTest.R | 32 - R/pkg/inst/tests/testthat/packageInAJarTest.R | 30 - R/pkg/inst/tests/testthat/test_Serde.R | 85 - R/pkg/inst/tests/testthat/test_Windows.R| 32 - R/pkg/inst/tests/testthat/test_basic.R | 90 + R/pkg/inst/tests/testthat/test_binaryFile.R | 100 - .../inst/tests/testthat/test_binary_function.R | 110 - R/pkg/inst/tests/testthat/test_broadcast.R | 55 - R/pkg/inst/tests/testthat/test_client.R | 51 - R/pkg/inst/tests/testthat/test_context.R| 226 -- R/pkg/inst/tests/testthat/test_includePackage.R | 64 - R/pkg/inst/tests/testthat/test_jvm_api.R| 36 - .../tests/testthat/test_mllib_classification.R | 396 -- .../inst/tests/testthat/test_mllib_clustering.R | 328 -- R/pkg/inst/tests/testthat/test_mllib_fpm.R | 85 - .../tests/testthat/test_mllib_recommendation.R | 67 - .../inst/tests/testthat/test_mllib_regression.R | 480 --- R/pkg/inst/tests/testthat/test_mllib_stat.R | 53 - R/pkg/inst/tests/testthat/test_mllib_tree.R | 320 -- .../tests/testthat/test_parallelize_collect.R | 120 - R/pkg/inst/tests/testthat/test_rdd.R| 906 - R/pkg/inst/tests/testthat/test_shuffle.R| 248 -- R/pkg/inst/tests/testthat/test_sparkR.R | 48 - R/pkg/inst/tests/testthat/test_sparkSQL.R | 3474 -- R/pkg/inst/tests/testthat/test_streaming.R | 167 - R/pkg/inst/tests/testthat/test_take.R | 71 - R/pkg/inst/tests/testthat/test_textFile.R | 182 - R/pkg/inst/tests/testthat/test_utils.R | 248 -- R/pkg/tests/fulltests/jarTest.R | 32 + R/pkg/tests/fulltests/packageInAJarTest.R | 30 + R/pkg/tests/fulltests/test_Serde.R | 85 + R/pkg/tests/fulltests/test_Windows.R| 32 + R/pkg/tests/fulltests/test_binaryFile.R | 100 + R/pkg/tests/fulltests/test_binary_function.R| 110 + R/pkg/tests/fulltests/test_broadcast.R | 55 + R/pkg/tests/fulltests/test_client.R | 51 + R/pkg/tests/fulltests/test_context.R| 226 ++ R/pkg/tests/fulltests/test_includePackage.R | 64 + R/pkg/tests/fulltests/test_jvm_api.R| 36 + .../tests/fulltests/test_mllib_classification.R | 396 ++ R/pkg/tests/fulltests/test_mllib_clustering.R | 328 ++ R/pkg/tests/fulltests/test_mllib_fpm.R | 85 + .../tests/fulltests/test_mllib_recommendation.R | 67 + R/pkg/tests/fulltests/test_mllib_regression.R | 480 +++ R/pkg/tests/fulltests/test_mllib_stat.R | 53 + R/pkg/tests/fulltests/test_mllib_tree.R | 320 ++ .../tests/fulltests/test_parallelize_collect.R | 120 + R/pkg/tests/fulltests/test_rdd.R| 906 + R/pkg/tests/fulltests/test_shuffle.R| 248 ++ R/pkg/tests/fulltests/test_sparkR.R | 48 + R/pkg/tests/fulltests/test_sparkSQL.R | 3474 ++ R/pkg/tests/fulltests/test_streaming.R | 167 + R/pkg/tests/fulltests/test_take.R | 71 + R/pkg/tests/fulltests/test_textFile.R | 182 + R/pkg/tests/fulltests/test_utils.R | 248 ++ R/pkg/tests/run-all.R |8 + 56 files changed, 8112 insertions(+), 8014 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/inst/tests/testthat/jarTest.R -
[4/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN
http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/inst/tests/testthat/test_streaming.R -- diff --git a/R/pkg/inst/tests/testthat/test_streaming.R b/R/pkg/inst/tests/testthat/test_streaming.R deleted file mode 100644 index b20b431..000 --- a/R/pkg/inst/tests/testthat/test_streaming.R +++ /dev/null @@ -1,167 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -library(testthat) - -context("Structured Streaming") - -# Tests for Structured Streaming functions in SparkR - -sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) - -jsonSubDir <- file.path("sparkr-test", "json", "") -if (.Platform$OS.type == "windows") { - # file.path removes the empty separator on Windows, adds it back - jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep) -} -jsonDir <- file.path(tempdir(), jsonSubDir) -dir.create(jsonDir, recursive = TRUE) - -mockLines <- c("{\"name\":\"Michael\"}", - "{\"name\":\"Andy\", \"age\":30}", - "{\"name\":\"Justin\", \"age\":19}") -jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp") -writeLines(mockLines, jsonPath) - -mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", - "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}", - "{\"name\":\"David\",\"age\":60,\"height\":null}") -jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp") - -schema <- structType(structField("name", "string"), - structField("age", "integer"), - structField("count", "double")) - -test_that("read.stream, write.stream, awaitTermination, stopQuery", { - skip_on_cran() - - df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1) - expect_true(isStreaming(df)) - counts <- count(group_by(df, "name")) - q <- write.stream(counts, "memory", queryName = "people", outputMode = "complete") - - expect_false(awaitTermination(q, 5 * 1000)) - callJMethod(q@ssq, "processAllAvailable") - expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3) - - writeLines(mockLinesNa, jsonPathNa) - awaitTermination(q, 5 * 1000) - callJMethod(q@ssq, "processAllAvailable") - expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6) - - stopQuery(q) - expect_true(awaitTermination(q, 1)) - expect_error(awaitTermination(q), NA) -}) - -test_that("print from explain, lastProgress, status, isActive", { - skip_on_cran() - - df <- read.stream("json", path = jsonDir, schema = schema) - expect_true(isStreaming(df)) - counts <- count(group_by(df, "name")) - q <- write.stream(counts, "memory", queryName = "people2", outputMode = "complete") - - awaitTermination(q, 5 * 1000) - callJMethod(q@ssq, "processAllAvailable") - - expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==") - expect_true(any(grepl("\"description\" : \"MemorySink\"", capture.output(lastProgress(q) - expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q) - - expect_equal(queryName(q), "people2") - expect_true(isActive(q)) - - stopQuery(q) -}) - -test_that("Stream other format", { - skip_on_cran() - - parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet") - df <- read.df(jsonPath, "json", schema) - write.df(df, parquetPath, "parquet", "overwrite") - - df <- read.stream(path = parquetPath, schema = schema) - expect_true(isStreaming(df)) - counts <- count(group_by(df, "name")) - q <- write.stream(counts, "memory", queryName = "people3", outputMode = "complete") - - expect_false(awaitTermination(q, 5 * 1000)) - callJMethod(q@ssq, "processAllAvailable") - expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3) - - expect_equal(queryName(q), "people3") - expect_true(any(grepl("\"description\" : \"FileStreamSource[[:print:]]+parquet", - capture.output(lastProgress(q) - expect_true(isActive(q)) - - stopQuery(q) - expect_true(awaitTermination(q, 1)) - expect_false(isActive(q)) - - unlink(parquetPath) -}) - -test_that("Non-streaming DataFrame", { - skip_on_cran() - - c <- as.DataFrame(cars) - expect_false(isStreaming(c)) - - expect_error(write.stream(c, "
[4/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/inst/tests/testthat/test_streaming.R -- diff --git a/R/pkg/inst/tests/testthat/test_streaming.R b/R/pkg/inst/tests/testthat/test_streaming.R deleted file mode 100644 index b20b431..000 --- a/R/pkg/inst/tests/testthat/test_streaming.R +++ /dev/null @@ -1,167 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -library(testthat) - -context("Structured Streaming") - -# Tests for Structured Streaming functions in SparkR - -sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) - -jsonSubDir <- file.path("sparkr-test", "json", "") -if (.Platform$OS.type == "windows") { - # file.path removes the empty separator on Windows, adds it back - jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep) -} -jsonDir <- file.path(tempdir(), jsonSubDir) -dir.create(jsonDir, recursive = TRUE) - -mockLines <- c("{\"name\":\"Michael\"}", - "{\"name\":\"Andy\", \"age\":30}", - "{\"name\":\"Justin\", \"age\":19}") -jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp") -writeLines(mockLines, jsonPath) - -mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", - "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}", - "{\"name\":\"David\",\"age\":60,\"height\":null}") -jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp") - -schema <- structType(structField("name", "string"), - structField("age", "integer"), - structField("count", "double")) - -test_that("read.stream, write.stream, awaitTermination, stopQuery", { - skip_on_cran() - - df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1) - expect_true(isStreaming(df)) - counts <- count(group_by(df, "name")) - q <- write.stream(counts, "memory", queryName = "people", outputMode = "complete") - - expect_false(awaitTermination(q, 5 * 1000)) - callJMethod(q@ssq, "processAllAvailable") - expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3) - - writeLines(mockLinesNa, jsonPathNa) - awaitTermination(q, 5 * 1000) - callJMethod(q@ssq, "processAllAvailable") - expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6) - - stopQuery(q) - expect_true(awaitTermination(q, 1)) - expect_error(awaitTermination(q), NA) -}) - -test_that("print from explain, lastProgress, status, isActive", { - skip_on_cran() - - df <- read.stream("json", path = jsonDir, schema = schema) - expect_true(isStreaming(df)) - counts <- count(group_by(df, "name")) - q <- write.stream(counts, "memory", queryName = "people2", outputMode = "complete") - - awaitTermination(q, 5 * 1000) - callJMethod(q@ssq, "processAllAvailable") - - expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==") - expect_true(any(grepl("\"description\" : \"MemorySink\"", capture.output(lastProgress(q) - expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q) - - expect_equal(queryName(q), "people2") - expect_true(isActive(q)) - - stopQuery(q) -}) - -test_that("Stream other format", { - skip_on_cran() - - parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet") - df <- read.df(jsonPath, "json", schema) - write.df(df, parquetPath, "parquet", "overwrite") - - df <- read.stream(path = parquetPath, schema = schema) - expect_true(isStreaming(df)) - counts <- count(group_by(df, "name")) - q <- write.stream(counts, "memory", queryName = "people3", outputMode = "complete") - - expect_false(awaitTermination(q, 5 * 1000)) - callJMethod(q@ssq, "processAllAvailable") - expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3) - - expect_equal(queryName(q), "people3") - expect_true(any(grepl("\"description\" : \"FileStreamSource[[:print:]]+parquet", - capture.output(lastProgress(q) - expect_true(isActive(q)) - - stopQuery(q) - expect_true(awaitTermination(q, 1)) - expect_false(isActive(q)) - - unlink(parquetPath) -}) - -test_that("Non-streaming DataFrame", { - skip_on_cran() - - c <- as.DataFrame(cars) - expect_false(isStreaming(c)) - - expect_error(write.stream(c, "
[1/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN
Repository: spark Updated Branches: refs/heads/master 5301a19a0 -> dc4c35183 http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_streaming.R -- diff --git a/R/pkg/tests/fulltests/test_streaming.R b/R/pkg/tests/fulltests/test_streaming.R new file mode 100644 index 000..b20b431 --- /dev/null +++ b/R/pkg/tests/fulltests/test_streaming.R @@ -0,0 +1,167 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +library(testthat) + +context("Structured Streaming") + +# Tests for Structured Streaming functions in SparkR + +sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) + +jsonSubDir <- file.path("sparkr-test", "json", "") +if (.Platform$OS.type == "windows") { + # file.path removes the empty separator on Windows, adds it back + jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep) +} +jsonDir <- file.path(tempdir(), jsonSubDir) +dir.create(jsonDir, recursive = TRUE) + +mockLines <- c("{\"name\":\"Michael\"}", + "{\"name\":\"Andy\", \"age\":30}", + "{\"name\":\"Justin\", \"age\":19}") +jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp") +writeLines(mockLines, jsonPath) + +mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", + "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}", + "{\"name\":\"David\",\"age\":60,\"height\":null}") +jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp") + +schema <- structType(structField("name", "string"), + structField("age", "integer"), + structField("count", "double")) + +test_that("read.stream, write.stream, awaitTermination, stopQuery", { + skip_on_cran() + + df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1) + expect_true(isStreaming(df)) + counts <- count(group_by(df, "name")) + q <- write.stream(counts, "memory", queryName = "people", outputMode = "complete") + + expect_false(awaitTermination(q, 5 * 1000)) + callJMethod(q@ssq, "processAllAvailable") + expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3) + + writeLines(mockLinesNa, jsonPathNa) + awaitTermination(q, 5 * 1000) + callJMethod(q@ssq, "processAllAvailable") + expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6) + + stopQuery(q) + expect_true(awaitTermination(q, 1)) + expect_error(awaitTermination(q), NA) +}) + +test_that("print from explain, lastProgress, status, isActive", { + skip_on_cran() + + df <- read.stream("json", path = jsonDir, schema = schema) + expect_true(isStreaming(df)) + counts <- count(group_by(df, "name")) + q <- write.stream(counts, "memory", queryName = "people2", outputMode = "complete") + + awaitTermination(q, 5 * 1000) + callJMethod(q@ssq, "processAllAvailable") + + expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==") + expect_true(any(grepl("\"description\" : \"MemorySink\"", capture.output(lastProgress(q) + expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q) + + expect_equal(queryName(q), "people2") + expect_true(isActive(q)) + + stopQuery(q) +}) + +test_that("Stream other format", { + skip_on_cran() + + parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet") + df <- read.df(jsonPath, "json", schema) + write.df(df, parquetPath, "parquet", "overwrite") + + df <- read.stream(path = parquetPath, schema = schema) + expect_true(isStreaming(df)) + counts <- count(group_by(df, "name")) + q <- write.stream(counts, "memory", queryName = "people3", outputMode = "complete") + + expect_false(awaitTermination(q, 5 * 1000)) + callJMethod(q@ssq, "processAllAvailable") + expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3) + + expect_equal(queryName(q), "people3") + expect_true(any(grepl("\"description\" : \"FileStreamSource[[:print:]]+parquet", + capture.output(lastProgress(q) + expect_true(isActive(q)) + + stopQuery(q) + expect_true(awaitTermination(q, 1)) + expect_false(isActive(q)) + + unlink(parquetPath) +}) + +test_that("Non-streaming DataFrame", { + skip_on_cran() + + c <- as.DataFrame(cars) + exp
[2/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_sparkSQL.R -- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R new file mode 100644 index 000..c790d02 --- /dev/null +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -0,0 +1,3474 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +library(testthat) + +context("SparkSQL functions") + +# Utility function for easily checking the values of a StructField +checkStructField <- function(actual, expectedName, expectedType, expectedNullable) { + expect_equal(class(actual), "structField") + expect_equal(actual$name(), expectedName) + expect_equal(actual$dataType.toString(), expectedType) + expect_equal(actual$nullable(), expectedNullable) +} + +markUtf8 <- function(s) { + Encoding(s) <- "UTF-8" + s +} + +setHiveContext <- function(sc) { + if (exists(".testHiveSession", envir = .sparkREnv)) { +hiveSession <- get(".testHiveSession", envir = .sparkREnv) + } else { +# initialize once and reuse +ssc <- callJMethod(sc, "sc") +hiveCtx <- tryCatch({ + newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc, FALSE) +}, +error = function(err) { + skip("Hive is not build with SparkSQL, skipped") +}) +hiveSession <- callJMethod(hiveCtx, "sparkSession") + } + previousSession <- get(".sparkRsession", envir = .sparkREnv) + assign(".sparkRsession", hiveSession, envir = .sparkREnv) + assign(".prevSparkRsession", previousSession, envir = .sparkREnv) + hiveSession +} + +unsetHiveContext <- function() { + previousSession <- get(".prevSparkRsession", envir = .sparkREnv) + assign(".sparkRsession", previousSession, envir = .sparkREnv) + remove(".prevSparkRsession", envir = .sparkREnv) +} + +# Tests for SparkSQL functions in SparkR + +filesBefore <- list.files(path = sparkRDir, all.files = TRUE) +sparkSession <- if (not_cran_or_windows_with_hadoop()) { +sparkR.session(master = sparkRTestMaster) + } else { +sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) + } +sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) + +mockLines <- c("{\"name\":\"Michael\"}", + "{\"name\":\"Andy\", \"age\":30}", + "{\"name\":\"Justin\", \"age\":19}") +jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") +parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet") +orcPath <- tempfile(pattern = "sparkr-test", fileext = ".orc") +writeLines(mockLines, jsonPath) + +# For test nafunctions, like dropna(), fillna(),... +mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", + "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}", + "{\"name\":\"David\",\"age\":60,\"height\":null}", + "{\"name\":\"Amy\",\"age\":null,\"height\":null}", + "{\"name\":null,\"age\":null,\"height\":null}") +jsonPathNa <- tempfile(pattern = "sparkr-test", fileext = ".tmp") +writeLines(mockLinesNa, jsonPathNa) + +# For test complex types in DataFrame +mockLinesComplexType <- + c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}", +"{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}", +"{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}") +complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") +writeLines(mockLinesComplexType, complexTypeJsonPath) + +# For test map type and struct type in DataFrame +mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}", + "{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}", + "{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}") +mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") +writeLines(mockLinesMapType, mapTypeJsonPath) + +if (.Platform$OS.type == "windows") { + Sys.setenv(TZ = "GMT") +} + +test_that("calling sparkRSQL.init returns existing SQL context", { + skip_on_cran() + + sqlContext <- suppressWarnings(sparkRSQL.init(sc)) + expect_equal(suppressWarni
[3/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN
http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/tests/fulltests/test_mllib_fpm.R -- diff --git a/R/pkg/tests/fulltests/test_mllib_fpm.R b/R/pkg/tests/fulltests/test_mllib_fpm.R new file mode 100644 index 000..4e10ca1 --- /dev/null +++ b/R/pkg/tests/fulltests/test_mllib_fpm.R @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +library(testthat) + +context("MLlib frequent pattern mining") + +# Tests for MLlib frequent pattern mining algorithms in SparkR +sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) + +test_that("spark.fpGrowth", { + data <- selectExpr(createDataFrame(data.frame(items = c( +"1,2", +"1,2", +"1,2,3", +"1,3" + ))), "split(items, ',') as items") + + model <- spark.fpGrowth(data, minSupport = 0.3, minConfidence = 0.8, numPartitions = 1) + + itemsets <- collect(spark.freqItemsets(model)) + + expected_itemsets <- data.frame( +items = I(list(list("3"), list("3", "1"), list("2"), list("2", "1"), list("1"))), +freq = c(2, 2, 3, 3, 4) + ) + + expect_equivalent(expected_itemsets, itemsets) + + expected_association_rules <- data.frame( +antecedent = I(list(list("2"), list("3"))), +consequent = I(list(list("1"), list("1"))), +confidence = c(1, 1) + ) + + expect_equivalent(expected_association_rules, collect(spark.associationRules(model))) + + new_data <- selectExpr(createDataFrame(data.frame(items = c( +"1,2", +"1,3", +"2,3" + ))), "split(items, ',') as items") + + expected_predictions <- data.frame( +items = I(list(list("1", "2"), list("1", "3"), list("2", "3"))), +prediction = I(list(list(), list(), list("1"))) + ) + + expect_equivalent(expected_predictions, collect(predict(model, new_data))) + + if (not_cran_or_windows_with_hadoop()) { +modelPath <- tempfile(pattern = "spark-fpm", fileext = ".tmp") +write.ml(model, modelPath, overwrite = TRUE) +loaded_model <- read.ml(modelPath) + +expect_equivalent( + itemsets, + collect(spark.freqItemsets(loaded_model))) + +unlink(modelPath) + } + + model_without_numpartitions <- spark.fpGrowth(data, minSupport = 0.3, minConfidence = 0.8) + expect_equal( +count(spark.freqItemsets(model_without_numpartitions)), +count(spark.freqItemsets(model)) + ) + +}) + +sparkR.session.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/tests/fulltests/test_mllib_recommendation.R -- diff --git a/R/pkg/tests/fulltests/test_mllib_recommendation.R b/R/pkg/tests/fulltests/test_mllib_recommendation.R new file mode 100644 index 000..cc8064f --- /dev/null +++ b/R/pkg/tests/fulltests/test_mllib_recommendation.R @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +library(testthat) + +context("MLlib recommendation algorithms") + +# Tests for MLlib recommendation algorithms in SparkR +sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) + +test_that("spark.als", { + data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0), + list(2, 1, 1.0), list(2, 2, 5.0)) + df <- createDataFrame(data, c("user", "item", "score")) + model <- spark.als(df, ratingCol = "score", userCol = "user", itemCol = "item", + rank = 10, maxIter = 5, seed = 0, regParam = 0.1) + stats <-
[3/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_mllib_fpm.R -- diff --git a/R/pkg/tests/fulltests/test_mllib_fpm.R b/R/pkg/tests/fulltests/test_mllib_fpm.R new file mode 100644 index 000..4e10ca1 --- /dev/null +++ b/R/pkg/tests/fulltests/test_mllib_fpm.R @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +library(testthat) + +context("MLlib frequent pattern mining") + +# Tests for MLlib frequent pattern mining algorithms in SparkR +sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) + +test_that("spark.fpGrowth", { + data <- selectExpr(createDataFrame(data.frame(items = c( +"1,2", +"1,2", +"1,2,3", +"1,3" + ))), "split(items, ',') as items") + + model <- spark.fpGrowth(data, minSupport = 0.3, minConfidence = 0.8, numPartitions = 1) + + itemsets <- collect(spark.freqItemsets(model)) + + expected_itemsets <- data.frame( +items = I(list(list("3"), list("3", "1"), list("2"), list("2", "1"), list("1"))), +freq = c(2, 2, 3, 3, 4) + ) + + expect_equivalent(expected_itemsets, itemsets) + + expected_association_rules <- data.frame( +antecedent = I(list(list("2"), list("3"))), +consequent = I(list(list("1"), list("1"))), +confidence = c(1, 1) + ) + + expect_equivalent(expected_association_rules, collect(spark.associationRules(model))) + + new_data <- selectExpr(createDataFrame(data.frame(items = c( +"1,2", +"1,3", +"2,3" + ))), "split(items, ',') as items") + + expected_predictions <- data.frame( +items = I(list(list("1", "2"), list("1", "3"), list("2", "3"))), +prediction = I(list(list(), list(), list("1"))) + ) + + expect_equivalent(expected_predictions, collect(predict(model, new_data))) + + if (not_cran_or_windows_with_hadoop()) { +modelPath <- tempfile(pattern = "spark-fpm", fileext = ".tmp") +write.ml(model, modelPath, overwrite = TRUE) +loaded_model <- read.ml(modelPath) + +expect_equivalent( + itemsets, + collect(spark.freqItemsets(loaded_model))) + +unlink(modelPath) + } + + model_without_numpartitions <- spark.fpGrowth(data, minSupport = 0.3, minConfidence = 0.8) + expect_equal( +count(spark.freqItemsets(model_without_numpartitions)), +count(spark.freqItemsets(model)) + ) + +}) + +sparkR.session.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_mllib_recommendation.R -- diff --git a/R/pkg/tests/fulltests/test_mllib_recommendation.R b/R/pkg/tests/fulltests/test_mllib_recommendation.R new file mode 100644 index 000..cc8064f --- /dev/null +++ b/R/pkg/tests/fulltests/test_mllib_recommendation.R @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +library(testthat) + +context("MLlib recommendation algorithms") + +# Tests for MLlib recommendation algorithms in SparkR +sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) + +test_that("spark.als", { + data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0), + list(2, 1, 1.0), list(2, 2, 5.0)) + df <- createDataFrame(data, c("user", "item", "score")) + model <- spark.als(df, ratingCol = "score", userCol = "user", itemCol = "item", + rank = 10, maxIter = 5, seed = 0, regParam = 0.1) + stats <-
[6/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN
http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/inst/tests/testthat/test_mllib_regression.R -- diff --git a/R/pkg/inst/tests/testthat/test_mllib_regression.R b/R/pkg/inst/tests/testthat/test_mllib_regression.R deleted file mode 100644 index b05fdd3..000 --- a/R/pkg/inst/tests/testthat/test_mllib_regression.R +++ /dev/null @@ -1,480 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -library(testthat) - -context("MLlib regression algorithms, except for tree-based algorithms") - -# Tests for MLlib regression algorithms in SparkR -sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) - -test_that("formula of spark.glm", { - skip_on_cran() - - training <- suppressWarnings(createDataFrame(iris)) - # directly calling the spark API - # dot minus and intercept vs native glm - model <- spark.glm(training, Sepal_Width ~ . - Species + 0) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # feature interaction vs native glm - model <- spark.glm(training, Sepal_Width ~ Species:Sepal_Length) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # glm should work with long formula - training <- suppressWarnings(createDataFrame(iris)) - training$LongLongLongLongLongName <- training$Sepal_Width - training$VeryLongLongLongLonLongName <- training$Sepal_Length - training$AnotherLongLongLongLongName <- training$Species - model <- spark.glm(training, LongLongLongLongLongName ~ VeryLongLongLongLonLongName + -AnotherLongLongLongLongName) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) -}) - -test_that("spark.glm and predict", { - training <- suppressWarnings(createDataFrame(iris)) - # gaussian family - model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species) - prediction <- predict(model, training) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") - vals <- collect(select(prediction, "prediction")) - rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # poisson family - model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species, - family = poisson(link = identity)) - prediction <- predict(model, training) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") - vals <- collect(select(prediction, "prediction")) - rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species, -data = iris, family = poisson(link = identity)), iris)) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # Gamma family - x <- runif(100, -1, 1) - y <- rgamma(100, rate = 10 / exp(0.5 + 1.2 * x), shape = 10) - df <- as.DataFrame(as.data.frame(list(x = x, y = y))) - model <- glm(y ~ x, family = Gamma, df) - out <- capture.output(print(summary(model))) - expect_true(any(grepl("Dispersion parameter for gamma family", out))) - - # tweedie family - model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species, - family = "tweedie", var.power = 1.2, link.power = 0.0) - prediction <- predict(model, training) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") - vals <- collect(select(prediction, "prediction")) - - # manual calculation of the R predicted values to avoid dependence on statmod - #' library(statmod) - #' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris, - #' family = tweedie(var.power = 1.2, link.power = 0.0)) - #' print(coef(rModel)) - - rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174) - rVals <- exp(as.numeric(model.