spark git commit: [SPARK-20665][SQL][FOLLOW-UP] Move test case to MathExpressionsSuite

2017-06-11 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 3476390c6 -> d14091809


[SPARK-20665][SQL][FOLLOW-UP] Move test case to MathExpressionsSuite

## What changes were proposed in this pull request?

 add test case to MathExpressionsSuite as #17906

## How was this patch tested?

unit test cases

Author: liuxian 

Closes #18082 from 10110346/wip-lx-0524.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1409180
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1409180
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1409180

Branch: refs/heads/master
Commit: d1409180932f2658daad2c6dbf5d80fdf4606dc5
Parents: 3476390
Author: liuxian 
Authored: Sun Jun 11 22:29:09 2017 -0700
Committer: Xiao Li 
Committed: Sun Jun 11 22:29:09 2017 -0700

--
 .../expressions/MathExpressionsSuite.scala  | 64 
 1 file changed, 52 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d1409180/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala
index 6af0cde..f4d5a44 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathExpressionsSuite.scala
@@ -23,6 +23,7 @@ import com.google.common.math.LongMath
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.catalyst.analysis.TypeCoercion.ImplicitTypeCasts.implicitCast
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
@@ -223,6 +224,14 @@ class MathExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 def f: (Double) => Double = (x: Double) => 1 / math.tan(x)
 testUnary(Cot, f)
 checkConsistencyBetweenInterpretedAndCodegen(Cot, DoubleType)
+val nullLit = Literal.create(null, NullType)
+val intNullLit = Literal.create(null, IntegerType)
+val intLit = Literal.create(1, IntegerType)
+checkEvaluation(checkDataTypeAndCast(Cot(nullLit)), null, EmptyRow)
+checkEvaluation(checkDataTypeAndCast(Cot(intNullLit)), null, EmptyRow)
+checkEvaluation(checkDataTypeAndCast(Cot(intLit)), 1 / math.tan(1), 
EmptyRow)
+checkEvaluation(checkDataTypeAndCast(Cot(-intLit)), 1 / math.tan(-1), 
EmptyRow)
+checkEvaluation(checkDataTypeAndCast(Cot(0)), 1 / math.tan(0), EmptyRow)
   }
 
   test("atan") {
@@ -250,6 +259,11 @@ class MathExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 checkConsistencyBetweenInterpretedAndCodegen(Cbrt, DoubleType)
   }
 
