spark git commit: [SPARK-8138] [SQL] Improves error message when conflicting partition columns are found

2015-06-24 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 09fcf96b8 -> cc465fd92


[SPARK-8138] [SQL] Improves error message when conflicting partition columns 
are found

This PR improves the error message shown when conflicting partition column 
names are detected.  This can be particularly annoying and confusing when there 
are a large number of partitions while a handful of them happened to contain 
unexpected temporary file(s).  Now all suspicious directories are listed as 
below:

```
java.lang.AssertionError: assertion failed: Conflicting partition column names 
detected:

Partition column name list #0: b, c, d
Partition column name list #1: b, c
Partition column name list #2: b

For partitioned table directories, data files should only live in leaf 
directories. Please check the following directories for unexpected files:

file:/tmp/foo/b=0
file:/tmp/foo/b=1
file:/tmp/foo/b=1/c=1
file:/tmp/foo/b=0/c=0
```

Author: Cheng Lian 

Closes #6610 from liancheng/part-errmsg and squashes the following commits:

7d05f2c [Cheng Lian] Fixes Scala style issue
a149250 [Cheng Lian] Adds test case for the error message
6b74dd8 [Cheng Lian] Also lists suspicious non-leaf partition directories
a935eb8 [Cheng Lian] Improves error message when conflicting partition columns 
are found


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc465fd9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc465fd9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc465fd9

Branch: refs/heads/master
Commit: cc465fd92482737c21971d82e30d4cf247acf932
Parents: 09fcf96
Author: Cheng Lian 
Authored: Wed Jun 24 02:17:12 2015 -0700
Committer: Cheng Lian 
Committed: Wed Jun 24 02:17:12 2015 -0700

