spark git commit: [SPARK-12030] Fix Platform.copyMemory to handle overlapping regions.

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 ab2a124c8 -> 1cf9d3858


[SPARK-12030] Fix Platform.copyMemory to handle overlapping regions.

This bug was exposed as memory corruption in Timsort which uses copyMemory to 
copy
large regions that can overlap. The prior implementation did not handle this 
case
half the time and always copied forward, resulting in the data being corrupt.

Author: Nong Li 

Closes #10068 from nongli/spark-12030.

(cherry picked from commit 2cef1cdfbb5393270ae83179b6a4e50c3cbf9e93)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1cf9d385
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1cf9d385
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1cf9d385

Branch: refs/heads/branch-1.6
Commit: 1cf9d3858c8a3a5796b64a9fbea22509f02d778a
Parents: ab2a124
Author: Nong Li 
Authored: Tue Dec 1 12:59:53 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 13:00:05 2015 -0800

--
 .../java/org/apache/spark/unsafe/Platform.java  | 27 +++--
 .../apache/spark/unsafe/PlatformUtilSuite.java  | 61 
 2 files changed, 82 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1cf9d385/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
--
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java 
b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index 1c16da9..0d6b215 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -107,12 +107,27 @@ public final class Platform {
 
   public static void copyMemory(
 Object src, long srcOffset, Object dst, long dstOffset, long length) {
-while (length > 0) {
-  long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
-  _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
-  length -= size;
-  srcOffset += size;
-  dstOffset += size;
+// Check if dstOffset is before or after srcOffset to determine if we 
should copy
+// forward or backwards. This is necessary in case src and dst overlap.
+if (dstOffset < srcOffset) {
+  while (length > 0) {
+long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
+_UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
+length -= size;
+srcOffset += size;
+dstOffset += size;
+  }
+} else {
+  srcOffset += length;
+  dstOffset += length;
+  while (length > 0) {
+long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
+srcOffset -= size;
+dstOffset -= size;
+_UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
+length -= size;
+  }
+
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1cf9d385/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
--
diff --git 
a/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java 
b/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
new file mode 100644
index 000..693ec6e
--- /dev/null
+++ b/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PlatformUtilSuite {
+
+  @Test
+  public void overlappingCopyMemory() {
+byte[] data = new byte[3 * 1024 * 1024];
+int size = 2 * 1024 * 1024;
+for (int i = 0; i < data.length; ++i) {
+  data[i] = (byte)i;
+}
+
+Platform.copyMemory(data, Platform.BYTE_ARRAY_OFFSET, data, 
Platform.BYTE_ARRAY_OFFSET, size);
+for (int i = 0; i < data.length; ++i) {
+  Assert.assertEquals((byte)i, data[i]);
+}
+
+Platform.copyMemory(
+data,
+ 

spark git commit: [SPARK-12030] Fix Platform.copyMemory to handle overlapping regions.

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 34e7093c1 -> 2cef1cdfb


[SPARK-12030] Fix Platform.copyMemory to handle overlapping regions.

This bug was exposed as memory corruption in Timsort which uses copyMemory to 
copy
large regions that can overlap. The prior implementation did not handle this 
case
half the time and always copied forward, resulting in the data being corrupt.

Author: Nong Li 

Closes #10068 from nongli/spark-12030.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2cef1cdf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2cef1cdf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2cef1cdf

Branch: refs/heads/master
Commit: 2cef1cdfbb5393270ae83179b6a4e50c3cbf9e93
Parents: 34e7093
Author: Nong Li 
Authored: Tue Dec 1 12:59:53 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 12:59:53 2015 -0800

--
 .../java/org/apache/spark/unsafe/Platform.java  | 27 +++--
 .../apache/spark/unsafe/PlatformUtilSuite.java  | 61 
 2 files changed, 82 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2cef1cdf/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
--
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java 
b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index 1c16da9..0d6b215 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -107,12 +107,27 @@ public final class Platform {
 
   public static void copyMemory(
 Object src, long srcOffset, Object dst, long dstOffset, long length) {
-while (length > 0) {
-  long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
-  _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
-  length -= size;
-  srcOffset += size;
-  dstOffset += size;
+// Check if dstOffset is before or after srcOffset to determine if we 
should copy
+// forward or backwards. This is necessary in case src and dst overlap.
+if (dstOffset < srcOffset) {
+  while (length > 0) {
+long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
+_UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
+length -= size;
+srcOffset += size;
+dstOffset += size;
+  }
+} else {
+  srcOffset += length;
+  dstOffset += length;
+  while (length > 0) {
+long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
+srcOffset -= size;
+dstOffset -= size;
+_UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
+length -= size;
+  }
+
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2cef1cdf/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
--
diff --git 
a/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java 
b/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
new file mode 100644
index 000..693ec6e
--- /dev/null
+++ b/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PlatformUtilSuite {
+
+  @Test
+  public void overlappingCopyMemory() {
+byte[] data = new byte[3 * 1024 * 1024];
+int size = 2 * 1024 * 1024;
+for (int i = 0; i < data.length; ++i) {
+  data[i] = (byte)i;
+}
+
+Platform.copyMemory(data, Platform.BYTE_ARRAY_OFFSET, data, 
Platform.BYTE_ARRAY_OFFSET, size);
+for (int i = 0; i < data.length; ++i) {
+  Assert.assertEquals((byte)i, data[i]);
+}
+
+Platform.copyMemory(
+data,
+Platform.BYTE_ARRAY_OFFSET + 1,
+data,
+Platform.BYTE_ARRAY_OFFSET,
+size);
+for (int i = 

spark git commit: [SPARK-12004] Preserve the RDD partitioner through RDD checkpointing

2015-12-01 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 1cf9d3858 -> 81db8d086


[SPARK-12004] Preserve the RDD partitioner through RDD checkpointing

The solution is the save the RDD partitioner in a separate file in the RDD 
checkpoint directory. That is, `/_partitioner`.  In most cases, 
whether the RDD partitioner was recovered or not, does not affect the 
correctness, only reduces performance. So this solution makes a best-effort 
attempt to save and recover the partitioner. If either fails, the checkpointing 
is not affected. This makes this patch safe and backward compatible.

Author: Tathagata Das 

Closes #9983 from tdas/SPARK-12004.

(cherry picked from commit 60b541ee1b97c9e5e84aa2af2ce856f316ad22b3)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81db8d08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81db8d08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81db8d08

Branch: refs/heads/branch-1.6
Commit: 81db8d086bbfe72caa0c45a395ebcaed80b5c237
Parents: 1cf9d38
Author: Tathagata Das 
Authored: Tue Dec 1 14:08:36 2015 -0800
Committer: Andrew Or 
Committed: Tue Dec 1 14:08:45 2015 -0800

--
 .../spark/rdd/ReliableCheckpointRDD.scala   | 122 ++-
 .../spark/rdd/ReliableRDDCheckpointData.scala   |  21 +---
 .../org/apache/spark/CheckpointSuite.scala  |  61 +-
 3 files changed, 173 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/81db8d08/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index a69be6a..fa71b8c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -20,12 +20,12 @@ package org.apache.spark.rdd
 import java.io.IOException
 
 import scala.reflect.ClassTag
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 /**
@@ -33,8 +33,9 @@ import org.apache.spark.util.{SerializableConfiguration, 
Utils}
  */
 private[spark] class ReliableCheckpointRDD[T: ClassTag](
 sc: SparkContext,
-val checkpointPath: String)
-  extends CheckpointRDD[T](sc) {
+val checkpointPath: String,
+_partitioner: Option[Partitioner] = None
+  ) extends CheckpointRDD[T](sc) {
 
   @transient private val hadoopConf = sc.hadoopConfiguration
   @transient private val cpath = new Path(checkpointPath)
@@ -47,7 +48,13 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
   /**
* Return the path of the checkpoint directory this RDD reads data from.
*/
-  override def getCheckpointFile: Option[String] = Some(checkpointPath)
+  override val getCheckpointFile: Option[String] = Some(checkpointPath)
+
+  override val partitioner: Option[Partitioner] = {
+_partitioner.orElse {
+  ReliableCheckpointRDD.readCheckpointedPartitionerFile(context, 
checkpointPath)
+}
+  }
 
   /**
* Return partitions described by the files in the checkpoint directory.
@@ -100,10 +107,52 @@ private[spark] object ReliableCheckpointRDD extends 
Logging {
 "part-%05d".format(partitionIndex)
   }
 
+  private def checkpointPartitionerFileName(): String = {
+"_partitioner"
+  }
+
+  /**
+   * Write RDD to checkpoint files and return a ReliableCheckpointRDD 
representing the RDD.
+   */
+  def writeRDDToCheckpointDirectory[T: ClassTag](
+  originalRDD: RDD[T],
+  checkpointDir: String,
+  blockSize: Int = -1): ReliableCheckpointRDD[T] = {
+
+val sc = originalRDD.sparkContext
+
+// Create the output path for the checkpoint
+val checkpointDirPath = new Path(checkpointDir)
+val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
+if (!fs.mkdirs(checkpointDirPath)) {
+  throw new SparkException(s"Failed to create checkpoint path 
$checkpointDirPath")
+}
+
+// Save to file, and reload it as an RDD
+val broadcastedConf = sc.broadcast(
+  new SerializableConfiguration(sc.hadoopConfiguration))
+// TODO: This is expensive because it computes the RDD again unnecessarily 
(SPARK-8582)
+sc.runJob(originalRDD,
+  writePartitionToCheckpointFile[T](checkpointDirPath.toString, 
broadcastedConf) _)
+
+if (originalRDD.partitioner.nonEmpty) {
+  writePartitionerToCheckpointDir(sc, 

spark git commit: [SPARK-12004] Preserve the RDD partitioner through RDD checkpointing

2015-12-01 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 2cef1cdfb -> 60b541ee1


[SPARK-12004] Preserve the RDD partitioner through RDD checkpointing

The solution is the save the RDD partitioner in a separate file in the RDD 
checkpoint directory. That is, `/_partitioner`.  In most cases, 
whether the RDD partitioner was recovered or not, does not affect the 
correctness, only reduces performance. So this solution makes a best-effort 
attempt to save and recover the partitioner. If either fails, the checkpointing 
is not affected. This makes this patch safe and backward compatible.

Author: Tathagata Das 

Closes #9983 from tdas/SPARK-12004.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60b541ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60b541ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60b541ee

Branch: refs/heads/master
Commit: 60b541ee1b97c9e5e84aa2af2ce856f316ad22b3
Parents: 2cef1cd
Author: Tathagata Das 
Authored: Tue Dec 1 14:08:36 2015 -0800
Committer: Andrew Or 
Committed: Tue Dec 1 14:08:36 2015 -0800

--
 .../spark/rdd/ReliableCheckpointRDD.scala   | 122 ++-
 .../spark/rdd/ReliableRDDCheckpointData.scala   |  21 +---
 .../org/apache/spark/CheckpointSuite.scala  |  61 +-
 3 files changed, 173 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/60b541ee/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index a69be6a..fa71b8c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -20,12 +20,12 @@ package org.apache.spark.rdd
 import java.io.IOException
 
 import scala.reflect.ClassTag
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 /**
@@ -33,8 +33,9 @@ import org.apache.spark.util.{SerializableConfiguration, 
Utils}
  */
 private[spark] class ReliableCheckpointRDD[T: ClassTag](
 sc: SparkContext,
-val checkpointPath: String)
-  extends CheckpointRDD[T](sc) {
+val checkpointPath: String,
+_partitioner: Option[Partitioner] = None
+  ) extends CheckpointRDD[T](sc) {
 
   @transient private val hadoopConf = sc.hadoopConfiguration
   @transient private val cpath = new Path(checkpointPath)
@@ -47,7 +48,13 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
   /**
* Return the path of the checkpoint directory this RDD reads data from.
*/
-  override def getCheckpointFile: Option[String] = Some(checkpointPath)
+  override val getCheckpointFile: Option[String] = Some(checkpointPath)
+
+  override val partitioner: Option[Partitioner] = {
+_partitioner.orElse {
+  ReliableCheckpointRDD.readCheckpointedPartitionerFile(context, 
checkpointPath)
+}
+  }
 
   /**
* Return partitions described by the files in the checkpoint directory.
@@ -100,10 +107,52 @@ private[spark] object ReliableCheckpointRDD extends 
Logging {
 "part-%05d".format(partitionIndex)
   }
 
+  private def checkpointPartitionerFileName(): String = {
+"_partitioner"
+  }
+
+  /**
+   * Write RDD to checkpoint files and return a ReliableCheckpointRDD 
representing the RDD.
+   */
+  def writeRDDToCheckpointDirectory[T: ClassTag](
+  originalRDD: RDD[T],
+  checkpointDir: String,
+  blockSize: Int = -1): ReliableCheckpointRDD[T] = {
+
+val sc = originalRDD.sparkContext
+
+// Create the output path for the checkpoint
+val checkpointDirPath = new Path(checkpointDir)
+val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
+if (!fs.mkdirs(checkpointDirPath)) {
+  throw new SparkException(s"Failed to create checkpoint path 
$checkpointDirPath")
+}
+
+// Save to file, and reload it as an RDD
+val broadcastedConf = sc.broadcast(
+  new SerializableConfiguration(sc.hadoopConfiguration))
+// TODO: This is expensive because it computes the RDD again unnecessarily 
(SPARK-8582)
+sc.runJob(originalRDD,
+  writePartitionToCheckpointFile[T](checkpointDirPath.toString, 
broadcastedConf) _)
+
+if (originalRDD.partitioner.nonEmpty) {
+  writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, 
checkpointDirPath)
+}
+
+val newRDD = new ReliableCheckpointRDD[T](
+  sc, 

spark git commit: [SPARK-11961][DOC] Add docs of ChiSqSelector

2015-12-01 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 21909b8ac -> 5647774b0


[SPARK-11961][DOC] Add docs of ChiSqSelector

https://issues.apache.org/jira/browse/SPARK-11961

Author: Xusen Yin 

Closes #9965 from yinxusen/SPARK-11961.

(cherry picked from commit e76431f886ae8061545b3216e8e2fb38c4db1f43)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5647774b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5647774b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5647774b

Branch: refs/heads/branch-1.6
Commit: 5647774b07593514f4ed4c29a038cfb1b69c9ba1
Parents: 21909b8
Author: Xusen Yin 
Authored: Tue Dec 1 15:21:53 2015 -0800
Committer: Joseph K. Bradley 
Committed: Tue Dec 1 15:22:04 2015 -0800

--
 docs/ml-features.md | 50 ++
 .../examples/ml/JavaChiSqSelectorExample.java   | 71 
 .../examples/ml/ChiSqSelectorExample.scala  | 57 
 3 files changed, 178 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5647774b/docs/ml-features.md
--
diff --git a/docs/ml-features.md b/docs/ml-features.md
index cd1838d..5f88877 100644
--- a/docs/ml-features.md
+++ b/docs/ml-features.md
@@ -1949,3 +1949,53 @@ output.select("features", "label").show()
 {% endhighlight %}
 
 
+
+## ChiSqSelector
+
+`ChiSqSelector` stands for Chi-Squared feature selection. It operates on 
labeled data with
+categorical features. ChiSqSelector orders features based on a
+[Chi-Squared test of 
independence](https://en.wikipedia.org/wiki/Chi-squared_test)
+from the class, and then filters (selects) the top features which the class 
label depends on the
+most. This is akin to yielding the features with the most predictive power.
+
+**Examples**
+
+Assume that we have a DataFrame with the columns `id`, `features`, and 
`clicked`, which is used as
+our target to be predicted:
+
+~~~
+id | features  | clicked
+---|---|-
+ 7 | [0.0, 0.0, 18.0, 1.0] | 1.0
+ 8 | [0.0, 1.0, 12.0, 0.0] | 0.0
+ 9 | [1.0, 0.0, 15.0, 0.1] | 0.0
+~~~
+
+If we use `ChiSqSelector` with a `numTopFeatures = 1`, then according to our 
label `clicked` the
+last column in our `features` chosen as the most useful feature:
+
+~~~
+id | features  | clicked | selectedFeatures
+---|---|-|--
+ 7 | [0.0, 0.0, 18.0, 1.0] | 1.0 | [1.0]
+ 8 | [0.0, 1.0, 12.0, 0.0] | 0.0 | [0.0]
+ 9 | [1.0, 0.0, 15.0, 0.1] | 0.0 | [0.1]
+~~~
+
+
+
+
+Refer to the [ChiSqSelector Scala 
docs](api/scala/index.html#org.apache.spark.ml.feature.ChiSqSelector)
+for more details on the API.
+
+{% include_example 
scala/org/apache/spark/examples/ml/ChiSqSelectorExample.scala %}
+
+
+
+
+Refer to the [ChiSqSelector Java 
docs](api/java/org/apache/spark/ml/feature/ChiSqSelector.html)
+for more details on the API.
+
+{% include_example 
java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java %}
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/5647774b/examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java
--
diff --git 
a/examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java
 
b/examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java
new file mode 100644
index 000..ede05d6
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.ml;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SQLContext;
+
+// $example on$
+import 

spark git commit: [SPARK-11961][DOC] Add docs of ChiSqSelector

2015-12-01 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 328b757d5 -> e76431f88


[SPARK-11961][DOC] Add docs of ChiSqSelector

https://issues.apache.org/jira/browse/SPARK-11961

Author: Xusen Yin 

Closes #9965 from yinxusen/SPARK-11961.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e76431f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e76431f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e76431f8

Branch: refs/heads/master
Commit: e76431f886ae8061545b3216e8e2fb38c4db1f43
Parents: 328b757
Author: Xusen Yin 
Authored: Tue Dec 1 15:21:53 2015 -0800
Committer: Joseph K. Bradley 
Committed: Tue Dec 1 15:21:53 2015 -0800

--
 docs/ml-features.md | 50 ++
 .../examples/ml/JavaChiSqSelectorExample.java   | 71 
 .../examples/ml/ChiSqSelectorExample.scala  | 57 
 3 files changed, 178 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e76431f8/docs/ml-features.md
--
diff --git a/docs/ml-features.md b/docs/ml-features.md
index cd1838d..5f88877 100644
--- a/docs/ml-features.md
+++ b/docs/ml-features.md
@@ -1949,3 +1949,53 @@ output.select("features", "label").show()
 {% endhighlight %}
 
 
+
+## ChiSqSelector
+
+`ChiSqSelector` stands for Chi-Squared feature selection. It operates on 
labeled data with
+categorical features. ChiSqSelector orders features based on a
+[Chi-Squared test of 
independence](https://en.wikipedia.org/wiki/Chi-squared_test)
+from the class, and then filters (selects) the top features which the class 
label depends on the
+most. This is akin to yielding the features with the most predictive power.
+
+**Examples**
+
+Assume that we have a DataFrame with the columns `id`, `features`, and 
`clicked`, which is used as
+our target to be predicted:
+
+~~~
+id | features  | clicked
+---|---|-
+ 7 | [0.0, 0.0, 18.0, 1.0] | 1.0
+ 8 | [0.0, 1.0, 12.0, 0.0] | 0.0
+ 9 | [1.0, 0.0, 15.0, 0.1] | 0.0
+~~~
+
+If we use `ChiSqSelector` with a `numTopFeatures = 1`, then according to our 
label `clicked` the
+last column in our `features` chosen as the most useful feature:
+
+~~~
+id | features  | clicked | selectedFeatures
+---|---|-|--
+ 7 | [0.0, 0.0, 18.0, 1.0] | 1.0 | [1.0]
+ 8 | [0.0, 1.0, 12.0, 0.0] | 0.0 | [0.0]
+ 9 | [1.0, 0.0, 15.0, 0.1] | 0.0 | [0.1]
+~~~
+
+
+
+
+Refer to the [ChiSqSelector Scala 
docs](api/scala/index.html#org.apache.spark.ml.feature.ChiSqSelector)
+for more details on the API.
+
+{% include_example 
scala/org/apache/spark/examples/ml/ChiSqSelectorExample.scala %}
+
+
+
+
+Refer to the [ChiSqSelector Java 
docs](api/java/org/apache/spark/ml/feature/ChiSqSelector.html)
+for more details on the API.
+
+{% include_example 
java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java %}
+
+

http://git-wip-us.apache.org/repos/asf/spark/blob/e76431f8/examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java
--
diff --git 
a/examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java
 
b/examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java
new file mode 100644
index 000..ede05d6
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.ml;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SQLContext;
+
+// $example on$
+import java.util.Arrays;
+
+import org.apache.spark.ml.feature.ChiSqSelector;
+import org.apache.spark.mllib.linalg.VectorUDT;
+import 

spark git commit: [SPARK-11788][SQL] surround timestamp/date value with quotes in JDBC data source

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 47a0abc34 -> 5a8b5fdd6


[SPARK-11788][SQL] surround timestamp/date value with quotes in JDBC data source

When query the Timestamp or Date column like the following
val filtered = jdbcdf.where($"TIMESTAMP_COLUMN" >= beg && $"TIMESTAMP_COLUMN" < 
end)
The generated SQL query is "TIMESTAMP_COLUMN >= 2015-01-01 00:00:00.0"
It should have quote around the Timestamp/Date value such as "TIMESTAMP_COLUMN 
>= '2015-01-01 00:00:00.0'"

Author: Huaxin Gao 

Closes #9872 from huaxingao/spark-11788.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a8b5fdd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a8b5fdd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a8b5fdd

Branch: refs/heads/master
Commit: 5a8b5fdd6ffa58f015cdadf3f2c6df78e0a388ad
Parents: 47a0abc
Author: Huaxin Gao 
Authored: Tue Dec 1 15:32:57 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 15:32:57 2015 -0800

--
 .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala   |  4 +++-
 .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 11 +++
 2 files changed, 14 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5a8b5fdd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 57a8a04..392d3ed 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
-import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, 
SQLException}
+import java.sql.{Connection, Date, DriverManager, ResultSet, 
ResultSetMetaData, SQLException, Timestamp}
 import java.util.Properties
 
 import scala.util.control.NonFatal
@@ -267,6 +267,8 @@ private[sql] class JDBCRDD(
*/
   private def compileValue(value: Any): Any = value match {
 case stringValue: String => s"'${escapeSql(stringValue)}'"
+case timestampValue: Timestamp => "'" + timestampValue + "'"
+case dateValue: Date => "'" + dateValue + "'"
 case _ => value
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a8b5fdd/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index d530b1a..8c24aa3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -484,4 +484,15 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter 
with SharedSQLContext
 assert(h2.getTableExistsQuery(table) == defaultQuery)
 assert(derby.getTableExistsQuery(table) == defaultQuery)
   }
+
+  test("Test DataFrame.where for Date and Timestamp") {
+// Regression test for bug SPARK-11788
+val timestamp = java.sql.Timestamp.valueOf("2001-02-20 11:22:33.543543");
+val date = java.sql.Date.valueOf("1995-01-01")
+val jdbcDf = sqlContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", 
new Properties)
+val rows = jdbcDf.where($"B" > date && $"C" > timestamp).collect()
+assert(rows(0).getAs[java.sql.Date](1) === 
java.sql.Date.valueOf("1996-01-01"))
+assert(rows(0).getAs[java.sql.Timestamp](2)
+  === java.sql.Timestamp.valueOf("2002-02-20 11:22:33.543543"))
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11328][SQL] Improve error message when hitting this issue

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master ef6790fdc -> 47a0abc34


[SPARK-11328][SQL] Improve error message when hitting this issue

The issue is that the output commiter is not idempotent and retry attempts will
fail because the output file already exists. It is not safe to clean up the file
as this output committer is by design not retryable. Currently, the job fails
with a confusing file exists error. This patch is a stop gap to tell the user
to look at the top of the error log for the proper message.

This is difficult to test locally as Spark is hardcoded not to retry. Manually
verified by upping the retry attempts.

Author: Nong Li 
Author: Nong Li 

Closes #10080 from nongli/spark-11328.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47a0abc3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47a0abc3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47a0abc3

Branch: refs/heads/master
Commit: 47a0abc343550c855e679de12983f43e6fcc0171
Parents: ef6790f
Author: Nong Li 
Authored: Tue Dec 1 15:30:21 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 15:30:21 2015 -0800

--
 .../execution/datasources/WriterContainer.scala | 22 ++--
 .../parquet/DirectParquetOutputCommitter.scala  |  3 ++-
 2 files changed, 22 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/47a0abc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 1b59b19..ad55367 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -124,6 +124,24 @@ private[sql] abstract class BaseWriterContainer(
 }
   }
 
+  protected def newOutputWriter(path: String): OutputWriter = {
+try {
+  outputWriterFactory.newInstance(path, dataSchema, taskAttemptContext)
+} catch {
+  case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
+if 
(outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) {
+  // Spark-11382: DirectParquetOutputCommitter is not idempotent, 
meaning on retry
+  // attempts, the task will fail because the output file is created 
from a prior attempt.
+  // This often means the most visible error to the user is 
misleading. Augment the error
+  // to tell the user to look for the actual error.
+  throw new SparkException("The output file already exists but this 
could be due to a " +
+"failure from an earlier attempt. Look through the earlier logs or 
stage page for " +
+"the first error.\n  File exists error: " + e)
+}
+throw e
+}
+  }
+
   private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter 
= {
 val defaultOutputCommitter = 
outputFormatClass.newInstance().getOutputCommitter(context)
 
@@ -234,7 +252,7 @@ private[sql] class DefaultWriterContainer(
 executorSideSetup(taskContext)
 val configuration = 
SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
 configuration.set("spark.sql.sources.output.path", outputPath)
-val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, 
taskAttemptContext)
+val writer = newOutputWriter(getWorkPath)
 writer.initConverter(dataSchema)
 
 var writerClosed = false
@@ -403,7 +421,7 @@ private[sql] class DynamicPartitionWriterContainer(
   val configuration = 
SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
   configuration.set(
 "spark.sql.sources.output.path", new Path(outputPath, 
partitionPath).toString)
-  val newWriter = outputWriterFactory.newInstance(path.toString, 
dataSchema, taskAttemptContext)
+  val newWriter = super.newOutputWriter(path.toString)
   newWriter.initConverter(dataSchema)
   newWriter
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/47a0abc3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
index 300e867..1a4e99f 100644
--- 

spark git commit: [SPARK-11352][SQL] Escape */ in the generated comments.

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 5a8b5fdd6 -> 5872a9d89


[SPARK-11352][SQL] Escape */ in the generated comments.

https://issues.apache.org/jira/browse/SPARK-11352

Author: Yin Huai 

Closes #10072 from yhuai/SPARK-11352.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5872a9d8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5872a9d8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5872a9d8

Branch: refs/heads/master
Commit: 5872a9d89fe2720c2bcb1fc7494136947a72581c
Parents: 5a8b5fd
Author: Yin Huai 
Authored: Tue Dec 1 16:24:04 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 16:24:04 2015 -0800

--
 .../spark/sql/catalyst/expressions/Expression.scala   | 10 --
 .../catalyst/expressions/codegen/CodegenFallback.scala|  2 +-
 .../sql/catalyst/expressions/CodeGenerationSuite.scala|  9 +
 3 files changed, 18 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5872a9d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index b55d365..4ee6542 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -95,7 +95,7 @@ abstract class Expression extends TreeNode[Expression] {
 ctx.subExprEliminationExprs.get(this).map { subExprState =>
   // This expression is repeated meaning the code to evaluated has already 
been added
   // as a function and called in advance. Just use it.
-  val code = s"/* $this */"
+  val code = s"/* ${this.toCommentSafeString} */"
   GeneratedExpressionCode(code, subExprState.isNull, subExprState.value)
 }.getOrElse {
   val isNull = ctx.freshName("isNull")
@@ -103,7 +103,7 @@ abstract class Expression extends TreeNode[Expression] {
   val ve = GeneratedExpressionCode("", isNull, primitive)
   ve.code = genCode(ctx, ve)
   // Add `this` in the comment.
-  ve.copy(s"/* $this */\n" + ve.code.trim)
+  ve.copy(s"/* ${this.toCommentSafeString} */\n" + ve.code.trim)
 }
   }
 
@@ -214,6 +214,12 @@ abstract class Expression extends TreeNode[Expression] {
   }
 
   override def toString: String = prettyName + flatArguments.mkString("(", 
",", ")")
+
+  /**
+   * Returns the string representation of this expression that is safe to be 
put in
+   * code comments of generated code.
+   */
+  protected def toCommentSafeString: String = this.toString.replace("*/", 
"\\*\\/")
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5872a9d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala
index a31574c..26fb143 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala
@@ -33,7 +33,7 @@ trait CodegenFallback extends Expression {
 ctx.references += this
 val objectTerm = ctx.freshName("obj")
 s"""
-  /* expression: ${this} */
+  /* expression: ${this.toCommentSafeString} */
   java.lang.Object $objectTerm = expressions[${ctx.references.size - 
1}].eval(${ctx.INPUT_ROW});
   boolean ${ev.isNull} = $objectTerm == null;
   ${ctx.javaType(this.dataType)} ${ev.value} = 
${ctx.defaultValue(this.dataType)};

http://git-wip-us.apache.org/repos/asf/spark/blob/5872a9d8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index 002ed16..fe75424 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -98,4 +98,13 @@ class CodeGenerationSuite extends SparkFunSuite 

spark git commit: [SPARK-11328][SQL] Improve error message when hitting this issue

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 d77bf0bd9 -> f1122dd2b


[SPARK-11328][SQL] Improve error message when hitting this issue

The issue is that the output commiter is not idempotent and retry attempts will
fail because the output file already exists. It is not safe to clean up the file
as this output committer is by design not retryable. Currently, the job fails
with a confusing file exists error. This patch is a stop gap to tell the user
to look at the top of the error log for the proper message.

This is difficult to test locally as Spark is hardcoded not to retry. Manually
verified by upping the retry attempts.

Author: Nong Li 
Author: Nong Li 

Closes #10080 from nongli/spark-11328.

(cherry picked from commit 47a0abc343550c855e679de12983f43e6fcc0171)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1122dd2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1122dd2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1122dd2

Branch: refs/heads/branch-1.6
Commit: f1122dd2bdc4c522a902b37bd34b46f785c21ecf
Parents: d77bf0b
Author: Nong Li 
Authored: Tue Dec 1 15:30:21 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 15:30:30 2015 -0800

--
 .../execution/datasources/WriterContainer.scala | 22 ++--
 .../parquet/DirectParquetOutputCommitter.scala  |  3 ++-
 2 files changed, 22 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f1122dd2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 1b59b19..ad55367 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -124,6 +124,24 @@ private[sql] abstract class BaseWriterContainer(
 }
   }
 
+  protected def newOutputWriter(path: String): OutputWriter = {
+try {
+  outputWriterFactory.newInstance(path, dataSchema, taskAttemptContext)
+} catch {
+  case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
+if 
(outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) {
+  // Spark-11382: DirectParquetOutputCommitter is not idempotent, 
meaning on retry
+  // attempts, the task will fail because the output file is created 
from a prior attempt.
+  // This often means the most visible error to the user is 
misleading. Augment the error
+  // to tell the user to look for the actual error.
+  throw new SparkException("The output file already exists but this 
could be due to a " +
+"failure from an earlier attempt. Look through the earlier logs or 
stage page for " +
+"the first error.\n  File exists error: " + e)
+}
+throw e
+}
+  }
+
   private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter 
= {
 val defaultOutputCommitter = 
outputFormatClass.newInstance().getOutputCommitter(context)
 
@@ -234,7 +252,7 @@ private[sql] class DefaultWriterContainer(
 executorSideSetup(taskContext)
 val configuration = 
SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
 configuration.set("spark.sql.sources.output.path", outputPath)
-val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, 
taskAttemptContext)
+val writer = newOutputWriter(getWorkPath)
 writer.initConverter(dataSchema)
 
 var writerClosed = false
@@ -403,7 +421,7 @@ private[sql] class DynamicPartitionWriterContainer(
   val configuration = 
SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
   configuration.set(
 "spark.sql.sources.output.path", new Path(outputPath, 
partitionPath).toString)
-  val newWriter = outputWriterFactory.newInstance(path.toString, 
dataSchema, taskAttemptContext)
+  val newWriter = super.newOutputWriter(path.toString)
   newWriter.initConverter(dataSchema)
   newWriter
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f1122dd2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
 

spark git commit: [SPARK-12075][SQL] Speed up HiveComparisionTest by avoiding / speeding up TestHive.reset()

2015-12-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master f292018f8 -> ef6790fdc


[SPARK-12075][SQL] Speed up HiveComparisionTest by avoiding / speeding up 
TestHive.reset()

When profiling HiveCompatibilitySuite, I noticed that most of the time seems to 
be spent in expensive `TestHive.reset()` calls. This patch speeds up suites 
based on HiveComparisionTest, such as HiveCompatibilitySuite, with the 
following changes:

- Avoid `TestHive.reset()` whenever possible:
  - Use a simple set of heuristics to guess whether we need to call `reset()` 
in between tests.
  - As a safety-net, automatically re-run failed tests by calling `reset()` 
before the re-attempt.
- Speed up the expensive parts of `TestHive.reset()`: loading the `src` and 
`srcpart` tables took roughly 600ms per test, so we now avoid this by using a 
simple heuristic which only loads those tables by tests that reference them. 
This is based on simple string matching over the test queries which errs on the 
side of loading in more situations than might be strictly necessary.

After these changes, HiveCompatibilitySuite seems to run in about 10 minutes.

This PR is a revival of #6663, an earlier experimental PR from June, where I 
played around with several possible speedups for this suite.

Author: Josh Rosen 

Closes #10055 from JoshRosen/speculative-testhive-reset.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef6790fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef6790fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef6790fd

Branch: refs/heads/master
Commit: ef6790fdc3b70b9d6184ec2b3d926e4b0e4b15f6
Parents: f292018
Author: Josh Rosen 
Authored: Wed Dec 2 07:29:45 2015 +0800
Committer: Reynold Xin 
Committed: Wed Dec 2 07:29:45 2015 +0800

--
 .../apache/spark/sql/hive/test/TestHive.scala   |  7 --
 .../sql/hive/execution/HiveComparisonTest.scala | 67 ++--
 .../sql/hive/execution/HiveQueryFileTest.scala  |  2 +-
 3 files changed, 62 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ef6790fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 6883d30..2e2d201 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -443,13 +443,6 @@ class TestHiveContext(sc: SparkContext) extends 
HiveContext(sc) {
   defaultOverrides()
 
   runSqlHive("USE default")
-
-  // Just loading src makes a lot of tests pass.  This is because some 
tests do something like
-  // drop an index on src at the beginning.  Since we just pass DDL to 
hive this bypasses our
-  // Analyzer and thus the test table auto-loading mechanism.
-  // Remove after we handle more DDL operations natively.
-  loadTestTable("src")
-  loadTestTable("srcpart")
 } catch {
   case e: Exception =>
 logError("FATAL ERROR: Failed to reset TestDB state.", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/ef6790fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index aa95ba9..4455430 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -209,7 +209,11 @@ abstract class HiveComparisonTest
   }
 
   val installHooksCommand = "(?i)SET.*hooks".r
-  def createQueryTest(testCaseName: String, sql: String, reset: Boolean = 
true) {
+  def createQueryTest(
+  testCaseName: String,
+  sql: String,
+  reset: Boolean = true,
+  tryWithoutResettingFirst: Boolean = false) {
 // testCaseName must not contain ':', which is not allowed to appear in a 
filename of Windows
 assert(!testCaseName.contains(":"))
 
@@ -240,9 +244,6 @@ abstract class HiveComparisonTest
 test(testCaseName) {
   logDebug(s"=== HIVE TEST: $testCaseName ===")
 
-  // Clear old output for this testcase.
-  outputDirectories.map(new File(_, 
testCaseName)).filter(_.exists()).foreach(_.delete())
-
   val sqlWithoutComment =
 sql.split("\n").filterNot(l => 
l.matches("--.*(?<=[^]);")).mkString("\n")
   val 

spark git commit: [SPARK-12075][SQL] Speed up HiveComparisionTest by avoiding / speeding up TestHive.reset()

2015-12-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 012de2ce5 -> d77bf0bd9


[SPARK-12075][SQL] Speed up HiveComparisionTest by avoiding / speeding up 
TestHive.reset()

When profiling HiveCompatibilitySuite, I noticed that most of the time seems to 
be spent in expensive `TestHive.reset()` calls. This patch speeds up suites 
based on HiveComparisionTest, such as HiveCompatibilitySuite, with the 
following changes:

- Avoid `TestHive.reset()` whenever possible:
  - Use a simple set of heuristics to guess whether we need to call `reset()` 
in between tests.
  - As a safety-net, automatically re-run failed tests by calling `reset()` 
before the re-attempt.
- Speed up the expensive parts of `TestHive.reset()`: loading the `src` and 
`srcpart` tables took roughly 600ms per test, so we now avoid this by using a 
simple heuristic which only loads those tables by tests that reference them. 
This is based on simple string matching over the test queries which errs on the 
side of loading in more situations than might be strictly necessary.

After these changes, HiveCompatibilitySuite seems to run in about 10 minutes.

This PR is a revival of #6663, an earlier experimental PR from June, where I 
played around with several possible speedups for this suite.

Author: Josh Rosen 

Closes #10055 from JoshRosen/speculative-testhive-reset.

(cherry picked from commit ef6790fdc3b70b9d6184ec2b3d926e4b0e4b15f6)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d77bf0bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d77bf0bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d77bf0bd

Branch: refs/heads/branch-1.6
Commit: d77bf0bd922835b6a63bb1eeedf91e2a92d92ca9
Parents: 012de2c
Author: Josh Rosen 
Authored: Wed Dec 2 07:29:45 2015 +0800
Committer: Reynold Xin 
Committed: Wed Dec 2 07:30:07 2015 +0800

--
 .../apache/spark/sql/hive/test/TestHive.scala   |  7 --
 .../sql/hive/execution/HiveComparisonTest.scala | 67 ++--
 .../sql/hive/execution/HiveQueryFileTest.scala  |  2 +-
 3 files changed, 62 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d77bf0bd/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 6883d30..2e2d201 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -443,13 +443,6 @@ class TestHiveContext(sc: SparkContext) extends 
HiveContext(sc) {
   defaultOverrides()
 
   runSqlHive("USE default")
-
-  // Just loading src makes a lot of tests pass.  This is because some 
tests do something like
-  // drop an index on src at the beginning.  Since we just pass DDL to 
hive this bypasses our
-  // Analyzer and thus the test table auto-loading mechanism.
-  // Remove after we handle more DDL operations natively.
-  loadTestTable("src")
-  loadTestTable("srcpart")
 } catch {
   case e: Exception =>
 logError("FATAL ERROR: Failed to reset TestDB state.", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/d77bf0bd/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index aa95ba9..4455430 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -209,7 +209,11 @@ abstract class HiveComparisonTest
   }
 
   val installHooksCommand = "(?i)SET.*hooks".r
-  def createQueryTest(testCaseName: String, sql: String, reset: Boolean = 
true) {
+  def createQueryTest(
+  testCaseName: String,
+  sql: String,
+  reset: Boolean = true,
+  tryWithoutResettingFirst: Boolean = false) {
 // testCaseName must not contain ':', which is not allowed to appear in a 
filename of Windows
 assert(!testCaseName.contains(":"))
 
@@ -240,9 +244,6 @@ abstract class HiveComparisonTest
 test(testCaseName) {
   logDebug(s"=== HIVE TEST: $testCaseName ===")
 
-  // Clear old output for this testcase.
-  outputDirectories.map(new File(_, 
testCaseName)).filter(_.exists()).foreach(_.delete())
-
  

spark git commit: [SPARK-11328][SQL] Improve error message when hitting this issue

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 80dac0b07 -> f28399e1a


[SPARK-11328][SQL] Improve error message when hitting this issue

The issue is that the output commiter is not idempotent and retry attempts will
fail because the output file already exists. It is not safe to clean up the file
as this output committer is by design not retryable. Currently, the job fails
with a confusing file exists error. This patch is a stop gap to tell the user
to look at the top of the error log for the proper message.

This is difficult to test locally as Spark is hardcoded not to retry. Manually
verified by upping the retry attempts.

Author: Nong Li 
Author: Nong Li 

Closes #10080 from nongli/spark-11328.

(cherry picked from commit 47a0abc343550c855e679de12983f43e6fcc0171)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f28399e1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f28399e1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f28399e1

Branch: refs/heads/branch-1.5
Commit: f28399e1ab6fd3a829d05ffbc637aa2c86cffdf2
Parents: 80dac0b
Author: Nong Li 
Authored: Tue Dec 1 15:30:21 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 15:59:09 2015 -0800

--
 .../execution/datasources/WriterContainer.scala | 22 ++--
 .../parquet/DirectParquetOutputCommitter.scala  |  3 ++-
 2 files changed, 22 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f28399e1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index c1599f1..6c39dba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -122,6 +122,24 @@ private[sql] abstract class BaseWriterContainer(
 }
   }
 
+  protected def newOutputWriter(path: String): OutputWriter = {
+try {
+  outputWriterFactory.newInstance(path, dataSchema, taskAttemptContext)
+} catch {
+  case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
+if 
(outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) {
+  // Spark-11382: DirectParquetOutputCommitter is not idempotent, 
meaning on retry
+  // attempts, the task will fail because the output file is created 
from a prior attempt.
+  // This often means the most visible error to the user is 
misleading. Augment the error
+  // to tell the user to look for the actual error.
+  throw new SparkException("The output file already exists but this 
could be due to a " +
+"failure from an earlier attempt. Look through the earlier logs or 
stage page for " +
+"the first error.\n  File exists error: " + e)
+}
+throw e
+}
+  }
+
   private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter 
= {
 val defaultOutputCommitter = 
outputFormatClass.newInstance().getOutputCommitter(context)
 
@@ -230,7 +248,7 @@ private[sql] class DefaultWriterContainer(
 executorSideSetup(taskContext)
 val configuration = 
SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
 configuration.set("spark.sql.sources.output.path", outputPath)
-val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, 
taskAttemptContext)
+val writer = newOutputWriter(getWorkPath)
 writer.initConverter(dataSchema)
 
 var writerClosed = false
@@ -400,7 +418,7 @@ private[sql] class DynamicPartitionWriterContainer(
   val configuration = 
SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
   configuration.set(
 "spark.sql.sources.output.path", new Path(outputPath, 
partitionPath).toString)
-  val newWriter = outputWriterFactory.newInstance(path.toString, 
dataSchema, taskAttemptContext)
+  val newWriter = super.newOutputWriter(path.toString)
   newWriter.initConverter(dataSchema)
   newWriter
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f28399e1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
 

spark git commit: Revert "[SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize"

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 81db8d086 -> 21909b8ac


Revert "[SPARK-12060][CORE] Avoid memory copy in 
JavaSerializerInstance.serialize"

This reverts commit 9b99b2b46c452ba396e922db5fc7eec02c45b158.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21909b8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21909b8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21909b8a

Branch: refs/heads/branch-1.6
Commit: 21909b8ac0068658cc833f324c0f1f418c200d61
Parents: 81db8d0
Author: Shixiong Zhu 
Authored: Tue Dec 1 15:16:07 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 1 15:16:07 2015 -0800

--
 .../spark/serializer/JavaSerializer.scala   |  7 +++--
 .../spark/util/ByteBufferOutputStream.scala | 31 
 2 files changed, 4 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/21909b8a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index ea718a0..b463a71 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -24,7 +24,8 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, 
Utils}
+import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.util.Utils
 
 private[spark] class JavaSerializationStream(
 out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
@@ -95,11 +96,11 @@ private[spark] class JavaSerializerInstance(
   extends SerializerInstance {
 
   override def serialize[T: ClassTag](t: T): ByteBuffer = {
-val bos = new ByteBufferOutputStream()
+val bos = new ByteArrayOutputStream()
 val out = serializeStream(bos)
 out.writeObject(t)
 out.close()
-bos.toByteBuffer
+ByteBuffer.wrap(bos.toByteArray)
   }
 
   override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {

http://git-wip-us.apache.org/repos/asf/spark/blob/21909b8a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala 
b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
deleted file mode 100644
index 92e4522..000
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.io.ByteArrayOutputStream
-import java.nio.ByteBuffer
-
-/**
- * Provide a zero-copy way to convert data in ByteArrayOutputStream to 
ByteBuffer
- */
-private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
-
-  def toByteBuffer: ByteBuffer = {
-return ByteBuffer.wrap(buf, 0, count)
-  }
-}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12030] Fix Platform.copyMemory to handle overlapping regions.

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 fc3fb8463 -> 7460e4309


[SPARK-12030] Fix Platform.copyMemory to handle overlapping regions.

This bug was exposed as memory corruption in Timsort which uses copyMemory to 
copy
large regions that can overlap. The prior implementation did not handle this 
case
half the time and always copied forward, resulting in the data being corrupt.

Author: Nong Li 

Closes #10068 from nongli/spark-12030.

(cherry picked from commit 2cef1cdfbb5393270ae83179b6a4e50c3cbf9e93)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7460e430
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7460e430
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7460e430

Branch: refs/heads/branch-1.5
Commit: 7460e430929473152230d70964a04a7ff834066c
Parents: fc3fb84
Author: Nong Li 
Authored: Tue Dec 1 12:59:53 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 16:11:25 2015 -0800

--
 .../java/org/apache/spark/unsafe/Platform.java  | 27 +++--
 .../apache/spark/unsafe/PlatformUtilSuite.java  | 61 
 2 files changed, 82 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7460e430/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
--
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java 
b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index 1c16da9..0d6b215 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -107,12 +107,27 @@ public final class Platform {
 
   public static void copyMemory(
 Object src, long srcOffset, Object dst, long dstOffset, long length) {
-while (length > 0) {
-  long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
-  _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
-  length -= size;
-  srcOffset += size;
-  dstOffset += size;
+// Check if dstOffset is before or after srcOffset to determine if we 
should copy
+// forward or backwards. This is necessary in case src and dst overlap.
+if (dstOffset < srcOffset) {
+  while (length > 0) {
+long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
+_UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
+length -= size;
+srcOffset += size;
+dstOffset += size;
+  }
+} else {
+  srcOffset += length;
+  dstOffset += length;
+  while (length > 0) {
+long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
+srcOffset -= size;
+dstOffset -= size;
+_UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
+length -= size;
+  }
+
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7460e430/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
--
diff --git 
a/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java 
b/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
new file mode 100644
index 000..693ec6e
--- /dev/null
+++ b/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PlatformUtilSuite {
+
+  @Test
+  public void overlappingCopyMemory() {
+byte[] data = new byte[3 * 1024 * 1024];
+int size = 2 * 1024 * 1024;
+for (int i = 0; i < data.length; ++i) {
+  data[i] = (byte)i;
+}
+
+Platform.copyMemory(data, Platform.BYTE_ARRAY_OFFSET, data, 
Platform.BYTE_ARRAY_OFFSET, size);
+for (int i = 0; i < data.length; ++i) {
+  Assert.assertEquals((byte)i, data[i]);
+}
+
+Platform.copyMemory(
+data,
+ 

spark git commit: Revert "[SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize"

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 60b541ee1 -> 328b757d5


Revert "[SPARK-12060][CORE] Avoid memory copy in 
JavaSerializerInstance.serialize"

This reverts commit 1401166576c7018c5f9c31e0a6703d5fb16ea339.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/328b757d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/328b757d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/328b757d

Branch: refs/heads/master
Commit: 328b757d5d4486ea3c2e246780792d7a57ee85e5
Parents: 60b541e
Author: Shixiong Zhu 
Authored: Tue Dec 1 15:13:10 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 1 15:13:10 2015 -0800

--
 .../spark/serializer/JavaSerializer.scala   |  7 +++--
 .../spark/util/ByteBufferOutputStream.scala | 31 
 2 files changed, 4 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/328b757d/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index ea718a0..b463a71 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -24,7 +24,8 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, 
Utils}
+import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.util.Utils
 
 private[spark] class JavaSerializationStream(
 out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
@@ -95,11 +96,11 @@ private[spark] class JavaSerializerInstance(
   extends SerializerInstance {
 
   override def serialize[T: ClassTag](t: T): ByteBuffer = {
-val bos = new ByteBufferOutputStream()
+val bos = new ByteArrayOutputStream()
 val out = serializeStream(bos)
 out.writeObject(t)
 out.close()
-bos.toByteBuffer
+ByteBuffer.wrap(bos.toByteArray)
   }
 
   override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {

http://git-wip-us.apache.org/repos/asf/spark/blob/328b757d/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala 
b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
deleted file mode 100644
index 92e4522..000
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.io.ByteArrayOutputStream
-import java.nio.ByteBuffer
-
-/**
- * Provide a zero-copy way to convert data in ByteArrayOutputStream to 
ByteBuffer
- */
-private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
-
-  def toByteBuffer: ByteBuffer = {
-return ByteBuffer.wrap(buf, 0, count)
-  }
-}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12002][STREAMING][PYSPARK] Fix python direct stream checkpoint recovery issue

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 5647774b0 -> 012de2ce5


[SPARK-12002][STREAMING][PYSPARK] Fix python direct stream checkpoint recovery 
issue

Fixed a minor race condition in #10017

Closes #10017

Author: jerryshao 
Author: Shixiong Zhu 

Closes #10074 from zsxwing/review-pr10017.

(cherry picked from commit f292018f8e57779debc04998456ec875f628133b)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/012de2ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/012de2ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/012de2ce

Branch: refs/heads/branch-1.6
Commit: 012de2ce5de01bc57197fa26334fc175c8f20233
Parents: 5647774
Author: jerryshao 
Authored: Tue Dec 1 15:26:10 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 1 15:26:20 2015 -0800

--
 python/pyspark/streaming/tests.py | 49 ++
 python/pyspark/streaming/util.py  | 13 -
 2 files changed, 56 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/012de2ce/python/pyspark/streaming/tests.py
--
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index d380d69..a2bfd79 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1150,6 +1150,55 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 self.assertNotEqual(topic_and_partition_a, topic_and_partition_d)
 
 @unittest.skipIf(sys.version >= "3", "long type not support")
+def test_kafka_direct_stream_transform_with_checkpoint(self):
+"""Test the Python direct Kafka stream transform with checkpoint 
correctly recovered."""
+topic = self._randomTopic()
+sendData = {"a": 1, "b": 2, "c": 3}
+kafkaParams = {"metadata.broker.list": 
self._kafkaTestUtils.brokerAddress(),
+   "auto.offset.reset": "smallest"}
+
+self._kafkaTestUtils.createTopic(topic)
+self._kafkaTestUtils.sendMessages(topic, sendData)
+
+offsetRanges = []
+
+def transformWithOffsetRanges(rdd):
+for o in rdd.offsetRanges():
+offsetRanges.append(o)
+return rdd
+
+self.ssc.stop(False)
+self.ssc = None
+tmpdir = "checkpoint-test-%d" % random.randint(0, 1)
+
+def setup():
+ssc = StreamingContext(self.sc, 0.5)
+ssc.checkpoint(tmpdir)
+stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
+stream.transform(transformWithOffsetRanges).count().pprint()
+return ssc
+
+try:
+ssc1 = StreamingContext.getOrCreate(tmpdir, setup)
+ssc1.start()
+self.wait_for(offsetRanges, 1)
+self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), 
long(6))])
+
+# To make sure some checkpoint is written
+time.sleep(3)
+ssc1.stop(False)
+ssc1 = None
+
+# Restart again to make sure the checkpoint is recovered correctly
+ssc2 = StreamingContext.getOrCreate(tmpdir, setup)
+ssc2.start()
+ssc2.awaitTermination(3)
+ssc2.stop(stopSparkContext=False, stopGraceFully=True)
+ssc2 = None
+finally:
+shutil.rmtree(tmpdir)
+
+@unittest.skipIf(sys.version >= "3", "long type not support")
 def test_kafka_rdd_message_handler(self):
 """Test Python direct Kafka RDD MessageHandler."""
 topic = self._randomTopic()

http://git-wip-us.apache.org/repos/asf/spark/blob/012de2ce/python/pyspark/streaming/util.py
--
diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py
index c7f02bc..abbbf6e 100644
--- a/python/pyspark/streaming/util.py
+++ b/python/pyspark/streaming/util.py
@@ -37,11 +37,11 @@ class TransformFunction(object):
 self.ctx = ctx
 self.func = func
 self.deserializers = deserializers
-self._rdd_wrapper = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser)
+self.rdd_wrap_func = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser)
 self.failure = None
 
 def rdd_wrapper(self, func):
-self._rdd_wrapper = func
+self.rdd_wrap_func = func
 return self
 
 def call(self, milliseconds, jrdds):
@@ -59,7 +59,7 @@ class TransformFunction(object):
 if len(sers) < len(jrdds):
 sers += (sers[0],) * (len(jrdds) - len(sers))
 
-rdds = [self._rdd_wrapper(jrdd, 

spark git commit: [SPARK-12002][STREAMING][PYSPARK] Fix python direct stream checkpoint recovery issue

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master e76431f88 -> f292018f8


[SPARK-12002][STREAMING][PYSPARK] Fix python direct stream checkpoint recovery 
issue

Fixed a minor race condition in #10017

Closes #10017

Author: jerryshao 
Author: Shixiong Zhu 

Closes #10074 from zsxwing/review-pr10017.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f292018f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f292018f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f292018f

Branch: refs/heads/master
Commit: f292018f8e57779debc04998456ec875f628133b
Parents: e76431f
Author: jerryshao 
Authored: Tue Dec 1 15:26:10 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 1 15:26:10 2015 -0800

--
 python/pyspark/streaming/tests.py | 49 ++
 python/pyspark/streaming/util.py  | 13 -
 2 files changed, 56 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f292018f/python/pyspark/streaming/tests.py
--
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index a647e6b..d50c6b8 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1150,6 +1150,55 @@ class KafkaStreamTests(PySparkStreamingTestCase):
 self.assertNotEqual(topic_and_partition_a, topic_and_partition_d)
 
 @unittest.skipIf(sys.version >= "3", "long type not support")
+def test_kafka_direct_stream_transform_with_checkpoint(self):
+"""Test the Python direct Kafka stream transform with checkpoint 
correctly recovered."""
+topic = self._randomTopic()
+sendData = {"a": 1, "b": 2, "c": 3}
+kafkaParams = {"metadata.broker.list": 
self._kafkaTestUtils.brokerAddress(),
+   "auto.offset.reset": "smallest"}
+
+self._kafkaTestUtils.createTopic(topic)
+self._kafkaTestUtils.sendMessages(topic, sendData)
+
+offsetRanges = []
+
+def transformWithOffsetRanges(rdd):
+for o in rdd.offsetRanges():
+offsetRanges.append(o)
+return rdd
+
+self.ssc.stop(False)
+self.ssc = None
+tmpdir = "checkpoint-test-%d" % random.randint(0, 1)
+
+def setup():
+ssc = StreamingContext(self.sc, 0.5)
+ssc.checkpoint(tmpdir)
+stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
+stream.transform(transformWithOffsetRanges).count().pprint()
+return ssc
+
+try:
+ssc1 = StreamingContext.getOrCreate(tmpdir, setup)
+ssc1.start()
+self.wait_for(offsetRanges, 1)
+self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), 
long(6))])
+
+# To make sure some checkpoint is written
+time.sleep(3)
+ssc1.stop(False)
+ssc1 = None
+
+# Restart again to make sure the checkpoint is recovered correctly
+ssc2 = StreamingContext.getOrCreate(tmpdir, setup)
+ssc2.start()
+ssc2.awaitTermination(3)
+ssc2.stop(stopSparkContext=False, stopGraceFully=True)
+ssc2 = None
+finally:
+shutil.rmtree(tmpdir)
+
+@unittest.skipIf(sys.version >= "3", "long type not support")
 def test_kafka_rdd_message_handler(self):
 """Test Python direct Kafka RDD MessageHandler."""
 topic = self._randomTopic()

http://git-wip-us.apache.org/repos/asf/spark/blob/f292018f/python/pyspark/streaming/util.py
--
diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py
index c7f02bc..abbbf6e 100644
--- a/python/pyspark/streaming/util.py
+++ b/python/pyspark/streaming/util.py
@@ -37,11 +37,11 @@ class TransformFunction(object):
 self.ctx = ctx
 self.func = func
 self.deserializers = deserializers
-self._rdd_wrapper = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser)
+self.rdd_wrap_func = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser)
 self.failure = None
 
 def rdd_wrapper(self, func):
-self._rdd_wrapper = func
+self.rdd_wrap_func = func
 return self
 
 def call(self, milliseconds, jrdds):
@@ -59,7 +59,7 @@ class TransformFunction(object):
 if len(sers) < len(jrdds):
 sers += (sers[0],) * (len(jrdds) - len(sers))
 
-rdds = [self._rdd_wrapper(jrdd, self.ctx, ser) if jrdd else None
+rdds = [self.rdd_wrap_func(jrdd, self.ctx, ser) if jrdd else None
 

spark git commit: [SPARK-11352][SQL] Escape */ in the generated comments.

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 1135430a0 -> 14eadf921


[SPARK-11352][SQL] Escape */ in the generated comments.

https://issues.apache.org/jira/browse/SPARK-11352

Author: Yin Huai 

Closes #10072 from yhuai/SPARK-11352.

(cherry picked from commit 5872a9d89fe2720c2bcb1fc7494136947a72581c)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14eadf92
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14eadf92
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14eadf92

Branch: refs/heads/branch-1.6
Commit: 14eadf921132219f5597d689ac1ffd6e938a939a
Parents: 1135430
Author: Yin Huai 
Authored: Tue Dec 1 16:24:04 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 16:24:14 2015 -0800

--
 .../spark/sql/catalyst/expressions/Expression.scala   | 10 --
 .../catalyst/expressions/codegen/CodegenFallback.scala|  2 +-
 .../sql/catalyst/expressions/CodeGenerationSuite.scala|  9 +
 3 files changed, 18 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/14eadf92/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index b55d365..4ee6542 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -95,7 +95,7 @@ abstract class Expression extends TreeNode[Expression] {
 ctx.subExprEliminationExprs.get(this).map { subExprState =>
   // This expression is repeated meaning the code to evaluated has already 
been added
   // as a function and called in advance. Just use it.
-  val code = s"/* $this */"
+  val code = s"/* ${this.toCommentSafeString} */"
   GeneratedExpressionCode(code, subExprState.isNull, subExprState.value)
 }.getOrElse {
   val isNull = ctx.freshName("isNull")
@@ -103,7 +103,7 @@ abstract class Expression extends TreeNode[Expression] {
   val ve = GeneratedExpressionCode("", isNull, primitive)
   ve.code = genCode(ctx, ve)
   // Add `this` in the comment.
-  ve.copy(s"/* $this */\n" + ve.code.trim)
+  ve.copy(s"/* ${this.toCommentSafeString} */\n" + ve.code.trim)
 }
   }
 
@@ -214,6 +214,12 @@ abstract class Expression extends TreeNode[Expression] {
   }
 
   override def toString: String = prettyName + flatArguments.mkString("(", 
",", ")")
+
+  /**
+   * Returns the string representation of this expression that is safe to be 
put in
+   * code comments of generated code.
+   */
+  protected def toCommentSafeString: String = this.toString.replace("*/", 
"\\*\\/")
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/14eadf92/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala
index a31574c..26fb143 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala
@@ -33,7 +33,7 @@ trait CodegenFallback extends Expression {
 ctx.references += this
 val objectTerm = ctx.freshName("obj")
 s"""
-  /* expression: ${this} */
+  /* expression: ${this.toCommentSafeString} */
   java.lang.Object $objectTerm = expressions[${ctx.references.size - 
1}].eval(${ctx.INPUT_ROW});
   boolean ${ev.isNull} = $objectTerm == null;
   ${ctx.javaType(this.dataType)} ${ev.value} = 
${ctx.defaultValue(this.dataType)};

http://git-wip-us.apache.org/repos/asf/spark/blob/14eadf92/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index 002ed16..fe75424 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ 

spark git commit: [SPARK-11788][SQL] surround timestamp/date value with quotes in JDBC data source

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 f1122dd2b -> 1135430a0


[SPARK-11788][SQL] surround timestamp/date value with quotes in JDBC data source

When query the Timestamp or Date column like the following
val filtered = jdbcdf.where($"TIMESTAMP_COLUMN" >= beg && $"TIMESTAMP_COLUMN" < 
end)
The generated SQL query is "TIMESTAMP_COLUMN >= 2015-01-01 00:00:00.0"
It should have quote around the Timestamp/Date value such as "TIMESTAMP_COLUMN 
>= '2015-01-01 00:00:00.0'"

Author: Huaxin Gao 

Closes #9872 from huaxingao/spark-11788.

(cherry picked from commit 5a8b5fdd6ffa58f015cdadf3f2c6df78e0a388ad)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1135430a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1135430a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1135430a

Branch: refs/heads/branch-1.6
Commit: 1135430a00dbe6516097dd3bc868ae865e8e644d
Parents: f1122dd
Author: Huaxin Gao 
Authored: Tue Dec 1 15:32:57 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 15:33:18 2015 -0800

--
 .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala   |  4 +++-
 .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 11 +++
 2 files changed, 14 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1135430a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index f9b7259..32f0889 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
-import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, 
SQLException}
+import java.sql.{Connection, Date, DriverManager, ResultSet, 
ResultSetMetaData, SQLException, Timestamp}
 import java.util.Properties
 
 import org.apache.commons.lang3.StringUtils
@@ -265,6 +265,8 @@ private[sql] class JDBCRDD(
*/
   private def compileValue(value: Any): Any = value match {
 case stringValue: String => s"'${escapeSql(stringValue)}'"
+case timestampValue: Timestamp => "'" + timestampValue + "'"
+case dateValue: Date => "'" + dateValue + "'"
 case _ => value
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1135430a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index d530b1a..8c24aa3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -484,4 +484,15 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter 
with SharedSQLContext
 assert(h2.getTableExistsQuery(table) == defaultQuery)
 assert(derby.getTableExistsQuery(table) == defaultQuery)
   }
+
+  test("Test DataFrame.where for Date and Timestamp") {
+// Regression test for bug SPARK-11788
+val timestamp = java.sql.Timestamp.valueOf("2001-02-20 11:22:33.543543");
+val date = java.sql.Date.valueOf("1995-01-01")
+val jdbcDf = sqlContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", 
new Properties)
+val rows = jdbcDf.where($"B" > date && $"C" > timestamp).collect()
+assert(rows(0).getAs[java.sql.Date](1) === 
java.sql.Date.valueOf("1996-01-01"))
+assert(rows(0).getAs[java.sql.Timestamp](2)
+  === java.sql.Timestamp.valueOf("2002-02-20 11:22:33.543543"))
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11788][SQL] surround timestamp/date value with quotes in JDBC data source

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 f28399e1a -> fc3fb8463


[SPARK-11788][SQL] surround timestamp/date value with quotes in JDBC data source

When query the Timestamp or Date column like the following
val filtered = jdbcdf.where($"TIMESTAMP_COLUMN" >= beg && $"TIMESTAMP_COLUMN" < 
end)
The generated SQL query is "TIMESTAMP_COLUMN >= 2015-01-01 00:00:00.0"
It should have quote around the Timestamp/Date value such as "TIMESTAMP_COLUMN 
>= '2015-01-01 00:00:00.0'"

Author: Huaxin Gao 

Closes #9872 from huaxingao/spark-11788.

(cherry picked from commit 5a8b5fdd6ffa58f015cdadf3f2c6df78e0a388ad)
Signed-off-by: Yin Huai 

Conflicts:
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc3fb846
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc3fb846
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc3fb846

Branch: refs/heads/branch-1.5
Commit: fc3fb84636ba5f84220e1f46036841f1cb61f1d0
Parents: f28399e
Author: Huaxin Gao 
Authored: Tue Dec 1 15:32:57 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 16:03:33 2015 -0800

--
 .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala   |  4 +++-
 .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 11 +++
 2 files changed, 14 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fc3fb846/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 018a009..a956ae6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.jdbc
 
-import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, 
SQLException}
+import java.sql.{Connection, Date, DriverManager, ResultSet, 
ResultSetMetaData, SQLException, Timestamp}
 import java.util.Properties
 
 import org.apache.commons.lang3.StringUtils
@@ -263,6 +263,8 @@ private[sql] class JDBCRDD(
*/
   private def compileValue(value: Any): Any = value match {
 case stringValue: String => s"'${escapeSql(stringValue)}'"
+case timestampValue: Timestamp => "'" + timestampValue + "'"
+case dateValue: Date => "'" + dateValue + "'"
 case _ => value
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fc3fb846/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 0edac08..6fc0c6b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -443,4 +443,15 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter 
with SharedSQLContext
 assert(agg.getCatalystType(0, "", 1, null) === Some(LongType))
 assert(agg.getCatalystType(1, "", 1, null) === Some(StringType))
   }
+
+  test("Test DataFrame.where for Date and Timestamp") {
+// Regression test for bug SPARK-11788
+val timestamp = java.sql.Timestamp.valueOf("2001-02-20 11:22:33.543543");
+val date = java.sql.Date.valueOf("1995-01-01")
+val jdbcDf = sqlContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", 
new Properties)
+val rows = jdbcDf.where($"B" > date && $"C" > timestamp).collect()
+assert(rows(0).getAs[java.sql.Date](1) === 
java.sql.Date.valueOf("1996-01-01"))
+assert(rows(0).getAs[java.sql.Timestamp](2)
+  === java.sql.Timestamp.valueOf("2002-02-20 11:22:33.543543"))
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11596][SQL] In TreeNode's argString, if a TreeNode is not a child of the current TreeNode, we should only return the simpleString.

2015-12-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 5872a9d89 -> e96a70d5a


[SPARK-11596][SQL] In TreeNode's argString, if a TreeNode is not a child of the 
current TreeNode, we should only return the simpleString.

In TreeNode's argString, if a TreeNode is not a child of the current TreeNode, 
we will only return the simpleString.

I tested the [following case provided by 
Cristian](https://issues.apache.org/jira/browse/SPARK-11596?focusedCommentId=15019241=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15019241).
```
val c = (1 to 20).foldLeft[Option[DataFrame]] (None) { (curr, idx) =>
println(s"PROCESSING >>> $idx")
val df = sqlContext.sparkContext.parallelize((0 to 
10).zipWithIndex).toDF("A", "B")
val union = curr.map(_.unionAll(df)).getOrElse(df)
union.cache()
Some(union)
  }

c.get.explain(true)
```

Without the change, `c.get.explain(true)` took 100s. With the change, 
`c.get.explain(true)` took 26ms.

https://issues.apache.org/jira/browse/SPARK-11596

Author: Yin Huai 

Closes #10079 from yhuai/SPARK-11596.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e96a70d5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e96a70d5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e96a70d5

Branch: refs/heads/master
Commit: e96a70d5ab2e2b43a2df17a550fa9ed2ee0001c4
Parents: 5872a9d
Author: Yin Huai 
Authored: Tue Dec 1 17:18:45 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Dec 1 17:18:45 2015 -0800

--
 .../main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e96a70d5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index f1cea07..ad2bd78 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -380,7 +380,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
   /** Returns a string representing the arguments to this node, minus any 
children */
   def argString: String = productIterator.flatMap {
 case tn: TreeNode[_] if containsChild(tn) => Nil
-case tn: TreeNode[_] if tn.toString contains "\n" => 
s"(${tn.simpleString})" :: Nil
+case tn: TreeNode[_] => s"(${tn.simpleString})" :: Nil
 case seq: Seq[BaseType] if seq.toSet.subsetOf(children.toSet) => Nil
 case seq: Seq[_] => seq.mkString("[", ",", "]") :: Nil
 case set: Set[_] => set.mkString("{", ",", "}") :: Nil


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11596][SQL] In TreeNode's argString, if a TreeNode is not a child of the current TreeNode, we should only return the simpleString.

2015-12-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 14eadf921 -> 1b3db967e


[SPARK-11596][SQL] In TreeNode's argString, if a TreeNode is not a child of the 
current TreeNode, we should only return the simpleString.

In TreeNode's argString, if a TreeNode is not a child of the current TreeNode, 
we will only return the simpleString.

I tested the [following case provided by 
Cristian](https://issues.apache.org/jira/browse/SPARK-11596?focusedCommentId=15019241=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15019241).
```
val c = (1 to 20).foldLeft[Option[DataFrame]] (None) { (curr, idx) =>
println(s"PROCESSING >>> $idx")
val df = sqlContext.sparkContext.parallelize((0 to 
10).zipWithIndex).toDF("A", "B")
val union = curr.map(_.unionAll(df)).getOrElse(df)
union.cache()
Some(union)
  }

c.get.explain(true)
```

Without the change, `c.get.explain(true)` took 100s. With the change, 
`c.get.explain(true)` took 26ms.

https://issues.apache.org/jira/browse/SPARK-11596

Author: Yin Huai 

Closes #10079 from yhuai/SPARK-11596.

(cherry picked from commit e96a70d5ab2e2b43a2df17a550fa9ed2ee0001c4)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b3db967
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b3db967
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b3db967

Branch: refs/heads/branch-1.6
Commit: 1b3db967e05a628897b7162aa605b2e4650a0d58
Parents: 14eadf9
Author: Yin Huai 
Authored: Tue Dec 1 17:18:45 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Dec 1 17:19:09 2015 -0800

--
 .../main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1b3db967/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index f1cea07..ad2bd78 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -380,7 +380,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
   /** Returns a string representing the arguments to this node, minus any 
children */
   def argString: String = productIterator.flatMap {
 case tn: TreeNode[_] if containsChild(tn) => Nil
-case tn: TreeNode[_] if tn.toString contains "\n" => 
s"(${tn.simpleString})" :: Nil
+case tn: TreeNode[_] => s"(${tn.simpleString})" :: Nil
 case seq: Seq[BaseType] if seq.toSet.subsetOf(children.toSet) => Nil
 case seq: Seq[_] => seq.mkString("[", ",", "]") :: Nil
 case set: Set[_] => set.mkString("{", ",", "}") :: Nil


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 add4e6311 -> 9b99b2b46


[SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize

`JavaSerializerInstance.serialize` uses `ByteArrayOutputStream.toByteArray` to 
get the serialized data. `ByteArrayOutputStream.toByteArray` needs to copy the 
content in the internal array to a new array. However, since the array will be 
converted to `ByteBuffer` at once, we can avoid the memory copy.

This PR added `ByteBufferOutputStream` to access the protected `buf` and 
convert it to a `ByteBuffer` directly.

Author: Shixiong Zhu 

Closes #10051 from zsxwing/SPARK-12060.

(cherry picked from commit 1401166576c7018c5f9c31e0a6703d5fb16ea339)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b99b2b4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b99b2b4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b99b2b4

Branch: refs/heads/branch-1.6
Commit: 9b99b2b46c452ba396e922db5fc7eec02c45b158
Parents: add4e63
Author: Shixiong Zhu 
Authored: Tue Dec 1 09:45:55 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 1 09:46:07 2015 -0800

--
 .../spark/serializer/JavaSerializer.scala   |  7 ++---
 .../spark/util/ByteBufferOutputStream.scala | 31 
 2 files changed, 34 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9b99b2b4/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index b463a71..ea718a0 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -24,8 +24,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.ByteBufferInputStream
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, 
Utils}
 
 private[spark] class JavaSerializationStream(
 out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
@@ -96,11 +95,11 @@ private[spark] class JavaSerializerInstance(
   extends SerializerInstance {
 
   override def serialize[T: ClassTag](t: T): ByteBuffer = {
-val bos = new ByteArrayOutputStream()
+val bos = new ByteBufferOutputStream()
 val out = serializeStream(bos)
 out.writeObject(t)
 out.close()
-ByteBuffer.wrap(bos.toByteArray)
+bos.toByteBuffer
   }
 
   override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {

http://git-wip-us.apache.org/repos/asf/spark/blob/9b99b2b4/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala 
b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
new file mode 100644
index 000..92e4522
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+
+/**
+ * Provide a zero-copy way to convert data in ByteArrayOutputStream to 
ByteBuffer
+ */
+private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
+
+  def toByteBuffer: ByteBuffer = {
+return ByteBuffer.wrap(buf, 0, count)
+  }
+}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master c87531b76 -> 140116657


[SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize

`JavaSerializerInstance.serialize` uses `ByteArrayOutputStream.toByteArray` to 
get the serialized data. `ByteArrayOutputStream.toByteArray` needs to copy the 
content in the internal array to a new array. However, since the array will be 
converted to `ByteBuffer` at once, we can avoid the memory copy.

This PR added `ByteBufferOutputStream` to access the protected `buf` and 
convert it to a `ByteBuffer` directly.

Author: Shixiong Zhu 

Closes #10051 from zsxwing/SPARK-12060.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14011665
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14011665
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14011665

Branch: refs/heads/master
Commit: 1401166576c7018c5f9c31e0a6703d5fb16ea339
Parents: c87531b
Author: Shixiong Zhu 
Authored: Tue Dec 1 09:45:55 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 1 09:45:55 2015 -0800

--
 .../spark/serializer/JavaSerializer.scala   |  7 ++---
 .../spark/util/ByteBufferOutputStream.scala | 31 
 2 files changed, 34 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/14011665/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index b463a71..ea718a0 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -24,8 +24,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.ByteBufferInputStream
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, 
Utils}
 
 private[spark] class JavaSerializationStream(
 out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
@@ -96,11 +95,11 @@ private[spark] class JavaSerializerInstance(
   extends SerializerInstance {
 
   override def serialize[T: ClassTag](t: T): ByteBuffer = {
-val bos = new ByteArrayOutputStream()
+val bos = new ByteBufferOutputStream()
 val out = serializeStream(bos)
 out.writeObject(t)
 out.close()
-ByteBuffer.wrap(bos.toByteArray)
+bos.toByteBuffer
   }
 
   override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {

http://git-wip-us.apache.org/repos/asf/spark/blob/14011665/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala 
b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
new file mode 100644
index 000..92e4522
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+
+/**
+ * Provide a zero-copy way to convert data in ByteArrayOutputStream to 
ByteBuffer
+ */
+private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
+
+  def toByteBuffer: ByteBuffer = {
+return ByteBuffer.wrap(buf, 0, count)
+  }
+}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12046][DOC] Fixes various ScalaDoc/JavaDoc issues

2015-12-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 140116657 -> 69dbe6b40


[SPARK-12046][DOC] Fixes various ScalaDoc/JavaDoc issues

This PR backports PR #10039 to master

Author: Cheng Lian 

Closes #10063 from liancheng/spark-12046.doc-fix.master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69dbe6b4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69dbe6b4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69dbe6b4

Branch: refs/heads/master
Commit: 69dbe6b40df35d488d4ee343098ac70d00bbdafb
Parents: 1401166
Author: Cheng Lian 
Authored: Tue Dec 1 10:21:31 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Dec 1 10:21:31 2015 -0800

--
 .../spark/api/java/function/Function4.java  |  2 +-
 .../spark/api/java/function/VoidFunction.java   |  2 +-
 .../spark/api/java/function/VoidFunction2.java  |  2 +-
 .../org/apache/spark/api/java/JavaPairRDD.scala | 16 +++
 .../scala/org/apache/spark/memory/package.scala | 14 +++---
 .../org/apache/spark/rdd/CoGroupedRDD.scala |  2 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  6 +--
 .../org/apache/spark/rdd/ShuffledRDD.scala  |  2 +-
 .../scala/org/apache/spark/scheduler/Task.scala |  5 ++-
 .../serializer/SerializationDebugger.scala  | 13 +++---
 .../scala/org/apache/spark/util/Vector.scala|  1 +
 .../spark/util/collection/ExternalSorter.scala  | 30 ++---
 .../WritablePartitionedPairCollection.scala |  7 +--
 .../streaming/kinesis/KinesisReceiver.scala | 23 +-
 .../spark/streaming/kinesis/KinesisUtils.scala  | 13 +++---
 .../mllib/optimization/GradientDescent.scala| 12 +++---
 project/SparkBuild.scala|  2 +
 .../scala/org/apache/spark/sql/Column.scala | 11 ++---
 .../spark/streaming/StreamingContext.scala  | 11 ++---
 .../streaming/dstream/FileInputDStream.scala| 19 +
 .../streaming/receiver/BlockGenerator.scala | 22 +-
 .../scheduler/ReceiverSchedulingPolicy.scala| 45 ++--
 .../streaming/util/FileBasedWriteAheadLog.scala |  7 +--
 .../spark/streaming/util/RecurringTimer.scala   |  8 ++--
 .../org/apache/spark/deploy/yarn/Client.scala   | 10 ++---
 25 files changed, 152 insertions(+), 133 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/69dbe6b4/core/src/main/java/org/apache/spark/api/java/function/Function4.java
--
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/Function4.java 
b/core/src/main/java/org/apache/spark/api/java/function/Function4.java
index fd727d6..9c35a22 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function4.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function4.java
@@ -23,5 +23,5 @@ import java.io.Serializable;
  * A four-argument function that takes arguments of type T1, T2, T3 and T4 and 
returns an R.
  */
 public interface Function4 extends Serializable {
-  public R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception;
+  R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69dbe6b4/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
--
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java 
b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
index 2a10435..f30d42e 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
@@ -23,5 +23,5 @@ import java.io.Serializable;
  * A function with no return value.
  */
 public interface VoidFunction extends Serializable {
-  public void call(T t) throws Exception;
+  void call(T t) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69dbe6b4/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
--
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java 
b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
index 6c576ab..da9ae1c 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
@@ -23,5 +23,5 @@ import java.io.Serializable;
  * A two-argument function that takes arguments of type T1 and T2 with no 
return value.
  */
 public interface VoidFunction2 extends Serializable {
-  public void call(T1 v1, T2 v2) throws Exception;
+  void call(T1 v1, T2 

spark git commit: [SPARK-12068][SQL] use a single column in Dataset.groupBy and count will fail

2015-12-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 69dbe6b40 -> 8ddc55f1d


[SPARK-12068][SQL] use a single column in Dataset.groupBy and count will fail

The reason is that, for a single culumn `RowEncoder`(or a single field product 
encoder), when we use it as the encoder for grouping key, we should also 
combine the grouping attributes, although there is only one grouping attribute.

Author: Wenchen Fan 

Closes #10059 from cloud-fan/bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ddc55f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ddc55f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ddc55f1

Branch: refs/heads/master
Commit: 8ddc55f1d5823ca135510b2ea776e889e481
Parents: 69dbe6b
Author: Wenchen Fan 
Authored: Tue Dec 1 10:22:55 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Dec 1 10:22:55 2015 -0800

--
 .../scala/org/apache/spark/sql/Dataset.scala |  2 +-
 .../org/apache/spark/sql/GroupedDataset.scala|  7 ---
 .../org/apache/spark/sql/DatasetSuite.scala  | 19 +++
 .../scala/org/apache/spark/sql/QueryTest.scala   |  6 +++---
 4 files changed, 27 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8ddc55f1/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index da46001..c357f88 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -70,7 +70,7 @@ class Dataset[T] private[sql](
* implicit so that we can use it when constructing new [[Dataset]] objects 
that have the same
* object type (that will be possibly resolved to a different schema).
*/
-  private implicit val unresolvedTEncoder: ExpressionEncoder[T] = 
encoderFor(tEncoder)
+  private[sql] implicit val unresolvedTEncoder: ExpressionEncoder[T] = 
encoderFor(tEncoder)
 
   /** The encoder for this [[Dataset]] that has been resolved to its output 
schema. */
   private[sql] val resolvedTEncoder: ExpressionEncoder[T] =

http://git-wip-us.apache.org/repos/asf/spark/blob/8ddc55f1/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
index a10a893..4bf0b25 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
@@ -228,10 +228,11 @@ class GroupedDataset[K, V] private[sql](
 val namedColumns =
   columns.map(
 _.withInputType(resolvedVEncoder, dataAttributes).named)
-val keyColumn = if (groupingAttributes.length > 1) {
-  Alias(CreateStruct(groupingAttributes), "key")()
-} else {
+val keyColumn = if (resolvedKEncoder.flat) {
+  assert(groupingAttributes.length == 1)
   groupingAttributes.head
+} else {
+  Alias(CreateStruct(groupingAttributes), "key")()
 }
 val aggregate = Aggregate(groupingAttributes, keyColumn +: namedColumns, 
logicalPlan)
 val execution = new QueryExecution(sqlContext, aggregate)

http://git-wip-us.apache.org/repos/asf/spark/blob/8ddc55f1/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 7d53918..a2c8d20 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -272,6 +272,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext 
{
   3 -> "abcxyz", 5 -> "hello")
   }
 
+  test("groupBy single field class, count") {
+val ds = Seq("abc", "xyz", "hello").toDS()
+val count = ds.groupBy(s => Tuple1(s.length)).count()
+
+checkAnswer(
+  count,
+  (Tuple1(3), 2L), (Tuple1(5), 1L)
+)
+  }
+
   test("groupBy columns, map") {
 val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
 val grouped = ds.groupBy($"_1")
@@ -282,6 +292,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext 
{
   ("a", 30), ("b", 3), ("c", 1))
   }
 
+  test("groupBy columns, count") {
+val ds = Seq("a" -> 1, "b" -> 1, "a" -> 2).toDS()
+val count = ds.groupBy($"_1").count()
+
+checkAnswer(
+

spark git commit: [SPARK-12068][SQL] use a single column in Dataset.groupBy and count will fail

2015-12-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 9b99b2b46 -> 6e3e3c648


[SPARK-12068][SQL] use a single column in Dataset.groupBy and count will fail

The reason is that, for a single culumn `RowEncoder`(or a single field product 
encoder), when we use it as the encoder for grouping key, we should also 
combine the grouping attributes, although there is only one grouping attribute.

Author: Wenchen Fan 

Closes #10059 from cloud-fan/bug.

(cherry picked from commit 8ddc55f1d5823ca135510b2ea776e889e481)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e3e3c64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e3e3c64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e3e3c64

Branch: refs/heads/branch-1.6
Commit: 6e3e3c648c4f74d9c1aabe767dbadfe47bd7e658
Parents: 9b99b2b
Author: Wenchen Fan 
Authored: Tue Dec 1 10:22:55 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Dec 1 10:23:17 2015 -0800

--
 .../scala/org/apache/spark/sql/Dataset.scala |  2 +-
 .../org/apache/spark/sql/GroupedDataset.scala|  7 ---
 .../org/apache/spark/sql/DatasetSuite.scala  | 19 +++
 .../scala/org/apache/spark/sql/QueryTest.scala   |  6 +++---
 4 files changed, 27 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6e3e3c64/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index da46001..c357f88 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -70,7 +70,7 @@ class Dataset[T] private[sql](
* implicit so that we can use it when constructing new [[Dataset]] objects 
that have the same
* object type (that will be possibly resolved to a different schema).
*/
-  private implicit val unresolvedTEncoder: ExpressionEncoder[T] = 
encoderFor(tEncoder)
+  private[sql] implicit val unresolvedTEncoder: ExpressionEncoder[T] = 
encoderFor(tEncoder)
 
   /** The encoder for this [[Dataset]] that has been resolved to its output 
schema. */
   private[sql] val resolvedTEncoder: ExpressionEncoder[T] =

http://git-wip-us.apache.org/repos/asf/spark/blob/6e3e3c64/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
index a10a893..4bf0b25 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
@@ -228,10 +228,11 @@ class GroupedDataset[K, V] private[sql](
 val namedColumns =
   columns.map(
 _.withInputType(resolvedVEncoder, dataAttributes).named)
-val keyColumn = if (groupingAttributes.length > 1) {
-  Alias(CreateStruct(groupingAttributes), "key")()
-} else {
+val keyColumn = if (resolvedKEncoder.flat) {
+  assert(groupingAttributes.length == 1)
   groupingAttributes.head
+} else {
+  Alias(CreateStruct(groupingAttributes), "key")()
 }
 val aggregate = Aggregate(groupingAttributes, keyColumn +: namedColumns, 
logicalPlan)
 val execution = new QueryExecution(sqlContext, aggregate)

http://git-wip-us.apache.org/repos/asf/spark/blob/6e3e3c64/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 7d53918..a2c8d20 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -272,6 +272,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext 
{
   3 -> "abcxyz", 5 -> "hello")
   }
 
+  test("groupBy single field class, count") {
+val ds = Seq("abc", "xyz", "hello").toDS()
+val count = ds.groupBy(s => Tuple1(s.length)).count()
+
+checkAnswer(
+  count,
+  (Tuple1(3), 2L), (Tuple1(5), 1L)
+)
+  }
+
   test("groupBy columns, map") {
 val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
 val grouped = ds.groupBy($"_1")
@@ -282,6 +292,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext 
{
   ("a", 30), ("b", 3), ("c", 1))
   }
 
+  test("groupBy 

spark git commit: [SPARK-11856][SQL] add type cast if the real type is different but compatible with encoder schema

2015-12-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 6e3e3c648 -> 74a230676


[SPARK-11856][SQL] add type cast if the real type is different but compatible 
with encoder schema

When we build the `fromRowExpression` for an encoder, we set up a lot of 
"unresolved" stuff and lost the required data type, which may lead to runtime 
error if the real type doesn't match the encoder's schema.
For example, we build an encoder for `case class Data(a: Int, b: String)` and 
the real type is `[a: int, b: long]`, then we will hit runtime error and say 
that we can't construct class `Data` with int and long, because we lost the 
information that `b` should be a string.

Author: Wenchen Fan 

Closes #9840 from cloud-fan/err-msg.

(cherry picked from commit 9df24624afedd993a39ab46c8211ae153aedef1a)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74a23067
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74a23067
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74a23067

Branch: refs/heads/branch-1.6
Commit: 74a2306763161fc04c9d3e7de186a6b31617faf4
Parents: 6e3e3c6
Author: Wenchen Fan 
Authored: Tue Dec 1 10:24:53 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Dec 1 10:25:11 2015 -0800

--
 .../spark/sql/catalyst/ScalaReflection.scala|  93 --
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  40 +
 .../catalyst/analysis/HiveTypeCoercion.scala|   2 +-
 .../catalyst/encoders/ExpressionEncoder.scala   |   4 +-
 .../spark/sql/catalyst/expressions/Cast.scala   |   9 +
 .../expressions/complexTypeCreator.scala|   2 +-
 .../apache/spark/sql/types/DecimalType.scala|  12 ++
 .../encoders/EncoderResolutionSuite.scala   | 180 +++
 .../spark/sql/DatasetAggregatorSuite.scala  |   4 +-
 .../org/apache/spark/sql/DatasetSuite.scala |  21 ++-
 10 files changed, 335 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/74a23067/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index d133ad3..9b6b5b8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -18,9 +18,8 @@
 package org.apache.spark.sql.catalyst
 
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedExtractValue, 
UnresolvedAttribute}
-import org.apache.spark.sql.catalyst.util.{GenericArrayData, 
ArrayBasedMapData, ArrayData, DateTimeUtils}
+import org.apache.spark.sql.catalyst.util.{GenericArrayData, 
ArrayBasedMapData, DateTimeUtils}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
@@ -117,31 +116,75 @@ object ScalaReflection extends ScalaReflection {
* from ordinal 0 (since there are no names to map to).  The actual location 
can be moved by
* calling resolve/bind with a new schema.
*/
-  def constructorFor[T : TypeTag]: Expression = constructorFor(localTypeOf[T], 
None)
+  def constructorFor[T : TypeTag]: Expression = {
+val tpe = localTypeOf[T]
+val clsName = getClassNameFromType(tpe)
+val walkedTypePath = s"""- root class: "${clsName} :: Nil
+constructorFor(tpe, None, walkedTypePath)
+  }
 
   private def constructorFor(
   tpe: `Type`,
-  path: Option[Expression]): Expression = ScalaReflectionLock.synchronized 
{
+  path: Option[Expression],
+  walkedTypePath: Seq[String]): Expression = 
ScalaReflectionLock.synchronized {
 
 /** Returns the current path with a sub-field extracted. */
-def addToPath(part: String): Expression = path
-  .map(p => UnresolvedExtractValue(p, expressions.Literal(part)))
-  .getOrElse(UnresolvedAttribute(part))
+def addToPath(part: String, dataType: DataType, walkedTypePath: 
Seq[String]): Expression = {
+  val newPath = path
+.map(p => UnresolvedExtractValue(p, expressions.Literal(part)))
+.getOrElse(UnresolvedAttribute(part))
+  upCastToExpectedType(newPath, dataType, walkedTypePath)
+}
 
 /** Returns the current path with a field at ordinal extracted. */
-def addToPathOrdinal(ordinal: Int, dataType: DataType): Expression = path
-  .map(p => GetStructField(p, ordinal))
-  

spark git commit: [SPARK-12090] [PYSPARK] consider shuffle in coalesce()

2015-12-01 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 0f37d1d7e -> 4375eb3f4


[SPARK-12090] [PYSPARK] consider shuffle in coalesce()

Author: Davies Liu 

Closes #10090 from davies/fix_coalesce.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4375eb3f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4375eb3f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4375eb3f

Branch: refs/heads/master
Commit: 4375eb3f48fc7ae90caf6c21a0d3ab0b66bf4efa
Parents: 0f37d1d
Author: Davies Liu 
Authored: Tue Dec 1 22:41:48 2015 -0800
Committer: Davies Liu 
Committed: Tue Dec 1 22:41:48 2015 -0800

--
 python/pyspark/rdd.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4375eb3f/python/pyspark/rdd.py
--
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 4b4d596..00bb9a6 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2015,7 +2015,7 @@ class RDD(object):
 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
 [[1, 2, 3, 4, 5]]
 """
-jrdd = self._jrdd.coalesce(numPartitions)
+jrdd = self._jrdd.coalesce(numPartitions, shuffle)
 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
 
 def zip(self, other):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12090] [PYSPARK] consider shuffle in coalesce()

2015-12-01 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 0d57a4ae1 -> ed7264ba2


[SPARK-12090] [PYSPARK] consider shuffle in coalesce()

Author: Davies Liu 

Closes #10090 from davies/fix_coalesce.

(cherry picked from commit 4375eb3f48fc7ae90caf6c21a0d3ab0b66bf4efa)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ed7264ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ed7264ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ed7264ba

Branch: refs/heads/branch-1.5
Commit: ed7264ba20d178be45dad3f50e9c73ce2f9148dc
Parents: 0d57a4a
Author: Davies Liu 
Authored: Tue Dec 1 22:41:48 2015 -0800
Committer: Davies Liu 
Committed: Tue Dec 1 22:42:19 2015 -0800

--
 python/pyspark/rdd.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ed7264ba/python/pyspark/rdd.py
--
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index ab5aab1..7e871d3 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2024,7 +2024,7 @@ class RDD(object):
 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
 [[1, 2, 3, 4, 5]]
 """
-jrdd = self._jrdd.coalesce(numPartitions)
+jrdd = self._jrdd.coalesce(numPartitions, shuffle)
 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
 
 def zip(self, other):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12090] [PYSPARK] consider shuffle in coalesce()

2015-12-01 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 3c4938e26 -> c47a7373a


[SPARK-12090] [PYSPARK] consider shuffle in coalesce()

Author: Davies Liu 

Closes #10090 from davies/fix_coalesce.

(cherry picked from commit 4375eb3f48fc7ae90caf6c21a0d3ab0b66bf4efa)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c47a7373
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c47a7373
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c47a7373

Branch: refs/heads/branch-1.6
Commit: c47a7373a88f49c77b8d65e887cac2ef1ae22eae
Parents: 3c4938e
Author: Davies Liu 
Authored: Tue Dec 1 22:41:48 2015 -0800
Committer: Davies Liu 
Committed: Tue Dec 1 22:42:03 2015 -0800

--
 python/pyspark/rdd.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c47a7373/python/pyspark/rdd.py
--
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 4b4d596..00bb9a6 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2015,7 +2015,7 @@ class RDD(object):
 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
 [[1, 2, 3, 4, 5]]
 """
-jrdd = self._jrdd.coalesce(numPartitions)
+jrdd = self._jrdd.coalesce(numPartitions, shuffle)
 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
 
 def zip(self, other):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12081] Make unified memory manager work with small heaps

2015-12-01 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 72da2a21f -> 84c44b500


[SPARK-12081] Make unified memory manager work with small heaps

The existing `spark.memory.fraction` (default 0.75) gives the system 25% of the 
space to work with. For small heaps, this is not enough: e.g. default 1GB 
leaves only 250MB system memory. This is especially a problem in local mode, 
where the driver and executor are crammed in the same JVM. Members of the 
community have reported driver OOM's in such cases.

**New proposal.** We now reserve 300MB before taking the 75%. For 1GB JVMs, 
this leaves `(1024 - 300) * 0.75 = 543MB` for execution and storage. This is 
proposal (1) listed in the 
[JIRA](https://issues.apache.org/jira/browse/SPARK-12081).

Author: Andrew Or 

Closes #10081 from andrewor14/unified-memory-small-heaps.

(cherry picked from commit d96f8c997b9bb5c3d61f513d2c71d67ccf8e85d6)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84c44b50
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84c44b50
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84c44b50

Branch: refs/heads/branch-1.6
Commit: 84c44b500b5c90dffbe1a6b0aa86f01699b09b96
Parents: 72da2a2
Author: Andrew Or 
Authored: Tue Dec 1 19:51:12 2015 -0800
Committer: Andrew Or 
Committed: Tue Dec 1 19:51:29 2015 -0800

--
 .../spark/memory/UnifiedMemoryManager.scala | 22 
 .../memory/UnifiedMemoryManagerSuite.scala  | 20 ++
 docs/configuration.md   |  4 ++--
 docs/tuning.md  |  2 +-
 4 files changed, 41 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/84c44b50/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 8be5b05..48b4e23 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -26,7 +26,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId}
  * A [[MemoryManager]] that enforces a soft boundary between execution and 
storage such that
  * either side can borrow memory from the other.
  *
- * The region shared between execution and storage is a fraction of the total 
heap space
+ * The region shared between execution and storage is a fraction of (the total 
heap space - 300MB)
  * configurable through `spark.memory.fraction` (default 0.75). The position 
of the boundary
  * within this space is further determined by `spark.memory.storageFraction` 
(default 0.5).
  * This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap 
space by default.
@@ -48,7 +48,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId}
  */
 private[spark] class UnifiedMemoryManager private[memory] (
 conf: SparkConf,
-maxMemory: Long,
+val maxMemory: Long,
 private val storageRegionSize: Long,
 numCores: Int)
   extends MemoryManager(
@@ -130,6 +130,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
 
 object UnifiedMemoryManager {
 
+  // Set aside a fixed amount of memory for non-storage, non-execution 
purposes.
+  // This serves a function similar to `spark.memory.fraction`, but guarantees 
that we reserve
+  // sufficient memory for the system even for small heaps. E.g. if we have a 
1GB JVM, then
+  // the memory used for execution and storage will be (1024 - 300) * 0.75 = 
543MB by default.
+  private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
+
   def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
 val maxMemory = getMaxMemory(conf)
 new UnifiedMemoryManager(
@@ -144,8 +150,16 @@ object UnifiedMemoryManager {
* Return the total amount of memory shared between execution and storage, 
in bytes.
*/
   private def getMaxMemory(conf: SparkConf): Long = {
-val systemMaxMemory = conf.getLong("spark.testing.memory", 
Runtime.getRuntime.maxMemory)
+val systemMemory = conf.getLong("spark.testing.memory", 
Runtime.getRuntime.maxMemory)
+val reservedMemory = conf.getLong("spark.testing.reservedMemory",
+  if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
+val minSystemMemory = reservedMemory * 1.5
+if (systemMemory < minSystemMemory) {
+  throw new IllegalArgumentException(s"System memory $systemMemory must " +
+s"be at least $minSystemMemory. Please use a larger heap size.")
+}
+

spark git commit: [SPARK-8414] Ensure context cleaner periodic cleanups

2015-12-01 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 1b3db967e -> 72da2a21f


[SPARK-8414] Ensure context cleaner periodic cleanups

Garbage collection triggers cleanups. If the driver JVM is huge and there is 
little memory pressure, we may never clean up shuffle files on executors. This 
is a problem for long-running applications (e.g. streaming).

Author: Andrew Or 

Closes #10070 from andrewor14/periodic-gc.

(cherry picked from commit 1ce4adf55b535518c2e63917a827fac1f2df4e8e)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72da2a21
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72da2a21
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72da2a21

Branch: refs/heads/branch-1.6
Commit: 72da2a21f0940b97757ace5975535e559d627688
Parents: 1b3db96
Author: Andrew Or 
Authored: Tue Dec 1 19:36:34 2015 -0800
Committer: Josh Rosen 
Committed: Tue Dec 1 19:36:47 2015 -0800

--
 .../scala/org/apache/spark/ContextCleaner.scala | 21 +++-
 1 file changed, 20 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/72da2a21/core/src/main/scala/org/apache/spark/ContextCleaner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index d23c153..bc73253 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -18,12 +18,13 @@
 package org.apache.spark
 
 import java.lang.ref.{ReferenceQueue, WeakReference}
+import java.util.concurrent.{TimeUnit, ScheduledExecutorService}
 
 import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
  * Classes that represent cleaning tasks.
@@ -66,6 +67,20 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
 
   private val cleaningThread = new Thread() { override def run() { 
keepCleaning() }}
 
+  private val periodicGCService: ScheduledExecutorService =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")
+
+  /**
+   * How often to trigger a garbage collection in this JVM.
+   *
+   * This context cleaner triggers cleanups only when weak references are 
garbage collected.
+   * In long-running applications with large driver JVMs, where there is 
little memory pressure
+   * on the driver, this may happen very occasionally or not at all. Not 
cleaning at all may
+   * lead to executors running out of disk space after a while.
+   */
+  private val periodicGCInterval =
+sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min")
+
   /**
* Whether the cleaning thread will block on cleanup tasks (other than 
shuffle, which
* is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` 
parameter).
@@ -104,6 +119,9 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
 cleaningThread.setDaemon(true)
 cleaningThread.setName("Spark Context Cleaner")
 cleaningThread.start()
+periodicGCService.scheduleAtFixedRate(new Runnable {
+  override def run(): Unit = System.gc()
+}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
   }
 
   /**
@@ -119,6 +137,7 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
   cleaningThread.interrupt()
 }
 cleaningThread.join()
+periodicGCService.shutdown()
   }
 
   /** Register a RDD for cleanup when it is garbage collected. */


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11352][SQL][BRANCH-1.5] Escape */ in the generated comments.

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 7460e4309 -> 4f07a590c


[SPARK-11352][SQL][BRANCH-1.5] Escape */ in the generated comments.

https://issues.apache.org/jira/browse/SPARK-11352

This one backports https://github.com/apache/spark/pull/10072 to branch 1.5.

Author: Yin Huai 

Closes #10084 from yhuai/SPARK-11352-branch-1.5.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f07a590
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f07a590
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f07a590

Branch: refs/heads/branch-1.5
Commit: 4f07a590c5d7b9e187f446c077357b00df93ee27
Parents: 7460e43
Author: Yin Huai 
Authored: Tue Dec 1 20:33:50 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 20:33:50 2015 -0800

--
 .../apache/spark/sql/catalyst/expressions/Expression.scala  | 8 +++-
 .../sql/catalyst/expressions/codegen/CodegenFallback.scala  | 2 +-
 .../sql/catalyst/expressions/CodeGenerationSuite.scala  | 9 +
 3 files changed, 17 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4f07a590/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index 0b98f55..4ef3b93 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -97,7 +97,7 @@ abstract class Expression extends TreeNode[Expression] {
 val ve = GeneratedExpressionCode("", isNull, primitive)
 ve.code = genCode(ctx, ve)
 // Add `this` in the comment.
-ve.copy(s"/* $this */\n" + ve.code)
+ve.copy(s"/* ${this.toCommentSafeString} */\n" + ve.code)
   }
 
   /**
@@ -175,6 +175,12 @@ abstract class Expression extends TreeNode[Expression] {
   }
 
   override def toString: String = prettyName + children.mkString("(", ",", ")")
+
+  /**
+   * Returns the string representation of this expression that is safe to be 
put in
+   * code comments of generated code.
+   */
+  protected def toCommentSafeString: String = this.toString.replace("*/", 
"\\*\\/")
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4f07a590/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala
index 3492d2c..5061451 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala
@@ -33,7 +33,7 @@ trait CodegenFallback extends Expression {
 ctx.references += this
 val objectTerm = ctx.freshName("obj")
 s"""
-  /* expression: ${this} */
+  /* expression: ${this.toCommentSafeString} */
   Object $objectTerm = expressions[${ctx.references.size - 1}].eval(i);
   boolean ${ev.isNull} = $objectTerm == null;
   ${ctx.javaType(this.dataType)} ${ev.primitive} = 
${ctx.defaultValue(this.dataType)};

http://git-wip-us.apache.org/repos/asf/spark/blob/4f07a590/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index e323467..fcb0c84 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -134,4 +134,13 @@ class CodeGenerationSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 unsafeRow.getStruct(3, 1).getStruct(0, 2).setInt(1, 4)
 assert(internalRow === internalRow2)
   }
+
+  test("*/ in the data") {
+// When */ appears in a comment block (i.e. in /**/), code gen will break.
+// So, in Expression and CodegenFallback, we escape */ to \*\/.
+checkEvaluation(
+  EqualTo(BoundReference(0, StringType, false), Literal.create("*/", 
StringType)),
+  true,
+  

spark git commit: [SPARK-12077][SQL] change the default plan for single distinct

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 84c44b500 -> a5743affc


[SPARK-12077][SQL] change the default plan for single distinct

Use try to match the behavior for single distinct aggregation with Spark 1.5, 
but that's not scalable, we should be robust by default, have a flag to address 
performance regression for low cardinality aggregation.

cc yhuai nongli

Author: Davies Liu 

Closes #10075 from davies/agg_15.

(cherry picked from commit 96691feae0229fd693c29475620be2c4059dd080)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5743aff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5743aff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5743aff

Branch: refs/heads/branch-1.6
Commit: a5743affcf73f7bf71517171583cbddc44cc9368
Parents: 84c44b5
Author: Davies Liu 
Authored: Tue Dec 1 20:17:12 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 20:17:44 2015 -0800

--
 sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala   | 2 +-
 .../test/scala/org/apache/spark/sql/execution/PlannerSuite.scala | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a5743aff/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 5ef3a48..58adf64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -451,7 +451,7 @@ private[spark] object SQLConf {
 
   val SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING =
 booleanConf("spark.sql.specializeSingleDistinctAggPlanning",
-  defaultValue = Some(true),
+  defaultValue = Some(false),
   isPublic = false,
   doc = "When true, if a query only has a single distinct column and it 
has " +
 "grouping expressions, we will use our planner rule to handle this 
distinct " +

http://git-wip-us.apache.org/repos/asf/spark/blob/a5743aff/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index dfec139..a462625 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -44,10 +44,10 @@ class PlannerSuite extends SharedSQLContext {
 fail(s"Could query play aggregation query $query. Is it an aggregation 
query?"))
 val aggregations = planned.collect { case n if n.nodeName contains 
"Aggregate" => n }
 
-// For the new aggregation code path, there will be three aggregate 
operator for
+// For the new aggregation code path, there will be four aggregate 
operator for
 // distinct aggregations.
 assert(
-  aggregations.size == 2 || aggregations.size == 3,
+  aggregations.size == 2 || aggregations.size == 4,
   s"The plan of query $query does not have partial aggregations.")
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12077][SQL] change the default plan for single distinct

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master d96f8c997 -> 96691feae


[SPARK-12077][SQL] change the default plan for single distinct

Use try to match the behavior for single distinct aggregation with Spark 1.5, 
but that's not scalable, we should be robust by default, have a flag to address 
performance regression for low cardinality aggregation.

cc yhuai nongli

Author: Davies Liu 

Closes #10075 from davies/agg_15.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96691fea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96691fea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96691fea

Branch: refs/heads/master
Commit: 96691feae0229fd693c29475620be2c4059dd080
Parents: d96f8c9
Author: Davies Liu 
Authored: Tue Dec 1 20:17:12 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 20:17:12 2015 -0800

--
 sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala   | 2 +-
 .../test/scala/org/apache/spark/sql/execution/PlannerSuite.scala | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/96691fea/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 5ef3a48..58adf64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -451,7 +451,7 @@ private[spark] object SQLConf {
 
   val SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING =
 booleanConf("spark.sql.specializeSingleDistinctAggPlanning",
-  defaultValue = Some(true),
+  defaultValue = Some(false),
   isPublic = false,
   doc = "When true, if a query only has a single distinct column and it 
has " +
 "grouping expressions, we will use our planner rule to handle this 
distinct " +

http://git-wip-us.apache.org/repos/asf/spark/blob/96691fea/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index dfec139..a462625 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -44,10 +44,10 @@ class PlannerSuite extends SharedSQLContext {
 fail(s"Could query play aggregation query $query. Is it an aggregation 
query?"))
 val aggregations = planned.collect { case n if n.nodeName contains 
"Aggregate" => n }
 
-// For the new aggregation code path, there will be three aggregate 
operator for
+// For the new aggregation code path, there will be four aggregate 
operator for
 // distinct aggregations.
 assert(
-  aggregations.size == 2 || aggregations.size == 3,
+  aggregations.size == 2 || aggregations.size == 4,
   s"The plan of query $query does not have partial aggregations.")
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 96691feae -> 8a75a3049


[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently 
in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is 
serializing it) can lead to concurrentModidicationException in the underlying 
Java hashmap using in the internal Hadoop Configuration object.

The solution is to create a new JobConf in every batch, that is updated by 
`RDD.saveAsHadoopFile()`, while the checkpointing serializes the original 
JobConf.

Tests to be added in #9988 will fail reliably without this patch. Keeping this 
patch really small to make sure that it can be added to previous branches.

Author: Tathagata Das 

Closes #10088 from tdas/SPARK-12087.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a75a304
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a75a304
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a75a304

Branch: refs/heads/master
Commit: 8a75a3049539eeef04c0db51736e97070c162b46
Parents: 96691fe
Author: Tathagata Das 
Authored: Tue Dec 1 21:04:52 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 1 21:04:52 2015 -0800

--
 .../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8a75a304/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index fb691ee..2762309 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -730,7 +730,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
 val serializableConf = new SerializableJobConf(conf)
 val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
   val file = rddToFileName(prefix, suffix, time)
-  rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, 
serializableConf.value)
+  rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
+new JobConf(serializableConf.value))
 }
 self.foreachRDD(saveFunc)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 4f07a590c -> 0d57a4ae1


[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently 
in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is 
serializing it) can lead to concurrentModidicationException in the underlying 
Java hashmap using in the internal Hadoop Configuration object.

The solution is to create a new JobConf in every batch, that is updated by 
`RDD.saveAsHadoopFile()`, while the checkpointing serializes the original 
JobConf.

Tests to be added in #9988 will fail reliably without this patch. Keeping this 
patch really small to make sure that it can be added to previous branches.

Author: Tathagata Das 

Closes #10088 from tdas/SPARK-12087.

(cherry picked from commit 8a75a3049539eeef04c0db51736e97070c162b46)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d57a4ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d57a4ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d57a4ae

Branch: refs/heads/branch-1.5
Commit: 0d57a4ae10f4ec40386194bc3c8e27f32da09d4d
Parents: 4f07a59
Author: Tathagata Das 
Authored: Tue Dec 1 21:04:52 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 1 21:05:18 2015 -0800

--
 .../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0d57a4ae/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 71bec96..aa36997 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -692,7 +692,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
 val serializableConf = new SerializableJobConf(conf)
 val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
   val file = rddToFileName(prefix, suffix, time)
-  rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, 
serializableConf.value)
+  rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
+new JobConf(serializableConf.value))
 }
 self.foreachRDD(saveFunc)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 a5743affc -> 1f42295b5


[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently 
in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is 
serializing it) can lead to concurrentModidicationException in the underlying 
Java hashmap using in the internal Hadoop Configuration object.

The solution is to create a new JobConf in every batch, that is updated by 
`RDD.saveAsHadoopFile()`, while the checkpointing serializes the original 
JobConf.

Tests to be added in #9988 will fail reliably without this patch. Keeping this 
patch really small to make sure that it can be added to previous branches.

Author: Tathagata Das 

Closes #10088 from tdas/SPARK-12087.

(cherry picked from commit 8a75a3049539eeef04c0db51736e97070c162b46)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f42295b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f42295b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f42295b

Branch: refs/heads/branch-1.6
Commit: 1f42295b5df69a6039ed2ba8ea67a8e57d77644d
Parents: a5743af
Author: Tathagata Das 
Authored: Tue Dec 1 21:04:52 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 1 21:05:02 2015 -0800

--
 .../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1f42295b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index fb691ee..2762309 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -730,7 +730,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
 val serializableConf = new SerializableJobConf(conf)
 val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
   val file = rddToFileName(prefix, suffix, time)
-  rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, 
serializableConf.value)
+  rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
+new JobConf(serializableConf.value))
 }
 self.foreachRDD(saveFunc)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11949][SQL] Check bitmasks to set nullable property

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 1f42295b5 -> 3c4938e26


[SPARK-11949][SQL] Check bitmasks to set nullable property

Following up #10038.

We can use bitmasks to determine which grouping expressions need to be set as 
nullable.

cc yhuai

Author: Liang-Chi Hsieh 

Closes #10067 from viirya/fix-cube-following.

(cherry picked from commit 0f37d1d7ed7f6e34f98f2a3c274918de29e7a1d7)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c4938e2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c4938e2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c4938e2

Branch: refs/heads/branch-1.6
Commit: 3c4938e26185dc0637f3af624830dbff11589997
Parents: 1f42295
Author: Liang-Chi Hsieh 
Authored: Tue Dec 1 21:51:33 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 21:51:47 2015 -0800

--
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala  | 13 +
 1 file changed, 9 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3c4938e2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 765327c..d3163dc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -224,10 +224,15 @@ class Analyzer(
   case other => Alias(other, other.toString)()
 }
 
-// TODO: We need to use bitmasks to determine which grouping 
expressions need to be
-// set as nullable. For example, if we have GROUPING SETS ((a,b), a), 
we do not need
-// to change the nullability of a.
-val attributeMap = groupByAliases.map(a => (a -> 
a.toAttribute.withNullability(true))).toMap
+val nonNullBitmask = x.bitmasks.reduce(_ & _)
+
+val attributeMap = groupByAliases.zipWithIndex.map { case (a, idx) =>
+  if ((nonNullBitmask & 1 << idx) == 0) {
+(a -> a.toAttribute.withNullability(true))
+  } else {
+(a -> a.toAttribute)
+  }
+}.toMap
 
 val aggregations: Seq[NamedExpression] = x.aggregations.map {
   // If an expression is an aggregate (contains a AggregateExpression) 
then we dont change


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

2015-12-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 f5af299ab -> b6ba2dab2


[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles

The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently 
in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is 
serializing it) can lead to concurrentModidicationException in the underlying 
Java hashmap using in the internal Hadoop Configuration object.

The solution is to create a new JobConf in every batch, that is updated by 
`RDD.saveAsHadoopFile()`, while the checkpointing serializes the original 
JobConf.

Tests to be added in #9988 will fail reliably without this patch. Keeping this 
patch really small to make sure that it can be added to previous branches.

Author: Tathagata Das 

Closes #10088 from tdas/SPARK-12087.

(cherry picked from commit 8a75a3049539eeef04c0db51736e97070c162b46)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6ba2dab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6ba2dab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6ba2dab

Branch: refs/heads/branch-1.4
Commit: b6ba2dab26092f56271114aa62f25b2fc9d6adad
Parents: f5af299
Author: Tathagata Das 
Authored: Tue Dec 1 21:04:52 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Dec 1 21:05:37 2015 -0800

--
 .../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b6ba2dab/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 358e4c6..4e392f5 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -691,7 +691,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
 val serializableConf = new SerializableWritable(conf)
 val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
   val file = rddToFileName(prefix, suffix, time)
-  rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, 
serializableConf.value)
+  rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
+new JobConf(serializableConf.value))
 }
 self.foreachRDD(saveFunc)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11949][SQL] Check bitmasks to set nullable property

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 8a75a3049 -> 0f37d1d7e


[SPARK-11949][SQL] Check bitmasks to set nullable property

Following up #10038.

We can use bitmasks to determine which grouping expressions need to be set as 
nullable.

cc yhuai

Author: Liang-Chi Hsieh 

Closes #10067 from viirya/fix-cube-following.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f37d1d7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f37d1d7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f37d1d7

Branch: refs/heads/master
Commit: 0f37d1d7ed7f6e34f98f2a3c274918de29e7a1d7
Parents: 8a75a30
Author: Liang-Chi Hsieh 
Authored: Tue Dec 1 21:51:33 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 21:51:33 2015 -0800

--
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala  | 13 +
 1 file changed, 9 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0f37d1d7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 765327c..d3163dc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -224,10 +224,15 @@ class Analyzer(
   case other => Alias(other, other.toString)()
 }
 
-// TODO: We need to use bitmasks to determine which grouping 
expressions need to be
-// set as nullable. For example, if we have GROUPING SETS ((a,b), a), 
we do not need
-// to change the nullability of a.
-val attributeMap = groupByAliases.map(a => (a -> 
a.toAttribute.withNullability(true))).toMap
+val nonNullBitmask = x.bitmasks.reduce(_ & _)
+
+val attributeMap = groupByAliases.zipWithIndex.map { case (a, idx) =>
+  if ((nonNullBitmask & 1 << idx) == 0) {
+(a -> a.toAttribute.withNullability(true))
+  } else {
+(a -> a.toAttribute)
+  }
+}.toMap
 
 val aggregations: Seq[NamedExpression] = x.aggregations.map {
   // If an expression is an aggregate (contains a AggregateExpression) 
then we dont change


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11954][SQL] Encoder for JavaBeans

2015-12-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 9df24624a -> fd95eeaf4


[SPARK-11954][SQL] Encoder for JavaBeans

create java version of `constructorFor` and `extractorFor` in 
`JavaTypeInference`

Author: Wenchen Fan 

This patch had conflicts when merged, resolved by
Committer: Michael Armbrust 

Closes #9937 from cloud-fan/pojo.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd95eeaf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd95eeaf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd95eeaf

Branch: refs/heads/master
Commit: fd95eeaf491809c6bb0f83d46b37b5e2eebbcbca
Parents: 9df2462
Author: Wenchen Fan 
Authored: Tue Dec 1 10:35:12 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Dec 1 10:35:12 2015 -0800

--
 .../scala/org/apache/spark/sql/Encoder.scala|  18 ++
 .../spark/sql/catalyst/JavaTypeInference.scala  | 313 ++-
 .../catalyst/encoders/ExpressionEncoder.scala   |  21 +-
 .../sql/catalyst/expressions/objects.scala  |  42 ++-
 .../spark/sql/catalyst/trees/TreeNode.scala |  27 +-
 .../sql/catalyst/util/ArrayBasedMapData.scala   |   5 +
 .../sql/catalyst/util/GenericArrayData.scala|   3 +
 .../sql/catalyst/trees/TreeNodeSuite.scala  |  25 ++
 .../org/apache/spark/sql/JavaDatasetSuite.java  | 174 ++-
 9 files changed, 608 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fd95eeaf/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
--
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
index 03aa25e..c40061a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
@@ -98,6 +98,24 @@ object Encoders {
   def STRING: Encoder[java.lang.String] = ExpressionEncoder()
 
   /**
+   * Creates an encoder for Java Bean of type T.
+   *
+   * T must be publicly accessible.
+   *
+   * supported types for java bean field:
+   *  - primitive types: boolean, int, double, etc.
+   *  - boxed types: Boolean, Integer, Double, etc.
+   *  - String
+   *  - java.math.BigDecimal
+   *  - time related: java.sql.Date, java.sql.Timestamp
+   *  - collection types: only array and java.util.List currently, map support 
is in progress
+   *  - nested java bean.
+   *
+   * @since 1.6.0
+   */
+  def bean[T](beanClass: Class[T]): Encoder[T] = 
ExpressionEncoder.javaBean(beanClass)
+
+  /**
* (Scala-specific) Creates an encoder that serializes objects of type T 
using Kryo.
* This encoder maps T into a single byte array (binary) field.
*

http://git-wip-us.apache.org/repos/asf/spark/blob/fd95eeaf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index 7d4cfbe..c8ee87e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -17,14 +17,20 @@
 
 package org.apache.spark.sql.catalyst
 
-import java.beans.Introspector
+import java.beans.{PropertyDescriptor, Introspector}
 import java.lang.{Iterable => JIterable}
-import java.util.{Iterator => JIterator, Map => JMap}
+import java.util.{Iterator => JIterator, Map => JMap, List => JList}
 
 import scala.language.existentials
 
 import com.google.common.reflect.TypeToken
+
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, 
UnresolvedExtractValue}
+import org.apache.spark.sql.catalyst.util.{GenericArrayData, 
ArrayBasedMapData, DateTimeUtils}
+import org.apache.spark.unsafe.types.UTF8String
+
 
 /**
  * Type-inference utilities for POJOs and Java collections.
@@ -33,13 +39,14 @@ object JavaTypeInference {
 
   private val iterableType = TypeToken.of(classOf[JIterable[_]])
   private val mapType = TypeToken.of(classOf[JMap[_, _]])
+  private val listType = TypeToken.of(classOf[JList[_]])
   private val iteratorReturnType = 
classOf[JIterable[_]].getMethod("iterator").getGenericReturnType
   private val nextReturnType = 
classOf[JIterator[_]].getMethod("next").getGenericReturnType
   private val keySetReturnType = classOf[JMap[_, 
_]].getMethod("keySet").getGenericReturnType
   private val 

spark git commit: [SPARK-11954][SQL] Encoder for JavaBeans

2015-12-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 74a230676 -> 88bbce008


[SPARK-11954][SQL] Encoder for JavaBeans

create java version of `constructorFor` and `extractorFor` in 
`JavaTypeInference`

Author: Wenchen Fan 

This patch had conflicts when merged, resolved by
Committer: Michael Armbrust 

Closes #9937 from cloud-fan/pojo.

(cherry picked from commit fd95eeaf491809c6bb0f83d46b37b5e2eebbcbca)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88bbce00
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88bbce00
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88bbce00

Branch: refs/heads/branch-1.6
Commit: 88bbce00813acf23eef411ac354c35995ddf9e77
Parents: 74a2306
Author: Wenchen Fan 
Authored: Tue Dec 1 10:35:12 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Dec 1 10:35:26 2015 -0800

--
 .../scala/org/apache/spark/sql/Encoder.scala|  18 ++
 .../spark/sql/catalyst/JavaTypeInference.scala  | 313 ++-
 .../catalyst/encoders/ExpressionEncoder.scala   |  21 +-
 .../sql/catalyst/expressions/objects.scala  |  42 ++-
 .../spark/sql/catalyst/trees/TreeNode.scala |  27 +-
 .../sql/catalyst/util/ArrayBasedMapData.scala   |   5 +
 .../sql/catalyst/util/GenericArrayData.scala|   3 +
 .../sql/catalyst/trees/TreeNodeSuite.scala  |  25 ++
 .../org/apache/spark/sql/JavaDatasetSuite.java  | 174 ++-
 9 files changed, 608 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/88bbce00/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
--
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
index 03aa25e..c40061a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
@@ -98,6 +98,24 @@ object Encoders {
   def STRING: Encoder[java.lang.String] = ExpressionEncoder()
 
   /**
+   * Creates an encoder for Java Bean of type T.
+   *
+   * T must be publicly accessible.
+   *
+   * supported types for java bean field:
+   *  - primitive types: boolean, int, double, etc.
+   *  - boxed types: Boolean, Integer, Double, etc.
+   *  - String
+   *  - java.math.BigDecimal
+   *  - time related: java.sql.Date, java.sql.Timestamp
+   *  - collection types: only array and java.util.List currently, map support 
is in progress
+   *  - nested java bean.
+   *
+   * @since 1.6.0
+   */
+  def bean[T](beanClass: Class[T]): Encoder[T] = 
ExpressionEncoder.javaBean(beanClass)
+
+  /**
* (Scala-specific) Creates an encoder that serializes objects of type T 
using Kryo.
* This encoder maps T into a single byte array (binary) field.
*

http://git-wip-us.apache.org/repos/asf/spark/blob/88bbce00/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index 7d4cfbe..c8ee87e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -17,14 +17,20 @@
 
 package org.apache.spark.sql.catalyst
 
-import java.beans.Introspector
+import java.beans.{PropertyDescriptor, Introspector}
 import java.lang.{Iterable => JIterable}
-import java.util.{Iterator => JIterator, Map => JMap}
+import java.util.{Iterator => JIterator, Map => JMap, List => JList}
 
 import scala.language.existentials
 
 import com.google.common.reflect.TypeToken
+
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, 
UnresolvedExtractValue}
+import org.apache.spark.sql.catalyst.util.{GenericArrayData, 
ArrayBasedMapData, DateTimeUtils}
+import org.apache.spark.unsafe.types.UTF8String
+
 
 /**
  * Type-inference utilities for POJOs and Java collections.
@@ -33,13 +39,14 @@ object JavaTypeInference {
 
   private val iterableType = TypeToken.of(classOf[JIterable[_]])
   private val mapType = TypeToken.of(classOf[JMap[_, _]])
+  private val listType = TypeToken.of(classOf[JList[_]])
   private val iteratorReturnType = 
classOf[JIterable[_]].getMethod("iterator").getGenericReturnType
   private val nextReturnType = 

spark git commit: [SPARK-11905][SQL] Support Persist/Cache and Unpersist in Dataset APIs

2015-12-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 88bbce008 -> 40769b48c


[SPARK-11905][SQL] Support Persist/Cache and Unpersist in Dataset APIs

Persist and Unpersist exist in both RDD and Dataframe APIs. I think they are 
still very critical in Dataset APIs. Not sure if my understanding is correct? 
If so, could you help me check if the implementation is acceptable?

Please provide your opinions. marmbrus rxin cloud-fan

Thank you very much!

Author: gatorsmile 
Author: xiaoli 
Author: Xiao Li 

Closes #9889 from gatorsmile/persistDS.

(cherry picked from commit 0a7bca2da04aefff16f2513ec27a92e69ceb77f6)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40769b48
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40769b48
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40769b48

Branch: refs/heads/branch-1.6
Commit: 40769b48cd001b7ff8c301628dc1442e3dd946cd
Parents: 88bbce0
Author: gatorsmile 
Authored: Tue Dec 1 10:38:59 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Dec 1 10:39:14 2015 -0800

--
 .../scala/org/apache/spark/sql/DataFrame.scala  |  9 +++
 .../scala/org/apache/spark/sql/Dataset.scala| 50 +++-
 .../scala/org/apache/spark/sql/SQLContext.scala |  9 +++
 .../spark/sql/execution/CacheManager.scala  | 27 +++
 .../apache/spark/sql/DatasetCacheSuite.scala| 80 
 .../scala/org/apache/spark/sql/QueryTest.scala  |  5 +-
 6 files changed, 162 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/40769b48/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 6197f10..eb87003 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1584,6 +1584,7 @@ class DataFrame private[sql](
   def distinct(): DataFrame = dropDuplicates()
 
   /**
+   * Persist this [[DataFrame]] with the default storage level 
(`MEMORY_AND_DISK`).
* @group basic
* @since 1.3.0
*/
@@ -1593,12 +1594,17 @@ class DataFrame private[sql](
   }
 
   /**
+   * Persist this [[DataFrame]] with the default storage level 
(`MEMORY_AND_DISK`).
* @group basic
* @since 1.3.0
*/
   def cache(): this.type = persist()
 
   /**
+   * Persist this [[DataFrame]] with the given storage level.
+   * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, 
`MEMORY_ONLY_SER`,
+   * `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`,
+   * `MEMORY_AND_DISK_2`, etc.
* @group basic
* @since 1.3.0
*/
@@ -1608,6 +1614,8 @@ class DataFrame private[sql](
   }
 
   /**
+   * Mark the [[DataFrame]] as non-persistent, and remove all blocks for it 
from memory and disk.
+   * @param blocking Whether to block until all blocks are deleted.
* @group basic
* @since 1.3.0
*/
@@ -1617,6 +1625,7 @@ class DataFrame private[sql](
   }
 
   /**
+   * Mark the [[DataFrame]] as non-persistent, and remove all blocks for it 
from memory and disk.
* @group basic
* @since 1.3.0
*/

http://git-wip-us.apache.org/repos/asf/spark/blob/40769b48/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index c357f88..d6bb1d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{Queryable, QueryExecution}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
 
 /**
@@ -565,7 +566,7 @@ class Dataset[T] private[sql](
* combined.
*
* Note that, this function is not a typical set union operation, in that it 
does not eliminate
-   * duplicate items.  As such, it is analagous to `UNION ALL` in SQL.
+   * duplicate items.  As such, it is analogous to `UNION ALL` in SQL.
* @since 1.6.0
*/
   def union(other: Dataset[T]): Dataset[T] = withPlan[T](other)(Union)
@@ -618,7 +619,6 @@ class Dataset[T] private[sql](
   case _ => Alias(CreateStruct(rightOutput), "_2")()
 }
 
-

spark git commit: [SPARK-11905][SQL] Support Persist/Cache and Unpersist in Dataset APIs

2015-12-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master fd95eeaf4 -> 0a7bca2da


[SPARK-11905][SQL] Support Persist/Cache and Unpersist in Dataset APIs

Persist and Unpersist exist in both RDD and Dataframe APIs. I think they are 
still very critical in Dataset APIs. Not sure if my understanding is correct? 
If so, could you help me check if the implementation is acceptable?

Please provide your opinions. marmbrus rxin cloud-fan

Thank you very much!

Author: gatorsmile 
Author: xiaoli 
Author: Xiao Li 

Closes #9889 from gatorsmile/persistDS.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a7bca2d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a7bca2d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a7bca2d

Branch: refs/heads/master
Commit: 0a7bca2da04aefff16f2513ec27a92e69ceb77f6
Parents: fd95eea
Author: gatorsmile 
Authored: Tue Dec 1 10:38:59 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Dec 1 10:38:59 2015 -0800

--
 .../scala/org/apache/spark/sql/DataFrame.scala  |  9 +++
 .../scala/org/apache/spark/sql/Dataset.scala| 50 +++-
 .../scala/org/apache/spark/sql/SQLContext.scala |  9 +++
 .../spark/sql/execution/CacheManager.scala  | 27 +++
 .../apache/spark/sql/DatasetCacheSuite.scala| 80 
 .../scala/org/apache/spark/sql/QueryTest.scala  |  5 +-
 6 files changed, 162 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0a7bca2d/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 6197f10..eb87003 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1584,6 +1584,7 @@ class DataFrame private[sql](
   def distinct(): DataFrame = dropDuplicates()
 
   /**
+   * Persist this [[DataFrame]] with the default storage level 
(`MEMORY_AND_DISK`).
* @group basic
* @since 1.3.0
*/
@@ -1593,12 +1594,17 @@ class DataFrame private[sql](
   }
 
   /**
+   * Persist this [[DataFrame]] with the default storage level 
(`MEMORY_AND_DISK`).
* @group basic
* @since 1.3.0
*/
   def cache(): this.type = persist()
 
   /**
+   * Persist this [[DataFrame]] with the given storage level.
+   * @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, 
`MEMORY_ONLY_SER`,
+   * `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`,
+   * `MEMORY_AND_DISK_2`, etc.
* @group basic
* @since 1.3.0
*/
@@ -1608,6 +1614,8 @@ class DataFrame private[sql](
   }
 
   /**
+   * Mark the [[DataFrame]] as non-persistent, and remove all blocks for it 
from memory and disk.
+   * @param blocking Whether to block until all blocks are deleted.
* @group basic
* @since 1.3.0
*/
@@ -1617,6 +1625,7 @@ class DataFrame private[sql](
   }
 
   /**
+   * Mark the [[DataFrame]] as non-persistent, and remove all blocks for it 
from memory and disk.
* @group basic
* @since 1.3.0
*/

http://git-wip-us.apache.org/repos/asf/spark/blob/0a7bca2d/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index c357f88..d6bb1d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{Queryable, QueryExecution}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
 
 /**
@@ -565,7 +566,7 @@ class Dataset[T] private[sql](
* combined.
*
* Note that, this function is not a typical set union operation, in that it 
does not eliminate
-   * duplicate items.  As such, it is analagous to `UNION ALL` in SQL.
+   * duplicate items.  As such, it is analogous to `UNION ALL` in SQL.
* @since 1.6.0
*/
   def union(other: Dataset[T]): Dataset[T] = withPlan[T](other)(Union)
@@ -618,7 +619,6 @@ class Dataset[T] private[sql](
   case _ => Alias(CreateStruct(rightOutput), "_2")()
 }
 
-
 implicit val tuple2Encoder: Encoder[(T, U)] =
   ExpressionEncoder.tuple(this.unresolvedTEncoder, 
other.unresolvedTEncoder)
 

spark git commit: [SPARK-12046][DOC] Fixes various ScalaDoc/JavaDoc issues

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 40769b48c -> 843a31afb


[SPARK-12046][DOC] Fixes various ScalaDoc/JavaDoc issues

This PR backports PR #10039 to master

Author: Cheng Lian 

Closes #10063 from liancheng/spark-12046.doc-fix.master.

(cherry picked from commit 69dbe6b40df35d488d4ee343098ac70d00bbdafb)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/843a31af
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/843a31af
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/843a31af

Branch: refs/heads/branch-1.6
Commit: 843a31afbdeea66449750f0ba8f676ef31d00726
Parents: 40769b4
Author: Cheng Lian 
Authored: Tue Dec 1 10:21:31 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 10:50:28 2015 -0800

--
 .../spark/api/java/function/Function4.java  |  2 +-
 .../spark/api/java/function/VoidFunction.java   |  2 +-
 .../spark/api/java/function/VoidFunction2.java  |  2 +-
 .../org/apache/spark/api/java/JavaPairRDD.scala | 16 +++
 .../scala/org/apache/spark/memory/package.scala | 14 +++---
 .../org/apache/spark/rdd/CoGroupedRDD.scala |  2 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  6 +--
 .../org/apache/spark/rdd/ShuffledRDD.scala  |  2 +-
 .../scala/org/apache/spark/scheduler/Task.scala |  5 ++-
 .../serializer/SerializationDebugger.scala  | 13 +++---
 .../scala/org/apache/spark/util/Vector.scala|  1 +
 .../spark/util/collection/ExternalSorter.scala  | 30 ++---
 .../WritablePartitionedPairCollection.scala |  7 +--
 .../streaming/kinesis/KinesisReceiver.scala | 23 +-
 .../spark/streaming/kinesis/KinesisUtils.scala  | 13 +++---
 .../mllib/optimization/GradientDescent.scala| 12 +++---
 project/SparkBuild.scala|  2 +
 .../scala/org/apache/spark/sql/Column.scala | 11 ++---
 .../spark/streaming/StreamingContext.scala  | 11 ++---
 .../streaming/dstream/FileInputDStream.scala| 19 +
 .../streaming/receiver/BlockGenerator.scala | 22 +-
 .../scheduler/ReceiverSchedulingPolicy.scala| 45 ++--
 .../streaming/util/FileBasedWriteAheadLog.scala |  7 +--
 .../spark/streaming/util/RecurringTimer.scala   |  8 ++--
 .../org/apache/spark/deploy/yarn/Client.scala   | 10 ++---
 25 files changed, 152 insertions(+), 133 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/843a31af/core/src/main/java/org/apache/spark/api/java/function/Function4.java
--
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/Function4.java 
b/core/src/main/java/org/apache/spark/api/java/function/Function4.java
index fd727d6..9c35a22 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function4.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function4.java
@@ -23,5 +23,5 @@ import java.io.Serializable;
  * A four-argument function that takes arguments of type T1, T2, T3 and T4 and 
returns an R.
  */
 public interface Function4 extends Serializable {
-  public R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception;
+  R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/843a31af/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
--
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java 
b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
index 2a10435..f30d42e 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
@@ -23,5 +23,5 @@ import java.io.Serializable;
  * A function with no return value.
  */
 public interface VoidFunction extends Serializable {
-  public void call(T t) throws Exception;
+  void call(T t) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/843a31af/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
--
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java 
b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
index 6c576ab..da9ae1c 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
@@ -23,5 +23,5 @@ import java.io.Serializable;
  * A two-argument function that takes arguments of type T1 and T2 with no 
return value.
  */
 public interface 

spark git commit: [SPARK-11821] Propagate Kerberos keytab for all environments

2015-12-01 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 843a31afb -> 99dc1335e


[SPARK-11821] Propagate Kerberos keytab for all environments

andrewor14 the same PR as in branch 1.5
harishreedharan

Author: woj-i 

Closes #9859 from woj-i/master.

(cherry picked from commit 6a8cf80cc8ef435ec46138fa57325bda5d68f3ce)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99dc1335
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99dc1335
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99dc1335

Branch: refs/heads/branch-1.6
Commit: 99dc1335e2f635a067f9fa1e83a35bf9593bfc24
Parents: 843a31a
Author: woj-i 
Authored: Tue Dec 1 11:05:45 2015 -0800
Committer: Marcelo Vanzin 
Committed: Tue Dec 1 11:06:08 2015 -0800

--
 core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 4 
 docs/running-on-yarn.md   | 4 ++--
 docs/sql-programming-guide.md | 7 ---
 3 files changed, 10 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/99dc1335/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 2e912b5..52d3ab3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -545,6 +545,10 @@ object SparkSubmit {
   if (args.isPython) {
 sysProps.put("spark.yarn.isPython", "true")
   }
+}
+
+// assure a keytab is available from any place in a JVM
+if (clusterManager == YARN || clusterManager == LOCAL) {
   if (args.principal != null) {
 require(args.keytab != null, "Keytab must be specified when principal 
is specified")
 if (!new File(args.keytab).exists()) {

http://git-wip-us.apache.org/repos/asf/spark/blob/99dc1335/docs/running-on-yarn.md
--
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 925a1e0..06413f8 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -358,14 +358,14 @@ If you need a reference to the proper location to put log 
files in the YARN so t
   
   The full path to the file that contains the keytab for the principal 
specified above.
   This keytab will be copied to the node running the YARN Application Master 
via the Secure Distributed Cache,
-  for renewing the login tickets and the delegation tokens periodically.
+  for renewing the login tickets and the delegation tokens periodically. 
(Works also with the "local" master)
   
 
 
   spark.yarn.principal
   (none)
   
-  Principal to be used to login to KDC, while running on secure HDFS.
+  Principal to be used to login to KDC, while running on secure HDFS. (Works 
also with the "local" master)
   
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/99dc1335/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index d7b205c..7b1d97b 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1614,7 +1614,8 @@ This command builds a new assembly jar that includes 
Hive. Note that this Hive a
 on all of the worker nodes, as they will need access to the Hive serialization 
and deserialization libraries
 (SerDes) in order to access data stored in Hive.
 
-Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. 
Please note when running
+Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` 
(for security configuration),
+ `hdfs-site.xml` (for HDFS configuration) file in `conf/`. Please note when 
running
 the query on a YARN cluster (`cluster` mode), the `datanucleus` jars under the 
`lib_managed/jars` directory
 and `hive-site.xml` under `conf/` directory need to be available on the driver 
and all executors launched by the
 YARN cluster. The convenient way to do this is adding them through the 
`--jars` option and `--file` option of the
@@ -2028,7 +2029,7 @@ Beeline will ask you for a username and password. In 
non-secure mode, simply ent
 your machine and a blank password. For secure mode, please follow the 
instructions given in the
 [beeline 
documentation](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients).
 
-Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
+Configuration of Hive is done by placing your 

spark git commit: [SPARK-11821] Propagate Kerberos keytab for all environments

2015-12-01 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 0a7bca2da -> 6a8cf80cc


[SPARK-11821] Propagate Kerberos keytab for all environments

andrewor14 the same PR as in branch 1.5
harishreedharan

Author: woj-i 

Closes #9859 from woj-i/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a8cf80c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a8cf80c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a8cf80c

Branch: refs/heads/master
Commit: 6a8cf80cc8ef435ec46138fa57325bda5d68f3ce
Parents: 0a7bca2
Author: woj-i 
Authored: Tue Dec 1 11:05:45 2015 -0800
Committer: Marcelo Vanzin 
Committed: Tue Dec 1 11:05:45 2015 -0800

--
 core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 4 
 docs/running-on-yarn.md   | 4 ++--
 docs/sql-programming-guide.md | 7 ---
 3 files changed, 10 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6a8cf80c/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 2e912b5..52d3ab3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -545,6 +545,10 @@ object SparkSubmit {
   if (args.isPython) {
 sysProps.put("spark.yarn.isPython", "true")
   }
+}
+
+// assure a keytab is available from any place in a JVM
+if (clusterManager == YARN || clusterManager == LOCAL) {
   if (args.principal != null) {
 require(args.keytab != null, "Keytab must be specified when principal 
is specified")
 if (!new File(args.keytab).exists()) {

http://git-wip-us.apache.org/repos/asf/spark/blob/6a8cf80c/docs/running-on-yarn.md
--
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 925a1e0..06413f8 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -358,14 +358,14 @@ If you need a reference to the proper location to put log 
files in the YARN so t
   
   The full path to the file that contains the keytab for the principal 
specified above.
   This keytab will be copied to the node running the YARN Application Master 
via the Secure Distributed Cache,
-  for renewing the login tickets and the delegation tokens periodically.
+  for renewing the login tickets and the delegation tokens periodically. 
(Works also with the "local" master)
   
 
 
   spark.yarn.principal
   (none)
   
-  Principal to be used to login to KDC, while running on secure HDFS.
+  Principal to be used to login to KDC, while running on secure HDFS. (Works 
also with the "local" master)
   
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6a8cf80c/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index d7b205c..7b1d97b 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1614,7 +1614,8 @@ This command builds a new assembly jar that includes 
Hive. Note that this Hive a
 on all of the worker nodes, as they will need access to the Hive serialization 
and deserialization libraries
 (SerDes) in order to access data stored in Hive.
 
-Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`. 
Please note when running
+Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` 
(for security configuration),
+ `hdfs-site.xml` (for HDFS configuration) file in `conf/`. Please note when 
running
 the query on a YARN cluster (`cluster` mode), the `datanucleus` jars under the 
`lib_managed/jars` directory
 and `hive-site.xml` under `conf/` directory need to be available on the driver 
and all executors launched by the
 YARN cluster. The convenient way to do this is adding them through the 
`--jars` option and `--file` option of the
@@ -2028,7 +2029,7 @@ Beeline will ask you for a username and password. In 
non-secure mode, simply ent
 your machine and a blank password. For secure mode, please follow the 
instructions given in the
 [beeline 
documentation](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients).
 
-Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
+Configuration of Hive is done by placing your `hive-site.xml`, `core-site.xml` 
and `hdfs-site.xml` files in `conf/`.
 
 You may also use the beeline script that comes with 

spark git commit: [SPARK-12065] Upgrade Tachyon from 0.8.1 to 0.8.2

2015-12-01 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 99dc1335e -> ab2a124c8


[SPARK-12065] Upgrade Tachyon from 0.8.1 to 0.8.2

This commit upgrades the Tachyon dependency from 0.8.1 to 0.8.2.

Author: Josh Rosen 

Closes #10054 from JoshRosen/upgrade-to-tachyon-0.8.2.

(cherry picked from commit 34e7093c1131162b3aa05b65a19a633a0b5b633e)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab2a124c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab2a124c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab2a124c

Branch: refs/heads/branch-1.6
Commit: ab2a124c8eca6823ee016c9ecfbdbf4918fbcdd6
Parents: 99dc133
Author: Josh Rosen 
Authored: Tue Dec 1 11:49:20 2015 -0800
Committer: Josh Rosen 
Committed: Tue Dec 1 11:50:00 2015 -0800

--
 core/pom.xml | 2 +-
 make-distribution.sh | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ab2a124c/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 37e3f16..61744bb 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -270,7 +270,7 @@
 
   org.tachyonproject
   tachyon-client
-  0.8.1
+  0.8.2
   
 
   org.apache.hadoop

http://git-wip-us.apache.org/repos/asf/spark/blob/ab2a124c/make-distribution.sh
--
diff --git a/make-distribution.sh b/make-distribution.sh
index d7d27e2..c949e94 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -33,7 +33,7 @@ SPARK_HOME="$(cd "`dirname "$0"`"; pwd)"
 DISTDIR="$SPARK_HOME/dist"
 
 SPARK_TACHYON=false
-TACHYON_VERSION="0.8.1"
+TACHYON_VERSION="0.8.2"
 TACHYON_TGZ="tachyon-${TACHYON_VERSION}-bin.tar.gz"
 
TACHYON_URL="http://tachyon-project.org/downloads/files/${TACHYON_VERSION}/${TACHYON_TGZ};
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12065] Upgrade Tachyon from 0.8.1 to 0.8.2

2015-12-01 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 6a8cf80cc -> 34e7093c1


[SPARK-12065] Upgrade Tachyon from 0.8.1 to 0.8.2

This commit upgrades the Tachyon dependency from 0.8.1 to 0.8.2.

Author: Josh Rosen 

Closes #10054 from JoshRosen/upgrade-to-tachyon-0.8.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34e7093c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34e7093c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34e7093c

Branch: refs/heads/master
Commit: 34e7093c1131162b3aa05b65a19a633a0b5b633e
Parents: 6a8cf80
Author: Josh Rosen 
Authored: Tue Dec 1 11:49:20 2015 -0800
Committer: Josh Rosen 
Committed: Tue Dec 1 11:49:20 2015 -0800

--
 core/pom.xml | 2 +-
 make-distribution.sh | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/34e7093c/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 37e3f16..61744bb 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -270,7 +270,7 @@
 
   org.tachyonproject
   tachyon-client
-  0.8.1
+  0.8.2
   
 
   org.apache.hadoop

http://git-wip-us.apache.org/repos/asf/spark/blob/34e7093c/make-distribution.sh
--
diff --git a/make-distribution.sh b/make-distribution.sh
index 7b417fe..e64ceb8 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -33,7 +33,7 @@ SPARK_HOME="$(cd "`dirname "$0"`"; pwd)"
 DISTDIR="$SPARK_HOME/dist"
 
 SPARK_TACHYON=false
-TACHYON_VERSION="0.8.1"
+TACHYON_VERSION="0.8.2"
 TACHYON_TGZ="tachyon-${TACHYON_VERSION}-bin.tar.gz"
 
TACHYON_URL="http://tachyon-project.org/downloads/files/${TACHYON_VERSION}/${TACHYON_TGZ};
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Set SPARK_EC2_VERSION to 1.5.2

2015-12-01 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 d78f1bc45 -> 80dac0b07


Set SPARK_EC2_VERSION to 1.5.2

Author: Alexander Pivovarov 

Closes #10064 from apivovarov/patch-1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80dac0b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80dac0b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80dac0b0

Branch: refs/heads/branch-1.5
Commit: 80dac0b07dd37064255f13874ebae5f75e371e9c
Parents: d78f1bc
Author: Alexander Pivovarov 
Authored: Tue Dec 1 01:17:49 2015 -0800
Committer: Shivaram Venkataraman 
Committed: Tue Dec 1 01:17:49 2015 -0800

--
 ec2/spark_ec2.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/80dac0b0/ec2/spark_ec2.py
--
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 561284e..ed538b0 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -51,7 +51,7 @@ else:
 raw_input = input
 xrange = range
 
-SPARK_EC2_VERSION = "1.5.1"
+SPARK_EC2_VERSION = "1.5.2"
 SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))
 
 VALID_SPARK_VERSIONS = set([


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11898][MLLIB] Use broadcast for the global tables in Word2Vec

2015-12-01 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 9693b0d5a -> a0af0e351


[SPARK-11898][MLLIB] Use broadcast for the global tables in Word2Vec

jira: https://issues.apache.org/jira/browse/SPARK-11898
syn0Global and sync1Global in word2vec are quite large objects with size (vocab 
* vectorSize * 8), yet they are passed to worker using basic task serialization.

Use broadcast can greatly improve the performance. My benchmark shows that, for 
1M vocabulary and default vectorSize 100, changing to broadcast can help,

1. decrease the worker memory consumption by 45%.
2. decrease running time by 40%.

This will also help extend the upper limit for Word2Vec.

Author: Yuhao Yang 

Closes #9878 from hhbyyh/w2vBC.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0af0e35
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0af0e35
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0af0e35

Branch: refs/heads/master
Commit: a0af0e351e45a8be47a6f65efd132eaa4a00c9e4
Parents: 9693b0d
Author: Yuhao Yang 
Authored: Tue Dec 1 09:26:58 2015 +
Committer: Sean Owen 
Committed: Tue Dec 1 09:26:58 2015 +

--
 .../main/scala/org/apache/spark/mllib/feature/Word2Vec.scala  | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a0af0e35/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index a47f27b..655ac0b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -316,12 +316,15 @@ class Word2Vec extends Serializable with Logging {
   Array.fill[Float](vocabSize * vectorSize)((initRandom.nextFloat() - 
0.5f) / vectorSize)
 val syn1Global = new Array[Float](vocabSize * vectorSize)
 var alpha = learningRate
+
 for (k <- 1 to numIterations) {
+  val bcSyn0Global = sc.broadcast(syn0Global)
+  val bcSyn1Global = sc.broadcast(syn1Global)
   val partial = newSentences.mapPartitionsWithIndex { case (idx, iter) =>
 val random = new XORShiftRandom(seed ^ ((idx + 1) << 16) ^ ((-k - 1) 
<< 8))
 val syn0Modify = new Array[Int](vocabSize)
 val syn1Modify = new Array[Int](vocabSize)
-val model = iter.foldLeft((syn0Global, syn1Global, 0, 0)) {
+val model = iter.foldLeft((bcSyn0Global.value, bcSyn1Global.value, 0, 
0)) {
   case ((syn0, syn1, lastWordCount, wordCount), sentence) =>
 var lwc = lastWordCount
 var wc = wordCount
@@ -405,6 +408,8 @@ class Word2Vec extends Serializable with Logging {
 }
 i += 1
   }
+  bcSyn0Global.unpersist(false)
+  bcSyn1Global.unpersist(false)
 }
 newSentences.unpersist()
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11949][SQL] Set field nullable property for GroupingSets to get correct results for null values

2015-12-01 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master a0af0e351 -> c87531b76


[SPARK-11949][SQL] Set field nullable property for GroupingSets to get correct 
results for null values

JIRA: https://issues.apache.org/jira/browse/SPARK-11949

The result of cube plan uses incorrect schema. The schema of cube result should 
set nullable property to true because the grouping expressions will have null 
values.

Author: Liang-Chi Hsieh 

Closes #10038 from viirya/fix-cube.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c87531b7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c87531b7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c87531b7

Branch: refs/heads/master
Commit: c87531b765f8934a9a6c0f673617e0abfa5e5f0e
Parents: a0af0e3
Author: Liang-Chi Hsieh 
Authored: Tue Dec 1 07:42:37 2015 -0800
Committer: Yin Huai 
Committed: Tue Dec 1 07:44:22 2015 -0800

--
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 --
 .../org/apache/spark/sql/DataFrameAggregateSuite.scala| 10 ++
 2 files changed, 18 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c87531b7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 94ffbbb..b8f212f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -223,6 +223,11 @@ class Analyzer(
   case other => Alias(other, other.toString)()
 }
 
+// TODO: We need to use bitmasks to determine which grouping 
expressions need to be
+// set as nullable. For example, if we have GROUPING SETS ((a,b), a), 
we do not need
+// to change the nullability of a.
+val attributeMap = groupByAliases.map(a => (a -> 
a.toAttribute.withNullability(true))).toMap
+
 val aggregations: Seq[NamedExpression] = x.aggregations.map {
   // If an expression is an aggregate (contains a AggregateExpression) 
then we dont change
   // it so that the aggregation is computed on the unmodified value of 
its argument
@@ -231,12 +236,13 @@ class Analyzer(
   // If not then its a grouping expression and we need to use the 
modified (with nulls from
   // Expand) value of the expression.
   case expr => expr.transformDown {
-case e => 
groupByAliases.find(_.child.semanticEquals(e)).map(_.toAttribute).getOrElse(e)
+case e =>
+  
groupByAliases.find(_.child.semanticEquals(e)).map(attributeMap(_)).getOrElse(e)
   }.asInstanceOf[NamedExpression]
 }
 
 val child = Project(x.child.output ++ groupByAliases, x.child)
-val groupByAttributes = groupByAliases.map(_.toAttribute)
+val groupByAttributes = groupByAliases.map(attributeMap(_))
 
 Aggregate(
   groupByAttributes :+ VirtualColumn.groupingIdAttribute,

http://git-wip-us.apache.org/repos/asf/spark/blob/c87531b7/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index b5c636d..b1004bc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.DecimalType
 
+case class Fact(date: Int, hour: Int, minute: Int, room_name: String, temp: 
Double)
 
 class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
@@ -86,6 +87,15 @@ class DataFrameAggregateSuite extends QueryTest with 
SharedSQLContext {
 Row(null, 2013, 78000.0) ::
 Row(null, null, 113000.0) :: Nil
 )
+
+val df0 = sqlContext.sparkContext.parallelize(Seq(
+  Fact(20151123, 18, 35, "room1", 18.6),
+  Fact(20151123, 18, 35, "room2", 22.4),
+  Fact(20151123, 18, 36, "room1", 17.4),
+  Fact(20151123, 18, 36, "room2", 25.6))).toDF()
+
+val cube0 = df0.cube("date", "hour", "minute", "room_name").agg(Map("temp" 
-> "avg"))
+assert(cube0.where("date IS NULL").count > 0)
   }
 
   test("rollup overlapping