+  def checkDataTypeAndCast(expression: UnaryMathExpression): Expression = {
+val expNew = implicitCast(expression.child, 
expression.inputTypes(0)).getOrElse(expression)
+expression.withNewChildren(Seq(expNew))
+  }
+
   test("ceil") {
 testUnary(Ceil, (d: Double) => math.ceil(d).toLong)
 checkConsistencyBetweenInterpretedAndCodegen(Ceil, DoubleType)
@@ -262,12 +276,22 @@ class MathExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 val doublePi: Double = 3.1415
 val floatPi: Float = 3.1415f
 val longLit: Long = 12345678901234567L
-checkEvaluation(Ceil(doublePi), 4L, EmptyRow)
-checkEvaluation(Ceil(floatPi.toDouble), 4L, EmptyRow)
-checkEvaluation(Ceil(longLit), longLit, EmptyRow)
-checkEvaluation(Ceil(-doublePi), -3L, EmptyRow)
-checkEvaluation(Ceil(-floatPi.toDouble), -3L, EmptyRow)
-checkEvaluation(Ceil(-longLit), -longLit, EmptyRow)
+val nullLit = Literal.create(null, NullType)
+val floatNullLit = Literal.create(null, FloatType)
+checkEvaluation(checkDataTypeAndCast(Ceil(doublePi)), 4L, EmptyRow)
+checkEvaluation(checkDataTypeAndCast(Ceil(floatPi)), 4L, EmptyRow)
+checkEvaluation(checkDataTypeAndCast(Ceil(longLit)), longLit, EmptyRow)
+checkEvaluation(checkDataTypeAndCast(Ceil(-doublePi)), -3L, EmptyRow)
+checkEvaluation(checkDataTypeAndCast(Ceil(-floatPi)), -3L, EmptyRow)
+checkEvaluation(checkDataTypeAndCast(Ceil(-longLit)), -longLit, EmptyRow)
+
+checkEvaluation(checkDataTypeAndCast(Ceil(nullLit)), null, EmptyRow)
+checkEvaluation(checkDataTypeAndCast(Ceil(floatNullLit)), null, EmptyRow)
+checkEvaluation(checkDataTypeAndCast(Ceil(0)), 0L, EmptyRow)
+checkEvaluation(checkDataTypeAndCast(Ceil(1)), 1L, EmptyRow)
+ 

spark git commit: [SPARK-20715] Store MapStatuses only in MapOutputTracker, not ShuffleMapStage

2017-06-11 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master f48273c13 -> 3476390c6


[SPARK-20715] Store MapStatuses only in MapOutputTracker, not ShuffleMapStage

## What changes were proposed in this pull request?

This PR refactors `ShuffleMapStage` and `MapOutputTracker` in order to simplify 
the management of `MapStatuses`, reduce driver memory consumption, and remove a 
potential source of scheduler correctness bugs.

### Background

In Spark there are currently two places where MapStatuses are tracked:

- The `MapOutputTracker` maintains an `Array[MapStatus]` storing a single 
location for each map output. This mapping is used by the `DAGScheduler` for 
determining reduce-task locality preferences (when locality-aware reduce task 
scheduling is enabled) and is also used to serve map output locations to 
executors / tasks.
- Each `ShuffleMapStage` also contains a mapping of `Array[List[MapStatus]]` 
which holds the complete set of locations where each map output could be 
available. This mapping is used to determine which map tasks need to be run 
when constructing `TaskSets` for the stage.

This duplication adds complexity and creates the potential for certain types of 
correctness bugs.  Bad things can happen if these two copies of the map output 
locations get out of sync. For instance, if the `MapOutputTracker` is missing 
locations for a map output but `ShuffleMapStage` believes that locations are 
available then tasks will fail with `MetadataFetchFailedException` but 
`ShuffleMapStage` will not be updated to reflect the missing map outputs, 
leading to situations where the stage will be reattempted (because downstream 
stages experienced fetch failures) but no task sets will be launched (because 
`ShuffleMapStage` thinks all maps are available).

I observed this behavior in a real-world deployment. I'm still not quite sure 
how the state got out of sync in the first place, but we can completely avoid 
this class of bug if we eliminate the duplicate state.

### Why we only need to track a single location for each map output

I think that storing an `Array[List[MapStatus]]` in `ShuffleMapStage` is 
unnecessary.

First, note that this adds memory/object bloat to the driver we need one extra 
`List` per task. If you have millions of tasks across all stages then this can 
add up to be a significant amount of resources.

Secondly, I believe that it's extremely uncommon that these lists will ever 
contain more than one entry. It's not impossible, but is very unlikely given 
the conditions which must occur for that to happen:

- In normal operation (no task failures) we'll only run each task once and thus 
will have at most one output.
- If speculation is enabled then it's possible that we'll have multiple 
attempts of a task. The TaskSetManager will [kill duplicate attempts of a 
task](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L717)
 after a task finishes successfully, reducing the likelihood that both the 
original and speculated task will successfully register map outputs.
- There is a [comment in 
`TaskSetManager`](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L113)
 which suggests that running tasks are not killed if a task set becomes a 
zombie. However:
  - If the task set becomes a zombie due to the job being cancelled then it 
doesn't matter whether we record map outputs.
  - If the task set became a zombie because of a stage failure (e.g. the map 
stage itself had a fetch failure from an upstream match stage) then I believe 
that the "failedEpoch" will be updated which may cause map outputs from 
still-running tasks to [be 
ignored](https://github.com/apache/spark/blob/04901dd03a3f8062fd39ea38d585935ff71a9248/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1213).
 (I'm not 100% sure on this point, though).
- Even if you _do_ manage to record multiple map outputs for a stage, only a 
single map output is reported to / tracked by the MapOutputTracker. The only 
situation where the additional output locations could actually be read or used 
would be if a task experienced a `FetchFailure` exception. The most likely 
cause of a `FetchFailure` exception is an executor lost, which will have most 
likely caused the loss of several map tasks' output, so saving on potential 
re-execution of a single map task isn't a huge win if we're going to have to 
recompute several other lost map outputs from other tasks which ran on that 
lost executor. Also note that the re-population of MapOutputTracker state from 
state in the ShuffleMapTask only happens after the reduce stage has failed; the 
additional location doesn't help to prevent FetchFailures but, instead, can 
only reduce the amount of work when recomputing missing parent stages.

Given this, this patch chooses to do away with tracking m

spark git commit: [SPARK-18891][SQL] Support for specific Java List subtypes

2017-06-11 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 0538f3b0a -> f48273c13


[SPARK-18891][SQL] Support for specific Java List subtypes

## What changes were proposed in this pull request?

Add support for specific Java `List` subtypes in deserialization as well as a 
generic implicit encoder.

All `List` subtypes are supported by using either the size-specifying 
constructor (one `int` parameter) or the default constructor.

Interfaces/abstract classes use the following implementations:

* `java.util.List`, `java.util.AbstractList` or 
`java.util.AbstractSequentialList` => `java.util.ArrayList`

## How was this patch tested?

```bash
build/mvn -DskipTests clean package && dev/run-tests
```

Additionally in Spark shell:

```
scala> val jlist = new java.util.LinkedList[Int]; jlist.add(1)
jlist: java.util.LinkedList[Int] = [1]
res0: Boolean = true

scala> Seq(jlist).toDS().map(_.element()).collect()
res1: Array[Int] = Array(1)
```

Author: Michal Senkyr 

Closes #18009 from michalsenkyr/dataset-java-lists.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f48273c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f48273c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f48273c1

Branch: refs/heads/master
Commit: f48273c13c9e9fea2d9bb6dda10fca50c588
Parents: 0538f3b
Author: Michal Senkyr 
Authored: Mon Jun 12 08:53:23 2017 +0800
Committer: Wenchen Fan 
Committed: Mon Jun 12 08:53:23 2017 +0800

--
 .../spark/sql/catalyst/JavaTypeInference.scala  | 15 ++---
 .../catalyst/expressions/objects/objects.scala  | 19 +-
 .../org/apache/spark/sql/JavaDatasetSuite.java  | 61 
 3 files changed, 83 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f48273c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index 86a73a3..7683ee7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -267,16 +267,11 @@ object JavaTypeInference {
 
   case c if listType.isAssignableFrom(typeToken) =>
 val et = elementType(typeToken)
-val array =
-  Invoke(
-MapObjects(
-  p => deserializerFor(et, Some(p)),
-  getPath,
-  inferDataType(et)._1),
-"array",
-ObjectType(classOf[Array[Any]]))
-
-StaticInvoke(classOf[java.util.Arrays], ObjectType(c), "asList", array 
:: Nil)
+MapObjects(
+  p => deserializerFor(et, Some(p)),
+  getPath,
+  inferDataType(et)._1,
+  customCollectionCls = Some(c))
 
   case _ if mapType.isAssignableFrom(typeToken) =>
 val (keyType, valueType) = mapKeyValueType(typeToken)

http://git-wip-us.apache.org/repos/asf/spark/blob/f48273c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 79b7b9f..5bb0feb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -22,6 +22,7 @@ import java.lang.reflect.Modifier
 import scala.collection.mutable.Builder
 import scala.language.existentials
 import scala.reflect.ClassTag
+import scala.util.Try
 
 import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.serializer._
@@ -597,8 +598,8 @@ case class MapObjects private(
 
 val (initCollection, addElement, getResult): (String, String => String, 
String) =
   customCollectionCls match {
-case Some(cls) =>
-  // collection
+case Some(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+  // Scala sequence
   val getBuilder = s"${cls.getName}$$.MODULE$$.newBuilder()"
   val builder = ctx.freshName("collectionBuilder")
   (
@@ -609,6 +610,20 @@ case class MapObjects private(
 genValue => s"$builder.$$plus$$eq($genValue);",
 s"(${cls.getName}) $builder.result();"
   )
+case Some(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
+  // Java list
+  val builder = ctx.freshName("collectionBuilder

spark git commit: [SPARK-18891][SQL] Support for Scala Map collection types

2017-06-11 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master a7c61c100 -> 0538f3b0a


[SPARK-18891][SQL] Support for Scala Map collection types

## What changes were proposed in this pull request?

Add support for arbitrary Scala `Map` types in deserialization as well as a 
generic implicit encoder.

Used the builder approach as in #16541 to construct any provided `Map` type 
upon deserialization.

Please note that this PR also adds (ignored) tests for issue [SPARK-19104 
CompileException with Map and Case Class in Spark 
2.1.0](https://issues.apache.org/jira/browse/SPARK-19104) but doesn't solve it.

Added support for Java Maps in codegen code (encoders will be added in a 
different PR) with the following default implementations for 
interfaces/abstract classes:

* `java.util.Map`, `java.util.AbstractMap` => `java.util.HashMap`
* `java.util.SortedMap`, `java.util.NavigableMap` => `java.util.TreeMap`
* `java.util.concurrent.ConcurrentMap` => 
`java.util.concurrent.ConcurrentHashMap`
* `java.util.concurrent.ConcurrentNavigableMap` => 
`java.util.concurrent.ConcurrentSkipListMap`

Resulting codegen for `Seq(Map(1 -> 
2)).toDS().map(identity).queryExecution.debug.codegen`:

```
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private boolean CollectObjectsToMap_loopIsNull1;
/* 010 */   private int CollectObjectsToMap_loopValue0;
/* 011 */   private boolean CollectObjectsToMap_loopIsNull3;
/* 012 */   private int CollectObjectsToMap_loopValue2;
/* 013 */   private UnsafeRow deserializetoobject_result;
/* 014 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
deserializetoobject_holder;
/* 015 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
deserializetoobject_rowWriter;
/* 016 */   private scala.collection.immutable.Map mapelements_argValue;
/* 017 */   private UnsafeRow mapelements_result;
/* 018 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
mapelements_holder;
/* 019 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
mapelements_rowWriter;
/* 020 */   private UnsafeRow serializefromobject_result;
/* 021 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
serializefromobject_holder;
/* 022 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
serializefromobject_rowWriter;
/* 023 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter 
serializefromobject_arrayWriter;
/* 024 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter 
serializefromobject_arrayWriter1;
/* 025 */
/* 026 */   public GeneratedIterator(Object[] references) {
/* 027 */ this.references = references;
/* 028 */   }
/* 029 */
/* 030 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 031 */ partitionIndex = index;
/* 032 */ this.inputs = inputs;
/* 033 */ wholestagecodegen_init_0();
/* 034 */ wholestagecodegen_init_1();
/* 035 */
/* 036 */   }
/* 037 */
/* 038 */   private void wholestagecodegen_init_0() {
/* 039 */ inputadapter_input = inputs[0];
/* 040 */
/* 041 */ deserializetoobject_result = new UnsafeRow(1);
/* 042 */ this.deserializetoobject_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result,
 32);
/* 043 */ this.deserializetoobject_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder,
 1);
/* 044 */
/* 045 */ mapelements_result = new UnsafeRow(1);
/* 046 */ this.mapelements_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result,
 32);
/* 047 */ this.mapelements_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder,
 1);
/* 048 */ serializefromobject_result = new UnsafeRow(1);
/* 049 */ this.serializefromobject_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result,
 32);
/* 050 */ this.serializefromobject_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder,
 1);
/* 051 */ this.serializefromobject_arrayWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 052 */
/* 053 */   }
/* 054 */
/* 055 */   private void wholestagecodegen_init_1() {
/* 056 */ this.serializefromobject_arrayWriter1 = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 057 */
/* 058 */   }
/* 059 */
/* 060 */   protected void processNext() throw

spark git commit: [SPARK-21031][SQL] Add `alterTableStats` to store spark's stats and let `alterTable` keep existing stats

2017-06-11 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 3a840048e -> a7c61c100


[SPARK-21031][SQL] Add `alterTableStats` to store spark's stats and let 
`alterTable` keep existing stats

## What changes were proposed in this pull request?

Currently, hive's stats are read into `CatalogStatistics`, while spark's stats 
are also persisted through `CatalogStatistics`. As a result, hive's stats can 
be unexpectedly propagated into spark' stats.

For example, for a catalog table, we read stats from hive, e.g. "totalSize" and 
put it into `CatalogStatistics`. Then, by using "ALTER TABLE" command, we will 
store the stats in `CatalogStatistics` into metastore as spark's stats (because 
we don't know whether it's from spark or not). But spark's stats should be only 
generated by "ANALYZE" command. This is unexpected from this command.

Secondly, now that we have spark's stats in metastore, after inserting new 
data, although hive updated "totalSize" in metastore, we still cannot get the 
right `sizeInBytes` in `CatalogStatistics`, because we respect spark's stats 
(should not exist) over hive's stats.

A running example is shown in 
[JIRA](https://issues.apache.org/jira/browse/SPARK-21031).

To fix this, we add a new method `alterTableStats` to store spark's stats, and 
let `alterTable` keep existing stats.

## How was this patch tested?

Added new tests.

Author: Zhenhua Wang 

Closes #18248 from wzhfy/separateHiveStats.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a7c61c10
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a7c61c10
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a7c61c10

Branch: refs/heads/master
Commit: a7c61c100b6e4380e8d0e588969dd7f2fd58d40c
Parents: 3a84004
Author: Zhenhua Wang 
Authored: Mon Jun 12 08:23:04 2017 +0800
Committer: Wenchen Fan 
Committed: Mon Jun 12 08:23:04 2017 +0800

--
 .../sql/catalyst/catalog/ExternalCatalog.scala  |  2 +
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |  9 +++
 .../sql/catalyst/catalog/SessionCatalog.scala   | 13 
 .../catalyst/catalog/ExternalCatalogSuite.scala | 11 ++-
 .../catalyst/catalog/SessionCatalogSuite.scala  | 12 +++
 .../command/AnalyzeColumnCommand.scala  |  2 +-
 .../execution/command/AnalyzeTableCommand.scala |  2 +-
 .../spark/sql/hive/HiveExternalCatalog.scala| 68 ++---
 .../apache/spark/sql/hive/StatisticsSuite.scala | 80 +++-
 9 files changed, 132 insertions(+), 67 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a7c61c10/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
index 974ef90..12ba5ae 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala
@@ -160,6 +160,8 @@ abstract class ExternalCatalog
*/
   def alterTableSchema(db: String, table: String, schema: StructType): Unit
 
+  def alterTableStats(db: String, table: String, stats: CatalogStatistics): 
Unit
+
   def getTable(db: String, table: String): CatalogTable
 
   def getTableOption(db: String, table: String): Option[CatalogTable]

http://git-wip-us.apache.org/repos/asf/spark/blob/a7c61c10/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 8a5319b..9820522 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -312,6 +312,15 @@ class InMemoryCatalog(
 catalog(db).tables(table).table = origTable.copy(schema = schema)
   }
 
+  override def alterTableStats(
+  db: String,
+  table: String,
+  stats: CatalogStatistics): Unit = synchronized {
+requireTableExists(db, table)
+val origTable = catalog(db).tables(table).table
+catalog(db).tables(table).table = origTable.copy(stats = Some(stats))
+  }
+
   override def getTable(db: String, table: String): CatalogTable = 
synchronized {
 requireTableExists(db, table)
 catalog(db).tables(table).table

http://git-wip-us.apache.org/repos/asf/spark/blob/a7c61c10/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCa

spark git commit: Fixed typo in sql.functions

2017-06-11 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 9f4ff9552 -> 3a840048e


Fixed typo in sql.functions

## What changes were proposed in this pull request?

I fixed a typo in the Scaladoc for the method `def struct(cols: Column*): 
Column`. 'retained' was misspelt as 'remained'.

## How was this patch tested?
Before:

Creates a new struct column.
   If the input column is a column in a `DataFrame`, or a derived column 
expression
   that is named (i.e. aliased), its name would be **remained** as the 
StructField's name,
   otherwise, the newly generated StructField's name would be auto generated as
   `col` with a suffix `index + 1`, i.e. col1, col2, col3, ...

After:

   Creates a new struct column.
   If the input column is a column in a `DataFrame`, or a derived column 
expression
   that is named (i.e. aliased), its name would be **retained** as the 
StructField's name,
   otherwise, the newly generated StructField's name would be auto generated as
   `col` with a suffix `index + 1`, i.e. col1, col2, col3, ...

Author: sujithjay 

Closes #18254 from sujithjay/fix-typo.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a840048
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a840048
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a840048

Branch: refs/heads/master
Commit: 3a840048ed3501e06260b7c5df18cc0bbdb1505c
Parents: 9f4ff95
Author: sujithjay 
Authored: Sun Jun 11 18:23:57 2017 +0100
Committer: Sean Owen 
Committed: Sun Jun 11 18:23:57 2017 +0100

--
 sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3a840048/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 8d0a8c2..8d2e1f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -1210,7 +1210,7 @@ object functions {
   /**
* Creates a new struct column.
* If the input column is a column in a `DataFrame`, or a derived column 
expression
-   * that is named (i.e. aliased), its name would be remained as the 
StructField's name,
+   * that is named (i.e. aliased), its name would be retained as the 
StructField's name,
* otherwise, the newly generated StructField's name would be auto generated 
as
* `col` with a suffix `index + 1`, i.e. col1, col2, col3, ...
*


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20877][SPARKR][FOLLOWUP] clean up after test move

2017-06-11 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 0b0be47e7 -> 26003de55


[SPARK-20877][SPARKR][FOLLOWUP] clean up after test move

clean up after big test move

unit tests, jenkins

Author: Felix Cheung 

Closes #18267 from felixcheung/rtestset2.

(cherry picked from commit 9f4ff9552470fb97ca38bb56bbf43be49a9a316c)
Signed-off-by: Felix Cheung 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26003de5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26003de5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26003de5

Branch: refs/heads/branch-2.2
Commit: 26003de55ba13695649b0d874563a76d71cda88d
Parents: 0b0be47
Author: Felix Cheung 
Authored: Sun Jun 11 03:00:44 2017 -0700
Committer: Felix Cheung 
Committed: Sun Jun 11 03:13:56 2017 -0700

--
 R/pkg/.Rbuildignore |   1 +
 R/pkg/R/install.R   |   2 +-
 R/pkg/R/utils.R |   8 +-
 R/pkg/tests/fulltests/test_Serde.R  |   6 --
 R/pkg/tests/fulltests/test_Windows.R|   7 +-
 R/pkg/tests/fulltests/test_binaryFile.R |   8 --
 R/pkg/tests/fulltests/test_binary_function.R|   6 --
 R/pkg/tests/fulltests/test_broadcast.R  |   4 -
 R/pkg/tests/fulltests/test_client.R |   8 --
 R/pkg/tests/fulltests/test_context.R|  16 ---
 R/pkg/tests/fulltests/test_includePackage.R |   4 -
 .../tests/fulltests/test_mllib_classification.R |  12 +--
 R/pkg/tests/fulltests/test_mllib_clustering.R   |  14 +--
 R/pkg/tests/fulltests/test_mllib_fpm.R  |   2 +-
 .../tests/fulltests/test_mllib_recommendation.R |   2 +-
 R/pkg/tests/fulltests/test_mllib_regression.R   |  16 +--
 R/pkg/tests/fulltests/test_mllib_tree.R |  14 ++-
 .../tests/fulltests/test_parallelize_collect.R  |   8 --
 R/pkg/tests/fulltests/test_rdd.R| 102 ---
 R/pkg/tests/fulltests/test_shuffle.R|  24 -
 R/pkg/tests/fulltests/test_sparkR.R |   2 -
 R/pkg/tests/fulltests/test_sparkSQL.R   |  92 ++---
 R/pkg/tests/fulltests/test_streaming.R  |  14 +--
 R/pkg/tests/fulltests/test_take.R   |   2 -
 R/pkg/tests/fulltests/test_textFile.R   |  18 
 R/pkg/tests/fulltests/test_utils.R  |   8 --
 R/pkg/tests/run-all.R   |   2 -
 27 files changed, 32 insertions(+), 370 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/26003de5/R/pkg/.Rbuildignore
--
diff --git a/R/pkg/.Rbuildignore b/R/pkg/.Rbuildignore
index f12f8c2..18b2db6 100644
--- a/R/pkg/.Rbuildignore
+++ b/R/pkg/.Rbuildignore
@@ -6,3 +6,4 @@
 ^README\.Rmd$
 ^src-native$
 ^html$
+^tests/fulltests/*

http://git-wip-us.apache.org/repos/asf/spark/blob/26003de5/R/pkg/R/install.R
--
diff --git a/R/pkg/R/install.R b/R/pkg/R/install.R
index 4ca7aa6..ec931be 100644
--- a/R/pkg/R/install.R
+++ b/R/pkg/R/install.R
@@ -267,7 +267,7 @@ hadoopVersionName <- function(hadoopVersion) {
 # The implementation refers to appdirs package: 
https://pypi.python.org/pypi/appdirs and
 # adapt to Spark context
 sparkCachePath <- function() {
-  if (.Platform$OS.type == "windows") {
+  if (is_windows()) {
 winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
 if (is.na(winAppPath)) {
   stop(paste("%LOCALAPPDATA% not found.",

http://git-wip-us.apache.org/repos/asf/spark/blob/26003de5/R/pkg/R/utils.R
--
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index b19556a..7225da9 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -900,10 +900,6 @@ isAtomicLengthOne <- function(x) {
   is.atomic(x) && length(x) == 1
 }
 
-is_cran <- function() {
-  !identical(Sys.getenv("NOT_CRAN"), "true")
-}
-
 is_windows <- function() {
   .Platform$OS.type == "windows"
 }
@@ -912,6 +908,6 @@ hadoop_home_set <- function() {
   !identical(Sys.getenv("HADOOP_HOME"), "")
 }
 
-not_cran_or_windows_with_hadoop <- function() {
-  !is_cran() && (!is_windows() || hadoop_home_set())
+windows_with_hadoop <- function() {
+  !is_windows() || hadoop_home_set()
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/26003de5/R/pkg/tests/fulltests/test_Serde.R
--
diff --git a/R/pkg/tests/fulltests/test_Serde.R 
b/R/pkg/tests/fulltests/test_Serde.R
index 6e160fa..6bbd201 100644
--- a/R/pkg/tests/fulltests/test_Serde.R
+++ b/R/pkg/tests/fulltests/test_Serde.R
@@ -20,8 +20,6 @@ context("SerDe functionality")
 sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
 
 test_that

spark git commit: [SPARK-20877][SPARKR][FOLLOWUP] clean up after test move

2017-06-11 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master 823f1eef5 -> 9f4ff9552


[SPARK-20877][SPARKR][FOLLOWUP] clean up after test move

## What changes were proposed in this pull request?

clean up after big test move

## How was this patch tested?

unit tests, jenkins

Author: Felix Cheung 

Closes #18267 from felixcheung/rtestset2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f4ff955
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f4ff955
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f4ff955

Branch: refs/heads/master
Commit: 9f4ff9552470fb97ca38bb56bbf43be49a9a316c
Parents: 823f1ee
Author: Felix Cheung 
Authored: Sun Jun 11 03:00:44 2017 -0700
Committer: Felix Cheung 
Committed: Sun Jun 11 03:00:44 2017 -0700

--
 R/pkg/.Rbuildignore |   1 +
 R/pkg/R/install.R   |   2 +-
 R/pkg/R/utils.R |   8 +-
 R/pkg/tests/fulltests/test_Serde.R  |   6 --
 R/pkg/tests/fulltests/test_Windows.R|   7 +-
 R/pkg/tests/fulltests/test_binaryFile.R |   8 --
 R/pkg/tests/fulltests/test_binary_function.R|   6 --
 R/pkg/tests/fulltests/test_broadcast.R  |   4 -
 R/pkg/tests/fulltests/test_client.R |   8 --
 R/pkg/tests/fulltests/test_context.R|  16 ---
 R/pkg/tests/fulltests/test_includePackage.R |   4 -
 .../tests/fulltests/test_mllib_classification.R |  12 +--
 R/pkg/tests/fulltests/test_mllib_clustering.R   |  14 +--
 R/pkg/tests/fulltests/test_mllib_fpm.R  |   2 +-
 .../tests/fulltests/test_mllib_recommendation.R |   2 +-
 R/pkg/tests/fulltests/test_mllib_regression.R   |  16 +--
 R/pkg/tests/fulltests/test_mllib_tree.R |  22 ++--
 .../tests/fulltests/test_parallelize_collect.R  |   8 --
 R/pkg/tests/fulltests/test_rdd.R| 102 ---
 R/pkg/tests/fulltests/test_shuffle.R|  24 -
 R/pkg/tests/fulltests/test_sparkR.R |   2 -
 R/pkg/tests/fulltests/test_sparkSQL.R   |  92 ++---
 R/pkg/tests/fulltests/test_streaming.R  |  14 +--
 R/pkg/tests/fulltests/test_take.R   |   2 -
 R/pkg/tests/fulltests/test_textFile.R   |  18 
 R/pkg/tests/fulltests/test_utils.R  |   9 --
 R/pkg/tests/run-all.R   |   2 -
 27 files changed, 35 insertions(+), 376 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9f4ff955/R/pkg/.Rbuildignore
--
diff --git a/R/pkg/.Rbuildignore b/R/pkg/.Rbuildignore
index f12f8c2..18b2db6 100644
--- a/R/pkg/.Rbuildignore
+++ b/R/pkg/.Rbuildignore
@@ -6,3 +6,4 @@
 ^README\.Rmd$
 ^src-native$
 ^html$
+^tests/fulltests/*

http://git-wip-us.apache.org/repos/asf/spark/blob/9f4ff955/R/pkg/R/install.R
--
diff --git a/R/pkg/R/install.R b/R/pkg/R/install.R
index 4ca7aa6..ec931be 100644
--- a/R/pkg/R/install.R
+++ b/R/pkg/R/install.R
@@ -267,7 +267,7 @@ hadoopVersionName <- function(hadoopVersion) {
 # The implementation refers to appdirs package: 
https://pypi.python.org/pypi/appdirs and
 # adapt to Spark context
 sparkCachePath <- function() {
-  if (.Platform$OS.type == "windows") {
+  if (is_windows()) {
 winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
 if (is.na(winAppPath)) {
   stop(paste("%LOCALAPPDATA% not found.",

http://git-wip-us.apache.org/repos/asf/spark/blob/9f4ff955/R/pkg/R/utils.R
--
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index ea45e39..91483a4 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -908,10 +908,6 @@ isAtomicLengthOne <- function(x) {
   is.atomic(x) && length(x) == 1
 }
 
-is_cran <- function() {
-  !identical(Sys.getenv("NOT_CRAN"), "true")
-}
-
 is_windows <- function() {
   .Platform$OS.type == "windows"
 }
@@ -920,6 +916,6 @@ hadoop_home_set <- function() {
   !identical(Sys.getenv("HADOOP_HOME"), "")
 }
 
-not_cran_or_windows_with_hadoop <- function() {
-  !is_cran() && (!is_windows() || hadoop_home_set())
+windows_with_hadoop <- function() {
+  !is_windows() || hadoop_home_set()
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9f4ff955/R/pkg/tests/fulltests/test_Serde.R
--
diff --git a/R/pkg/tests/fulltests/test_Serde.R 
b/R/pkg/tests/fulltests/test_Serde.R
index 6e160fa..6bbd201 100644
--- a/R/pkg/tests/fulltests/test_Serde.R
+++ b/R/pkg/tests/fulltests/test_Serde.R
@@ -20,8 +20,6 @@ context("SerDe functionality")
 sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
 
 test_that("SerDe of primitive t

spark git commit: [SPARK-13933][BUILD] Update hadoop-2.7 profile's curator version to 2.7.1

2017-06-11 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master eb3ea3a08 -> 823f1eef5


[SPARK-13933][BUILD] Update hadoop-2.7 profile's curator version to 2.7.1

## What changes were proposed in this pull request?

Update hadoop-2.7 profile's curator version to 2.7.1, more see 
[SPARK-13933](https://issues.apache.org/jira/browse/SPARK-13933).

## How was this patch tested?

manual tests

Author: Yuming Wang 

Closes #18247 from wangyum/SPARK-13933.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/823f1eef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/823f1eef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/823f1eef

Branch: refs/heads/master
Commit: 823f1eef580763048b08b640090519e884f29c47
Parents: eb3ea3a
Author: Yuming Wang 
Authored: Sun Jun 11 10:05:47 2017 +0100
Committer: Sean Owen 
Committed: Sun Jun 11 10:05:47 2017 +0100

--
 dev/deps/spark-deps-hadoop-2.7 | 6 +++---
 pom.xml| 1 +
 2 files changed, 4 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/823f1eef/dev/deps/spark-deps-hadoop-2.7
--
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index ab1de3d..9127413 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -47,9 +47,9 @@ commons-net-2.2.jar
 commons-pool-1.5.4.jar
 compress-lzf-1.0.3.jar
 core-1.1.2.jar
-curator-client-2.6.0.jar
-curator-framework-2.6.0.jar
-curator-recipes-2.6.0.jar
+curator-client-2.7.1.jar
+curator-framework-2.7.1.jar
+curator-recipes-2.7.1.jar
 datanucleus-api-jdo-3.2.6.jar
 datanucleus-core-3.2.10.jar
 datanucleus-rdbms-3.2.9.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/823f1eef/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 6835ea1..5f52407 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2532,6 +2532,7 @@
   hadoop-2.7
   
 2.7.3
+2.7.1
   
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20935][STREAMING] Always close WriteAheadLog and make it idempotent

2017-06-11 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 8da3f7041 -> eb3ea3a08


[SPARK-20935][STREAMING] Always close WriteAheadLog and make it idempotent

## What changes were proposed in this pull request?

This PR proposes to stop `ReceiverTracker` to close `WriteAheadLog` whenever it 
is and make `WriteAheadLog` and its implementations idempotent.

## How was this patch tested?

Added a test in `WriteAheadLogSuite`. Note that  the added test looks passing 
even if it closes twice (namely even without the changes in 
`FileBasedWriteAheadLog` and `BatchedWriteAheadLog`. It looks both are already 
idempotent but this is a rather sanity check.

Author: hyukjinkwon 

Closes #18224 from HyukjinKwon/streaming-closing.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb3ea3a0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb3ea3a0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb3ea3a0

Branch: refs/heads/master
Commit: eb3ea3a0831b26d3dc35a97566716b92868a7beb
Parents: 8da3f70
Author: hyukjinkwon 
Authored: Sun Jun 11 09:54:57 2017 +0100
Committer: Sean Owen 
Committed: Sun Jun 11 09:54:57 2017 +0100

--
 .../spark/streaming/util/WriteAheadLog.java |  2 +-
 .../streaming/scheduler/ReceiverTracker.scala   | 27 
 .../streaming/util/BatchedWriteAheadLog.scala   | 13 +-
 .../streaming/util/FileBasedWriteAheadLog.scala |  8 +++---
 .../scheduler/ReceiverTrackerSuite.scala|  2 ++
 .../streaming/util/WriteAheadLogSuite.scala |  2 ++
 6 files changed, 26 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/eb3ea3a0/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java
--
diff --git 
a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java 
b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java
index 2803cad..00c5972 100644
--- a/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java
+++ b/streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java
@@ -56,7 +56,7 @@ public abstract class WriteAheadLog {
   public abstract void clean(long threshTime, boolean waitForCompletion);
 
   /**
-   * Close this log and release any resources.
+   * Close this log and release any resources. It must be idempotent.
*/
   public abstract void close();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/eb3ea3a0/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index bd7ab0b..6f130c8 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -165,11 +165,11 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
 
   /** Stop the receiver execution thread. */
   def stop(graceful: Boolean): Unit = synchronized {
-if (isTrackerStarted) {
-  // First, stop the receivers
-  trackerState = Stopping
+val isStarted: Boolean = isTrackerStarted
+trackerState = Stopping
+if (isStarted) {
   if (!skipReceiverLaunch) {
-// Send the stop signal to all the receivers
+// First, stop the receivers. Send the stop signal to all the receivers
 endpoint.askSync[Boolean](StopAllReceivers)
 
 // Wait for the Spark job that runs the receivers to be over
@@ -194,17 +194,13 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
   // Finally, stop the endpoint
   ssc.env.rpcEnv.stop(endpoint)
   endpoint = null
-  receivedBlockTracker.stop()
-  logInfo("ReceiverTracker stopped")
-  trackerState = Stopped
-} else if (isTrackerInitialized) {
-  trackerState = Stopping
-  // `ReceivedBlockTracker` is open when this instance is created. We 
should
-  // close this even if this `ReceiverTracker` is not started.
-  receivedBlockTracker.stop()
-  logInfo("ReceiverTracker stopped")
-  trackerState = Stopped
 }
+
+// `ReceivedBlockTracker` is open when this instance is created. We should
+// close this even if this `ReceiverTracker` is not started.
+receivedBlockTracker.stop()
+logInfo("ReceiverTracker stopped")
+trackerState = Stopped
   }
 
   /** Allocate all unallocated blocks to the given batch. */
@@ -453,9 +449,6 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Bool

spark git commit: [SPARK-21000][MESOS] Add Mesos labels support to the Spark Dispatcher

2017-06-11 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master dc4c35183 -> 8da3f7041


[SPARK-21000][MESOS] Add Mesos labels support to the Spark Dispatcher

## What changes were proposed in this pull request?

Add Mesos labels support to the Spark Dispatcher

## How was this patch tested?

unit tests

Author: Michael Gummelt 

Closes #18220 from mgummelt/SPARK-21000-dispatcher-labels.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8da3f704
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8da3f704
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8da3f704

Branch: refs/heads/master
Commit: 8da3f7041aafa71d7596b531625edb899970fec2
Parents: dc4c351
Author: Michael Gummelt 
Authored: Sun Jun 11 09:49:39 2017 +0100
Committer: Sean Owen 
Committed: Sun Jun 11 09:49:39 2017 +0100

--
 docs/running-on-mesos.md| 14 +-
 .../org/apache/spark/deploy/mesos/config.scala  |  7 +++
 .../cluster/mesos/MesosClusterScheduler.scala   | 10 ++--
 .../MesosCoarseGrainedSchedulerBackend.scala| 28 ++-
 .../cluster/mesos/MesosProtoUtils.scala | 53 
 .../mesos/MesosClusterSchedulerSuite.scala  | 27 ++
 ...esosCoarseGrainedSchedulerBackendSuite.scala | 23 -
 .../cluster/mesos/MesosProtoUtilsSuite.scala| 48 ++
 8 files changed, 157 insertions(+), 53 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8da3f704/docs/running-on-mesos.md
--
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 8745e76..ec130c1 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -382,8 +382,9 @@ See the [configuration page](configuration.html) for 
information on Spark config
   (none)
   
 Set the Mesos labels to add to each task. Labels are free-form key-value 
pairs.
-Key-value pairs should be separated by a colon, and commas used to list 
more than one.
-Ex. key:value,key2:value2.
+Key-value pairs should be separated by a colon, and commas used to
+list more than one.  If your label includes a colon or comma, you
+can escape it with a backslash.  Ex. key:value,key2:a\:b.
   
 
 
@@ -469,6 +470,15 @@ See the [configuration page](configuration.html) for 
information on Spark config
   
 
 
+  spark.mesos.driver.labels
+  (none)
+  
+Mesos labels to add to the driver.  See 
spark.mesos.task.labels
+for formatting information.
+  
+
+
+
   spark.mesos.driverEnv.[EnvironmentVariableName]
   (none)
   

http://git-wip-us.apache.org/repos/asf/spark/blob/8da3f704/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
--
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index 19e2533..56d697f 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -56,4 +56,11 @@ package object config {
   .stringConf
   .createOptional
 
+  private [spark] val DRIVER_LABELS =
+ConfigBuilder("spark.mesos.driver.labels")
+  .doc("Mesos labels to add to the driver.  Labels are free-form key-value 
pairs.  Key-value" +
+"pairs should be separated by a colon, and commas used to list more 
than one." +
+"Ex. key:value,key2:value2")
+  .stringConf
+  .createOptional
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8da3f704/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
--
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 1bc6f71..577f9a8 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -30,11 +30,13 @@ import org.apache.mesos.Protos.Environment.Variable
 import org.apache.mesos.Protos.TaskStatus.Reason
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
+import org.apache.spark.deploy.mesos.config
 import org.apache.spark.deploy.mesos.MesosDriverDescription
 import org.apache.spark.deploy.rest.{CreateSubmissionResponse, 
KillSubmissionResponse, SubmissionStatusResponse}
 import org.apach

[1/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

2017-06-11 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 815a0820b -> 0b0be47e7


http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/tests/fulltests/test_streaming.R
--
diff --git a/R/pkg/tests/fulltests/test_streaming.R 
b/R/pkg/tests/fulltests/test_streaming.R
new file mode 100644
index 000..b20b431
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_streaming.R
@@ -0,0 +1,167 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("Structured Streaming")
+
+# Tests for Structured Streaming functions in SparkR
+
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
+
+jsonSubDir <- file.path("sparkr-test", "json", "")
+if (.Platform$OS.type == "windows") {
+  # file.path removes the empty separator on Windows, adds it back
+  jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep)
+}
+jsonDir <- file.path(tempdir(), jsonSubDir)
+dir.create(jsonDir, recursive = TRUE)
+
+mockLines <- c("{\"name\":\"Michael\"}",
+   "{\"name\":\"Andy\", \"age\":30}",
+   "{\"name\":\"Justin\", \"age\":19}")
+jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
+writeLines(mockLines, jsonPath)
+
+mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
+ "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
+ "{\"name\":\"David\",\"age\":60,\"height\":null}")
+jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
+
+schema <- structType(structField("name", "string"),
+ structField("age", "integer"),
+ structField("count", "double"))
+
+test_that("read.stream, write.stream, awaitTermination, stopQuery", {
+  skip_on_cran()
+
+  df <- read.stream("json", path = jsonDir, schema = schema, 
maxFilesPerTrigger = 1)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people", outputMode = 
"complete")
+
+  expect_false(awaitTermination(q, 5 * 1000))
+  callJMethod(q@ssq, "processAllAvailable")
+  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3)
+
+  writeLines(mockLinesNa, jsonPathNa)
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6)
+
+  stopQuery(q)
+  expect_true(awaitTermination(q, 1))
+  expect_error(awaitTermination(q), NA)
+})
+
+test_that("print from explain, lastProgress, status, isActive", {
+  skip_on_cran()
+
+  df <- read.stream("json", path = jsonDir, schema = schema)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people2", outputMode = 
"complete")
+
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+
+  expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==")
+  expect_true(any(grepl("\"description\" : \"MemorySink\"", 
capture.output(lastProgress(q)
+  expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q)
+
+  expect_equal(queryName(q), "people2")
+  expect_true(isActive(q))
+
+  stopQuery(q)
+})
+
+test_that("Stream other format", {
+  skip_on_cran()
+
+  parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
+  df <- read.df(jsonPath, "json", schema)
+  write.df(df, parquetPath, "parquet", "overwrite")
+
+  df <- read.stream(path = parquetPath, schema = schema)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people3", outputMode = 
"complete")
+
+  expect_false(awaitTermination(q, 5 * 1000))
+  callJMethod(q@ssq, "processAllAvailable")
+  expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
+
+  expect_equal(queryName(q), "people3")
+  expect_true(any(grepl("\"description\" : 
\"FileStreamSource[[:print:]]+parquet",
+  capture.output(lastProgress(q)
+  expect_true(isActive(q))
+
+  stopQuery(q)
+  expect_true(awaitTermination(q, 1))
+  expect_false(isActive(q))
+
+  unlink(parquetPath)
+})
+
+test_that("Non-streaming DataFrame", {
+  skip_on_cran()
+
+  c <- as.DataFrame(cars)
+ 

[5/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

2017-06-11 Thread felixcheung
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/inst/tests/testthat/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
deleted file mode 100644
index c790d02..000
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ /dev/null
@@ -1,3474 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(testthat)
-
-context("SparkSQL functions")
-
-# Utility function for easily checking the values of a StructField
-checkStructField <- function(actual, expectedName, expectedType, 
expectedNullable) {
-  expect_equal(class(actual), "structField")
-  expect_equal(actual$name(), expectedName)
-  expect_equal(actual$dataType.toString(), expectedType)
-  expect_equal(actual$nullable(), expectedNullable)
-}
-
-markUtf8 <- function(s) {
-  Encoding(s) <- "UTF-8"
-  s
-}
-
-setHiveContext <- function(sc) {
-  if (exists(".testHiveSession", envir = .sparkREnv)) {
-hiveSession <- get(".testHiveSession", envir = .sparkREnv)
-  } else {
-# initialize once and reuse
-ssc <- callJMethod(sc, "sc")
-hiveCtx <- tryCatch({
-  newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc, FALSE)
-},
-error = function(err) {
-  skip("Hive is not build with SparkSQL, skipped")
-})
-hiveSession <- callJMethod(hiveCtx, "sparkSession")
-  }
-  previousSession <- get(".sparkRsession", envir = .sparkREnv)
-  assign(".sparkRsession", hiveSession, envir = .sparkREnv)
-  assign(".prevSparkRsession", previousSession, envir = .sparkREnv)
-  hiveSession
-}
-
-unsetHiveContext <- function() {
-  previousSession <- get(".prevSparkRsession", envir = .sparkREnv)
-  assign(".sparkRsession", previousSession, envir = .sparkREnv)
-  remove(".prevSparkRsession", envir = .sparkREnv)
-}
-
-# Tests for SparkSQL functions in SparkR
-
-filesBefore <- list.files(path = sparkRDir, all.files = TRUE)
-sparkSession <- if (not_cran_or_windows_with_hadoop()) {
-sparkR.session(master = sparkRTestMaster)
-  } else {
-sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
-  }
-sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", 
"getJavaSparkContext", sparkSession)
-
-mockLines <- c("{\"name\":\"Michael\"}",
-   "{\"name\":\"Andy\", \"age\":30}",
-   "{\"name\":\"Justin\", \"age\":19}")
-jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
-parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
-orcPath <- tempfile(pattern = "sparkr-test", fileext = ".orc")
-writeLines(mockLines, jsonPath)
-
-# For test nafunctions, like dropna(), fillna(),...
-mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
- "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
- "{\"name\":\"David\",\"age\":60,\"height\":null}",
- "{\"name\":\"Amy\",\"age\":null,\"height\":null}",
- "{\"name\":null,\"age\":null,\"height\":null}")
-jsonPathNa <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
-writeLines(mockLinesNa, jsonPathNa)
-
-# For test complex types in DataFrame
-mockLinesComplexType <-
-  c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}",
-"{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}",
-"{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}")
-complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
-writeLines(mockLinesComplexType, complexTypeJsonPath)
-
-# For test map type and struct type in DataFrame
-mockLinesMapType <- 
c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
-  
"{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",
-  
"{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}")
-mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
-writeLines(mockLinesMapType, mapTypeJsonPath)
-
-if (.Platform$OS.type == "windows") {
-  Sys.setenv(TZ = "GMT")
-}
-
-test_that("calling sparkRSQL.init returns existing SQL context", {
-  skip_on_cran()
-
-  sqlContext <- suppressWarnings(sparkRSQL.init(sc))
-  expect

[2/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

2017-06-11 Thread felixcheung
http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/tests/fulltests/test_sparkSQL.R
--
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
new file mode 100644
index 000..d2d5191
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -0,0 +1,3198 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("SparkSQL functions")
+
+# Utility function for easily checking the values of a StructField
+checkStructField <- function(actual, expectedName, expectedType, 
expectedNullable) {
+  expect_equal(class(actual), "structField")
+  expect_equal(actual$name(), expectedName)
+  expect_equal(actual$dataType.toString(), expectedType)
+  expect_equal(actual$nullable(), expectedNullable)
+}
+
+markUtf8 <- function(s) {
+  Encoding(s) <- "UTF-8"
+  s
+}
+
+setHiveContext <- function(sc) {
+  if (exists(".testHiveSession", envir = .sparkREnv)) {
+hiveSession <- get(".testHiveSession", envir = .sparkREnv)
+  } else {
+# initialize once and reuse
+ssc <- callJMethod(sc, "sc")
+hiveCtx <- tryCatch({
+  newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc, FALSE)
+},
+error = function(err) {
+  skip("Hive is not build with SparkSQL, skipped")
+})
+hiveSession <- callJMethod(hiveCtx, "sparkSession")
+  }
+  previousSession <- get(".sparkRsession", envir = .sparkREnv)
+  assign(".sparkRsession", hiveSession, envir = .sparkREnv)
+  assign(".prevSparkRsession", previousSession, envir = .sparkREnv)
+  hiveSession
+}
+
+unsetHiveContext <- function() {
+  previousSession <- get(".prevSparkRsession", envir = .sparkREnv)
+  assign(".sparkRsession", previousSession, envir = .sparkREnv)
+  remove(".prevSparkRsession", envir = .sparkREnv)
+}
+
+# Tests for SparkSQL functions in SparkR
+
+filesBefore <- list.files(path = sparkRDir, all.files = TRUE)
+sparkSession <- if (not_cran_or_windows_with_hadoop()) {
+sparkR.session(master = sparkRTestMaster)
+  } else {
+sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+  }
+sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", 
"getJavaSparkContext", sparkSession)
+
+mockLines <- c("{\"name\":\"Michael\"}",
+   "{\"name\":\"Andy\", \"age\":30}",
+   "{\"name\":\"Justin\", \"age\":19}")
+jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
+parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
+orcPath <- tempfile(pattern = "sparkr-test", fileext = ".orc")
+writeLines(mockLines, jsonPath)
+
+# For test nafunctions, like dropna(), fillna(),...
+mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
+ "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
+ "{\"name\":\"David\",\"age\":60,\"height\":null}",
+ "{\"name\":\"Amy\",\"age\":null,\"height\":null}",
+ "{\"name\":null,\"age\":null,\"height\":null}")
+jsonPathNa <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
+writeLines(mockLinesNa, jsonPathNa)
+
+# For test complex types in DataFrame
+mockLinesComplexType <-
+  c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}",
+"{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}",
+"{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}")
+complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
+writeLines(mockLinesComplexType, complexTypeJsonPath)
+
+# For test map type and struct type in DataFrame
+mockLinesMapType <- 
c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
+  
"{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",
+  
"{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}")
+mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
+writeLines(mockLinesMapType, mapTypeJsonPath)
+
+if (.Platform$OS.type == "windows") {
+  Sys.setenv(TZ = "GMT")
+}
+
+test_that("calling sparkRSQL.init returns existing SQL context", {
+  skip_on_cran()
+
+  sqlContext <- suppressWarnings(sparkRSQL.init(sc))
+  expect_equal(suppressWarni

[7/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

2017-06-11 Thread felixcheung
[SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

## What changes were proposed in this pull request?

Move all existing tests to non-installed directory so that it will never run by 
installing SparkR package

For a follow-up PR:
- remove all skip_on_cran() calls in tests
- clean up test timer
- improve or change basic tests that do run on CRAN (if anyone has suggestion)

It looks like `R CMD build pkg` will still put pkg\tests (ie. the full tests) 
into the source package but `R CMD INSTALL` on such source package does not 
install these tests (and so `R CMD check` does not run them)

## How was this patch tested?

- [x] unit tests, Jenkins
- [x] AppVeyor
- [x] make a source package, install it, `R CMD check` it - verify the full 
tests are not installed or run

Author: Felix Cheung 

Closes #18264 from felixcheung/rtestset.

(cherry picked from commit dc4c351837879dab26ad8fb471dc51c06832a9e4)
Signed-off-by: Felix Cheung 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b0be47e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b0be47e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b0be47e

Branch: refs/heads/branch-2.2
Commit: 0b0be47e7b742d96810c60b19a9aa920242e5224
Parents: 815a082
Author: Felix Cheung 
Authored: Sun Jun 11 00:00:33 2017 -0700
Committer: Felix Cheung 
Committed: Sun Jun 11 00:00:45 2017 -0700

--
 R/pkg/inst/tests/testthat/jarTest.R |   32 -
 R/pkg/inst/tests/testthat/packageInAJarTest.R   |   30 -
 R/pkg/inst/tests/testthat/test_Serde.R  |   85 -
 R/pkg/inst/tests/testthat/test_Windows.R|   32 -
 R/pkg/inst/tests/testthat/test_basic.R  |   90 +
 R/pkg/inst/tests/testthat/test_binaryFile.R |  100 -
 .../inst/tests/testthat/test_binary_function.R  |  110 -
 R/pkg/inst/tests/testthat/test_broadcast.R  |   55 -
 R/pkg/inst/tests/testthat/test_client.R |   51 -
 R/pkg/inst/tests/testthat/test_context.R|  226 --
 R/pkg/inst/tests/testthat/test_includePackage.R |   64 -
 R/pkg/inst/tests/testthat/test_jvm_api.R|   36 -
 .../tests/testthat/test_mllib_classification.R  |  396 ---
 .../inst/tests/testthat/test_mllib_clustering.R |  328 --
 R/pkg/inst/tests/testthat/test_mllib_fpm.R  |   85 -
 .../tests/testthat/test_mllib_recommendation.R  |   67 -
 .../inst/tests/testthat/test_mllib_regression.R |  480 ---
 R/pkg/inst/tests/testthat/test_mllib_stat.R |   53 -
 R/pkg/inst/tests/testthat/test_mllib_tree.R |  226 --
 .../tests/testthat/test_parallelize_collect.R   |  120 -
 R/pkg/inst/tests/testthat/test_rdd.R|  906 -
 R/pkg/inst/tests/testthat/test_shuffle.R|  248 --
 R/pkg/inst/tests/testthat/test_sparkR.R |   48 -
 R/pkg/inst/tests/testthat/test_sparkSQL.R   | 3198 --
 R/pkg/inst/tests/testthat/test_streaming.R  |  167 -
 R/pkg/inst/tests/testthat/test_take.R   |   71 -
 R/pkg/inst/tests/testthat/test_textFile.R   |  182 -
 R/pkg/inst/tests/testthat/test_utils.R  |  247 --
 R/pkg/tests/fulltests/jarTest.R |   32 +
 R/pkg/tests/fulltests/packageInAJarTest.R   |   30 +
 R/pkg/tests/fulltests/test_Serde.R  |   85 +
 R/pkg/tests/fulltests/test_Windows.R|   32 +
 R/pkg/tests/fulltests/test_binaryFile.R |  100 +
 R/pkg/tests/fulltests/test_binary_function.R|  110 +
 R/pkg/tests/fulltests/test_broadcast.R  |   55 +
 R/pkg/tests/fulltests/test_client.R |   51 +
 R/pkg/tests/fulltests/test_context.R|  226 ++
 R/pkg/tests/fulltests/test_includePackage.R |   64 +
 R/pkg/tests/fulltests/test_jvm_api.R|   36 +
 .../tests/fulltests/test_mllib_classification.R |  396 +++
 R/pkg/tests/fulltests/test_mllib_clustering.R   |  328 ++
 R/pkg/tests/fulltests/test_mllib_fpm.R  |   85 +
 .../tests/fulltests/test_mllib_recommendation.R |   67 +
 R/pkg/tests/fulltests/test_mllib_regression.R   |  480 +++
 R/pkg/tests/fulltests/test_mllib_stat.R |   53 +
 R/pkg/tests/fulltests/test_mllib_tree.R |  226 ++
 .../tests/fulltests/test_parallelize_collect.R  |  120 +
 R/pkg/tests/fulltests/test_rdd.R|  906 +
 R/pkg/tests/fulltests/test_shuffle.R|  248 ++
 R/pkg/tests/fulltests/test_sparkR.R |   48 +
 R/pkg/tests/fulltests/test_sparkSQL.R   | 3198 ++
 R/pkg/tests/fulltests/test_streaming.R  |  167 +
 R/pkg/tests/fulltests/test_take.R   |   71 +
 R/pkg/tests/fulltests/test_textFile.R   |  182 +
 R/pkg/tests/fulltests/test_utils.R  |  247 ++
 R/pkg/tests/run-all.R   |8 +
 56 files changed, 7741 insertions(+), 7643 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/bl

[6/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

2017-06-11 Thread felixcheung
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/inst/tests/testthat/test_mllib_regression.R
--
diff --git a/R/pkg/inst/tests/testthat/test_mllib_regression.R 
b/R/pkg/inst/tests/testthat/test_mllib_regression.R
deleted file mode 100644
index b05fdd3..000
--- a/R/pkg/inst/tests/testthat/test_mllib_regression.R
+++ /dev/null
@@ -1,480 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(testthat)
-
-context("MLlib regression algorithms, except for tree-based algorithms")
-
-# Tests for MLlib regression algorithms in SparkR
-sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
-
-test_that("formula of spark.glm", {
-  skip_on_cran()
-
-  training <- suppressWarnings(createDataFrame(iris))
-  # directly calling the spark API
-  # dot minus and intercept vs native glm
-  model <- spark.glm(training, Sepal_Width ~ . - Species + 0)
-  vals <- collect(select(predict(model, training), "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
-  # feature interaction vs native glm
-  model <- spark.glm(training, Sepal_Width ~ Species:Sepal_Length)
-  vals <- collect(select(predict(model, training), "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
-  # glm should work with long formula
-  training <- suppressWarnings(createDataFrame(iris))
-  training$LongLongLongLongLongName <- training$Sepal_Width
-  training$VeryLongLongLongLonLongName <- training$Sepal_Length
-  training$AnotherLongLongLongLongName <- training$Species
-  model <- spark.glm(training, LongLongLongLongLongName ~ 
VeryLongLongLongLonLongName +
-AnotherLongLongLongLongName)
-  vals <- collect(select(predict(model, training), "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), 
iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-})
-
-test_that("spark.glm and predict", {
-  training <- suppressWarnings(createDataFrame(iris))
-  # gaussian family
-  model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species)
-  prediction <- predict(model, training)
-  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), 
"double")
-  vals <- collect(select(prediction, "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), 
iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
-  # poisson family
-  model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
- family = poisson(link = identity))
-  prediction <- predict(model, training)
-  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), 
"double")
-  vals <- collect(select(prediction, "prediction"))
-  rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species,
-data = iris, family = poisson(link = 
identity)), iris))
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
-  # Gamma family
-  x <- runif(100, -1, 1)
-  y <- rgamma(100, rate = 10 / exp(0.5 + 1.2 * x), shape = 10)
-  df <- as.DataFrame(as.data.frame(list(x = x, y = y)))
-  model <- glm(y ~ x, family = Gamma, df)
-  out <- capture.output(print(summary(model)))
-  expect_true(any(grepl("Dispersion parameter for gamma family", out)))
-
-  # tweedie family
-  model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
- family = "tweedie", var.power = 1.2, link.power = 0.0)
-  prediction <- predict(model, training)
-  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), 
"double")
-  vals <- collect(select(prediction, "prediction"))
-
-  # manual calculation of the R predicted values to avoid dependence on statmod
-  #' library(statmod)
-  #' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris,
-  #' family = tweedie(var.power = 1.2, link.power = 0.0))
-  #' print(coef(rModel))
-
-  rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174)
-  rVals <- exp(as.numeric(model.

[5/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

2017-06-11 Thread felixcheung
http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/inst/tests/testthat/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
deleted file mode 100644
index d2d5191..000
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ /dev/null
@@ -1,3198 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(testthat)
-
-context("SparkSQL functions")
-
-# Utility function for easily checking the values of a StructField
-checkStructField <- function(actual, expectedName, expectedType, 
expectedNullable) {
-  expect_equal(class(actual), "structField")
-  expect_equal(actual$name(), expectedName)
-  expect_equal(actual$dataType.toString(), expectedType)
-  expect_equal(actual$nullable(), expectedNullable)
-}
-
-markUtf8 <- function(s) {
-  Encoding(s) <- "UTF-8"
-  s
-}
-
-setHiveContext <- function(sc) {
-  if (exists(".testHiveSession", envir = .sparkREnv)) {
-hiveSession <- get(".testHiveSession", envir = .sparkREnv)
-  } else {
-# initialize once and reuse
-ssc <- callJMethod(sc, "sc")
-hiveCtx <- tryCatch({
-  newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc, FALSE)
-},
-error = function(err) {
-  skip("Hive is not build with SparkSQL, skipped")
-})
-hiveSession <- callJMethod(hiveCtx, "sparkSession")
-  }
-  previousSession <- get(".sparkRsession", envir = .sparkREnv)
-  assign(".sparkRsession", hiveSession, envir = .sparkREnv)
-  assign(".prevSparkRsession", previousSession, envir = .sparkREnv)
-  hiveSession
-}
-
-unsetHiveContext <- function() {
-  previousSession <- get(".prevSparkRsession", envir = .sparkREnv)
-  assign(".sparkRsession", previousSession, envir = .sparkREnv)
-  remove(".prevSparkRsession", envir = .sparkREnv)
-}
-
-# Tests for SparkSQL functions in SparkR
-
-filesBefore <- list.files(path = sparkRDir, all.files = TRUE)
-sparkSession <- if (not_cran_or_windows_with_hadoop()) {
-sparkR.session(master = sparkRTestMaster)
-  } else {
-sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
-  }
-sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", 
"getJavaSparkContext", sparkSession)
-
-mockLines <- c("{\"name\":\"Michael\"}",
-   "{\"name\":\"Andy\", \"age\":30}",
-   "{\"name\":\"Justin\", \"age\":19}")
-jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
-parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
-orcPath <- tempfile(pattern = "sparkr-test", fileext = ".orc")
-writeLines(mockLines, jsonPath)
-
-# For test nafunctions, like dropna(), fillna(),...
-mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
- "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
- "{\"name\":\"David\",\"age\":60,\"height\":null}",
- "{\"name\":\"Amy\",\"age\":null,\"height\":null}",
- "{\"name\":null,\"age\":null,\"height\":null}")
-jsonPathNa <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
-writeLines(mockLinesNa, jsonPathNa)
-
-# For test complex types in DataFrame
-mockLinesComplexType <-
-  c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}",
-"{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}",
-"{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}")
-complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
-writeLines(mockLinesComplexType, complexTypeJsonPath)
-
-# For test map type and struct type in DataFrame
-mockLinesMapType <- 
c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
-  
"{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",
-  
"{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}")
-mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
-writeLines(mockLinesMapType, mapTypeJsonPath)
-
-if (.Platform$OS.type == "windows") {
-  Sys.setenv(TZ = "GMT")
-}
-
-test_that("calling sparkRSQL.init returns existing SQL context", {
-  skip_on_cran()
-
-  sqlContext <- suppressWarnings(sparkRSQL.init(sc))
-  expect

[7/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

2017-06-11 Thread felixcheung
[SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

## What changes were proposed in this pull request?

Move all existing tests to non-installed directory so that it will never run by 
installing SparkR package

For a follow-up PR:
- remove all skip_on_cran() calls in tests
- clean up test timer
- improve or change basic tests that do run on CRAN (if anyone has suggestion)

It looks like `R CMD build pkg` will still put pkg\tests (ie. the full tests) 
into the source package but `R CMD INSTALL` on such source package does not 
install these tests (and so `R CMD check` does not run them)

## How was this patch tested?

- [x] unit tests, Jenkins
- [x] AppVeyor
- [x] make a source package, install it, `R CMD check` it - verify the full 
tests are not installed or run

Author: Felix Cheung 

Closes #18264 from felixcheung/rtestset.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc4c3518
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc4c3518
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc4c3518

Branch: refs/heads/master
Commit: dc4c351837879dab26ad8fb471dc51c06832a9e4
Parents: 5301a19
Author: Felix Cheung 
Authored: Sun Jun 11 00:00:33 2017 -0700
Committer: Felix Cheung 
Committed: Sun Jun 11 00:00:33 2017 -0700

--
 R/pkg/inst/tests/testthat/jarTest.R |   32 -
 R/pkg/inst/tests/testthat/packageInAJarTest.R   |   30 -
 R/pkg/inst/tests/testthat/test_Serde.R  |   85 -
 R/pkg/inst/tests/testthat/test_Windows.R|   32 -
 R/pkg/inst/tests/testthat/test_basic.R  |   90 +
 R/pkg/inst/tests/testthat/test_binaryFile.R |  100 -
 .../inst/tests/testthat/test_binary_function.R  |  110 -
 R/pkg/inst/tests/testthat/test_broadcast.R  |   55 -
 R/pkg/inst/tests/testthat/test_client.R |   51 -
 R/pkg/inst/tests/testthat/test_context.R|  226 --
 R/pkg/inst/tests/testthat/test_includePackage.R |   64 -
 R/pkg/inst/tests/testthat/test_jvm_api.R|   36 -
 .../tests/testthat/test_mllib_classification.R  |  396 --
 .../inst/tests/testthat/test_mllib_clustering.R |  328 --
 R/pkg/inst/tests/testthat/test_mllib_fpm.R  |   85 -
 .../tests/testthat/test_mllib_recommendation.R  |   67 -
 .../inst/tests/testthat/test_mllib_regression.R |  480 ---
 R/pkg/inst/tests/testthat/test_mllib_stat.R |   53 -
 R/pkg/inst/tests/testthat/test_mllib_tree.R |  320 --
 .../tests/testthat/test_parallelize_collect.R   |  120 -
 R/pkg/inst/tests/testthat/test_rdd.R|  906 -
 R/pkg/inst/tests/testthat/test_shuffle.R|  248 --
 R/pkg/inst/tests/testthat/test_sparkR.R |   48 -
 R/pkg/inst/tests/testthat/test_sparkSQL.R   | 3474 --
 R/pkg/inst/tests/testthat/test_streaming.R  |  167 -
 R/pkg/inst/tests/testthat/test_take.R   |   71 -
 R/pkg/inst/tests/testthat/test_textFile.R   |  182 -
 R/pkg/inst/tests/testthat/test_utils.R  |  248 --
 R/pkg/tests/fulltests/jarTest.R |   32 +
 R/pkg/tests/fulltests/packageInAJarTest.R   |   30 +
 R/pkg/tests/fulltests/test_Serde.R  |   85 +
 R/pkg/tests/fulltests/test_Windows.R|   32 +
 R/pkg/tests/fulltests/test_binaryFile.R |  100 +
 R/pkg/tests/fulltests/test_binary_function.R|  110 +
 R/pkg/tests/fulltests/test_broadcast.R  |   55 +
 R/pkg/tests/fulltests/test_client.R |   51 +
 R/pkg/tests/fulltests/test_context.R|  226 ++
 R/pkg/tests/fulltests/test_includePackage.R |   64 +
 R/pkg/tests/fulltests/test_jvm_api.R|   36 +
 .../tests/fulltests/test_mllib_classification.R |  396 ++
 R/pkg/tests/fulltests/test_mllib_clustering.R   |  328 ++
 R/pkg/tests/fulltests/test_mllib_fpm.R  |   85 +
 .../tests/fulltests/test_mllib_recommendation.R |   67 +
 R/pkg/tests/fulltests/test_mllib_regression.R   |  480 +++
 R/pkg/tests/fulltests/test_mllib_stat.R |   53 +
 R/pkg/tests/fulltests/test_mllib_tree.R |  320 ++
 .../tests/fulltests/test_parallelize_collect.R  |  120 +
 R/pkg/tests/fulltests/test_rdd.R|  906 +
 R/pkg/tests/fulltests/test_shuffle.R|  248 ++
 R/pkg/tests/fulltests/test_sparkR.R |   48 +
 R/pkg/tests/fulltests/test_sparkSQL.R   | 3474 ++
 R/pkg/tests/fulltests/test_streaming.R  |  167 +
 R/pkg/tests/fulltests/test_take.R   |   71 +
 R/pkg/tests/fulltests/test_textFile.R   |  182 +
 R/pkg/tests/fulltests/test_utils.R  |  248 ++
 R/pkg/tests/run-all.R   |8 +
 56 files changed, 8112 insertions(+), 8014 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/inst/tests/testthat/jarTest.R
-

[4/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

2017-06-11 Thread felixcheung
http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/inst/tests/testthat/test_streaming.R
--
diff --git a/R/pkg/inst/tests/testthat/test_streaming.R 
b/R/pkg/inst/tests/testthat/test_streaming.R
deleted file mode 100644
index b20b431..000
--- a/R/pkg/inst/tests/testthat/test_streaming.R
+++ /dev/null
@@ -1,167 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(testthat)
-
-context("Structured Streaming")
-
-# Tests for Structured Streaming functions in SparkR
-
-sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
-
-jsonSubDir <- file.path("sparkr-test", "json", "")
-if (.Platform$OS.type == "windows") {
-  # file.path removes the empty separator on Windows, adds it back
-  jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep)
-}
-jsonDir <- file.path(tempdir(), jsonSubDir)
-dir.create(jsonDir, recursive = TRUE)
-
-mockLines <- c("{\"name\":\"Michael\"}",
-   "{\"name\":\"Andy\", \"age\":30}",
-   "{\"name\":\"Justin\", \"age\":19}")
-jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
-writeLines(mockLines, jsonPath)
-
-mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
- "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
- "{\"name\":\"David\",\"age\":60,\"height\":null}")
-jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
-
-schema <- structType(structField("name", "string"),
- structField("age", "integer"),
- structField("count", "double"))
-
-test_that("read.stream, write.stream, awaitTermination, stopQuery", {
-  skip_on_cran()
-
-  df <- read.stream("json", path = jsonDir, schema = schema, 
maxFilesPerTrigger = 1)
-  expect_true(isStreaming(df))
-  counts <- count(group_by(df, "name"))
-  q <- write.stream(counts, "memory", queryName = "people", outputMode = 
"complete")
-
-  expect_false(awaitTermination(q, 5 * 1000))
-  callJMethod(q@ssq, "processAllAvailable")
-  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3)
-
-  writeLines(mockLinesNa, jsonPathNa)
-  awaitTermination(q, 5 * 1000)
-  callJMethod(q@ssq, "processAllAvailable")
-  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6)
-
-  stopQuery(q)
-  expect_true(awaitTermination(q, 1))
-  expect_error(awaitTermination(q), NA)
-})
-
-test_that("print from explain, lastProgress, status, isActive", {
-  skip_on_cran()
-
-  df <- read.stream("json", path = jsonDir, schema = schema)
-  expect_true(isStreaming(df))
-  counts <- count(group_by(df, "name"))
-  q <- write.stream(counts, "memory", queryName = "people2", outputMode = 
"complete")
-
-  awaitTermination(q, 5 * 1000)
-  callJMethod(q@ssq, "processAllAvailable")
-
-  expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==")
-  expect_true(any(grepl("\"description\" : \"MemorySink\"", 
capture.output(lastProgress(q)
-  expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q)
-
-  expect_equal(queryName(q), "people2")
-  expect_true(isActive(q))
-
-  stopQuery(q)
-})
-
-test_that("Stream other format", {
-  skip_on_cran()
-
-  parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
-  df <- read.df(jsonPath, "json", schema)
-  write.df(df, parquetPath, "parquet", "overwrite")
-
-  df <- read.stream(path = parquetPath, schema = schema)
-  expect_true(isStreaming(df))
-  counts <- count(group_by(df, "name"))
-  q <- write.stream(counts, "memory", queryName = "people3", outputMode = 
"complete")
-
-  expect_false(awaitTermination(q, 5 * 1000))
-  callJMethod(q@ssq, "processAllAvailable")
-  expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
-
-  expect_equal(queryName(q), "people3")
-  expect_true(any(grepl("\"description\" : 
\"FileStreamSource[[:print:]]+parquet",
-  capture.output(lastProgress(q)
-  expect_true(isActive(q))
-
-  stopQuery(q)
-  expect_true(awaitTermination(q, 1))
-  expect_false(isActive(q))
-
-  unlink(parquetPath)
-})
-
-test_that("Non-streaming DataFrame", {
-  skip_on_cran()
-
-  c <- as.DataFrame(cars)
-  expect_false(isStreaming(c))
-
-  expect_error(write.stream(c, "

[4/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

2017-06-11 Thread felixcheung
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/inst/tests/testthat/test_streaming.R
--
diff --git a/R/pkg/inst/tests/testthat/test_streaming.R 
b/R/pkg/inst/tests/testthat/test_streaming.R
deleted file mode 100644
index b20b431..000
--- a/R/pkg/inst/tests/testthat/test_streaming.R
+++ /dev/null
@@ -1,167 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(testthat)
-
-context("Structured Streaming")
-
-# Tests for Structured Streaming functions in SparkR
-
-sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
-
-jsonSubDir <- file.path("sparkr-test", "json", "")
-if (.Platform$OS.type == "windows") {
-  # file.path removes the empty separator on Windows, adds it back
-  jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep)
-}
-jsonDir <- file.path(tempdir(), jsonSubDir)
-dir.create(jsonDir, recursive = TRUE)
-
-mockLines <- c("{\"name\":\"Michael\"}",
-   "{\"name\":\"Andy\", \"age\":30}",
-   "{\"name\":\"Justin\", \"age\":19}")
-jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
-writeLines(mockLines, jsonPath)
-
-mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
- "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
- "{\"name\":\"David\",\"age\":60,\"height\":null}")
-jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
-
-schema <- structType(structField("name", "string"),
- structField("age", "integer"),
- structField("count", "double"))
-
-test_that("read.stream, write.stream, awaitTermination, stopQuery", {
-  skip_on_cran()
-
-  df <- read.stream("json", path = jsonDir, schema = schema, 
maxFilesPerTrigger = 1)
-  expect_true(isStreaming(df))
-  counts <- count(group_by(df, "name"))
-  q <- write.stream(counts, "memory", queryName = "people", outputMode = 
"complete")
-
-  expect_false(awaitTermination(q, 5 * 1000))
-  callJMethod(q@ssq, "processAllAvailable")
-  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3)
-
-  writeLines(mockLinesNa, jsonPathNa)
-  awaitTermination(q, 5 * 1000)
-  callJMethod(q@ssq, "processAllAvailable")
-  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6)
-
-  stopQuery(q)
-  expect_true(awaitTermination(q, 1))
-  expect_error(awaitTermination(q), NA)
-})
-
-test_that("print from explain, lastProgress, status, isActive", {
-  skip_on_cran()
-
-  df <- read.stream("json", path = jsonDir, schema = schema)
-  expect_true(isStreaming(df))
-  counts <- count(group_by(df, "name"))
-  q <- write.stream(counts, "memory", queryName = "people2", outputMode = 
"complete")
-
-  awaitTermination(q, 5 * 1000)
-  callJMethod(q@ssq, "processAllAvailable")
-
-  expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==")
-  expect_true(any(grepl("\"description\" : \"MemorySink\"", 
capture.output(lastProgress(q)
-  expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q)
-
-  expect_equal(queryName(q), "people2")
-  expect_true(isActive(q))
-
-  stopQuery(q)
-})
-
-test_that("Stream other format", {
-  skip_on_cran()
-
-  parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
-  df <- read.df(jsonPath, "json", schema)
-  write.df(df, parquetPath, "parquet", "overwrite")
-
-  df <- read.stream(path = parquetPath, schema = schema)
-  expect_true(isStreaming(df))
-  counts <- count(group_by(df, "name"))
-  q <- write.stream(counts, "memory", queryName = "people3", outputMode = 
"complete")
-
-  expect_false(awaitTermination(q, 5 * 1000))
-  callJMethod(q@ssq, "processAllAvailable")
-  expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
-
-  expect_equal(queryName(q), "people3")
-  expect_true(any(grepl("\"description\" : 
\"FileStreamSource[[:print:]]+parquet",
-  capture.output(lastProgress(q)
-  expect_true(isActive(q))
-
-  stopQuery(q)
-  expect_true(awaitTermination(q, 1))
-  expect_false(isActive(q))
-
-  unlink(parquetPath)
-})
-
-test_that("Non-streaming DataFrame", {
-  skip_on_cran()
-
-  c <- as.DataFrame(cars)
-  expect_false(isStreaming(c))
-
-  expect_error(write.stream(c, "

[1/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

2017-06-11 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master 5301a19a0 -> dc4c35183


http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_streaming.R
--
diff --git a/R/pkg/tests/fulltests/test_streaming.R 
b/R/pkg/tests/fulltests/test_streaming.R
new file mode 100644
index 000..b20b431
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_streaming.R
@@ -0,0 +1,167 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("Structured Streaming")
+
+# Tests for Structured Streaming functions in SparkR
+
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
+
+jsonSubDir <- file.path("sparkr-test", "json", "")
+if (.Platform$OS.type == "windows") {
+  # file.path removes the empty separator on Windows, adds it back
+  jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep)
+}
+jsonDir <- file.path(tempdir(), jsonSubDir)
+dir.create(jsonDir, recursive = TRUE)
+
+mockLines <- c("{\"name\":\"Michael\"}",
+   "{\"name\":\"Andy\", \"age\":30}",
+   "{\"name\":\"Justin\", \"age\":19}")
+jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
+writeLines(mockLines, jsonPath)
+
+mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
+ "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
+ "{\"name\":\"David\",\"age\":60,\"height\":null}")
+jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
+
+schema <- structType(structField("name", "string"),
+ structField("age", "integer"),
+ structField("count", "double"))
+
+test_that("read.stream, write.stream, awaitTermination, stopQuery", {
+  skip_on_cran()
+
+  df <- read.stream("json", path = jsonDir, schema = schema, 
maxFilesPerTrigger = 1)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people", outputMode = 
"complete")
+
+  expect_false(awaitTermination(q, 5 * 1000))
+  callJMethod(q@ssq, "processAllAvailable")
+  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3)
+
+  writeLines(mockLinesNa, jsonPathNa)
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6)
+
+  stopQuery(q)
+  expect_true(awaitTermination(q, 1))
+  expect_error(awaitTermination(q), NA)
+})
+
+test_that("print from explain, lastProgress, status, isActive", {
+  skip_on_cran()
+
+  df <- read.stream("json", path = jsonDir, schema = schema)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people2", outputMode = 
"complete")
+
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+
+  expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==")
+  expect_true(any(grepl("\"description\" : \"MemorySink\"", 
capture.output(lastProgress(q)
+  expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q)
+
+  expect_equal(queryName(q), "people2")
+  expect_true(isActive(q))
+
+  stopQuery(q)
+})
+
+test_that("Stream other format", {
+  skip_on_cran()
+
+  parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
+  df <- read.df(jsonPath, "json", schema)
+  write.df(df, parquetPath, "parquet", "overwrite")
+
+  df <- read.stream(path = parquetPath, schema = schema)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people3", outputMode = 
"complete")
+
+  expect_false(awaitTermination(q, 5 * 1000))
+  callJMethod(q@ssq, "processAllAvailable")
+  expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
+
+  expect_equal(queryName(q), "people3")
+  expect_true(any(grepl("\"description\" : 
\"FileStreamSource[[:print:]]+parquet",
+  capture.output(lastProgress(q)
+  expect_true(isActive(q))
+
+  stopQuery(q)
+  expect_true(awaitTermination(q, 1))
+  expect_false(isActive(q))
+
+  unlink(parquetPath)
+})
+
+test_that("Non-streaming DataFrame", {
+  skip_on_cran()
+
+  c <- as.DataFrame(cars)
+  exp

[2/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

2017-06-11 Thread felixcheung
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_sparkSQL.R
--
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
new file mode 100644
index 000..c790d02
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -0,0 +1,3474 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("SparkSQL functions")
+
+# Utility function for easily checking the values of a StructField
+checkStructField <- function(actual, expectedName, expectedType, 
expectedNullable) {
+  expect_equal(class(actual), "structField")
+  expect_equal(actual$name(), expectedName)
+  expect_equal(actual$dataType.toString(), expectedType)
+  expect_equal(actual$nullable(), expectedNullable)
+}
+
+markUtf8 <- function(s) {
+  Encoding(s) <- "UTF-8"
+  s
+}
+
+setHiveContext <- function(sc) {
+  if (exists(".testHiveSession", envir = .sparkREnv)) {
+hiveSession <- get(".testHiveSession", envir = .sparkREnv)
+  } else {
+# initialize once and reuse
+ssc <- callJMethod(sc, "sc")
+hiveCtx <- tryCatch({
+  newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc, FALSE)
+},
+error = function(err) {
+  skip("Hive is not build with SparkSQL, skipped")
+})
+hiveSession <- callJMethod(hiveCtx, "sparkSession")
+  }
+  previousSession <- get(".sparkRsession", envir = .sparkREnv)
+  assign(".sparkRsession", hiveSession, envir = .sparkREnv)
+  assign(".prevSparkRsession", previousSession, envir = .sparkREnv)
+  hiveSession
+}
+
+unsetHiveContext <- function() {
+  previousSession <- get(".prevSparkRsession", envir = .sparkREnv)
+  assign(".sparkRsession", previousSession, envir = .sparkREnv)
+  remove(".prevSparkRsession", envir = .sparkREnv)
+}
+
+# Tests for SparkSQL functions in SparkR
+
+filesBefore <- list.files(path = sparkRDir, all.files = TRUE)
+sparkSession <- if (not_cran_or_windows_with_hadoop()) {
+sparkR.session(master = sparkRTestMaster)
+  } else {
+sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)
+  }
+sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", 
"getJavaSparkContext", sparkSession)
+
+mockLines <- c("{\"name\":\"Michael\"}",
+   "{\"name\":\"Andy\", \"age\":30}",
+   "{\"name\":\"Justin\", \"age\":19}")
+jsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
+parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
+orcPath <- tempfile(pattern = "sparkr-test", fileext = ".orc")
+writeLines(mockLines, jsonPath)
+
+# For test nafunctions, like dropna(), fillna(),...
+mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
+ "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
+ "{\"name\":\"David\",\"age\":60,\"height\":null}",
+ "{\"name\":\"Amy\",\"age\":null,\"height\":null}",
+ "{\"name\":null,\"age\":null,\"height\":null}")
+jsonPathNa <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
+writeLines(mockLinesNa, jsonPathNa)
+
+# For test complex types in DataFrame
+mockLinesComplexType <-
+  c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}",
+"{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}",
+"{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}")
+complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
+writeLines(mockLinesComplexType, complexTypeJsonPath)
+
+# For test map type and struct type in DataFrame
+mockLinesMapType <- 
c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
+  
"{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",
+  
"{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}")
+mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
+writeLines(mockLinesMapType, mapTypeJsonPath)
+
+if (.Platform$OS.type == "windows") {
+  Sys.setenv(TZ = "GMT")
+}
+
+test_that("calling sparkRSQL.init returns existing SQL context", {
+  skip_on_cran()
+
+  sqlContext <- suppressWarnings(sparkRSQL.init(sc))
+  expect_equal(suppressWarni

[3/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

2017-06-11 Thread felixcheung
http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/tests/fulltests/test_mllib_fpm.R
--
diff --git a/R/pkg/tests/fulltests/test_mllib_fpm.R 
b/R/pkg/tests/fulltests/test_mllib_fpm.R
new file mode 100644
index 000..4e10ca1
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_mllib_fpm.R
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("MLlib frequent pattern mining")
+
+# Tests for MLlib frequent pattern mining algorithms in SparkR
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
+
+test_that("spark.fpGrowth", {
+  data <- selectExpr(createDataFrame(data.frame(items = c(
+"1,2",
+"1,2",
+"1,2,3",
+"1,3"
+  ))), "split(items, ',') as items")
+
+  model <- spark.fpGrowth(data, minSupport = 0.3, minConfidence = 0.8, 
numPartitions = 1)
+
+  itemsets <- collect(spark.freqItemsets(model))
+
+  expected_itemsets <- data.frame(
+items = I(list(list("3"), list("3", "1"), list("2"), list("2", "1"), 
list("1"))),
+freq = c(2, 2, 3, 3, 4)
+  )
+
+  expect_equivalent(expected_itemsets, itemsets)
+
+  expected_association_rules <- data.frame(
+antecedent = I(list(list("2"), list("3"))),
+consequent = I(list(list("1"), list("1"))),
+confidence = c(1, 1)
+  )
+
+  expect_equivalent(expected_association_rules, 
collect(spark.associationRules(model)))
+
+  new_data <- selectExpr(createDataFrame(data.frame(items = c(
+"1,2",
+"1,3",
+"2,3"
+  ))), "split(items, ',') as items")
+
+  expected_predictions <- data.frame(
+items = I(list(list("1", "2"), list("1", "3"), list("2", "3"))),
+prediction = I(list(list(), list(), list("1")))
+  )
+
+  expect_equivalent(expected_predictions, collect(predict(model, new_data)))
+
+  if (not_cran_or_windows_with_hadoop()) {
+modelPath <- tempfile(pattern = "spark-fpm", fileext = ".tmp")
+write.ml(model, modelPath, overwrite = TRUE)
+loaded_model <- read.ml(modelPath)
+
+expect_equivalent(
+  itemsets,
+  collect(spark.freqItemsets(loaded_model)))
+
+unlink(modelPath)
+  }
+
+  model_without_numpartitions <- spark.fpGrowth(data, minSupport = 0.3, 
minConfidence = 0.8)
+  expect_equal(
+count(spark.freqItemsets(model_without_numpartitions)),
+count(spark.freqItemsets(model))
+  )
+
+})
+
+sparkR.session.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/tests/fulltests/test_mllib_recommendation.R
--
diff --git a/R/pkg/tests/fulltests/test_mllib_recommendation.R 
b/R/pkg/tests/fulltests/test_mllib_recommendation.R
new file mode 100644
index 000..cc8064f
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_mllib_recommendation.R
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("MLlib recommendation algorithms")
+
+# Tests for MLlib recommendation algorithms in SparkR
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
+
+test_that("spark.als", {
+  data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 
4.0),
+   list(2, 1, 1.0), list(2, 2, 5.0))
+  df <- createDataFrame(data, c("user", "item", "score"))
+  model <- spark.als(df, ratingCol = "score", userCol = "user", itemCol = 
"item",
+ rank = 10, maxIter = 5, seed = 0, regParam = 0.1)
+  stats <-

[3/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

2017-06-11 Thread felixcheung
http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_mllib_fpm.R
--
diff --git a/R/pkg/tests/fulltests/test_mllib_fpm.R 
b/R/pkg/tests/fulltests/test_mllib_fpm.R
new file mode 100644
index 000..4e10ca1
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_mllib_fpm.R
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("MLlib frequent pattern mining")
+
+# Tests for MLlib frequent pattern mining algorithms in SparkR
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
+
+test_that("spark.fpGrowth", {
+  data <- selectExpr(createDataFrame(data.frame(items = c(
+"1,2",
+"1,2",
+"1,2,3",
+"1,3"
+  ))), "split(items, ',') as items")
+
+  model <- spark.fpGrowth(data, minSupport = 0.3, minConfidence = 0.8, 
numPartitions = 1)
+
+  itemsets <- collect(spark.freqItemsets(model))
+
+  expected_itemsets <- data.frame(
+items = I(list(list("3"), list("3", "1"), list("2"), list("2", "1"), 
list("1"))),
+freq = c(2, 2, 3, 3, 4)
+  )
+
+  expect_equivalent(expected_itemsets, itemsets)
+
+  expected_association_rules <- data.frame(
+antecedent = I(list(list("2"), list("3"))),
+consequent = I(list(list("1"), list("1"))),
+confidence = c(1, 1)
+  )
+
+  expect_equivalent(expected_association_rules, 
collect(spark.associationRules(model)))
+
+  new_data <- selectExpr(createDataFrame(data.frame(items = c(
+"1,2",
+"1,3",
+"2,3"
+  ))), "split(items, ',') as items")
+
+  expected_predictions <- data.frame(
+items = I(list(list("1", "2"), list("1", "3"), list("2", "3"))),
+prediction = I(list(list(), list(), list("1")))
+  )
+
+  expect_equivalent(expected_predictions, collect(predict(model, new_data)))
+
+  if (not_cran_or_windows_with_hadoop()) {
+modelPath <- tempfile(pattern = "spark-fpm", fileext = ".tmp")
+write.ml(model, modelPath, overwrite = TRUE)
+loaded_model <- read.ml(modelPath)
+
+expect_equivalent(
+  itemsets,
+  collect(spark.freqItemsets(loaded_model)))
+
+unlink(modelPath)
+  }
+
+  model_without_numpartitions <- spark.fpGrowth(data, minSupport = 0.3, 
minConfidence = 0.8)
+  expect_equal(
+count(spark.freqItemsets(model_without_numpartitions)),
+count(spark.freqItemsets(model))
+  )
+
+})
+
+sparkR.session.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_mllib_recommendation.R
--
diff --git a/R/pkg/tests/fulltests/test_mllib_recommendation.R 
b/R/pkg/tests/fulltests/test_mllib_recommendation.R
new file mode 100644
index 000..cc8064f
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_mllib_recommendation.R
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("MLlib recommendation algorithms")
+
+# Tests for MLlib recommendation algorithms in SparkR
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
+
+test_that("spark.als", {
+  data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 
4.0),
+   list(2, 1, 1.0), list(2, 2, 5.0))
+  df <- createDataFrame(data, c("user", "item", "score"))
+  model <- spark.als(df, ratingCol = "score", userCol = "user", itemCol = 
"item",
+ rank = 10, maxIter = 5, seed = 0, regParam = 0.1)
+  stats <-

[6/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

2017-06-11 Thread felixcheung
http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/inst/tests/testthat/test_mllib_regression.R
--
diff --git a/R/pkg/inst/tests/testthat/test_mllib_regression.R 
b/R/pkg/inst/tests/testthat/test_mllib_regression.R
deleted file mode 100644
index b05fdd3..000
--- a/R/pkg/inst/tests/testthat/test_mllib_regression.R
+++ /dev/null
@@ -1,480 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-library(testthat)
-
-context("MLlib regression algorithms, except for tree-based algorithms")
-
-# Tests for MLlib regression algorithms in SparkR
-sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
-
-test_that("formula of spark.glm", {
-  skip_on_cran()
-
-  training <- suppressWarnings(createDataFrame(iris))
-  # directly calling the spark API
-  # dot minus and intercept vs native glm
-  model <- spark.glm(training, Sepal_Width ~ . - Species + 0)
-  vals <- collect(select(predict(model, training), "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
-  # feature interaction vs native glm
-  model <- spark.glm(training, Sepal_Width ~ Species:Sepal_Length)
-  vals <- collect(select(predict(model, training), "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
-  # glm should work with long formula
-  training <- suppressWarnings(createDataFrame(iris))
-  training$LongLongLongLongLongName <- training$Sepal_Width
-  training$VeryLongLongLongLonLongName <- training$Sepal_Length
-  training$AnotherLongLongLongLongName <- training$Species
-  model <- spark.glm(training, LongLongLongLongLongName ~ 
VeryLongLongLongLonLongName +
-AnotherLongLongLongLongName)
-  vals <- collect(select(predict(model, training), "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), 
iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-})
-
-test_that("spark.glm and predict", {
-  training <- suppressWarnings(createDataFrame(iris))
-  # gaussian family
-  model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species)
-  prediction <- predict(model, training)
-  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), 
"double")
-  vals <- collect(select(prediction, "prediction"))
-  rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), 
iris)
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
-  # poisson family
-  model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
- family = poisson(link = identity))
-  prediction <- predict(model, training)
-  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), 
"double")
-  vals <- collect(select(prediction, "prediction"))
-  rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species,
-data = iris, family = poisson(link = 
identity)), iris))
-  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
-
-  # Gamma family
-  x <- runif(100, -1, 1)
-  y <- rgamma(100, rate = 10 / exp(0.5 + 1.2 * x), shape = 10)
-  df <- as.DataFrame(as.data.frame(list(x = x, y = y)))
-  model <- glm(y ~ x, family = Gamma, df)
-  out <- capture.output(print(summary(model)))
-  expect_true(any(grepl("Dispersion parameter for gamma family", out)))
-
-  # tweedie family
-  model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
- family = "tweedie", var.power = 1.2, link.power = 0.0)
-  prediction <- predict(model, training)
-  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), 
"double")
-  vals <- collect(select(prediction, "prediction"))
-
-  # manual calculation of the R predicted values to avoid dependence on statmod
-  #' library(statmod)
-  #' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris,
-  #' family = tweedie(var.power = 1.2, link.power = 0.0))
-  #' print(coef(rModel))
-
-  rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174)
-  rVals <- exp(as.numeric(model.