--
 .../spark/sql/sources/PartitioningUtils.scala   | 47 +++-
 .../ParquetPartitionDiscoverySuite.scala| 45 +++
 2 files changed, 82 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cc465fd9/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index c6f535d..8b2a45d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -84,7 +84,7 @@ private[sql] object PartitioningUtils {
 } else {
   // This dataset is partitioned. We need to check whether all partitions 
have the same
   // partition columns and resolve potential type conflicts.
-  val resolvedPartitionValues = 
resolvePartitions(pathsWithPartitionValues.map(_._2))
+  val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
 
   // Creates the StructType which represents the partition columns.
   val fields = {
@@ -181,19 +181,18 @@ private[sql] object PartitioningUtils {
*   StringType
* }}}
*/
-  private[sql] def resolvePartitions(values: Seq[PartitionValues]): 
Seq[PartitionValues] = {
-// Column names of all partitions must match
-val distinctPartitionsColNames = values.map(_.columnNames).distinct
-
-if (distinctPartitionsColNames.isEmpty) {
+  private[sql] def resolvePartitions(
+  pathsWithPartitionValues: Seq[(Path, PartitionValues)]): 
Seq[PartitionValues] = {
+if (pathsWithPartitionValues.isEmpty) {
   Seq.empty
 } else {
-  assert(distinctPartitionsColNames.size == 1, {
-val list = distinctPartitionsColNames.mkString("\t", "\n\t", "")
-s"Conflicting partition column names detected:\n$list"
-  })
+  val distinctPartColNames = 
pathsWithPartitionValues.map(_._2.columnNames).distinct
+  assert(
+distinctPartColNames.size == 1,
+listConflictingPartitionColumns(pathsWithPartitionValues))
 
   // Resolves possible type conflicts for each column
+  val values = pathsWithPartitionValues.map(_._2)
   val columnCount = values.head.columnNames.size
   val resolvedValues = (0 until columnCount).map { i =>
 resolveTypeConflicts(values.map(_.literals(i)))
@@ -206,6 +205,34 @@ private[sql] object PartitioningUtils {
 }
   }
 
+  private[sql] def listConflictingPartitionColumns(
+  pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = {
+val distinctPartColNames = 
pathWithPartitionValues.map(_._2.columnNames).distinct
+
+def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
+  seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) 
=> value })
+
+val partColNamesT

spark git commit: [SPARK-8567] [SQL] Debugging flaky HiveSparkSubmitSuite

2015-06-24 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master cc465fd92 -> 9d36ec243


[SPARK-8567] [SQL] Debugging flaky HiveSparkSubmitSuite

Using similar approach used in `HiveThriftServer2Suite` to print stdout/stderr 
of the spawned process instead of logging them to see what happens on Jenkins. 
(This test suite only fails on Jenkins and doesn't spill out any log...)

cc yhuai

Author: Cheng Lian 

Closes #6978 from liancheng/debug-hive-spark-submit-suite and squashes the 
following commits:

b031647 [Cheng Lian] Prints process stdout/stderr instead of logging them


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d36ec24
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d36ec24
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d36ec24

Branch: refs/heads/master
Commit: 9d36ec24312f0a9865b4392f89e9611a5b80916d
Parents: cc465fd
Author: Cheng Lian 
Authored: Wed Jun 24 09:49:20 2015 -0700
Committer: Yin Huai 
Committed: Wed Jun 24 09:49:20 2015 -0700

--
 .../apache/spark/sql/hive/HiveSparkSubmitSuite.scala  | 14 +++---
 1 file changed, 11 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9d36ec24/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index ab44303..d85516a 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.hive
 
 import java.io.File
 
+import scala.sys.process.{ProcessLogger, Process}
+
 import org.apache.spark._
 import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
 import org.apache.spark.util.{ResetSystemProperties, Utils}
@@ -82,12 +84,18 @@ class HiveSparkSubmitSuite
   // This is copied from org.apache.spark.deploy.SparkSubmitSuite
   private def runSparkSubmit(args: Seq[String]): Unit = {
 val sparkHome = sys.props.getOrElse("spark.test.home", 
fail("spark.test.home is not set!"))
-val process = Utils.executeCommand(
+val process = Process(
   Seq("./bin/spark-submit") ++ args,
   new File(sparkHome),
-  Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
+  "SPARK_TESTING" -> "1",
+  "SPARK_HOME" -> sparkHome
+).run(ProcessLogger(
+  (line: String) => { println(s"out> $line") },
+  (line: String) => { println(s"err> $line") }
+))
+
 try {
-  val exitCode = failAfter(120 seconds) { process.waitFor() }
+  val exitCode = failAfter(120 seconds) { process.exitValue() }
   if (exitCode != 0) {
 fail(s"Process returned with exit code $exitCode. See the log4j logs 
for more detail.")
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-8578] [SQL] Should ignore user defined output committer when appending data

2015-06-24 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 9d36ec243 -> bba6699d0


[SPARK-8578] [SQL] Should ignore user defined output committer when appending 
data

https://issues.apache.org/jira/browse/SPARK-8578

It is not very safe to use a custom output committer when append data to an 
existing dir. This changes adds the logic to check if we are appending data, 
and if so, we use the output committer associated with the file output format.

Author: Yin Huai 

Closes #6964 from yhuai/SPARK-8578 and squashes the following commits:

43544c4 [Yin Huai] Do not use a custom output commiter when appendiing data.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bba6699d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bba6699d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bba6699d

Branch: refs/heads/master
Commit: bba6699d0e9093bc041a9a33dd31992790f32174
Parents: 9d36ec2
Author: Yin Huai 
Authored: Wed Jun 24 09:50:03 2015 -0700
Committer: Yin Huai 
Committed: Wed Jun 24 09:50:03 2015 -0700

--
 .../org/apache/spark/sql/sources/commands.scala | 89 
 .../sql/sources/hadoopFsRelationSuites.scala| 83 +-
 2 files changed, 136 insertions(+), 36 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bba6699d/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 215e53c..fb6173f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -96,7 +96,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
 val fs = outputPath.getFileSystem(hadoopConf)
 val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
 
-val doInsertion = (mode, fs.exists(qualifiedOutputPath)) match {
+val pathExists = fs.exists(qualifiedOutputPath)
+val doInsertion = (mode, pathExists) match {
   case (SaveMode.ErrorIfExists, true) =>
 sys.error(s"path $qualifiedOutputPath already exists.")
   case (SaveMode.Overwrite, true) =>
@@ -107,6 +108,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
   case (SaveMode.Ignore, exists) =>
 !exists
 }
+// If we are appending data to an existing dir.
+val isAppend = (pathExists) && (mode == SaveMode.Append)
 
 if (doInsertion) {
   val job = new Job(hadoopConf)
@@ -130,10 +133,10 @@ private[sql] case class InsertIntoHadoopFsRelation(
 
   val partitionColumns = relation.partitionColumns.fieldNames
   if (partitionColumns.isEmpty) {
-insert(new DefaultWriterContainer(relation, job), df)
+insert(new DefaultWriterContainer(relation, job, isAppend), df)
   } else {
 val writerContainer = new DynamicPartitionWriterContainer(
-  relation, job, partitionColumns, 
PartitioningUtils.DEFAULT_PARTITION_NAME)
+  relation, job, partitionColumns, 
PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
 insertWithDynamicPartitions(sqlContext, writerContainer, df, 
partitionColumns)
   }
 }
@@ -277,7 +280,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
 
 private[sql] abstract class BaseWriterContainer(
 @transient val relation: HadoopFsRelation,
-@transient job: Job)
+@transient job: Job,
+isAppend: Boolean)
   extends SparkHadoopMapReduceUtil
   with Logging
   with Serializable {
@@ -356,34 +360,47 @@ private[sql] abstract class BaseWriterContainer(
   }
 
   private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter 
= {
-val committerClass = context.getConfiguration.getClass(
-  SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
-
-Option(committerClass).map { clazz =>
-  logInfo(s"Using user defined output committer class 
${clazz.getCanonicalName}")
-
-  // Every output format based on 
org.apache.hadoop.mapreduce.lib.output.OutputFormat
-  // has an associated output committer. To override this output committer,
-  // we will first try to use the output committer set in 
SQLConf.OUTPUT_COMMITTER_CLASS.
-  // If a data source needs to override the output committer, it needs to 
set the
-  // output committer in prepareForWrite method.
-  if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
-// The specified output committer is a FileOutputCommitter.
-// So, we will use the FileOutputCommitter-specified constructor.
-val ctor = clazz.getDeclaredConstructor(classOf[Path], 
classOf[TaskAttemptContext])
-

spark git commit: [SPARK-8578] [SQL] Should ignore user defined output committer when appending data (branch 1.4)

2015-06-24 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 eafbe1345 -> 7e53ff258


[SPARK-8578] [SQL] Should ignore user defined output committer when appending 
data (branch 1.4)

This is https://github.com/apache/spark/pull/6964 for branch 1.4.

Author: Yin Huai 

Closes #6966 from yhuai/SPARK-8578-branch-1.4 and squashes the following 
commits:

9c3947b [Yin Huai] Do not use a custom output commiter when appendiing data.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e53ff25
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e53ff25
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e53ff25

Branch: refs/heads/branch-1.4
Commit: 7e53ff25813dc6a79f728c91e6c1d4d4dfa32aab
Parents: eafbe13
Author: Yin Huai 
Authored: Wed Jun 24 09:51:18 2015 -0700
Committer: Yin Huai 
Committed: Wed Jun 24 09:51:18 2015 -0700

--
 .../org/apache/spark/sql/sources/commands.scala | 89 
 .../sql/sources/hadoopFsRelationSuites.scala| 83 +-
 2 files changed, 136 insertions(+), 36 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7e53ff25/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 9a75dd7..29a47f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -97,7 +97,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
 val fs = outputPath.getFileSystem(hadoopConf)
 val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
 
-val doInsertion = (mode, fs.exists(qualifiedOutputPath)) match {
+val pathExists = fs.exists(qualifiedOutputPath)
+val doInsertion = (mode, pathExists) match {
   case (SaveMode.ErrorIfExists, true) =>
 sys.error(s"path $qualifiedOutputPath already exists.")
   case (SaveMode.Overwrite, true) =>
@@ -108,6 +109,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
   case (SaveMode.Ignore, exists) =>
 !exists
 }
+// If we are appending data to an existing dir.
+val isAppend = (pathExists) && (mode == SaveMode.Append)
 
 if (doInsertion) {
   val job = new Job(hadoopConf)
@@ -133,10 +136,10 @@ private[sql] case class InsertIntoHadoopFsRelation(
 
   val partitionColumns = relation.partitionColumns.fieldNames
   if (partitionColumns.isEmpty) {
-insert(new DefaultWriterContainer(relation, job), df)
+insert(new DefaultWriterContainer(relation, job, isAppend), df)
   } else {
 val writerContainer = new DynamicPartitionWriterContainer(
-  relation, job, partitionColumns, 
PartitioningUtils.DEFAULT_PARTITION_NAME)
+  relation, job, partitionColumns, 
PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
 insertWithDynamicPartitions(sqlContext, writerContainer, df, 
partitionColumns)
   }
 }
@@ -286,7 +289,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
 
 private[sql] abstract class BaseWriterContainer(
 @transient val relation: HadoopFsRelation,
-@transient job: Job)
+@transient job: Job,
+isAppend: Boolean)
   extends SparkHadoopMapReduceUtil
   with Logging
   with Serializable {
@@ -365,34 +369,47 @@ private[sql] abstract class BaseWriterContainer(
   }
 
   private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter 
= {
-val committerClass = context.getConfiguration.getClass(
-  SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
-
-Option(committerClass).map { clazz =>
-  logInfo(s"Using user defined output committer class 
${clazz.getCanonicalName}")
-
-  // Every output format based on 
org.apache.hadoop.mapreduce.lib.output.OutputFormat
-  // has an associated output committer. To override this output committer,
-  // we will first try to use the output committer set in 
SQLConf.OUTPUT_COMMITTER_CLASS.
-  // If a data source needs to override the output committer, it needs to 
set the
-  // output committer in prepareForWrite method.
-  if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
-// The specified output committer is a FileOutputCommitter.
-// So, we will use the FileOutputCommitter-specified constructor.
-val ctor = clazz.getDeclaredConstructor(classOf[Path], 
classOf[TaskAttemptContext])
-ctor.newInstance(new Path(outputPath), context)
-  } else {
-// The specified output committer is just a OutputCommitter.
-// So, we will use the no-argument con

spark git commit: [SPARK-8576] Add spark-ec2 options to set IAM roles and instance-initiated shutdown behavior

2015-06-24 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master bba6699d0 -> 31f48e5af


[SPARK-8576] Add spark-ec2 options to set IAM roles and instance-initiated 
shutdown behavior

Both of these options are useful when spark-ec2 is being used as part of an 
automated pipeline and the engineers want to minimize the need to pass around 
AWS keys for access to things like S3 (keys are replaced by the IAM role) and 
to be able to launch a cluster that can terminate itself cleanly.

Author: Nicholas Chammas 

Closes #6962 from nchammas/additional-ec2-options and squashes the following 
commits:

fcf252e [Nicholas Chammas] PEP8 fixes
efba9ee [Nicholas Chammas] add help for --instance-initiated-shutdown-behavior
598aecf [Nicholas Chammas] option to launch instances into IAM role
2743632 [Nicholas Chammas] add option for instance initiated shutdown


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31f48e5a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31f48e5a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31f48e5a

Branch: refs/heads/master
Commit: 31f48e5af887a9ccc9cea0218c36bf52bbf49d24
Parents: bba6699
Author: Nicholas Chammas 
Authored: Wed Jun 24 11:20:51 2015 -0700
Committer: Shivaram Venkataraman 
Committed: Wed Jun 24 11:20:51 2015 -0700

--
 ec2/spark_ec2.py | 56 ---
 1 file changed, 35 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/31f48e5a/ec2/spark_ec2.py
--
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 63e2c79..e4932cf 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -306,6 +306,13 @@ def parse_args():
 "--private-ips", action="store_true", default=False,
 help="Use private IPs for instances rather than public if VPC/subnet " 
+
  "requires that.")
+parser.add_option(
+"--instance-initiated-shutdown-behavior", default="stop",
+choices=["stop", "terminate"],
+help="Whether instances should terminate when shut down or just stop")
+parser.add_option(
+"--instance-profile-name", default=None,
+help="IAM profile name to launch instances under")
 
 (opts, args) = parser.parse_args()
 if len(args) != 2:
@@ -602,7 +609,8 @@ def launch_cluster(conn, opts, cluster_name):
 block_device_map=block_map,
 subnet_id=opts.subnet_id,
 placement_group=opts.placement_group,
-user_data=user_data_content)
+user_data=user_data_content,
+instance_profile_name=opts.instance_profile_name)
 my_req_ids += [req.id for req in slave_reqs]
 i += 1
 
@@ -647,16 +655,19 @@ def launch_cluster(conn, opts, cluster_name):
 for zone in zones:
 num_slaves_this_zone = get_partition(opts.slaves, num_zones, i)
 if num_slaves_this_zone > 0:
-slave_res = image.run(key_name=opts.key_pair,
-  security_group_ids=[slave_group.id] + 
additional_group_ids,
-  instance_type=opts.instance_type,
-  placement=zone,
-  min_count=num_slaves_this_zone,
-  max_count=num_slaves_this_zone,
-  block_device_map=block_map,
-  subnet_id=opts.subnet_id,
-  placement_group=opts.placement_group,
-  user_data=user_data_content)
+slave_res = image.run(
+key_name=opts.key_pair,
+security_group_ids=[slave_group.id] + additional_group_ids,
+instance_type=opts.instance_type,
+placement=zone,
+min_count=num_slaves_this_zone,
+max_count=num_slaves_this_zone,
+block_device_map=block_map,
+subnet_id=opts.subnet_id,
+placement_group=opts.placement_group,
+user_data=user_data_content,
+
instance_initiated_shutdown_behavior=opts.instance_initiated_shutdown_behavior,
+instance_profile_name=opts.instance_profile_name)
 slave_nodes += slave_res.instances
 print("Launched {s} slave{plural_s} in {z}, regid = 
{r}".format(
   s=num_slaves_this_zone,
@@ -678,16 +689,19 @@ def launch_cluster(conn, opts, cluster_name):
 master_type = opts.instance_type
 if opts.zone == 'all':
 opts.zone = random.choice(con

spark git commit: [SPARK-8399] [STREAMING] [WEB UI] Overlap between histograms and axis' name in Spark Streaming UI

2015-06-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 31f48e5af -> 1173483f3


[SPARK-8399] [STREAMING] [WEB UI] Overlap between histograms and axis' name in 
Spark Streaming UI

Moved where the X axis' name (#batches) is written in histograms in the spark 
streaming web ui so the histograms and the axis' name do not overlap.

Author: BenFradet 

Closes #6845 from BenFradet/SPARK-8399 and squashes the following commits:

b63695f [BenFradet] adjusted inner histograms
eb610ee [BenFradet] readjusted #batches on the x axis
dd46f98 [BenFradet] aligned all unit labels and ticks
0564b62 [BenFradet] readjusted #batches placement
edd0936 [BenFradet] moved where the X axis' name (#batches) is written in 
histograms in the spark streaming web ui


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1173483f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1173483f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1173483f

Branch: refs/heads/master
Commit: 1173483f3f465a4c63246e83d0aaa2af521395f5
Parents: 31f48e5
Author: BenFradet 
Authored: Wed Jun 24 11:53:03 2015 -0700
Committer: Tathagata Das 
Committed: Wed Jun 24 11:53:03 2015 -0700

--
 .../apache/spark/streaming/ui/static/streaming-page.js| 10 ++
 .../org/apache/spark/streaming/ui/StreamingPage.scala |  4 ++--
 2 files changed, 8 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1173483f/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
--
diff --git 
a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
 
b/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
index 75251f4..4886b68 100644
--- 
a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
+++ 
b/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
@@ -31,6 +31,8 @@ var maxXForHistogram = 0;
 var histogramBinCount = 10;
 var yValueFormat = d3.format(",.2f");
 
+var unitLabelYOffset = -10;
+
 // Show a tooltip "text" for "node"
 function showBootstrapTooltip(node, text) {
 $(node).tooltip({title: text, trigger: "manual", container: "body"});
@@ -133,7 +135,7 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, 
unitY, batchInterval) {
 .attr("class", "y axis")
 .call(yAxis)
 .append("text")
-.attr("transform", "translate(0," + (-3) + ")")
+.attr("transform", "translate(0," + unitLabelYOffset + ")")
 .text(unitY);
 
 
@@ -223,10 +225,10 @@ function drawHistogram(id, values, minY, maxY, unitY, 
batchInterval) {
 .style("border-left", "0px solid white");
 
 var margin = {top: 20, right: 30, bottom: 30, left: 10};
-var width = 300 - margin.left - margin.right;
+var width = 350 - margin.left - margin.right;
 var height = 150 - margin.top - margin.bottom;
 
-var x = d3.scale.linear().domain([0, maxXForHistogram]).range([0, width]);
+var x = d3.scale.linear().domain([0, maxXForHistogram]).range([0, width - 
50]);
 var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]);
 
 var xAxis = d3.svg.axis().scale(x).orient("top").ticks(5);
@@ -248,7 +250,7 @@ function drawHistogram(id, values, minY, maxY, unitY, 
batchInterval) {
 .attr("class", "x axis")
 .call(xAxis)
 .append("text")
-.attr("transform", "translate(" + (margin.left + width - 40) + ", 
15)")
+.attr("transform", "translate(" + (margin.left + width - 45) + ", 
" + unitLabelYOffset + ")")
 .text("#batches");
 
 svg.append("g")

http://git-wip-us.apache.org/repos/asf/spark/blob/1173483f/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 4ee7a48..87af902 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -310,7 +310,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
 
   
   Timelines (Last {batchTimes.length} 
batches, {numActiveBatches} active, {numCompletedBatches} completed)
-  Histograms
+  Histograms
   
   
 
@@ -456,7 +456,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
   {receiverActive}
   {receiverLocation}
   {receiverLastErrorTime}
-  {receiverLastError}
+  {receiverLastError}
 
 
   


-

spark git commit: [SPARK-8506] Add pakages to R context created through init.

2015-06-24 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 7e53ff258 -> f6682dd6e


[SPARK-8506] Add pakages to R context created through init.

Author: Holden Karau 

Closes #6928 from 
holdenk/SPARK-8506-sparkr-does-not-provide-an-easy-way-to-depend-on-spark-packages-when-performing-init-from-inside-of-r
 and squashes the following commits:

b60dd63 [Holden Karau] Add an example with the spark-csv package
fa8bc92 [Holden Karau] typo: sparm -> spark
865a90c [Holden Karau] strip spaces for comparision
c7a4471 [Holden Karau] Add some documentation
c1a9233 [Holden Karau] refactor for testing
c818556 [Holden Karau] Add pakages to R

(cherry picked from commit 43e66192f45a23f7232116e9f664158862df5015)
Signed-off-by: Shivaram Venkataraman 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6682dd6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6682dd6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6682dd6

Branch: refs/heads/branch-1.4
Commit: f6682dd6e8ab8c5acddd1cf20317bea3afcbcae7
Parents: 7e53ff2
Author: Holden Karau 
Authored: Wed Jun 24 11:55:20 2015 -0700
Committer: Shivaram Venkataraman 
Committed: Wed Jun 24 11:55:29 2015 -0700

--
 R/pkg/R/client.R   | 26 +++---
 R/pkg/R/sparkR.R   |  7 +--
 R/pkg/inst/tests/test_client.R | 32 
 docs/sparkr.md | 17 +
 4 files changed, 69 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f6682dd6/R/pkg/R/client.R
--
diff --git a/R/pkg/R/client.R b/R/pkg/R/client.R
index 1281c41..cf2e5dd 100644
--- a/R/pkg/R/client.R
+++ b/R/pkg/R/client.R
@@ -34,24 +34,36 @@ connectBackend <- function(hostname, port, timeout = 6000) {
   con
 }
 
-launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts) {
+determineSparkSubmitBin <- function() {
   if (.Platform$OS.type == "unix") {
 sparkSubmitBinName = "spark-submit"
   } else {
 sparkSubmitBinName = "spark-submit.cmd"
   }
+  sparkSubmitBinName
+}
+
+generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, 
packages) {
+  if (jars != "") {
+jars <- paste("--jars", jars)
+  }
+
+  if (packages != "") {
+packages <- paste("--packages", packages)
+  }
 
+  combinedArgs <- paste(jars, packages, sparkSubmitOpts, args, sep = " ")
+  combinedArgs
+}
+
+launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts, packages) {
+  sparkSubmitBin <- determineSparkSubmitBin()
   if (sparkHome != "") {
 sparkSubmitBin <- file.path(sparkHome, "bin", sparkSubmitBinName)
   } else {
 sparkSubmitBin <- sparkSubmitBinName
   }
-
-  if (jars != "") {
-jars <- paste("--jars", jars)
-  }
-
-  combinedArgs <- paste(jars, sparkSubmitOpts, args, sep = " ")
+  combinedArgs <- generateSparkSubmitArgs(args, sparkHome, jars, 
sparkSubmitOpts, packages)
   cat("Launching java with spark-submit command", sparkSubmitBin, 
combinedArgs, "\n")
   invisible(system2(sparkSubmitBin, combinedArgs, wait = F))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f6682dd6/R/pkg/R/sparkR.R
--
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index dbde0c4..8f81d56 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -81,6 +81,7 @@ sparkR.stop <- function() {
 #' @param sparkExecutorEnv Named list of environment variables to be used when 
launching executors.
 #' @param sparkJars Character string vector of jar files to pass to the worker 
nodes.
 #' @param sparkRLibDir The path where R is installed on the worker nodes.
+#' @param sparkPackages Character string vector of packages from 
spark-packages.org
 #' @export
 #' @examples
 #'\dontrun{
@@ -100,7 +101,8 @@ sparkR.init <- function(
   sparkEnvir = list(),
   sparkExecutorEnv = list(),
   sparkJars = "",
-  sparkRLibDir = "") {
+  sparkRLibDir = "",
+  sparkPackages = "") {
 
   if (exists(".sparkRjsc", envir = .sparkREnv)) {
 cat("Re-using existing Spark Context. Please stop SparkR with 
sparkR.stop() or restart R to create a new Spark Context\n")
@@ -129,7 +131,8 @@ sparkR.init <- function(
 args = path,
 sparkHome = sparkHome,
 jars = jars,
-sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"))
+sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
+sparkPackages = sparkPackages)
 # wait atmost 100 seconds for JVM to launch
 wait <- 0.1
 for (i in 1:25) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f6682dd6/R/pkg/inst/tests/test_client.R
--
diff --git a/R/pkg/inst/tests/test_client.R b/R/pk

spark git commit: [SPARK-8506] Add pakages to R context created through init.

2015-06-24 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master 1173483f3 -> 43e66192f


[SPARK-8506] Add pakages to R context created through init.

Author: Holden Karau 

Closes #6928 from 
holdenk/SPARK-8506-sparkr-does-not-provide-an-easy-way-to-depend-on-spark-packages-when-performing-init-from-inside-of-r
 and squashes the following commits:

b60dd63 [Holden Karau] Add an example with the spark-csv package
fa8bc92 [Holden Karau] typo: sparm -> spark
865a90c [Holden Karau] strip spaces for comparision
c7a4471 [Holden Karau] Add some documentation
c1a9233 [Holden Karau] refactor for testing
c818556 [Holden Karau] Add pakages to R


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43e66192
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43e66192
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43e66192

Branch: refs/heads/master
Commit: 43e66192f45a23f7232116e9f664158862df5015
Parents: 1173483
Author: Holden Karau 
Authored: Wed Jun 24 11:55:20 2015 -0700
Committer: Shivaram Venkataraman 
Committed: Wed Jun 24 11:55:20 2015 -0700

--
 R/pkg/R/client.R   | 26 +++---
 R/pkg/R/sparkR.R   |  7 +--
 R/pkg/inst/tests/test_client.R | 32 
 docs/sparkr.md | 17 +
 4 files changed, 69 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/43e66192/R/pkg/R/client.R
--
diff --git a/R/pkg/R/client.R b/R/pkg/R/client.R
index 1281c41..cf2e5dd 100644
--- a/R/pkg/R/client.R
+++ b/R/pkg/R/client.R
@@ -34,24 +34,36 @@ connectBackend <- function(hostname, port, timeout = 6000) {
   con
 }
 
-launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts) {
+determineSparkSubmitBin <- function() {
   if (.Platform$OS.type == "unix") {
 sparkSubmitBinName = "spark-submit"
   } else {
 sparkSubmitBinName = "spark-submit.cmd"
   }
+  sparkSubmitBinName
+}
+
+generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, 
packages) {
+  if (jars != "") {
+jars <- paste("--jars", jars)
+  }
+
+  if (packages != "") {
+packages <- paste("--packages", packages)
+  }
 
+  combinedArgs <- paste(jars, packages, sparkSubmitOpts, args, sep = " ")
+  combinedArgs
+}
+
+launchBackend <- function(args, sparkHome, jars, sparkSubmitOpts, packages) {
+  sparkSubmitBin <- determineSparkSubmitBin()
   if (sparkHome != "") {
 sparkSubmitBin <- file.path(sparkHome, "bin", sparkSubmitBinName)
   } else {
 sparkSubmitBin <- sparkSubmitBinName
   }
-
-  if (jars != "") {
-jars <- paste("--jars", jars)
-  }
-
-  combinedArgs <- paste(jars, sparkSubmitOpts, args, sep = " ")
+  combinedArgs <- generateSparkSubmitArgs(args, sparkHome, jars, 
sparkSubmitOpts, packages)
   cat("Launching java with spark-submit command", sparkSubmitBin, 
combinedArgs, "\n")
   invisible(system2(sparkSubmitBin, combinedArgs, wait = F))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/43e66192/R/pkg/R/sparkR.R
--
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index dbde0c4..8f81d56 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -81,6 +81,7 @@ sparkR.stop <- function() {
 #' @param sparkExecutorEnv Named list of environment variables to be used when 
launching executors.
 #' @param sparkJars Character string vector of jar files to pass to the worker 
nodes.
 #' @param sparkRLibDir The path where R is installed on the worker nodes.
+#' @param sparkPackages Character string vector of packages from 
spark-packages.org
 #' @export
 #' @examples
 #'\dontrun{
@@ -100,7 +101,8 @@ sparkR.init <- function(
   sparkEnvir = list(),
   sparkExecutorEnv = list(),
   sparkJars = "",
-  sparkRLibDir = "") {
+  sparkRLibDir = "",
+  sparkPackages = "") {
 
   if (exists(".sparkRjsc", envir = .sparkREnv)) {
 cat("Re-using existing Spark Context. Please stop SparkR with 
sparkR.stop() or restart R to create a new Spark Context\n")
@@ -129,7 +131,8 @@ sparkR.init <- function(
 args = path,
 sparkHome = sparkHome,
 jars = jars,
-sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"))
+sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
+sparkPackages = sparkPackages)
 # wait atmost 100 seconds for JVM to launch
 wait <- 0.1
 for (i in 1:25) {

http://git-wip-us.apache.org/repos/asf/spark/blob/43e66192/R/pkg/inst/tests/test_client.R
--
diff --git a/R/pkg/inst/tests/test_client.R b/R/pkg/inst/tests/test_client.R
new file mode 100644
index 000..30b05c1
--- /dev/null
+++ b/R/pkg/inst/tests/test_cli

spark git commit: [SPARK-8399] [STREAMING] [WEB UI] Overlap between histograms and axis' name in Spark Streaming UI

2015-06-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 f6682dd6e -> 93793237e


[SPARK-8399] [STREAMING] [WEB UI] Overlap between histograms and axis' name in 
Spark Streaming UI

Moved where the X axis' name (#batches) is written in histograms in the spark 
streaming web ui so the histograms and the axis' name do not overlap.

Author: BenFradet 

Closes #6845 from BenFradet/SPARK-8399 and squashes the following commits:

b63695f [BenFradet] adjusted inner histograms
eb610ee [BenFradet] readjusted #batches on the x axis
dd46f98 [BenFradet] aligned all unit labels and ticks
0564b62 [BenFradet] readjusted #batches placement
edd0936 [BenFradet] moved where the X axis' name (#batches) is written in 
histograms in the spark streaming web ui

(cherry picked from commit 1173483f3f465a4c63246e83d0aaa2af521395f5)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93793237
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93793237
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93793237

Branch: refs/heads/branch-1.4
Commit: 93793237ee1098fbde73074d78e5f3249a8b58d4
Parents: f6682dd
Author: BenFradet 
Authored: Wed Jun 24 11:53:03 2015 -0700
Committer: Tathagata Das 
Committed: Wed Jun 24 12:03:23 2015 -0700

--
 .../apache/spark/streaming/ui/static/streaming-page.js| 10 ++
 .../org/apache/spark/streaming/ui/StreamingPage.scala |  4 ++--
 2 files changed, 8 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/93793237/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
--
diff --git 
a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
 
b/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
index 75251f4..4886b68 100644
--- 
a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
+++ 
b/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
@@ -31,6 +31,8 @@ var maxXForHistogram = 0;
 var histogramBinCount = 10;
 var yValueFormat = d3.format(",.2f");
 
+var unitLabelYOffset = -10;
+
 // Show a tooltip "text" for "node"
 function showBootstrapTooltip(node, text) {
 $(node).tooltip({title: text, trigger: "manual", container: "body"});
@@ -133,7 +135,7 @@ function drawTimeline(id, data, minX, maxX, minY, maxY, 
unitY, batchInterval) {
 .attr("class", "y axis")
 .call(yAxis)
 .append("text")
-.attr("transform", "translate(0," + (-3) + ")")
+.attr("transform", "translate(0," + unitLabelYOffset + ")")
 .text(unitY);
 
 
@@ -223,10 +225,10 @@ function drawHistogram(id, values, minY, maxY, unitY, 
batchInterval) {
 .style("border-left", "0px solid white");
 
 var margin = {top: 20, right: 30, bottom: 30, left: 10};
-var width = 300 - margin.left - margin.right;
+var width = 350 - margin.left - margin.right;
 var height = 150 - margin.top - margin.bottom;
 
-var x = d3.scale.linear().domain([0, maxXForHistogram]).range([0, width]);
+var x = d3.scale.linear().domain([0, maxXForHistogram]).range([0, width - 
50]);
 var y = d3.scale.linear().domain([minY, maxY]).range([height, 0]);
 
 var xAxis = d3.svg.axis().scale(x).orient("top").ticks(5);
@@ -248,7 +250,7 @@ function drawHistogram(id, values, minY, maxY, unitY, 
batchInterval) {
 .attr("class", "x axis")
 .call(xAxis)
 .append("text")
-.attr("transform", "translate(" + (margin.left + width - 40) + ", 
15)")
+.attr("transform", "translate(" + (margin.left + width - 45) + ", 
" + unitLabelYOffset + ")")
 .text("#batches");
 
 svg.append("g")

http://git-wip-us.apache.org/repos/asf/spark/blob/93793237/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 4ee7a48..87af902 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -310,7 +310,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
 
   
   Timelines (Last {batchTimes.length} 
batches, {numActiveBatches} active, {numCompletedBatches} completed)
-  Histograms
+  Histograms
   
   
 
@@ -456,7 +456,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
   {receiverActive}
   {receiverLocation}

spark git commit: [SPARK-7088] [SQL] Fix analysis for 3rd party logical plan.

2015-06-24 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 43e66192f -> b84d4b4df


[SPARK-7088] [SQL] Fix analysis for 3rd party logical plan.

ResolveReferences analysis rule now does not throw when it cannot resolve 
references in a self-join.

Author: Santiago M. Mola 

Closes #6853 from smola/SPARK-7088 and squashes the following commits:

af71ac7 [Santiago M. Mola] [SPARK-7088] Fix analysis for 3rd party logical plan.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b84d4b4d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b84d4b4d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b84d4b4d

Branch: refs/heads/master
Commit: b84d4b4dfe8ced1b96a0c74ef968a20a1bba8231
Parents: 43e6619
Author: Santiago M. Mola 
Authored: Wed Jun 24 12:29:07 2015 -0700
Committer: Michael Armbrust 
Committed: Wed Jun 24 12:29:07 2015 -0700

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 38 ++--
 .../sql/catalyst/analysis/CheckAnalysis.scala   | 12 +++
 2 files changed, 32 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b84d4b4d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 0a3f5a7..b06759f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -283,7 +283,7 @@ class Analyzer(
 val conflictingAttributes = left.outputSet.intersect(right.outputSet)
 logDebug(s"Conflicting attributes 
${conflictingAttributes.mkString(",")} in $j")
 
-val (oldRelation, newRelation) = right.collect {
+right.collect {
   // Handle base relations that might appear more than once.
   case oldVersion: MultiInstanceRelation
   if 
oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
@@ -308,25 +308,27 @@ class Analyzer(
   if 
AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
 .nonEmpty =>
 (oldVersion, oldVersion.copy(windowExpressions = 
newAliases(windowExpressions)))
-}.headOption.getOrElse { // Only handle first case, others will be 
fixed on the next pass.
-  sys.error(
-s"""
-  |Failure when resolving conflicting references in Join:
-  |$plan
-  |
-  |Conflicting attributes: ${conflictingAttributes.mkString(",")}
-  """.stripMargin)
 }
-
-val attributeRewrites = 
AttributeMap(oldRelation.output.zip(newRelation.output))
-val newRight = right transformUp {
-  case r if r == oldRelation => newRelation
-} transformUp {
-  case other => other transformExpressions {
-case a: Attribute => attributeRewrites.get(a).getOrElse(a)
-  }
+  // Only handle first case, others will be fixed on the next pass.
+  .headOption match {
+  case None =>
+/*
+ * No result implies that there is a logical plan node that 
produces new references
+ * that this rule cannot handle. When that is the case, there must 
be another rule
+ * that resolves these conflicts. Otherwise, the analysis will 
fail.
+ */
+j
+  case Some((oldRelation, newRelation)) =>
+val attributeRewrites = 
AttributeMap(oldRelation.output.zip(newRelation.output))
+val newRight = right transformUp {
+  case r if r == oldRelation => newRelation
+} transformUp {
+  case other => other transformExpressions {
+case a: Attribute => attributeRewrites.get(a).getOrElse(a)
+  }
+}
+j.copy(right = newRight)
 }
-j.copy(right = newRight)
 
   // When resolve `SortOrder`s in Sort based on child, don't report errors 
as
   // we still have chance to resolve it based on grandchild

http://git-wip-us.apache.org/repos/asf/spark/blob/b84d4b4d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index c5a1437..a069b47 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Check

spark git commit: [SPARK-7289] handle project -> limit -> sort efficiently

2015-06-24 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master b84d4b4df -> f04b5672c


[SPARK-7289] handle project -> limit -> sort efficiently

make the `TakeOrdered` strategy and operator more general, such that it can 
optionally handle a projection when necessary

Author: Wenchen Fan 

Closes #6780 from cloud-fan/limit and squashes the following commits:

34aa07b [Wenchen Fan] revert
07d5456 [Wenchen Fan] clean closure
20821ec [Wenchen Fan] fix
3676a82 [Wenchen Fan] address comments
b558549 [Wenchen Fan] address comments
214842b [Wenchen Fan] fix style
2d8be83 [Wenchen Fan] add LimitPushDown
948f740 [Wenchen Fan] fix existing


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f04b5672
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f04b5672
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f04b5672

Branch: refs/heads/master
Commit: f04b5672c5a5562f8494df3b0df23235285c9e9e
Parents: b84d4b4
Author: Wenchen Fan 
Authored: Wed Jun 24 13:28:50 2015 -0700
Committer: Michael Armbrust 
Committed: Wed Jun 24 13:28:50 2015 -0700

--
 .../sql/catalyst/optimizer/Optimizer.scala  | 52 ++--
 .../catalyst/optimizer/UnionPushdownSuite.scala |  4 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |  2 +-
 .../apache/spark/sql/execution/SparkPlan.scala  |  1 -
 .../spark/sql/execution/SparkStrategies.scala   |  8 ++-
 .../spark/sql/execution/basicOperators.scala| 27 +++---
 .../spark/sql/execution/PlannerSuite.scala  |  6 +++
 .../org/apache/spark/sql/hive/HiveContext.scala |  2 +-
 8 files changed, 62 insertions(+), 40 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f04b5672/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 98b4476..bfd2428 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -39,19 +39,22 @@ object DefaultOptimizer extends Optimizer {
 Batch("Distinct", FixedPoint(100),
   ReplaceDistinctWithAggregate) ::
 Batch("Operator Optimizations", FixedPoint(100),
-  UnionPushdown,
-  CombineFilters,
+  // Operator push down
+  UnionPushDown,
+  PushPredicateThroughJoin,
   PushPredicateThroughProject,
   PushPredicateThroughGenerate,
   ColumnPruning,
+  // Operator combine
   ProjectCollapsing,
+  CombineFilters,
   CombineLimits,
+  // Constant folding
   NullPropagation,
   OptimizeIn,
   ConstantFolding,
   LikeSimplification,
   BooleanSimplification,
-  PushPredicateThroughJoin,
   RemovePositive,
   SimplifyFilters,
   SimplifyCasts,
@@ -63,25 +66,25 @@ object DefaultOptimizer extends Optimizer {
 }
 
 /**
-  *  Pushes operations to either side of a Union.
-  */
-object UnionPushdown extends Rule[LogicalPlan] {
+ * Pushes operations to either side of a Union.
+ */
+object UnionPushDown extends Rule[LogicalPlan] {
 
   /**
-*  Maps Attributes from the left side to the corresponding Attribute on 
the right side.
-*/
-  def buildRewrites(union: Union): AttributeMap[Attribute] = {
+   * Maps Attributes from the left side to the corresponding Attribute on the 
right side.
+   */
+  private def buildRewrites(union: Union): AttributeMap[Attribute] = {
 assert(union.left.output.size == union.right.output.size)
 
 AttributeMap(union.left.output.zip(union.right.output))
   }
 
   /**
-*  Rewrites an expression so that it can be pushed to the right side of a 
Union operator.
-*  This method relies on the fact that the output attributes of a union 
are always equal
-*  to the left child's output.
-*/
-  def pushToRight[A <: Expression](e: A, rewrites: AttributeMap[Attribute]): A 
= {
+   * Rewrites an expression so that it can be pushed to the right side of a 
Union operator.
+   * This method relies on the fact that the output attributes of a union are 
always equal
+   * to the left child's output.
+   */
+  private def pushToRight[A <: Expression](e: A, rewrites: 
AttributeMap[Attribute]) = {
 val result = e transform {
   case a: Attribute => rewrites(a)
 }
@@ -108,7 +111,6 @@ object UnionPushdown extends Rule[LogicalPlan] {
   }
 }
 
-
 /**
  * Attempts to eliminate the reading of unneeded columns from the query plan 
using the following
  * transformations:
@@ -117,7 +119,6 @@ object UnionPushdown extends Rule[LogicalPlan] {
  *   - Aggregate
  *   - Project <- Join
  *   - Lef

spark git commit: [SPARK-7633] [MLLIB] [PYSPARK] Python bindings for StreamingLogisticRegressionwithSGD

2015-06-24 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master f04b5672c -> fb32c3889


[SPARK-7633] [MLLIB] [PYSPARK] Python bindings for 
StreamingLogisticRegressionwithSGD

Add Python bindings to StreamingLogisticRegressionwithSGD.

No Java wrappers are needed as models are updated directly using train.

Author: MechCoder 

Closes #6849 from MechCoder/spark-3258 and squashes the following commits:

b4376a5 [MechCoder] minor
d7e5fc1 [MechCoder] Refactor into StreamingLinearAlgorithm Better docs
9c09d4e [MechCoder] [SPARK-7633] Python bindings for 
StreamingLogisticRegressionwithSGD


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb32c388
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb32c388
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb32c388

Branch: refs/heads/master
Commit: fb32c388985ce65c1083cb435cf1f7479fecbaac
Parents: f04b567
Author: MechCoder 
Authored: Wed Jun 24 14:58:43 2015 -0700
Committer: Xiangrui Meng 
Committed: Wed Jun 24 14:58:43 2015 -0700

--
 python/pyspark/mllib/classification.py |  96 +++-
 python/pyspark/mllib/tests.py  | 135 +++-
 2 files changed, 229 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fb32c388/python/pyspark/mllib/classification.py
--
diff --git a/python/pyspark/mllib/classification.py 
b/python/pyspark/mllib/classification.py
index 758accf..2698f10 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -21,6 +21,7 @@ import numpy
 from numpy import array
 
 from pyspark import RDD
+from pyspark.streaming import DStream
 from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py
 from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector
 from pyspark.mllib.regression import LabeledPoint, LinearModel, 
_regression_train_wrapper
@@ -28,7 +29,8 @@ from pyspark.mllib.util import Saveable, Loader, inherit_doc
 
 
 __all__ = ['LogisticRegressionModel', 'LogisticRegressionWithSGD', 
'LogisticRegressionWithLBFGS',
-   'SVMModel', 'SVMWithSGD', 'NaiveBayesModel', 'NaiveBayes']
+   'SVMModel', 'SVMWithSGD', 'NaiveBayesModel', 'NaiveBayes',
+   'StreamingLogisticRegressionWithSGD']
 
 
 class LinearClassificationModel(LinearModel):
@@ -583,6 +585,98 @@ class NaiveBayes(object):
 return NaiveBayesModel(labels.toArray(), pi.toArray(), 
numpy.array(theta))
 
 
+class StreamingLinearAlgorithm(object):
+"""
+Base class that has to be inherited by any StreamingLinearAlgorithm.
+
+Prevents reimplementation of methods predictOn and predictOnValues.
+"""
+def __init__(self, model):
+self._model = model
+
+def latestModel(self):
+"""
+Returns the latest model.
+"""
+return self._model
+
+def _validate(self, dstream):
+if not isinstance(dstream, DStream):
+raise TypeError(
+"dstream should be a DStream object, got %s" % type(dstream))
+if not self._model:
+raise ValueError(
+"Model must be intialized using setInitialWeights")
+
+def predictOn(self, dstream):
+"""
+Make predictions on a dstream.
+
+:return: Transformed dstream object.
+"""
+self._validate(dstream)
+return dstream.map(lambda x: self._model.predict(x))
+
+def predictOnValues(self, dstream):
+"""
+Make predictions on a keyed dstream.
+
+:return: Transformed dstream object.
+"""
+self._validate(dstream)
+return dstream.mapValues(lambda x: self._model.predict(x))
+
+
+@inherit_doc
+class StreamingLogisticRegressionWithSGD(StreamingLinearAlgorithm):
+"""
+Run LogisticRegression with SGD on a stream of data.
+
+The weights obtained at the end of training a stream are used as initial
+weights for the next stream.
+
+:param stepSize: Step size for each iteration of gradient descent.
+:param numIterations: Number of iterations run for each batch of data.
+:param miniBatchFraction: Fraction of data on which SGD is run for each
+  iteration.
+:param regParam: L2 Regularization parameter.
+"""
+def __init__(self, stepSize=0.1, numIterations=50, miniBatchFraction=1.0, 
regParam=0.01):
+self.stepSize = stepSize
+self.numIterations = numIterations
+self.regParam = regParam
+self.miniBatchFraction = miniBatchFraction
+self._model = None
+super(StreamingLogisticRegressionWithSGD, self).__init__(
+model=self._model)
+
+def setInitialWeights(self, initialWeights):
+"""
+Set the ini

spark git commit: [SPARK-6777] [SQL] Implements backwards compatibility rules in CatalystSchemaConverter

2015-06-24 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master fb32c3889 -> 8ab50765c


[SPARK-6777] [SQL] Implements backwards compatibility rules in 
CatalystSchemaConverter

This PR introduces `CatalystSchemaConverter` for converting Parquet schema to 
Spark SQL schema and vice versa.  Original conversion code in 
`ParquetTypesConverter` is removed. Benefits of the new version are:

1. When converting Spark SQL schemas, it generates standard Parquet schemas 
conforming to [the most updated Parquet format spec] [1]. Converting to old 
style Parquet schemas is also supported via feature flag 
`spark.sql.parquet.followParquetFormatSpec` (which is set to `false` for now, 
and should be set to `true` after both read and write paths are fixed).

   Note that although this version of Parquet format spec hasn't been 
officially release yet, Parquet MR 1.7.0 already sticks to it. So it should be 
safe to follow.

1. It implements backwards-compatibility rules described in the most updated 
Parquet format spec. Thus can recognize more schema patterns generated by 
other/legacy systems/tools.
1. Code organization follows convention used in [parquet-mr] [2], which is 
easier to follow. (Structure of `CatalystSchemaConverter` is similar to 
`AvroSchemaConverter`).

To fully implement backwards-compatibility rules in both read and write path, 
we also need to update `CatalystRowConverter` (which is responsible for 
converting Parquet records to `Row`s), `RowReadSupport`, and `RowWriteSupport`. 
These would be done in follow-up PRs.

TODO

- [x] More schema conversion test cases for legacy schema patterns.

[1]: 
https://github.com/apache/parquet-format/blob/ea095226597fdbecd60c2419d96b54b2fdb4ae6c/LogicalTypes.md
[2]: https://github.com/apache/parquet-mr/

Author: Cheng Lian 

Closes #6617 from liancheng/spark-6777 and squashes the following commits:

2a2062d [Cheng Lian] Don't convert decimals without precision information
b60979b [Cheng Lian] Adds a constructor which accepts a Configuration, and 
fixes default value of assumeBinaryIsString
743730f [Cheng Lian] Decimal scale shouldn't be larger than precision
a104a9e [Cheng Lian] Fixes Scala style issue
1f71d8d [Cheng Lian] Adds feature flag to allow falling back to old style 
Parquet schema conversion
ba84f4b [Cheng Lian] Fixes MapType schema conversion bug
13cb8d5 [Cheng Lian] Fixes MiMa failure
81de5b0 [Cheng Lian] Fixes UDT, workaround read path, and add tests
28ef95b [Cheng Lian] More AnalysisExceptions
b10c322 [Cheng Lian] Replaces require() with analysisRequire() which throws 
AnalysisException
cceaf3f [Cheng Lian] Implements backwards compatibility rules in 
CatalystSchemaConverter


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ab50765
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ab50765
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ab50765

Branch: refs/heads/master
Commit: 8ab50765cd793169091d983b50d87a391f6ac1f4
Parents: fb32c38
Author: Cheng Lian 
Authored: Wed Jun 24 15:03:43 2015 -0700
Committer: Cheng Lian 
Committed: Wed Jun 24 15:03:43 2015 -0700

--
 project/MimaExcludes.scala  |   7 +-
 .../apache/spark/sql/types/DecimalType.scala|   9 +-
 .../scala/org/apache/spark/sql/SQLConf.scala|  14 +
 .../sql/parquet/CatalystSchemaConverter.scala   | 565 ++
 .../spark/sql/parquet/ParquetTableSupport.scala |   6 +-
 .../apache/spark/sql/parquet/ParquetTypes.scala | 374 +-
 .../spark/sql/parquet/ParquetIOSuite.scala  |   6 +-
 .../spark/sql/parquet/ParquetSchemaSuite.scala  | 736 +--
 .../sql/hive/execution/SQLQuerySuite.scala  |   2 +-
 9 files changed, 1297 insertions(+), 422 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8ab50765/project/MimaExcludes.scala
--
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index f678c69..6f86a50 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -69,7 +69,12 @@ object MimaExcludes {
 ProblemFilters.exclude[MissingClassProblem](
   "org.apache.spark.sql.parquet.CatalystTimestampConverter"),
 ProblemFilters.exclude[MissingClassProblem](
-  "org.apache.spark.sql.parquet.CatalystTimestampConverter$")
+  "org.apache.spark.sql.parquet.CatalystTimestampConverter$"),
+// SPARK-6777 Implements backwards compatibility rules in 
CatalystSchemaConverter
+ProblemFilters.exclude[MissingClassProblem](
+  "org.apache.spark.sql.parquet.ParquetTypeInfo"),
+ProblemFilters.exclude[MissingClassProblem](
+  "org.apache.spark.sql.parquet.ParquetTypeInfo$")
   )
 case v if v.startsWith("

spark git commit: [SPARK-8558] [BUILD] Script /dev/run-tests fails when _JAVA_OPTIONS env var set

2015-06-24 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 8ab50765c -> dca21a83a


[SPARK-8558] [BUILD] Script /dev/run-tests fails when _JAVA_OPTIONS env var set

Author: fe2s 
Author: Oleksiy Dyagilev 

Closes #6956 from fe2s/fix-run-tests and squashes the following commits:

31b6edc [fe2s] str is a built-in function, so using it as a variable name will 
lead to spurious warnings in some Python linters
7d781a0 [fe2s] fixing for openjdk/IBM, seems like they have slightly different 
wording, but all have 'version' word. Surrounding with spaces for the case if 
version word appears in _JAVA_OPTIONS
cd455ef [fe2s] address comment, looking for java version string rather than 
expecting to have on a certain line number
ad577d7 [Oleksiy Dyagilev] [SPARK-8558][BUILD] Script /dev/run-tests fails when 
_JAVA_OPTIONS env var set


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dca21a83
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dca21a83
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dca21a83

Branch: refs/heads/master
Commit: dca21a83ac33813dd8165acb5f20d06e4f9b9034
Parents: 8ab5076
Author: fe2s 
Authored: Wed Jun 24 15:12:23 2015 -0700
Committer: Josh Rosen 
Committed: Wed Jun 24 15:12:23 2015 -0700

--
 dev/run-tests.py | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dca21a83/dev/run-tests.py
--
diff --git a/dev/run-tests.py b/dev/run-tests.py
index de1b453..e7c09b0 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -477,7 +477,12 @@ def determine_java_version(java_exe):
 
 raw_output = subprocess.check_output([java_exe, "-version"],
  stderr=subprocess.STDOUT)
-raw_version_str = raw_output.split('\n')[0]  # eg 'java version "1.8.0_25"'
+
+raw_output_lines = raw_output.split('\n')
+
+# find raw version string, eg 'java version "1.8.0_25"'
+raw_version_str = next(x for x in raw_output_lines if " version " in x)
+
 version_str = raw_version_str.split()[-1].strip('"')  # eg '1.8.0_25'
 version, update = version_str.split('_')  # eg ['1.8.0', '25']
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-8567] [SQL] Increase the timeout of HiveSparkSubmitSuite

2015-06-24 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master dca21a83a -> 7daa70292


[SPARK-8567] [SQL] Increase the timeout of HiveSparkSubmitSuite

https://issues.apache.org/jira/browse/SPARK-8567

Author: Yin Huai 

Closes #6957 from yhuai/SPARK-8567 and squashes the following commits:

62dff5b [Yin Huai] Increase the timeout.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7daa7029
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7daa7029
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7daa7029

Branch: refs/heads/master
Commit: 7daa70292e47be6a944351ef00c770ad4bcb0877
Parents: dca21a8
Author: Yin Huai 
Authored: Wed Jun 24 15:52:58 2015 -0700
Committer: Andrew Or 
Committed: Wed Jun 24 15:52:58 2015 -0700

--
 .../scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7daa7029/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index d85516a..b875e52 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -95,7 +95,7 @@ class HiveSparkSubmitSuite
 ))
 
 try {
-  val exitCode = failAfter(120 seconds) { process.exitValue() }
+  val exitCode = failAfter(180 seconds) { process.exitValue() }
   if (exitCode != 0) {
 fail(s"Process returned with exit code $exitCode. See the log4j logs 
for more detail.")
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-8567] [SQL] Increase the timeout of HiveSparkSubmitSuite

2015-06-24 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 93793237e -> 792ed7a4b


[SPARK-8567] [SQL] Increase the timeout of HiveSparkSubmitSuite

https://issues.apache.org/jira/browse/SPARK-8567

Author: Yin Huai 

Closes #6957 from yhuai/SPARK-8567 and squashes the following commits:

62dff5b [Yin Huai] Increase the timeout.

Conflicts:

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/792ed7a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/792ed7a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/792ed7a4

Branch: refs/heads/branch-1.4
Commit: 792ed7a4ba444907e5a2f1f79cb2a4f402476fb9
Parents: 9379323
Author: Yin Huai 
Authored: Wed Jun 24 15:52:58 2015 -0700
Committer: Andrew Or 
Committed: Wed Jun 24 15:56:43 2015 -0700

--
 .../scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/792ed7a4/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index ab44303..8ca7a80 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -87,7 +87,7 @@ class HiveSparkSubmitSuite
   new File(sparkHome),
   Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
 try {
-  val exitCode = failAfter(120 seconds) { process.waitFor() }
+  val exitCode = failAfter(180 seconds) { process.waitFor() }
   if (exitCode != 0) {
 fail(s"Process returned with exit code $exitCode. See the log4j logs 
for more detail.")
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-8075] [SQL] apply type check interface to more expressions

2015-06-24 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 7daa70292 -> b71d3254e


[SPARK-8075] [SQL] apply type check interface to more expressions

a follow up of https://github.com/apache/spark/pull/6405.
Note: It's not a big change, a lot of changing is due to I swap some code in 
`aggregates.scala` to make aggregate functions right below its corresponding 
aggregate expressions.

Author: Wenchen Fan 

Closes #6723 from cloud-fan/type-check and squashes the following commits:

2124301 [Wenchen Fan] fix tests
5a658bb [Wenchen Fan] add tests
287d3bb [Wenchen Fan] apply type check interface to more expressions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b71d3254
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b71d3254
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b71d3254

Branch: refs/heads/master
Commit: b71d3254e50838ccae43bdb0ff186fda25f03152
Parents: 7daa702
Author: Wenchen Fan 
Authored: Wed Jun 24 16:26:00 2015 -0700
Committer: Michael Armbrust 
Committed: Wed Jun 24 16:26:00 2015 -0700

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  |   4 +-
 .../catalyst/analysis/HiveTypeCoercion.scala|  17 +-
 .../spark/sql/catalyst/expressions/Cast.scala   |  11 +-
 .../sql/catalyst/expressions/Expression.scala   |   4 +-
 .../sql/catalyst/expressions/ExtractValue.scala |  10 +-
 .../sql/catalyst/expressions/aggregates.scala   | 420 ++-
 .../sql/catalyst/expressions/arithmetic.scala   |   2 -
 .../expressions/complexTypeCreator.scala|  30 +-
 .../catalyst/expressions/decimalFunctions.scala |  17 +-
 .../sql/catalyst/expressions/generators.scala   |  13 +-
 .../spark/sql/catalyst/expressions/math.scala   |   4 +-
 .../catalyst/expressions/namedExpressions.scala |   4 +-
 .../catalyst/expressions/nullFunctions.scala|  27 +-
 .../spark/sql/catalyst/expressions/sets.scala   |  10 +-
 .../catalyst/expressions/stringOperations.scala |   2 -
 .../expressions/windowExpressions.scala |   3 +-
 .../spark/sql/catalyst/util/TypeUtils.scala |   9 +
 .../sql/catalyst/analysis/AnalysisSuite.scala   |   6 +-
 .../analysis/ExpressionTypeCheckingSuite.scala  | 163 +++
 .../ExpressionTypeCheckingSuite.scala   | 141 ---
 .../apache/spark/sql/execution/pythonUdfs.scala |   2 +-
 .../hive/execution/HiveTypeCoercionSuite.scala  |   6 -
 22 files changed, 476 insertions(+), 429 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b71d3254/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index b06759f..cad2c2a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -587,8 +587,8 @@ class Analyzer(
   failAnalysis(
 s"""Expect multiple names given for ${g.getClass.getName},
|but only single name '${name}' specified""".stripMargin)
-case Alias(g: Generator, name) => Some((g, name :: Nil))
-case MultiAlias(g: Generator, names) => Some(g, names)
+case Alias(g: Generator, name) if g.resolved => Some((g, name :: Nil))
+case MultiAlias(g: Generator, names) if g.resolved => Some(g, names)
 case _ => None
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b71d3254/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
index d4ab1fc..4ef7341 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
@@ -317,6 +317,7 @@ trait HiveTypeCoercion {
 i.makeCopy(Array(Cast(a, StringType), b.map(Cast(_, StringType
 
   case Sum(e @ StringType()) => Sum(Cast(e, DoubleType))
+  case SumDistinct(e @ StringType()) => Sum(Cast(e, DoubleType))
   case Average(e @ StringType()) => Average(Cast(e, DoubleType))
 }
   }
@@ -590,11 +591,12 @@ trait HiveTypeCoercion {
   // Skip nodes who's children have not been resolved yet.
   case e if !e.childrenResolved => e
 
-  case a @ CreateArray(children) if !a.resolved =>
-val commonType = a.childTypes.reduce(
-  (a, b) => f

spark git commit: Two minor SQL cleanup (compiler warning & indent).

2015-06-24 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master b71d3254e -> 82f80c1c7


Two minor SQL cleanup (compiler warning & indent).

Author: Reynold Xin 

Closes #7000 from rxin/minor-cleanup and squashes the following commits:

046044c [Reynold Xin] Two minor SQL cleanup (compiler warning & indent).


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82f80c1c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82f80c1c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82f80c1c

Branch: refs/heads/master
Commit: 82f80c1c7dc42c11bca2b6832c10f9610a43391b
Parents: b71d325
Author: Reynold Xin 
Authored: Wed Jun 24 19:34:07 2015 -0700
Committer: Reynold Xin 
Committed: Wed Jun 24 19:34:07 2015 -0700

--
 .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala  | 4 ++--
 .../apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala| 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/82f80c1c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index cad2c2a..117c87a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -309,8 +309,8 @@ class Analyzer(
 .nonEmpty =>
 (oldVersion, oldVersion.copy(windowExpressions = 
newAliases(windowExpressions)))
 }
-  // Only handle first case, others will be fixed on the next pass.
-  .headOption match {
+// Only handle first case, others will be fixed on the next pass.
+.headOption match {
   case None =>
 /*
  * No result implies that there is a logical plan node that 
produces new references

http://git-wip-us.apache.org/repos/asf/spark/blob/82f80c1c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
index 4ef7341..976fa57 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
@@ -678,8 +678,8 @@ trait HiveTypeCoercion {
   findTightestCommonTypeAndPromoteToString((c.key +: 
c.whenList).map(_.dataType))
 maybeCommonType.map { commonType =>
   val castedBranches = c.branches.grouped(2).map {
-case Seq(when, then) if when.dataType != commonType =>
-  Seq(Cast(when, commonType), then)
+case Seq(whenExpr, thenExpr) if whenExpr.dataType != commonType =>
+  Seq(Cast(whenExpr, commonType), thenExpr)
 case other => other
   }.reduce(_ ++ _)
   CaseKeyWhen(Cast(c.key, commonType), castedBranches)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7884] Move block deserialization from BlockStoreShuffleFetcher to ShuffleReader

2015-06-24 Thread kayousterhout
Repository: spark
Updated Branches:
  refs/heads/master 82f80c1c7 -> 7bac2fe77


[SPARK-7884] Move block deserialization from BlockStoreShuffleFetcher to 
ShuffleReader

This commit updates the shuffle read path to enable ShuffleReader 
implementations more control over the deserialization process.

The BlockStoreShuffleFetcher.fetch() method has been renamed to 
BlockStoreShuffleFetcher.fetchBlockStreams(). Previously, this method returned 
a record iterator; now, it returns an iterator of (BlockId, InputStream). 
Deserialization of records is now handled in the ShuffleReader.read() method.

This change creates a cleaner separation of concerns and allows implementations 
of ShuffleReader more flexibility in how records are retrieved.

Author: Matt Massie 
Author: Kay Ousterhout 

Closes #6423 from massie/shuffle-api-cleanup and squashes the following commits:

8b0632c [Matt Massie] Minor Scala style fixes
d0a1b39 [Matt Massie] Merge pull request #1 from 
kayousterhout/massie_shuffle-api-cleanup
290f1eb [Kay Ousterhout] Added test for HashShuffleReader.read()
5186da0 [Kay Ousterhout] Revert "Add test to ensure HashShuffleReader is 
freeing resources"
f98a1b9 [Matt Massie] Add test to ensure HashShuffleReader is freeing resources
a011bfa [Matt Massie] Use PrivateMethodTester on check that delegate stream is 
closed
4ea1712 [Matt Massie] Small code cleanup for readability
7429a98 [Matt Massie] Update tests to check that BufferReleasingStream is 
closing delegate InputStream
f458489 [Matt Massie] Remove unnecessary map() on return Iterator
4abb855 [Matt Massie] Consolidate metric code. Make it clear why 
InterrubtibleIterator is needed.
5c30405 [Matt Massie] Return visibility of BlockStoreShuffleFetcher to 
private[hash]
7eedd1d [Matt Massie] Small Scala import cleanup
28f8085 [Matt Massie] Small import nit
f93841e [Matt Massie] Update shuffle read metrics in ShuffleReader instead of 
BlockStoreShuffleFetcher.
7e8e0fe [Matt Massie] Minor Scala style fixes
01e8721 [Matt Massie] Explicitly cast iterator in branches for type clarity
7c8f73e [Matt Massie] Close Block InputStream immediately after all records are 
read
208b7a5 [Matt Massie] Small code style changes
b70c945 [Matt Massie] Make BlockStoreShuffleFetcher visible to shuffle package
19135f2 [Matt Massie] [SPARK-7884] Allow Spark shuffle APIs to be more 
customizable


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7bac2fe7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7bac2fe7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7bac2fe7

Branch: refs/heads/master
Commit: 7bac2fe7717c0102b4875dbd95ae0bbf964536e3
Parents: 82f80c1
Author: Matt Massie 
Authored: Wed Jun 24 22:09:31 2015 -0700
Committer: Kay Ousterhout 
Committed: Wed Jun 24 22:10:06 2015 -0700

--
 .../shuffle/hash/BlockStoreShuffleFetcher.scala |  59 +++-
 .../spark/shuffle/hash/HashShuffleReader.scala  |  52 ++-
 .../storage/ShuffleBlockFetcherIterator.scala   |  90 +++
 .../shuffle/hash/HashShuffleReaderSuite.scala   | 150 +++
 .../ShuffleBlockFetcherIteratorSuite.scala  |  59 +---
 5 files changed, 314 insertions(+), 96 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7bac2fe7/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
 
b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
index 597d46a..9d8e7e9 100644
--- 
a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
+++ 
b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala
@@ -17,29 +17,29 @@
 
 package org.apache.spark.shuffle.hash
 
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.util.{Failure, Success, Try}
+import java.io.InputStream
+
+import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.util.{Failure, Success}
 
 import org.apache.spark._
-import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.FetchFailedException
-import org.apache.spark.storage.{BlockId, BlockManagerId, 
ShuffleBlockFetcherIterator, ShuffleBlockId}
-import org.apache.spark.util.CompletionIterator
+import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId, 
ShuffleBlockFetcherIterator,
+  ShuffleBlockId}
 
 private[hash] object BlockStoreShuffleFetcher extends Logging {
-  def fetch[T](
+  def fetchBlockStreams(
   shuffleId: Int,
   reduceId: Int,
   context: TaskContext,
-  serializer: Serializer)
-: Iterator[T] =
+  blockManager: BlockManager,
+  mapOutputTracker: MapOutput