spark git commit: [SPARK-13996] [SQL] Add more not null attributes for Filter codegen

2016-04-02 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 1cf701834 -> c2f25b1a1


[SPARK-13996] [SQL] Add more not null attributes for Filter codegen

## What changes were proposed in this pull request?
JIRA: https://issues.apache.org/jira/browse/SPARK-13996

Filter codegen finds the attributes not null by checking IsNotNull(a) 
expression with a condition if child.output.contains(a). However, the current 
approach to checking it is not comprehensive. We can improve it.

E.g., for this plan:

val rdd = sqlContext.sparkContext.makeRDD(Seq(Row(1, "1"), Row(null, "1"), 
Row(2, "2")))
val schema = new StructType().add("k", IntegerType).add("v", StringType)
val smallDF = sqlContext.createDataFrame(rdd, schema)
val df = smallDF.filter("isnotnull(k + 1)")

The code snippet generated without this patch:

/* 031 */   protected void processNext() throws java.io.IOException {
/* 032 */ /*** PRODUCE: Filter isnotnull((k#0 + 1)) */
/* 033 */
/* 034 */ /*** PRODUCE: INPUT */
/* 035 */
/* 036 */ while (!shouldStop() && inputadapter_input.hasNext()) {
/* 037 */   InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
/* 038 */   /*** CONSUME: Filter isnotnull((k#0 + 1)) */
/* 039 */   /* input[0, int] */
/* 040 */   boolean filter_isNull = inputadapter_row.isNullAt(0);
/* 041 */   int filter_value = filter_isNull ? -1 : 
(inputadapter_row.getInt(0));
/* 042 */
/* 043 */   /* isnotnull((input[0, int] + 1)) */
/* 044 */   /* (input[0, int] + 1) */
/* 045 */   boolean filter_isNull3 = true;
/* 046 */   int filter_value3 = -1;
/* 047 */
/* 048 */   if (!filter_isNull) {
/* 049 */ filter_isNull3 = false; // resultCode could change 
nullability.
/* 050 */ filter_value3 = filter_value + 1;
/* 051 */
/* 052 */   }
/* 053 */   if (!(!(filter_isNull3))) continue;
/* 054 */
/* 055 */   filter_metricValue.add(1);

With this patch:

/* 031 */   protected void processNext() throws java.io.IOException {
/* 032 */ /*** PRODUCE: Filter isnotnull((k#0 + 1)) */
/* 033 */
/* 034 */ /*** PRODUCE: INPUT */
/* 035 */
/* 036 */ while (!shouldStop() && inputadapter_input.hasNext()) {
/* 037 */   InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
/* 038 */   /*** CONSUME: Filter isnotnull((k#0 + 1)) */
/* 039 */   /* input[0, int] */
/* 040 */   boolean filter_isNull = inputadapter_row.isNullAt(0);
/* 041 */   int filter_value = filter_isNull ? -1 : 
(inputadapter_row.getInt(0));
/* 042 */
/* 043 */   if (filter_isNull) continue;
/* 044 */
/* 045 */   filter_metricValue.add(1);

## How was this patch tested?

Existing tests.

Author: Liang-Chi Hsieh 

Closes #11810 from viirya/add-more-not-null-attrs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c2f25b1a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c2f25b1a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c2f25b1a

Branch: refs/heads/master
Commit: c2f25b1a148eeb1791ea7018b14b3a665c13212a
Parents: 1cf7018
Author: Liang-Chi Hsieh 
Authored: Sat Apr 2 19:34:38 2016 -0700
Committer: Davies Liu 
Committed: Sat Apr 2 19:34:38 2016 -0700

--
 .../org/apache/spark/sql/execution/basicOperators.scala  | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c2f25b1a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index a6a14df..fb1c618 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -79,12 +79,12 @@ case class Filter(condition: Expression, child: SparkPlan)
 
   // Split out all the IsNotNulls from condition.
   private val (notNullPreds, otherPreds) = 
splitConjunctivePredicates(condition).partition {
-case IsNotNull(a) if child.output.exists(_.semanticEquals(a)) => true
+case IsNotNull(a: NullIntolerant) if 
a.references.subsetOf(child.outputSet) => true
 case _ => false
   }
 
   // The columns that will filtered out by `IsNotNull` could be considered as 
not nullable.
-  private val notNullAttributes = notNullPreds.flatMap(_.references)
+  private val notNullAttributes = 

spark git commit: [SPARK-14056] Appends s3 specific configurations and spark.hadoop con…

2016-04-02 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 03d130f97 -> 1cf701834


[SPARK-14056] Appends s3 specific configurations and spark.hadoop con…

## What changes were proposed in this pull request?

Appends s3 specific configurations and spark.hadoop configurations to hive 
configuration.

## How was this patch tested?

Tested by running a job on cluster.

…figurations to hive configuration.

Author: Sital Kedia 

Closes #11876 from sitalkedia/hiveConf.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1cf70183
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1cf70183
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1cf70183

Branch: refs/heads/master
Commit: 1cf70183423b938ec064925b20fd4a5b9e355991
Parents: 03d130f
Author: Sital Kedia 
Authored: Sat Apr 2 19:17:25 2016 -0700
Committer: Sean Owen 
Committed: Sat Apr 2 19:17:25 2016 -0700

--
 .../apache/spark/deploy/SparkHadoopUtil.scala| 19 +--
 .../org/apache/spark/sql/hive/TableReader.scala  |  4 ++--
 2 files changed, 15 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1cf70183/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 06b7b38..4e8e363 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -74,13 +74,12 @@ class SparkHadoopUtil extends Logging {
 }
   }
 
-  /**
-   * Return an appropriate (subclass) of Configuration. Creating config can 
initializes some Hadoop
-   * subsystems.
-   */
-  def newConfiguration(conf: SparkConf): Configuration = {
-val hadoopConf = new Configuration()
 
+  /**
+* Appends S3-specific, spark.hadoop.*, and spark.buffer.size 
configurations to a Hadoop
+* configuration.
+*/
+  def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: 
Configuration): Unit = {
 // Note: this null check is around more than just access to the "conf" 
object to maintain
 // the behavior of the old implementation of this code, for backwards 
compatibility.
 if (conf != null) {
@@ -106,7 +105,15 @@ class SparkHadoopUtil extends Logging {
   val bufferSize = conf.get("spark.buffer.size", "65536")
   hadoopConf.set("io.file.buffer.size", bufferSize)
 }
+  }
 
+  /**
+* Return an appropriate (subclass) of Configuration. Creating config can 
initializes some Hadoop
+* subsystems.
+*/
+  def newConfiguration(conf: SparkConf): Configuration = {
+val hadoopConf = new Configuration()
+appendS3AndSparkHadoopConfigurations(conf, hadoopConf)
 hadoopConf
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1cf70183/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 80b24dc..54afe9c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -34,6 +34,7 @@ import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
 
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -74,8 +75,7 @@ class HadoopTableReader(
 math.max(sc.hiveconf.getInt("mapred.map.tasks", 1), 
sc.sparkContext.defaultMinPartitions)
   }
 
-  // TODO: set aws s3 credentials.
-
+  
SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sc.sparkContext.conf, 
hiveExtraConf)
   private val _broadcastedHiveConf =
 sc.sparkContext.broadcast(new SerializableConfiguration(hiveExtraConf))
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-14342][CORE][DOCS][TESTS] Remove straggler references to Tachyon

2016-04-02 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 4a6e78abd -> 03d130f97


[SPARK-14342][CORE][DOCS][TESTS] Remove straggler references to Tachyon

## What changes were proposed in this pull request?

Straggler references to Tachyon were removed:
- for docs, `tachyon` has been generalized as `off-heap memory`;
- for Mesos test suits, the key-value `tachyon:true`/`tachyon:false` has been 
changed to `os:centos`/`os:ubuntu`, since `os` is an example constrain used by 
the [Mesos official 
docs](http://mesos.apache.org/documentation/attributes-resources/).

## How was this patch tested?

Existing test suites.

Author: Liwei Lin 

Closes #12129 from lw-lin/tachyon-cleanup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03d130f9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03d130f9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03d130f9

Branch: refs/heads/master
Commit: 03d130f9734be66e8aefc4ffaa207ee13e837629
Parents: 4a6e78a
Author: Liwei Lin 
Authored: Sat Apr 2 17:55:46 2016 -0700
Committer: Reynold Xin 
Committed: Sat Apr 2 17:55:46 2016 -0700

--
 .../apache/spark/api/java/StorageLevels.java|  4 +--
 .../cluster/mesos/MesosSchedulerUtils.scala |  4 +--
 .../mesos/MesosSchedulerUtilsSuite.scala| 32 ++--
 docs/running-on-mesos.md|  4 +--
 docs/streaming-programming-guide.md |  2 +-
 python/pyspark/storagelevel.py  |  2 +-
 6 files changed, 24 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/03d130f9/core/src/main/java/org/apache/spark/api/java/StorageLevels.java
--
diff --git a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java 
b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java
index 666c797..23673d3 100644
--- a/core/src/main/java/org/apache/spark/api/java/StorageLevels.java
+++ b/core/src/main/java/org/apache/spark/api/java/StorageLevels.java
@@ -39,8 +39,8 @@ public class StorageLevels {
   /**
* Create a new StorageLevel object.
* @param useDisk saved to disk, if true
-   * @param useMemory saved to memory, if true
-   * @param useOffHeap saved to Tachyon, if true
+   * @param useMemory saved to on-heap memory, if true
+   * @param useOffHeap saved to off-heap memory, if true
* @param deserialized saved as deserialized objects, if true
* @param replication replication factor
*/

http://git-wip-us.apache.org/repos/asf/spark/blob/03d130f9/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 35f9143..233bdc2 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -283,11 +283,11 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
*  are separated by ':'. The ':' implies equality (for singular values) and 
"is one of" for
*  multiple values (comma separated). For example:
*  {{{
-   *  parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b")
+   *  parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b")
*  // would result in
*  
*  Map(
-   *"tachyon" -> Set("true"),
+   *"os" -> Set("centos7"),
*"zone":   -> Set("us-east-1a", "us-east-1b")
*  )
*  }}}

http://git-wip-us.apache.org/repos/asf/spark/blob/03d130f9/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
index 85437b2..ceb3a52 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -59,10 +59,10 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with 
Matchers with MockitoS
 
   test("parse a non-empty constraint string correctly") {
 val expectedMap = Map(
-  "tachyon" -> Set("true"),
+  "os" -> Set("centos7"),
   "zone" -> Set("us-east-1a", "us-east-1b")
 )
-utils.parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") 

[2/2] spark git commit: [MINOR][DOCS] Use multi-line JavaDoc comments in Scala code.

2016-04-02 Thread rxin
[MINOR][DOCS] Use multi-line JavaDoc comments in Scala code.

## What changes were proposed in this pull request?

This PR aims to fix all Scala-Style multiline comments into Java-Style 
multiline comments in Scala codes.
(All comment-only changes over 77 files: +786 lines, −747 lines)

## How was this patch tested?

Manual.

Author: Dongjoon Hyun 

Closes #12130 from dongjoon-hyun/use_multiine_javadoc_comments.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a6e78ab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a6e78ab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a6e78ab

Branch: refs/heads/master
Commit: 4a6e78abd9d5edc4a5092738dff0006bbe202a89
Parents: f705037
Author: Dongjoon Hyun 
Authored: Sat Apr 2 17:50:40 2016 -0700
Committer: Reynold Xin 
Committed: Sat Apr 2 17:50:40 2016 -0700

--
 .../scala/org/apache/spark/FutureAction.scala   |  14 +-
 .../scala/org/apache/spark/SSLOptions.scala |  57 +++---
 .../scala/org/apache/spark/SparkContext.scala   |  42 ++--
 .../org/apache/spark/api/java/JavaPairRDD.scala |   8 +-
 .../spark/api/java/JavaSparkContext.scala   |  60 +++---
 .../spark/deploy/worker/CommandUtils.scala  |   2 +-
 .../org/apache/spark/rdd/CoGroupedRDD.scala |  10 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |   4 +-
 .../mesos/CoarseMesosSchedulerBackend.scala |  24 +--
 .../cluster/mesos/MesosClusterScheduler.scala   |  12 +-
 .../cluster/mesos/MesosSchedulerUtils.scala |   4 +-
 .../apache/spark/shuffle/ShuffleManager.scala   |   6 +-
 .../spark/storage/memory/MemoryStore.scala  |  20 +-
 .../scala/org/apache/spark/util/Utils.scala |  17 +-
 .../test/scala/org/apache/spark/Smuggle.scala   |  46 ++---
 .../spark/memory/MemoryManagerSuite.scala   |  24 +--
 .../apache/spark/examples/BroadcastTest.scala   |   4 +-
 .../spark/examples/DFSReadWriteTest.scala   |  20 +-
 .../org/apache/spark/examples/GroupByTest.scala |   4 +-
 .../spark/examples/MultiBroadcastTest.scala |   4 +-
 .../examples/SimpleSkewedGroupByTest.scala  |   4 +-
 .../spark/examples/SkewedGroupByTest.scala  |   4 +-
 .../clickstream/PageViewGenerator.scala |  23 +--
 .../streaming/clickstream/PageViewStream.scala  |  21 +-
 .../streaming/flume/FlumeInputDStream.scala |  15 +-
 .../streaming/kafka/KafkaRDDPartition.scala |  15 +-
 .../org/apache/spark/graphx/GraphOps.scala  |  10 +-
 .../spark/graphx/lib/ConnectedComponents.scala  |  18 +-
 .../spark/ml/feature/ElementwiseProduct.scala   |   6 +-
 .../python/GaussianMixtureModelWrapper.scala|   8 +-
 .../mllib/api/python/Word2VecModelWrapper.scala |   4 +-
 .../apache/spark/mllib/linalg/Matrices.scala|  16 +-
 .../StreamingLinearRegressionWithSGD.scala  |   4 +-
 .../org/apache/spark/repl/SparkILoop.scala  |  21 +-
 .../org/apache/spark/repl/SparkImports.scala|   5 +-
 .../scala/org/apache/spark/sql/Encoder.scala|  24 +--
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  20 +-
 .../sql/catalyst/expressions/Projection.scala   |   6 +-
 .../expressions/codegen/CodeGenerator.scala |  26 +--
 .../sql/catalyst/expressions/grouping.scala |  18 +-
 .../spark/sql/catalyst/expressions/misc.scala   |   4 +-
 .../sql/catalyst/optimizer/Optimizer.scala  |  40 ++--
 .../spark/sql/catalyst/parser/AstBuilder.scala  |   4 +-
 .../spark/sql/catalyst/planning/patterns.scala  |  28 +--
 .../spark/sql/catalyst/plans/QueryPlan.scala|   4 +-
 .../catalyst/plans/physical/partitioning.scala  |   6 +-
 .../optimizer/OptimizerExtendableSuite.scala|  14 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |  14 +-
 .../spark/sql/execution/CacheManager.scala  |   7 +-
 .../apache/spark/sql/execution/SparkPlan.scala  |   4 +-
 .../spark/sql/execution/WholeStageCodegen.scala | 172 -
 .../org/apache/spark/sql/execution/Window.scala |  36 ++--
 .../aggregate/AggregationIterator.scala |  22 +--
 .../SortBasedAggregationIterator.scala  |   6 +-
 .../execution/aggregate/TungstenAggregate.scala |  16 +-
 .../execution/datasources/SqlNewHadoopRDD.scala |   8 +-
 .../datasources/csv/CSVInferSchema.scala|  22 +--
 .../datasources/csv/DefaultSource.scala |   4 +-
 .../spark/sql/execution/datasources/ddl.scala   |   8 +-
 .../sql/execution/joins/BroadcastHashJoin.scala |  22 +--
 .../sql/execution/joins/CartesianProduct.scala  |   8 +-
 .../spark/sql/execution/joins/HashJoin.scala|   8 +-
 .../sql/execution/joins/HashedRelation.scala|  76 
 .../sql/execution/joins/SortMergeJoin.scala |  36 ++--
 .../state/HDFSBackedStateStoreProvider.scala|   8 +-
 .../spark/sql/execution/ui/SparkPlanGraph.scala |   4 +-
 .../scala/org/apache/spark/sql/functions.scala  | 191 +--
 

[1/2] spark git commit: [MINOR][DOCS] Use multi-line JavaDoc comments in Scala code.

2016-04-02 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master f70503761 -> 4a6e78abd


http://git-wip-us.apache.org/repos/asf/spark/blob/4a6e78ab/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 9bdf611..9f539c4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -31,8 +31,8 @@ import 
org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
 import org.apache.spark.sql.internal.SQLConf
 
 /**
-  * An interface for those physical operators that support codegen.
-  */
+ * An interface for those physical operators that support codegen.
+ */
 trait CodegenSupport extends SparkPlan {
 
   /** Prefix used in the current operator's variable names. */
@@ -46,10 +46,10 @@ trait CodegenSupport extends SparkPlan {
   }
 
   /**
-* Creates a metric using the specified name.
-*
-* @return name of the variable representing the metric
-*/
+   * Creates a metric using the specified name.
+   *
+   * @return name of the variable representing the metric
+   */
   def metricTerm(ctx: CodegenContext, name: String): String = {
 val metric = ctx.addReferenceObj(name, longMetric(name))
 val value = ctx.freshName("metricValue")
@@ -59,25 +59,25 @@ trait CodegenSupport extends SparkPlan {
   }
 
   /**
-* Whether this SparkPlan support whole stage codegen or not.
-*/
+   * Whether this SparkPlan support whole stage codegen or not.
+   */
   def supportCodegen: Boolean = true
 
   /**
-* Which SparkPlan is calling produce() of this one. It's itself for the 
first SparkPlan.
-*/
+   * Which SparkPlan is calling produce() of this one. It's itself for the 
first SparkPlan.
+   */
   protected var parent: CodegenSupport = null
 
   /**
-* Returns all the RDDs of InternalRow which generates the input rows.
-*
-* Note: right now we support up to two RDDs.
-*/
+   * Returns all the RDDs of InternalRow which generates the input rows.
+   *
+   * Note: right now we support up to two RDDs.
+   */
   def upstreams(): Seq[RDD[InternalRow]]
 
   /**
-* Returns Java source code to process the rows from upstream.
-*/
+   * Returns Java source code to process the rows from upstream.
+   */
   final def produce(ctx: CodegenContext, parent: CodegenSupport): String = {
 this.parent = parent
 ctx.freshNamePrefix = variablePrefix
@@ -89,28 +89,28 @@ trait CodegenSupport extends SparkPlan {
   }
 
   /**
-* Generate the Java source code to process, should be overridden by 
subclass to support codegen.
-*
-* doProduce() usually generate the framework, for example, aggregation 
could generate this:
-*
-*   if (!initialized) {
-* # create a hash map, then build the aggregation hash map
-* # call child.produce()
-* initialized = true;
-*   }
-*   while (hashmap.hasNext()) {
-* row = hashmap.next();
-* # build the aggregation results
-* # create variables for results
-* # call consume(), which will call parent.doConsume()
+   * Generate the Java source code to process, should be overridden by 
subclass to support codegen.
+   *
+   * doProduce() usually generate the framework, for example, aggregation 
could generate this:
+   *
+   *   if (!initialized) {
+   * # create a hash map, then build the aggregation hash map
+   * # call child.produce()
+   * initialized = true;
+   *   }
+   *   while (hashmap.hasNext()) {
+   * row = hashmap.next();
+   * # build the aggregation results
+   * # create variables for results
+   * # call consume(), which will call parent.doConsume()
*  if (shouldStop()) return;
-*   }
-*/
+   *   }
+   */
   protected def doProduce(ctx: CodegenContext): String
 
   /**
-* Consume the generated columns or row from current SparkPlan, call it's 
parent's doConsume().
-*/
+   * Consume the generated columns or row from current SparkPlan, call it's 
parent's doConsume().
+   */
   final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: 
String = null): String = {
 val inputVars =
   if (row != null) {
@@ -158,9 +158,9 @@ trait CodegenSupport extends SparkPlan {
   }
 
   /**
-* Returns source code to evaluate all the variables, and clear the code of 
them, to prevent
-* them to be evaluated twice.
-*/
+   * Returns source code to evaluate all the variables, and clear the code of 
them, to prevent
+   * them to be evaluated twice.
+   */
   protected def evaluateVariables(variables: Seq[ExprCode]): String = {
 val evaluate = variables.filter(_.code != 

spark git commit: [SPARK-14338][SQL] Improve `SimplifyConditionals` rule to handle `null` in IF/CASEWHEN

2016-04-02 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master a3e293542 -> f70503761


[SPARK-14338][SQL] Improve `SimplifyConditionals` rule to handle `null` in 
IF/CASEWHEN

## What changes were proposed in this pull request?

Currently, `SimplifyConditionals` handles `true` and `false` to optimize 
branches. This PR improves `SimplifyConditionals` to take advantage of `null` 
conditions for `if` and `CaseWhen` expressions, too.

**Before**
```
scala> sql("SELECT IF(null, 1, 0)").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [if (null) 1 else 0 AS (IF(CAST(NULL AS BOOLEAN), 1, 0))#4]
: +- INPUT
+- Scan OneRowRelation[]
scala> sql("select case when cast(null as boolean) then 1 else 2 end").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [CASE WHEN null THEN 1 ELSE 2 END AS CASE WHEN CAST(NULL AS 
BOOLEAN) THEN 1 ELSE 2 END#14]
: +- INPUT
+- Scan OneRowRelation[]
```

**After**
```
scala> sql("SELECT IF(null, 1, 0)").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [0 AS (IF(CAST(NULL AS BOOLEAN), 1, 0))#4]
: +- INPUT
+- Scan OneRowRelation[]
scala> sql("select case when cast(null as boolean) then 1 else 2 end").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [2 AS CASE WHEN CAST(NULL AS BOOLEAN) THEN 1 ELSE 2 END#4]
: +- INPUT
+- Scan OneRowRelation[]
```

**Hive**
```
hive> select if(null,1,2);
OK
2
hive> select case when cast(null as boolean) then 1 else 2 end;
OK
2
```

## How was this patch tested?

Pass the Jenkins tests (including new extended test cases).

Author: Dongjoon Hyun 

Closes #12122 from dongjoon-hyun/SPARK-14338.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7050376
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7050376
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7050376

Branch: refs/heads/master
Commit: f705037617d55bb479ec60bcb1e55c736224be94
Parents: a3e2935
Author: Dongjoon Hyun 
Authored: Sat Apr 2 17:48:53 2016 -0700
Committer: Reynold Xin 
Committed: Sat Apr 2 17:48:53 2016 -0700

--
 .../spark/sql/catalyst/optimizer/Optimizer.scala| 13 ++---
 .../optimizer/SimplifyConditionalSuite.scala| 16 +++-
 2 files changed, 21 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f7050376/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 326933e..a5ab390 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -527,7 +527,7 @@ object LikeSimplification extends Rule[LogicalPlan] {
  * Null value propagation from bottom to top of the expression tree.
  */
 object NullPropagation extends Rule[LogicalPlan] {
-  def nonNullLiteral(e: Expression): Boolean = e match {
+  private def nonNullLiteral(e: Expression): Boolean = e match {
 case Literal(null, _) => false
 case _ => true
   }
@@ -773,17 +773,24 @@ object BooleanSimplification extends Rule[LogicalPlan] 
with PredicateHelper {
  * Simplifies conditional expressions (if / case).
  */
 object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper {
+  private def falseOrNullLiteral(e: Expression): Boolean = e match {
+case FalseLiteral => true
+case Literal(null, _) => true
+case _ => false
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 case q: LogicalPlan => q transformExpressionsUp {
   case If(TrueLiteral, trueValue, _) => trueValue
   case If(FalseLiteral, _, falseValue) => falseValue
+  case If(Literal(null, _), _, falseValue) => falseValue
 
-  case e @ CaseWhen(branches, elseValue) if branches.exists(_._1 == 
FalseLiteral) =>
+  case e @ CaseWhen(branches, elseValue) if branches.exists(x => 
falseOrNullLiteral(x._1)) =>
 // If there are branches that are always false, remove them.
 // If there are no more branches left, just use the else value.
 // Note that these two are handled together here in a single case 
statement because
 // otherwise we cannot determine the data type for the elseValue if it 
is None (i.e. null).
-val newBranches = branches.filter(_._1 != FalseLiteral)
+val newBranches = branches.filter(x => !falseOrNullLiteral(x._1))
 if (newBranches.isEmpty) {
   elseValue.getOrElse(Literal.create(null, e.dataType))
 } else {


spark git commit: [HOTFIX] Disable StateStoreSuite.maintenance

2016-04-02 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 06694f1c6 -> a3e293542


[HOTFIX] Disable StateStoreSuite.maintenance


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3e29354
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3e29354
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3e29354

Branch: refs/heads/master
Commit: a3e293542a6e7df9bcc7d9bbd22b3c93a81bcc38
Parents: 06694f1
Author: Reynold Xin 
Authored: Sat Apr 2 12:44:02 2016 -0700
Committer: Reynold Xin 
Committed: Sat Apr 2 12:44:02 2016 -0700

--
 .../spark/sql/execution/streaming/state/StateStoreSuite.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a3e29354/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 0e5936d..dd23925 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -352,7 +352,7 @@ class StateStoreSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateMeth
 }
   }
 
-  test("maintenance") {
+  ignore("maintenance") {
 val conf = new SparkConf()
   .setMaster("local")
   .setAppName("test")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR] Typo fixes

2016-04-02 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 67d753516 -> 06694f1c6


[MINOR] Typo fixes

## What changes were proposed in this pull request?

Typo fixes. No functional changes.

## How was this patch tested?

Built the sources and ran with samples.

Author: Jacek Laskowski 

Closes #11802 from jaceklaskowski/typo-fixes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06694f1c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06694f1c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06694f1c

Branch: refs/heads/master
Commit: 06694f1c68cb752ea311144f0dbe50e92e1393cf
Parents: 67d7535
Author: Jacek Laskowski 
Authored: Sat Apr 2 08:12:04 2016 -0700
Committer: Sean Owen 
Committed: Sat Apr 2 08:12:04 2016 -0700

--
 .../streaming/RecoverableNetworkWordCount.scala |  2 +-
 .../scala/org/apache/spark/ml/Pipeline.scala|  2 +-
 .../spark/ml/regression/LinearRegression.scala  |  2 +-
 .../catalyst/plans/logical/LogicalPlan.scala|  4 ++--
 .../apache/spark/sql/ExperimentalMethods.scala  |  2 +-
 .../sql/execution/joins/BroadcastHashJoin.scala |  2 +-
 .../scala/org/apache/spark/sql/functions.scala  | 12 +--
 .../spark/streaming/StreamingContext.scala  | 13 ++--
 .../dstream/ConstantInputDStream.scala  |  2 +-
 .../spark/streaming/dstream/DStream.scala   |  8 +++
 .../dstream/DStreamCheckpointData.scala |  6 +++---
 .../spark/streaming/dstream/InputDStream.scala  |  6 +++---
 .../dstream/ReducedWindowedDStream.scala|  2 +-
 .../spark/streaming/dstream/StateDStream.scala  | 12 +--
 .../scheduler/ReceivedBlockTracker.scala|  4 ++--
 .../scheduler/rate/RateEstimator.scala  | 22 +++-
 16 files changed, 52 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index 05f8e65..b6b8bc3 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -141,7 +141,7 @@ object RecoverableNetworkWordCount {
 
   def main(args: Array[String]) {
 if (args.length != 4) {
-  System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
+  System.err.println("Your arguments were " + args.mkString("[", ", ", 
"]"))
   System.err.println(
 """
   |Usage: RecoverableNetworkWordCount   


http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala 
b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
index 3a99979..afefaaa 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
@@ -147,7 +147,7 @@ class Pipeline @Since("1.4.0") (
 t
   case _ =>
 throw new IllegalArgumentException(
-  s"Do not support stage $stage of type ${stage.getClass}")
+  s"Does not support stage $stage of type ${stage.getClass}")
 }
 if (index < indexOfLastEstimator) {
   curDataset = transformer.transform(curDataset)

http://git-wip-us.apache.org/repos/asf/spark/blob/06694f1c/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala 
b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index ba5ad4c..2633c06 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -58,7 +58,7 @@ private[regression] trait LinearRegressionParams extends 
PredictorParams
  * The specific squared error loss function used is:
  *   L = 1/2n ||A coefficients - y||^2^
  *
- * This support multiple types of regularization:
+ * This supports multiple types of regularization:
  *  - none (a.k.a. ordinary least squares)
  *  - L2 (ridge regression)
  *  - L1 (Lasso)


spark git commit: [HOTFIX] Fix compilation break.

2016-04-02 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master d7982a3a9 -> 67d753516


[HOTFIX] Fix compilation break.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67d75351
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67d75351
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67d75351

Branch: refs/heads/master
Commit: 67d753516da9b6318cd4001bb7ae91703aaf098d
Parents: d7982a3
Author: Reynold Xin 
Authored: Sat Apr 2 00:00:19 2016 -0700
Committer: Reynold Xin 
Committed: Sat Apr 2 00:00:19 2016 -0700

--
 .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala | 1 +
 .../spark/sql/streaming/StreamingAggregationSuite.scala  | 8 +++-
 2 files changed, 4 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/67d75351/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index d5db9db..1328142 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -21,6 +21,7 @@ import java.io.{File, FileNotFoundException, IOException}
 import java.net.URI
 import java.util.ConcurrentModificationException
 
+import scala.language.implicitConversions
 import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration

http://git-wip-us.apache.org/repos/asf/spark/blob/67d75351/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index b63ce89..3af7c01 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -18,8 +18,9 @@
 package org.apache.spark.sql.streaming
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{Encoder, StreamTest, SumOf, TypedColumn}
+import org.apache.spark.sql.StreamTest
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.expressions.scala.typed
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 
@@ -118,11 +119,8 @@ class StreamingAggregationSuite extends StreamTest with 
SharedSQLContext {
   }
 
   test("typed aggregators") {
-def sum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N] =
-  new SumOf(f).toColumn
-
 val inputData = MemoryStream[(String, Int)]
-val aggregated = inputData.toDS().groupByKey(_._1).agg(sum(_._2))
+val aggregated = inputData.toDS().groupByKey(_._1).agg(typed.sumLong(_._2))
 
 testStream(aggregated)(
   AddData(inputData, ("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)),


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org