[GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...

2016-11-21 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15959#discussion_r88954353
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -58,60 +61,127 @@ case class Statistics(
   }
 }
 
+
 /**
- * Statistics for a column.
+ * Statistics collected for a column.
+ *
+ * 1. Supported data types are defined in `ColumnStat.supportsType`.
+ * 2. The JVM data type stored in min/max is the external data type (used 
in Row) for the
+ * corresponding Catalyst data type. For example, for DateType we store 
java.sql.Date, and for
+ * TimestampType we store java.sql.Timestamp.
+ * 3. For integral types, they are all upcasted to longs, i.e. shorts are 
stored as longs.
+ *
+ * @param ndv number of distinct values
+ * @param min minimum value
+ * @param max maximum value
+ * @param numNulls number of nulls
+ * @param avgLen average length of the values. For fixed-length types, 
this should be a constant.
+ * @param maxLen maximum length of the values. For fixed-length types, 
this should be a constant.
  */
-case class ColumnStat(statRow: InternalRow) {
+// TODO: decide if we want to use bigint to represent ndv and numNulls.
+case class ColumnStat(
+ndv: Long,
+min: Any,
+max: Any,
+numNulls: Long,
+avgLen: Long,
+maxLen: Long) {
 
-  def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = {
-NumericColumnStat(statRow, dataType)
-  }
-  def forString: StringColumnStat = StringColumnStat(statRow)
-  def forBinary: BinaryColumnStat = BinaryColumnStat(statRow)
-  def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow)
+  /**
+   * Returns a map from string to string that can be used to serialize the 
column stats.
+   * The key is the name of the field (e.g. "ndv" or "min"), and the value 
is the string
+   * representation for the value. The deserialization side is defined in 
[[ColumnStat.fromMap]].
+   *
+   * As part of the protocol, the returned map always contains a key 
called "version".
+   */
+  def toMap: Map[String, String] = Map(
+"version" -> "1",
+"ndv" -> ndv.toString,
+"min" -> min.toString,
+"max" -> max.toString,
+"numNulls" -> numNulls.toString,
+"avgLen" -> avgLen.toString,
+"maxLen" -> maxLen.toString
+  )
+}
+
+
+object ColumnStat extends Logging {
 
-  override def toString: String = {
-// use Base64 for encoding
-Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes)
+  /** Returns true iff the we support gathering column statistics on 
column of the given type. */
+  def supportsType(dataType: DataType): Boolean = dataType match {
+case _: NumericType | TimestampType | DateType | BooleanType => true
+case StringType | BinaryType => true
+case _ => false
   }
-}
 
-object ColumnStat {
-  def apply(numFields: Int, str: String): ColumnStat = {
-// use Base64 for decoding
-val bytes = Base64.decodeBase64(str)
-val unsafeRow = new UnsafeRow(numFields)
-unsafeRow.pointTo(bytes, bytes.length)
-ColumnStat(unsafeRow)
+  /**
+   * Creates a [[ColumnStat]] object from the given map. This is used to 
deserialize column stats
+   * from some external storage. The serialization side is defined in 
[[ColumnStat.toMap]].
+   */
+  def fromMap(dataType: DataType, map: Map[String, String]): 
Option[ColumnStat] = {
+val str2val: (String => Any) = dataType match {
+  case _: IntegralType => _.toLong
+  case _: DecimalType => Decimal(_)
+  case DoubleType | FloatType => _.toDouble
+  case BooleanType => _.toBoolean
+  case _ => identity
+}
+
+try {
+  Some(ColumnStat(
+ndv = map("ndv").toLong,
+min = str2val(map.get("min").orNull),
+max = str2val(map.get("max").orNull),
+numNulls = map("numNulls").toLong,
+avgLen = map.getOrElse("avgLen", "1").toLong,
--- End diff --

For avgLen I think we should either have them be correct for fixed length 
types, or as cloud-fan suggests have it be an option.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15959: [SPARK-18522][SQL] Explicit contract for column s...

2016-11-21 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15959#discussion_r88958613
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -58,60 +61,127 @@ case class Statistics(
   }
 }
 
+
 /**
- * Statistics for a column.
+ * Statistics collected for a column.
+ *
+ * 1. Supported data types are defined in `ColumnStat.supportsType`.
+ * 2. The JVM data type stored in min/max is the external data type (used 
in Row) for the
+ * corresponding Catalyst data type. For example, for DateType we store 
java.sql.Date, and for
+ * TimestampType we store java.sql.Timestamp.
+ * 3. For integral types, they are all upcasted to longs, i.e. shorts are 
stored as longs.
+ *
+ * @param ndv number of distinct values
+ * @param min minimum value
+ * @param max maximum value
+ * @param numNulls number of nulls
+ * @param avgLen average length of the values. For fixed-length types, 
this should be a constant.
+ * @param maxLen maximum length of the values. For fixed-length types, 
this should be a constant.
  */
-case class ColumnStat(statRow: InternalRow) {
+// TODO: decide if we want to use bigint to represent ndv and numNulls.
+case class ColumnStat(
+ndv: Long,
+min: Any,
+max: Any,
+numNulls: Long,
+avgLen: Long,
+maxLen: Long) {
 
-  def forNumeric[T <: AtomicType](dataType: T): NumericColumnStat[T] = {
-NumericColumnStat(statRow, dataType)
-  }
-  def forString: StringColumnStat = StringColumnStat(statRow)
-  def forBinary: BinaryColumnStat = BinaryColumnStat(statRow)
-  def forBoolean: BooleanColumnStat = BooleanColumnStat(statRow)
+  /**
+   * Returns a map from string to string that can be used to serialize the 
column stats.
+   * The key is the name of the field (e.g. "ndv" or "min"), and the value 
is the string
+   * representation for the value. The deserialization side is defined in 
[[ColumnStat.fromMap]].
+   *
+   * As part of the protocol, the returned map always contains a key 
called "version".
+   */
+  def toMap: Map[String, String] = Map(
+"version" -> "1",
+"ndv" -> ndv.toString,
+"min" -> min.toString,
+"max" -> max.toString,
+"numNulls" -> numNulls.toString,
+"avgLen" -> avgLen.toString,
+"maxLen" -> maxLen.toString
+  )
+}
+
+
+object ColumnStat extends Logging {
 
-  override def toString: String = {
-// use Base64 for encoding
-Base64.encodeBase64String(statRow.asInstanceOf[UnsafeRow].getBytes)
+  /** Returns true iff the we support gathering column statistics on 
column of the given type. */
+  def supportsType(dataType: DataType): Boolean = dataType match {
+case _: NumericType | TimestampType | DateType | BooleanType => true
+case StringType | BinaryType => true
+case _ => false
   }
-}
 
-object ColumnStat {
-  def apply(numFields: Int, str: String): ColumnStat = {
-// use Base64 for decoding
-val bytes = Base64.decodeBase64(str)
-val unsafeRow = new UnsafeRow(numFields)
-unsafeRow.pointTo(bytes, bytes.length)
-ColumnStat(unsafeRow)
+  /**
+   * Creates a [[ColumnStat]] object from the given map. This is used to 
deserialize column stats
+   * from some external storage. The serialization side is defined in 
[[ColumnStat.toMap]].
+   */
+  def fromMap(dataType: DataType, map: Map[String, String]): 
Option[ColumnStat] = {
+val str2val: (String => Any) = dataType match {
+  case _: IntegralType => _.toLong
+  case _: DecimalType => Decimal(_)
+  case DoubleType | FloatType => _.toDouble
+  case BooleanType => _.toBoolean
+  case _ => identity
+}
+
+try {
+  Some(ColumnStat(
+ndv = map("ndv").toLong,
+min = str2val(map.get("min").orNull),
+max = str2val(map.get("max").orNull),
+numNulls = map("numNulls").toLong,
+avgLen = map.getOrElse("avgLen", "1").toLong,
+maxLen = map.getOrElse("maxLen", "1").toLong
+  ))
+} catch {
+  case NonFatal(e) =>
+logWarning("Failed to parse column statistics", e)
+None
+}
   }
-}
 
-case class NumericColumnStat[T <: AtomicType](statRow: InternalRow, 
dataType: T) {
-  // The indices here must be consistent with 
`ColumnStatStruct.numericColumnStat`.
- 

[GitHub] spark issue #15763: [SPARK-17348][SQL] Incorrect results from subquery trans...

2016-11-07 Thread srinathshankar
Github user srinathshankar commented on the issue:

https://github.com/apache/spark/pull/15763
  
As it stands the check seems overly aggressive. The following works:
```sql
select t1.c1
from t1
where t1.c1 in (
  select c1 + 1 as col 
  from t2 
  where t1.c2 < t2.c2)
```
```
scala> sql("select t1.c1 from t1 where t1.c1 in (select c1 + 1 as col from 
t2 where t1.c2 < t2.c2)").show()
+---+
| c1|
+---+
+---+
```
But the following, when I wrap the subselect with a project, does not
```sql
select t1.c1
from t1
where t1.c1 in (
  select col + 1
  from (
select c1 as col
from t2
where t1.c2 < t2.c2))
```
```
scala> sql("select t1.c1 from t1 where t1.c1 in (select col + 1 from 
(select c1 as col from t2 where t1.c2 < t2.c2))").show()
org.apache.spark.sql.AnalysisException: Correlated column is not allowed in 
a non-equality predicate:
Project [(col#29 + 1) AS (col + 1)#32]
+- Project [(c1#16 + 1) AS col#29]
   +- Filter (outer(c2#6) < c2#17)
  +- SubqueryAlias t2
 +- Project [_1#13 AS c1#16, _2#14 AS c2#17]
+- LocalRelation [_1#13, _2#14]
;
```

Is this intentional ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-11-01 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85993076
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MapAggregate.scala
 ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription}
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The MapAggregate function for a column returns:
+ * 1. null if no non-null value exists.
+ * 2. (distinct non-null value, frequency) pairs of equi-width histogram 
when the number of
+ * distinct non-null values is less than or equal to the specified maximum 
number of bins.
+ * 3. an empty map otherwise.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins) - Returns 1. null if no non-null value exists.
+  2. (distinct non-null value, frequency) pairs of equi-width 
histogram when the number of
+  distinct non-null values is less than or equal to the specified 
maximum number of bins.
+  3. an empty map otherwise.
+""")
+case class MapAggregate(
+child: Expression,
+numBinsExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[MapDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 0, 0)
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  override def inputTypes: Seq[AbstractDataType] = {
+Seq(TypeCollection(NumericType, TimestampType, DateType, StringType), 
IntegerType)
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val defaultCheck = super.checkInputDataTypes()
+if (defaultCheck.isFailure) {
+  defaultCheck
+} else if (!numBinsExpression.foldable) {
+  TypeCheckFailure("The maximum number of bins provided must be a 
constant literal")
+} else if (numBins < 2) {
+  TypeCheckFailure(
+"The maximum number of bins provided must be a positive integer 
literal >= 2 " +
+  s"(current value = $numBins)")
+} else {
+  TypeCheckSuccess
+}
+  }
+
+  override def update(buffer: MapDigest, input: InternalRow): Unit = {
+if (buffer.isInvalid) {
+  return
+}
+val evaluated = child.eval(input)
+if (evaluated != null) {
+  buffer.update(child.dataType, evaluated, numBins)
+}
+  }
+
+  override def merge(buffer: MapDigest, other: MapDigest): Unit = {
+if (buffer.isInvalid) return
+if (other.isInvalid) {
+  buffer.isInvalid = true
+  buffer.clear()
+  return
+}
+buffer.merge(other, numBins)
+  }
+
+  override def eval(buffer: MapDigest): Any = {
+if (buffer.isInvalid) {
+  // return empty map
+  ArrayBasedMapDa

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-28 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85619246
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HistogramEndpoints.scala
 ---
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription, Literal}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.{PercentileDigest,
 PercentileDigestSerializer}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
QuantileSummaries}
+import org.apache.spark.sql.catalyst.util.QuantileSummaries._
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The HistogramEndpoints function for a column returns bins - (distinct 
value, frequency) pairs
+ * of equi-width histogram when the number of distinct values is less than 
or equal to the
+ * specified maximum number of bins. Otherwise, for column of string type, 
it returns an empty
+ * map; for column of numeric type, it returns endpoints of equi-height 
histogram - approximate
+ * percentiles at percentages 0.0, 1/numBins, 2/numBins, ..., 
(numBins-1)/numBins, 1.0.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ * @param accuracyExpression Accuracy used in computing approximate 
percentiles.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins [, accuracy]) - Returns bins - (distinct value, 
frequency) pairs
+  of equi-width histogram when the number of distinct values is less 
than or equal to the
+  specified maximum number of bins. Otherwise, for column of string 
type, it returns an empty
+  map; for column of numeric type, it returns endpoints of equi-height 
histogram - approximate
+  percentiles at percentages 0.0, 1/numBins, 2/numBins, ..., 
(numBins-1)/numBins, 1.0. The
+  `accuracy` parameter (default: 1) is a positive integer literal 
which controls percentiles
+  approximation accuracy at the cost of memory. Higher value of 
`accuracy` yields better
+  accuracy, `1.0/accuracy` is the relative error of the approximation.
+""")
+case class HistogramEndpoints(
+child: Expression,
+numBinsExpression: Expression,
+accuracyExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[EndpointsDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression, 
accuracyExpression: Expression) = {
+this(child, numBinsExpression, accuracyExpression, 0, 0)
+  }
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 
Literal(ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY))
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  private lazy val percentages: Array[Double] = {
+val array = new Array[Double](numBins + 1)
+for (i <- 0 to numBins) {
+  array(i) = i / numBins.toDouble
+}
+array
+  }
+
+  private lazy val accuracy: Int = 
accuracyExpressi

[GitHub] spark pull request #15637: [SPARK-18000] [SQL] Aggregation function for comp...

2016-10-28 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15637#discussion_r85609917
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HistogramEndpoints.scala
 ---
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable
+
+import com.google.common.primitives.{Doubles, Ints, Longs}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
ExpressionDescription, Literal}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile.{PercentileDigest,
 PercentileDigestSerializer}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
QuantileSummaries}
+import org.apache.spark.sql.catalyst.util.QuantileSummaries._
+import org.apache.spark.sql.types.{DataType, _}
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * The HistogramEndpoints function for a column returns bins - (distinct 
value, frequency) pairs
+ * of equi-width histogram when the number of distinct values is less than 
or equal to the
+ * specified maximum number of bins. Otherwise, for column of string type, 
it returns an empty
+ * map; for column of numeric type, it returns endpoints of equi-height 
histogram - approximate
+ * percentiles at percentages 0.0, 1/numBins, 2/numBins, ..., 
(numBins-1)/numBins, 1.0.
+ *
+ * @param child child expression that can produce column value with 
`child.eval(inputRow)`
+ * @param numBinsExpression The maximum number of bins.
+ * @param accuracyExpression Accuracy used in computing approximate 
percentiles.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, numBins [, accuracy]) - Returns bins - (distinct value, 
frequency) pairs
+  of equi-width histogram when the number of distinct values is less 
than or equal to the
+  specified maximum number of bins. Otherwise, for column of string 
type, it returns an empty
+  map; for column of numeric type, it returns endpoints of equi-height 
histogram - approximate
+  percentiles at percentages 0.0, 1/numBins, 2/numBins, ..., 
(numBins-1)/numBins, 1.0. The
+  `accuracy` parameter (default: 1) is a positive integer literal 
which controls percentiles
+  approximation accuracy at the cost of memory. Higher value of 
`accuracy` yields better
+  accuracy, `1.0/accuracy` is the relative error of the approximation.
+""")
+case class HistogramEndpoints(
+child: Expression,
+numBinsExpression: Expression,
+accuracyExpression: Expression,
+override val mutableAggBufferOffset: Int,
+override val inputAggBufferOffset: Int) extends 
TypedImperativeAggregate[EndpointsDigest] {
+
+  def this(child: Expression, numBinsExpression: Expression, 
accuracyExpression: Expression) = {
+this(child, numBinsExpression, accuracyExpression, 0, 0)
+  }
+
+  def this(child: Expression, numBinsExpression: Expression) = {
+this(child, numBinsExpression, 
Literal(ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY))
+  }
+
+  // Mark as lazy so that numBinsExpression is not evaluated during tree 
transformation.
+  private lazy val numBins: Int = 
numBinsExpression.eval().asInstanceOf[Int]
+
+  private lazy val percentages: Array[Double] = {
+val array = new Array[Double](numBins + 1)
+for (i <- 0 to numBins) {
+  array(i) = i / numBins.toDouble
+}
+array
+  }
+
+  private lazy val accuracy: Int = 
accuracyExpressi

[GitHub] spark pull request #15640: [SPARK-18106][SQL] ANALYZE TABLE should raise a P...

2016-10-26 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15640#discussion_r85236110
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 ---
@@ -220,4 +220,12 @@ class SparkSqlParserSuite extends PlanTest {
 
 intercept("explain describe tables x", "Unsupported SQL statement")
   }
+
+  test("SPARK-18106 analyze table") {
--- End diff --

In my opinion it's fine to rewrite and simplify. If you could do that, that 
would be great.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15640: [SPARK-18106][SQL] ANALYZE TABLE should raise a P...

2016-10-26 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15640#discussion_r85200011
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 ---
@@ -220,4 +220,13 @@ class SparkSqlParserSuite extends PlanTest {
 
 intercept("explain describe tables x", "Unsupported SQL statement")
   }
+
+  test("SPARK-18106 analyze table") {
+assertEqual("analyze table t compute statistics",
+  AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
+assertEqual("analyze table t compute statistics noscan",
+  AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
+intercept("analyze table t compute statistics ", "Expected 
`NOSCAN` instead of ``")
+intercept("analyze table t partition (a) compute statistics ")
--- End diff --

Nit: `, "Expected `NOSCAN` instead of ``"` here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15640: [SPARK-18106][SQL] ANALYZE TABLE should raise a P...

2016-10-26 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15640#discussion_r85199895
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 ---
@@ -220,4 +220,12 @@ class SparkSqlParserSuite extends PlanTest {
 
 intercept("explain describe tables x", "Unsupported SQL statement")
   }
+
+  test("SPARK-18106 analyze table") {
--- End diff --

Then maybe you can move those parse tests here ? All I'm suggesting is that 
the parse tests all be together.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15640: [SPARK-18106][SQL] ANALYZE TABLE should raise a P...

2016-10-26 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15640#discussion_r85163515
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -98,9 +98,10 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
-if (ctx.partitionSpec == null &&
-  ctx.identifier != null &&
-  ctx.identifier.getText.toLowerCase == "noscan") {
+if (ctx.partitionSpec == null && ctx.identifier != null) {
--- End diff --

What if partition spec is not null ?
What happens with something like 
```
ANALYZE TABLE mytable PARTITION (a) garbage
```
(Could you add a test for that ?)
Maybe 
```
if (ctx.identifier != null && ctx.identifier.getText.toLowerCase != 
"noscan") {
  throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
}
```
could be moved to the top ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15640: [SPARK-18106][SQL] ANALYZE TABLE should raise a P...

2016-10-26 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15640#discussion_r85163883
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 ---
@@ -220,4 +220,12 @@ class SparkSqlParserSuite extends PlanTest {
 
 intercept("explain describe tables x", "Unsupported SQL statement")
   }
+
+  test("SPARK-18106 analyze table") {
--- End diff --

There are also parse tests for AnalyzeTable in 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
Let's have these in the same place


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15493: [SPARK-17946][PYSPARK] Python crossJoin API similar to S...

2016-10-19 Thread srinathshankar
Github user srinathshankar commented on the issue:

https://github.com/apache/spark/pull/15493
  
Created https://issues.apache.org/jira/browse/SPARK-18013
@felixcheung @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15542: [SPARK-17996][SQL] Fix unqualified catalog.getFun...

2016-10-19 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15542#discussion_r84118865
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala ---
@@ -133,11 +133,11 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   private def makeFunction(funcIdent: FunctionIdentifier): Function = {
 val metadata = sessionCatalog.lookupFunctionInfo(funcIdent)
 new Function(
-  name = funcIdent.identifier,
-  database = funcIdent.database.orNull,
+  name = metadata.getName,
+  database = metadata.getDb,
   description = null, // for now, this is always undefined
   className = metadata.getClassName,
-  isTemporary = funcIdent.database.isEmpty)
+  isTemporary = metadata.getDb == null)
--- End diff --

Is this still the right way to do this ? What about global temp tables ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15542: [SPARK-17996][SQL] Fix unqualified catalog.getFun...

2016-10-19 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15542#discussion_r84114285
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java
 ---
@@ -42,14 +43,23 @@ public String getExtended() {
 return extended;
 }
 
-public ExpressionInfo(String className, String name, String usage, 
String extended) {
+public String getDb() {
+return db;
+}
+
+public ExpressionInfo(String className, String name, String usage, 
String extended, String db) {
--- End diff --

Nit: Could we put the db name before the expression name in the params ? 
Not a big deal if this requires more surgery.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15542: [SPARK-17996][SQL] Fix unqualified catalog.getFun...

2016-10-19 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15542#discussion_r84114502
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala 
---
@@ -118,14 +118,16 @@ case class DescribeFunctionCommand(
   case _ =>
 try {
   val info = 
sparkSession.sessionState.catalog.lookupFunctionInfo(functionName)
+  val db = if (info.getDb != null) info.getDb + "." else ""
--- End diff --

Just inline the definition of val db into the definition of val name ? You 
don't use it anywhere else.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15493: [SPARK-17946][PYSPARK] Python crossJoin API simil...

2016-10-14 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15493#discussion_r83518122
  
--- Diff: python/pyspark/sql/dataframe.py ---
@@ -627,6 +627,25 @@ def alias(self, alias):
 return DataFrame(getattr(self._jdf, "as")(alias), self.sql_ctx)
 
 @ignore_unicode_prefix
+@since(2.1)
+def crossJoin(self, other):
+"""Returns the cartesian product with another :class:`DataFrame`
--- End diff --

done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15493: [SPARK-17946][PYSPARK] Python crossJoin API similar to S...

2016-10-14 Thread srinathshankar
Github user srinathshankar commented on the issue:

https://github.com/apache/spark/pull/15493
  
cc @sameeragarwal @hvanhovell @davies @JoshRosen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15493: [SPARK-17946][PYSPARK] Python crossJoin API simil...

2016-10-14 Thread srinathshankar
GitHub user srinathshankar opened a pull request:

https://github.com/apache/spark/pull/15493

[SPARK-17946][PYSPARK] Python crossJoin API similar to Scala

## What changes were proposed in this pull request?

Add a crossJoin function to the DataFrame API similar to that in Scala. 
Joins with no condition (cartesian products) must be specified with the 
crossJoin API
 
## How was this patch tested?
Added python tests to ensure that an AnalysisException if a cartesian 
product is specified without crossJoin(), and that cartesian products can 
execute if specified via crossJoin()

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review 
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before 
opening a pull request.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/srinathshankar/spark crosspython

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15493.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15493


commit 9b4d995fa5e7ca0d9636b708fcec1088054c1022
Author: Srinath Shankar 
Date:   2016-10-14T21:00:08Z

[SPARK-17946][PYSPARK] Python crossJoin API similar to Scala




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15409: [Spark-14761][SQL] Reject invalid join methods wh...

2016-10-12 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15409#discussion_r83054921
  
--- Diff: python/pyspark/sql/dataframe.py ---
@@ -640,25 +640,24 @@ def join(self, other, on=None, how=None):
 if on is not None and not isinstance(on, list):
 on = [on]
 
-if on is None or len(on) == 0:
-jdf = self._jdf.crossJoin(other._jdf)
-elif isinstance(on[0], basestring):
-if how is None:
-jdf = self._jdf.join(other._jdf, self._jseq(on), "inner")
+if on is not None:
+if isinstance(on[0], basestring):
+on = self._jseq(on)
 else:
-assert isinstance(how, basestring), "how should be 
basestring"
-jdf = self._jdf.join(other._jdf, self._jseq(on), how)
+assert isinstance(on[0], Column), "on should be Column or 
list of Column"
+if len(on) > 1:
+on = reduce(lambda x, y: x.__and__(y), on)
--- End diff --

I could fix this too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15409: [Spark-14761][SQL] Reject invalid join methods wh...

2016-10-12 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15409#discussion_r83050677
  
--- Diff: python/pyspark/sql/dataframe.py ---
@@ -640,25 +640,24 @@ def join(self, other, on=None, how=None):
 if on is not None and not isinstance(on, list):
 on = [on]
 
-if on is None or len(on) == 0:
-jdf = self._jdf.crossJoin(other._jdf)
-elif isinstance(on[0], basestring):
-if how is None:
-jdf = self._jdf.join(other._jdf, self._jseq(on), "inner")
+if on is not None:
+if isinstance(on[0], basestring):
+on = self._jseq(on)
 else:
-assert isinstance(how, basestring), "how should be 
basestring"
-jdf = self._jdf.join(other._jdf, self._jseq(on), how)
+assert isinstance(on[0], Column), "on should be Column or 
list of Column"
+if len(on) > 1:
+on = reduce(lambda x, y: x.__and__(y), on)
--- End diff --

Wouldn't reduce work even if len(on) == 1. i.e. do you need an if here ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15238: [SPARK-17653][SQL] Remove unnecessary distincts i...

2016-09-26 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15238#discussion_r80593959
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
 ---
@@ -76,4 +77,32 @@ class SetOperationSuite extends PlanTest {
 testRelation3.select('g) :: Nil).analyze
 comparePlans(unionOptimized, unionCorrectAnswer)
   }
+
+  test("no more unnecessary distincts in multiple unions") {
+val query1 = OneRowRelation
+  .select(Literal(1).as('a))
+val query2 = OneRowRelation
+  .select(Literal(2).as('b))
+val query3 = OneRowRelation
+  .select(Literal(3).as('c))
+
+val unionQuery1 = Distinct(Union(Distinct(Union(query1, query2)), 
query3)).analyze
+val optimized1 = Optimize.execute(unionQuery1)
+val distinctUnionCorrectAnswer1 =
+  Distinct(Union(query1 :: query2 :: query3 :: Nil)).analyze
+comparePlans(distinctUnionCorrectAnswer1, optimized1)
+
+val unionQuery2 = Union(Distinct(Union(query1, query2)), 
query3).analyze
+val optimized2 = Optimize.execute(unionQuery2)
+val distinctUnionCorrectAnswer2 =
--- End diff --

In other words, distinctUnionCorrectAnswer2 = unionQuery2, right ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15238: [SPARK-17653][SQL] Remove unnecessary distincts i...

2016-09-26 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15238#discussion_r80593053
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -579,8 +579,13 @@ object InferFiltersFromConstraints extends 
Rule[LogicalPlan] with PredicateHelpe
  * Combines all adjacent [[Union]] operators into a single [[Union]].
  */
 object CombineUnions extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-case Unions(children) => Union(children)
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+case Unions(children, isDistinct) =>
--- End diff --

Could we do Unions(children, true) and Unions(children, false) as separate 
case statements, that is separate pattern matching ? That would make it more 
clear.
Like a UnionAll and a UnionDistinct ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15187: [SPARK-17616][SQL] Support a single distinct aggr...

2016-09-22 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15187#discussion_r80112793
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregatesSuite.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, 
EmptyFunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{If, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{CollectSet, 
Count}
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, 
LocalRelation, LogicalPlan}
+import org.apache.spark.sql.types.{IntegerType, StringType}
+
+class RewriteDistinctAggregatesSuite extends PlanTest {
+  val conf = SimpleCatalystConf(caseSensitiveAnalysis = false, 
groupByOrdinal = false)
+  val catalog = new SessionCatalog(new InMemoryCatalog, 
EmptyFunctionRegistry, conf)
+  val analyzer = new Analyzer(catalog, conf)
+
+  val nullInt = Literal(null, IntegerType)
+  val nullString = Literal(null, StringType)
+  val testRelation = LocalRelation('a.string, 'b.string, 'c.string, 
'd.string, 'e.int)
+
+  private def checkRewrite(rewrite: LogicalPlan): Unit = rewrite match {
+case Aggregate(_, _, Aggregate(_, _, _: Expand)) =>
+case _ => fail(s"Plan is not rewritten:\n$rewrite")
+  }
+
+  test("single distinct group") {
+val input = testRelation
+  .groupBy('a)(countDistinct('e))
+  .analyze
+val rewrite = RewriteDistinctAggregates(input)
+comparePlans(input, rewrite)
+  }
+
+  test("single distinct group with non-partial aggregates") {
+val input = testRelation
+  .groupBy('a, 'd)(
+countDistinct('e, 'c).as('agg1),
+CollectSet('b).toAggregateExpression().as('agg2))
+  .analyze
+checkRewrite(RewriteDistinctAggregates(input))
+  }
+
+  test("multiple distinct groups") {
+val input = testRelation
+  .groupBy('a)(countDistinct('b, 'c), countDistinct('d), sum('e))
+  .analyze
+checkRewrite(RewriteDistinctAggregates(input))
--- End diff --

Could you also add a test with partials, and one without partials here ? 
(part of the same test(""))


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15187: [SPARK-17616][SQL] Support a single distinct aggr...

2016-09-22 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15187#discussion_r80112666
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregatesSuite.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, 
EmptyFunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{If, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{CollectSet, 
Count}
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, 
LocalRelation, LogicalPlan}
+import org.apache.spark.sql.types.{IntegerType, StringType}
+
+class RewriteDistinctAggregatesSuite extends PlanTest {
+  val conf = SimpleCatalystConf(caseSensitiveAnalysis = false, 
groupByOrdinal = false)
+  val catalog = new SessionCatalog(new InMemoryCatalog, 
EmptyFunctionRegistry, conf)
+  val analyzer = new Analyzer(catalog, conf)
+
+  val nullInt = Literal(null, IntegerType)
+  val nullString = Literal(null, StringType)
+  val testRelation = LocalRelation('a.string, 'b.string, 'c.string, 
'd.string, 'e.int)
+
+  private def checkRewrite(rewrite: LogicalPlan): Unit = rewrite match {
+case Aggregate(_, _, Aggregate(_, _, _: Expand)) =>
+case _ => fail(s"Plan is not rewritten:\n$rewrite")
+  }
+
+  test("single distinct group") {
+val input = testRelation
+  .groupBy('a)(countDistinct('e))
+  .analyze
+val rewrite = RewriteDistinctAggregates(input)
+comparePlans(input, rewrite)
+  }
--- End diff --

Could you also add single distinct group with aggregates that have partial


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15187: [SPARK-17616][SQL] Support a single distinct aggr...

2016-09-22 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15187#discussion_r80109506
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregatesSuite.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.SimpleCatalystConf
+import org.apache.spark.sql.catalyst.analysis.{Analyzer, 
EmptyFunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.{If, Literal}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{CollectSet, 
Count}
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, 
LocalRelation, LogicalPlan}
+import org.apache.spark.sql.types.{IntegerType, StringType}
+
+class RewriteDistinctAggregatesSuite extends PlanTest {
+  val conf = SimpleCatalystConf(caseSensitiveAnalysis = false, 
groupByOrdinal = false)
+  val catalog = new SessionCatalog(new InMemoryCatalog, 
EmptyFunctionRegistry, conf)
+  val analyzer = new Analyzer(catalog, conf)
+
+  val nullInt = Literal(null, IntegerType)
+  val nullString = Literal(null, StringType)
+  val testRelation = LocalRelation('a.string, 'b.string, 'c.string, 
'd.string, 'e.int)
+
+  private def checkRewrite(rewrite: LogicalPlan): Unit = rewrite match {
+case Aggregate(_, _, Aggregate(_, _, _: Expand)) =>
+case _ => fail(s"Plan is not rewritten:\n$rewrite")
+  }
+
+  test("single distinct group") {
+val input = testRelation
+  .groupBy('a)(countDistinct('e))
+  .analyze
+val rewrite = RewriteDistinctAggregates(input)
+comparePlans(input, rewrite)
+  }
+
+  test("single distinct group with non-partial aggregates") {
+val input = testRelation
+  .groupBy('a, 'd)(
+countDistinct('e, 'c).as('agg1),
+CollectSet('b).toAggregateExpression().as('agg2))
+  .analyze
+checkRewrite(RewriteDistinctAggregates(input))
+  }
+
+  test("multiple distinct groups") {
+val input = testRelation
+  .groupBy('a)(countDistinct('b, 'c), countDistinct('d), sum('e))
+  .analyze
+checkRewrite(RewriteDistinctAggregates(input))
+  }
+
+  test("multiple distinct groups without non-distinct aggregates") {
--- End diff --

Do you mean non-partial aggregates here ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15186: [SPARK-17485] Prevent failed remote reads of cached bloc...

2016-09-22 Thread srinathshankar
Github user srinathshankar commented on the issue:

https://github.com/apache/spark/pull/15186
  
Thanks for confirming, LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15154: [SPARK-17494] [SQL] changePrecision() on compact ...

2016-09-21 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15154#discussion_r79946505
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala ---
@@ -191,4 +192,18 @@ class DecimalSuite extends SparkFunSuite with 
PrivateMethodTester {
 assert(new Decimal().set(100L, 10, 0).toUnscaledLong === 100L)
 assert(Decimal(Long.MaxValue, 100, 0).toUnscaledLong === Long.MaxValue)
   }
+
+  test("changePrecision() on compact decimal should respect rounding 
mode") {
+Seq(ROUND_FLOOR, ROUND_CEILING, ROUND_HALF_UP, 
ROUND_HALF_EVEN).foreach { mode =>
+  Seq("0.4", "0.5", "0.6", "1.0", "1.1", "1.6", "2.5", "5.5").foreach 
{ n =>
+Seq("", "-").foreach { sigh =>
--- End diff --

I think you mean sign, not sigh, though I can understand :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15090: [SPARK-17073] [SQL] generate column-level statistics

2016-09-21 Thread srinathshankar
Github user srinathshankar commented on the issue:

https://github.com/apache/spark/pull/15090
  
Actually, I didn't mean to approve immediately, sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79921689
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
ColumnStats, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = sessionState.conf.caseSensitiveAnalysis
+columnNames.foreach { col =>
+  val exprOption = relation.output.find { attr =>
+if (caseSensitive) attr.name == col else 
attr.name.equalsIgnoreCase(col)
+  }
+  val expr = exprOption.getOrElse(throw new 
AnalysisException(s"Invalid column name: $col."))
+  // do deduplication
+  if (!attributesToAnalyze.contains(expr)) {
+attributesToAnalyze += expr
+  }
+}
+
+relation match {
+  case catalogRel: CatalogRelation =>
+updateStats(catalogRel.catalogTable,
+  AnalyzeTableCommand.calculateTotalSize(sessionState, 
catalogRel.catalogTable))
+
+  case logicalRel: LogicalRelation if 
logicalRel.catalogTable.isDefined =>
+updateStats(logicalRel.catalogTable.get, 
logicalRel.relation.sizeInBytes)
+
+  case otherRelation =>
+throw new AnalysisException("ANALYZE TABLE is not supported for " +
+  s"${otherRelation.nodeName}.")
+}
+
+def updateStats(catalogTable: CatalogTable, newTotalSize: Long): Unit 
= {
+  // Collect statistics per column.
+  // The first element in the result will be the overall row count, 
the following elements
+  // will be structs containing all column stats.
+  // The layout of each struct follows the layout of the ColumnStats.
+  val ndvMaxErr = sessionState.conf.ndvMaxError
+  val expressions = Count(Literal(1)).toAggregateExpression() +:
+attributesToAnalyze.map(ColumnStatsStruct(_, ndvMaxErr))
+  val namedExpressions = expressions.map(e => Alias(e, e.toString)())
+  val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, 
namedExpressions, relation))
+.queryExecution.toRdd.collect().head
+
+  // unwrap the result
+  val rowCount = statsRow.getLong(0)
+  val columnStats = attributesToAnalyze.zipWithIndex.map { case (expr, 
i) =>
+(expr.name, ColumnStatsStruct.unwrapStruct(statsRow, i + 1, expr))
+  }.toMap
+
+  val statistics = Statistics(
+  

[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79893014
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical
  * @param sizeInBytes Physical size in bytes. For leaf operators this 
defaults to 1, otherwise it
  *defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
+ * @param colStats Column-level statistics.
  * @param isBroadcastable If true, output is small enough to be used in a 
broadcast join.
  */
 case class Statistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
+colStats: Map[String, ColumnStats] = Map.empty,
 isBroadcastable: Boolean = false) {
+
   override def toString: String = "Statistics(" + simpleString + ")"
 
   /** Readable string representation for the Statistics. */
   def simpleString: String = {
 Seq(s"sizeInBytes=$sizeInBytes",
   if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
+  if (colStats.nonEmpty) s"colStats=$colStats" else "",
   s"isBroadcastable=$isBroadcastable"
-).filter(_.nonEmpty).mkString("", ", ", "")
+).filter(_.nonEmpty).mkString(", ")
+  }
+}
+
+/**
+ * Statistics for a column.
+ * @param ndv Number of distinct values of the column.
+ */
+case class ColumnStats(
+dataType: DataType,
+numNulls: Long,
+max: Option[Any] = None,
+min: Option[Any] = None,
+ndv: Option[Long] = None,
+avgColLen: Option[Double] = None,
+maxColLen: Option[Long] = None,
+numTrues: Option[Long] = None,
+numFalses: Option[Long] = None) {
+
+  override def toString: String = "ColumnStats(" + simpleString + ")"
+
+  def simpleString: String = {
+Seq(s"numNulls=$numNulls",
+  if (max.isDefined) s"max=${max.get}" else "",
+  if (min.isDefined) s"min=${min.get}" else "",
+  if (ndv.isDefined) s"ndv=${ndv.get}" else "",
+  if (avgColLen.isDefined) s"avgColLen=${avgColLen.get}" else "",
+  if (maxColLen.isDefined) s"maxColLen=${maxColLen.get}" else "",
+  if (numTrues.isDefined) s"numTrues=${numTrues.get}" else "",
+  if (numFalses.isDefined) s"numFalses=${numFalses.get}" else ""
+).filter(_.nonEmpty).mkString(", ")
+  }
+}
+
+object ColumnStats {
+  def fromString(str: String, dataType: DataType): ColumnStats = {
--- End diff --

Suggestion: Based on fromString, It looks like simpleString above is 
practically used as a serializer to stuff stats in the catalog ? If so, could 
you change the name to catalogRepresentation or something like that so it's 
clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79891286
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical
  * @param sizeInBytes Physical size in bytes. For leaf operators this 
defaults to 1, otherwise it
  *defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
+ * @param colStats Column-level statistics.
  * @param isBroadcastable If true, output is small enough to be used in a 
broadcast join.
  */
 case class Statistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
+colStats: Map[String, ColumnStats] = Map.empty,
 isBroadcastable: Boolean = false) {
+
   override def toString: String = "Statistics(" + simpleString + ")"
 
   /** Readable string representation for the Statistics. */
   def simpleString: String = {
 Seq(s"sizeInBytes=$sizeInBytes",
   if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
+  if (colStats.nonEmpty) s"colStats=$colStats" else "",
   s"isBroadcastable=$isBroadcastable"
-).filter(_.nonEmpty).mkString("", ", ", "")
+).filter(_.nonEmpty).mkString(", ")
+  }
+}
+
+/**
+ * Statistics for a column.
+ * @param ndv Number of distinct values of the column.
+ */
+case class ColumnStats(
+dataType: DataType,
+numNulls: Long,
+max: Option[Any] = None,
+min: Option[Any] = None,
+ndv: Option[Long] = None,
+avgColLen: Option[Double] = None,
+maxColLen: Option[Long] = None,
+numTrues: Option[Long] = None,
--- End diff --

This seems special-cased for booleans. If we're planning to maintain 
histograms or frequent values, it would seem that this is unnecessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79897549
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
CatalogTable}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
ColumnStats, Statistics}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+
+/**
+ * Analyzes the given columns of the given table in the current database 
to generate statistics,
+ * which will be used in query optimizations.
+ */
+case class AnalyzeColumnCommand(
+tableIdent: TableIdentifier,
+columnNames: Seq[String]) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+
+// check correctness of column names
+val attributesToAnalyze = mutable.MutableList[Attribute]()
+val caseSensitive = sessionState.conf.caseSensitiveAnalysis
--- End diff --

I think catalyst.resolver gives you a comparator based on the conf. Might 
be cleaner to just use that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15090: [SPARK-17073] [SQL] generate column-level statist...

2016-09-21 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15090#discussion_r79893449
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -32,19 +34,74 @@ package org.apache.spark.sql.catalyst.plans.logical
  * @param sizeInBytes Physical size in bytes. For leaf operators this 
defaults to 1, otherwise it
  *defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
+ * @param colStats Column-level statistics.
  * @param isBroadcastable If true, output is small enough to be used in a 
broadcast join.
  */
 case class Statistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
+colStats: Map[String, ColumnStats] = Map.empty,
 isBroadcastable: Boolean = false) {
+
   override def toString: String = "Statistics(" + simpleString + ")"
 
   /** Readable string representation for the Statistics. */
   def simpleString: String = {
 Seq(s"sizeInBytes=$sizeInBytes",
   if (rowCount.isDefined) s"rowCount=${rowCount.get}" else "",
+  if (colStats.nonEmpty) s"colStats=$colStats" else "",
   s"isBroadcastable=$isBroadcastable"
-).filter(_.nonEmpty).mkString("", ", ", "")
+).filter(_.nonEmpty).mkString(", ")
+  }
+}
+
+/**
+ * Statistics for a column.
+ * @param ndv Number of distinct values of the column.
+ */
+case class ColumnStats(
+dataType: DataType,
+numNulls: Long,
+max: Option[Any] = None,
--- End diff --

Shouldn't max/min be numeric ? (You're not planning on putting strings in 
here, right) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15154: [SPARK-17494] [SQL] changePrecision() on compact ...

2016-09-21 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15154#discussion_r79884937
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala ---
@@ -242,10 +242,30 @@ final class Decimal extends Ordered[Decimal] with 
Serializable {
   if (scale < _scale) {
 // Easier case: we just need to divide our scale down
 val diff = _scale - scale
-val droppedDigits = longVal % POW_10(diff)
-longVal /= POW_10(diff)
-if (math.abs(droppedDigits) * 2 >= POW_10(diff)) {
-  longVal += (if (longVal < 0) -1L else 1L)
+val pow10diff = POW_10(diff)
+// % and / always round to 0
+val droppedDigits = longVal % pow10diff
+longVal /= pow10diff
+roundMode match {
+  case ROUND_FLOOR =>
+if (droppedDigits < 0) {
+  longVal += -1L
+}
+  case ROUND_CEILING =>
+if (droppedDigits > 0) {
+  longVal += 1L
+}
+  case ROUND_HALF_UP =>
+if (math.abs(droppedDigits) * 2 >= pow10diff) {
+  longVal += (if (longVal < 0) -1L else 1L)
--- End diff --

Is this correct if longVal is 0, after the division above
What happens if you round_half_up -0.6 ? Looks like you may get +1 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15154: [SPARK-17494] [SQL] changePrecision() on compact ...

2016-09-21 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15154#discussion_r79885018
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala ---
@@ -191,4 +192,16 @@ class DecimalSuite extends SparkFunSuite with 
PrivateMethodTester {
 assert(new Decimal().set(100L, 10, 0).toUnscaledLong === 100L)
 assert(Decimal(Long.MaxValue, 100, 0).toUnscaledLong === Long.MaxValue)
   }
+
+  test("changePrecision() on compact decimal should respect rounding 
mode") {
+Seq(ROUND_FLOOR, ROUND_CEILING, ROUND_HALF_UP, 
ROUND_HALF_EVEN).foreach { mode =>
+  Seq("1.0", "1.1", "1.6", "2.5", "5.5", "-1.0", "-1.1", "-1.6", 
"-2.5", "-5.5").foreach { n =>
--- End diff --

tests of the form 0.5, -0.5, 0.6 etc. might also be useful


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15043: [SPARK-17491] Close serialization stream to fix w...

2016-09-15 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15043#discussion_r79105677
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/PartiallyUnrolledIteratorSuite.scala
 ---
@@ -33,7 +33,7 @@ class PartiallyUnrolledIteratorSuite extends 
SparkFunSuite with MockitoSugar {
 val rest = (unrollSize until restSize + unrollSize).iterator
 
 val memoryStore = mock[MemoryStore]
-val joinIterator = new PartiallyUnrolledIterator(memoryStore, 
unrollSize, unroll, rest)
+val joinIterator = new PartiallyUnrolledIterator(memoryStore, ON_HEAP, 
unrollSize, unroll, rest)
--- End diff --

We should look into trying to test this with OFF_HEAP as well. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15043: [SPARK-17491] Close serialization stream to fix w...

2016-09-15 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15043#discussion_r79105418
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+
+import scala.reflect.ClassTag
+
+import org.mockito.Mockito
+import org.mockito.Mockito.atLeastOnce
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
+
+import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext, 
TaskContextImpl}
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.serializer.{JavaSerializer, SerializationStream, 
SerializerManager}
+import org.apache.spark.storage.memory.{MemoryStore, 
PartiallySerializedBlock, RedirectableOutputStream}
+import org.apache.spark.util.{ByteBufferInputStream, 
ByteBufferOutputStream}
+import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
+
+class PartiallySerializedBlockSuite
+extends SparkFunSuite
+with BeforeAndAfterEach
+with PrivateMethodTester {
+
+  private val blockId = new TestBlockId("test")
+  private val conf = new SparkConf()
+  private val memoryStore = Mockito.mock(classOf[MemoryStore], 
Mockito.RETURNS_SMART_NULLS)
+  private val serializerManager = new SerializerManager(new 
JavaSerializer(conf), conf)
+
+  private val getSerializationStream = 
PrivateMethod[SerializationStream]('serializationStream)
+  private val getRedirectableOutputStream =
+PrivateMethod[RedirectableOutputStream]('redirectableOutputStream)
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+Mockito.reset(memoryStore)
+  }
+
+  private def partiallyUnroll[T: ClassTag](
+  iter: Iterator[T],
+  numItemsToBuffer: Int): PartiallySerializedBlock[T] = {
+
+val bbos: ChunkedByteBufferOutputStream = {
+  val spy = Mockito.spy(new ChunkedByteBufferOutputStream(128, 
ByteBuffer.allocate))
+  Mockito.doAnswer(new Answer[ChunkedByteBuffer] {
+override def answer(invocationOnMock: InvocationOnMock): 
ChunkedByteBuffer = {
+  
Mockito.spy(invocationOnMock.callRealMethod().asInstanceOf[ChunkedByteBuffer])
+}
+  }).when(spy).toChunkedByteBuffer
+  spy
+}
+
+val serializer = 
serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
+val redirectableOutputStream = Mockito.spy(new 
RedirectableOutputStream)
+redirectableOutputStream.setOutputStream(bbos)
+val serializationStream = 
Mockito.spy(serializer.serializeStream(redirectableOutputStream))
+
+(1 to numItemsToBuffer).foreach { _ =>
+  assert(iter.hasNext)
+  serializationStream.writeObject[T](iter.next())
+}
+
+val unrollMemory = bbos.size
+new PartiallySerializedBlock[T](
+  memoryStore,
+  serializerManager,
+  blockId,
+  serializationStream = serializationStream,
+  redirectableOutputStream,
+  unrollMemory = unrollMemory,
+  memoryMode = MemoryMode.ON_HEAP,
+  bbos,
+  rest = iter,
+  classTag = implicitly[ClassTag[T]])
+  }
+
+  test("valuesIterator() and finishWritingToStream() cannot be called 
after discard() is called") {
+val partiallySerializedBlock = partiallyUnroll((1 to 10).iterator, 2)
+partiallySerializedBlock.discard()
+intercept[IllegalStateException] {
+  partiallySerializedBlock.finishWritingToStream(null)
+}
+intercept[IllegalStateException] {
+  partiallySerializedBlock.valuesIterator
+}
+  }
+
+  test("discard() can be called more than once"

[GitHub] spark pull request #15043: [SPARK-17491] Close serialization stream to fix w...

2016-09-15 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15043#discussion_r79042389
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -751,23 +757,43 @@ private[storage] class PartiallySerializedBlock[T](
 taskContext.addTaskCompletionListener { _ =>
   // When a task completes, its unroll memory will automatically be 
freed. Thus we do not call
   // releaseUnrollMemoryForThisTask() here because we want to avoid 
double-freeing.
-  unrolled.dispose()
+  unrolledBuffer.dispose()
+}
+  }
+
+  // Exposed for testing
+  private[storage] def getUnrolledChunkedByteBuffer: ChunkedByteBuffer = 
unrolledBuffer
+
+  private[this] var discarded = false
+  private[this] var consumed = false
+
+  private def verifyNotConsumedAndNotDiscarded(): Unit = {
+if (consumed) {
+  throw new IllegalStateException(
+"Can only call one of finishWritingToStream() or valuesIterator() 
and can only call once.")
+}
+if (discarded) {
+  throw new IllegalStateException("Cannot call methods on a discarded 
PartiallySerializedBlock")
 }
+consumed = true
--- End diff --

Hmm, why set consumed = true here ? What's the problem with calling 
verifyNotConsumedAndNotDiscarded() twice ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15043: [SPARK-17491] Close serialization stream to fix w...

2016-09-15 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15043#discussion_r79061246
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -794,13 +821,17 @@ private[storage] class PartiallySerializedBlock[T](
* `close()` on it to free its resources.
*/
   def valuesIterator: PartiallyUnrolledIterator[T] = {
+verifyNotConsumedAndNotDiscarded()
+// Close the serialization stream so that the serializer's internal 
buffers are freed and any
+// "end-of-stream" markers can be written out so that `unrolled` is a 
valid serialized stream.
+serializationStream.close()
 // `unrolled`'s underlying buffers will be freed once this input 
stream is fully read:
 val unrolledIter = serializerManager.dataDeserializeStream(
-  blockId, unrolled.toInputStream(dispose = true))(classTag)
+  blockId, unrolledBuffer.toInputStream(dispose = true))(classTag)
 new PartiallyUnrolledIterator(
   memoryStore,
   unrollMemory,
-  unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, 
discard()),
+  unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, 
unrolledBuffer.dispose()),
--- End diff --

Why the change from discard to dispose() ? You've made discard idempotent, 
right ? Does the caller have to manually release memory after the iterator is 
consumed ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15043: [SPARK-17491] Close serialization stream to fix w...

2016-09-15 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15043#discussion_r79060355
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+
+import scala.reflect.ClassTag
+
+import org.mockito.Mockito
+import org.mockito.Mockito.atLeastOnce
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
+
+import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext, 
TaskContextImpl}
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.serializer.{JavaSerializer, SerializationStream, 
SerializerManager}
+import org.apache.spark.storage.memory.{MemoryStore, 
PartiallySerializedBlock, RedirectableOutputStream}
+import org.apache.spark.util.{ByteBufferInputStream, 
ByteBufferOutputStream}
+import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
+
+class PartiallySerializedBlockSuite
+extends SparkFunSuite
+with BeforeAndAfterEach
+with PrivateMethodTester {
+
+  private val blockId = new TestBlockId("test")
+  private val conf = new SparkConf()
+  private val memoryStore = Mockito.mock(classOf[MemoryStore], 
Mockito.RETURNS_SMART_NULLS)
+  private val serializerManager = new SerializerManager(new 
JavaSerializer(conf), conf)
+
+  private val getSerializationStream = 
PrivateMethod[SerializationStream]('serializationStream)
+  private val getRedirectableOutputStream =
+PrivateMethod[RedirectableOutputStream]('redirectableOutputStream)
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+Mockito.reset(memoryStore)
+  }
+
+  private def partiallyUnroll[T: ClassTag](
+  iter: Iterator[T],
+  numItemsToBuffer: Int): PartiallySerializedBlock[T] = {
+
+val bbos: ChunkedByteBufferOutputStream = {
+  val spy = Mockito.spy(new ChunkedByteBufferOutputStream(128, 
ByteBuffer.allocate))
+  Mockito.doAnswer(new Answer[ChunkedByteBuffer] {
+override def answer(invocationOnMock: InvocationOnMock): 
ChunkedByteBuffer = {
+  
Mockito.spy(invocationOnMock.callRealMethod().asInstanceOf[ChunkedByteBuffer])
+}
+  }).when(spy).toChunkedByteBuffer
+  spy
+}
+
+val serializer = 
serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
+val redirectableOutputStream = Mockito.spy(new 
RedirectableOutputStream)
+redirectableOutputStream.setOutputStream(bbos)
+val serializationStream = 
Mockito.spy(serializer.serializeStream(redirectableOutputStream))
+
+(1 to numItemsToBuffer).foreach { _ =>
+  assert(iter.hasNext)
+  serializationStream.writeObject[T](iter.next())
+}
+
+val unrollMemory = bbos.size
+new PartiallySerializedBlock[T](
+  memoryStore,
+  serializerManager,
+  blockId,
+  serializationStream = serializationStream,
+  redirectableOutputStream,
+  unrollMemory = unrollMemory,
+  memoryMode = MemoryMode.ON_HEAP,
--- End diff --

WHat happens if the memory mode is off_heap. Is that relevant ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15043: [SPARK-17491] Close serialization stream to fix w...

2016-09-15 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15043#discussion_r79063493
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+
+import scala.reflect.ClassTag
+
+import org.mockito.Mockito
+import org.mockito.Mockito.atLeastOnce
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
+
+import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext, 
TaskContextImpl}
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.serializer.{JavaSerializer, SerializationStream, 
SerializerManager}
+import org.apache.spark.storage.memory.{MemoryStore, 
PartiallySerializedBlock, RedirectableOutputStream}
+import org.apache.spark.util.{ByteBufferInputStream, 
ByteBufferOutputStream}
+import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
+
+class PartiallySerializedBlockSuite
+extends SparkFunSuite
+with BeforeAndAfterEach
+with PrivateMethodTester {
+
+  private val blockId = new TestBlockId("test")
+  private val conf = new SparkConf()
+  private val memoryStore = Mockito.mock(classOf[MemoryStore], 
Mockito.RETURNS_SMART_NULLS)
+  private val serializerManager = new SerializerManager(new 
JavaSerializer(conf), conf)
+
+  private val getSerializationStream = 
PrivateMethod[SerializationStream]('serializationStream)
+  private val getRedirectableOutputStream =
+PrivateMethod[RedirectableOutputStream]('redirectableOutputStream)
+
+  override protected def beforeEach(): Unit = {
+super.beforeEach()
+Mockito.reset(memoryStore)
+  }
+
+  private def partiallyUnroll[T: ClassTag](
+  iter: Iterator[T],
+  numItemsToBuffer: Int): PartiallySerializedBlock[T] = {
+
+val bbos: ChunkedByteBufferOutputStream = {
+  val spy = Mockito.spy(new ChunkedByteBufferOutputStream(128, 
ByteBuffer.allocate))
+  Mockito.doAnswer(new Answer[ChunkedByteBuffer] {
+override def answer(invocationOnMock: InvocationOnMock): 
ChunkedByteBuffer = {
+  
Mockito.spy(invocationOnMock.callRealMethod().asInstanceOf[ChunkedByteBuffer])
+}
+  }).when(spy).toChunkedByteBuffer
+  spy
+}
+
+val serializer = 
serializerManager.getSerializer(implicitly[ClassTag[T]]).newInstance()
+val redirectableOutputStream = Mockito.spy(new 
RedirectableOutputStream)
+redirectableOutputStream.setOutputStream(bbos)
+val serializationStream = 
Mockito.spy(serializer.serializeStream(redirectableOutputStream))
+
+(1 to numItemsToBuffer).foreach { _ =>
+  assert(iter.hasNext)
+  serializationStream.writeObject[T](iter.next())
+}
+
+val unrollMemory = bbos.size
+new PartiallySerializedBlock[T](
+  memoryStore,
+  serializerManager,
+  blockId,
+  serializationStream = serializationStream,
+  redirectableOutputStream,
+  unrollMemory = unrollMemory,
+  memoryMode = MemoryMode.ON_HEAP,
+  bbos,
+  rest = iter,
+  classTag = implicitly[ClassTag[T]])
+  }
+
+  test("valuesIterator() and finishWritingToStream() cannot be called 
after discard() is called") {
+val partiallySerializedBlock = partiallyUnroll((1 to 10).iterator, 2)
+partiallySerializedBlock.discard()
+intercept[IllegalStateException] {
+  partiallySerializedBlock.finishWritingToStream(null)
+}
+intercept[IllegalStateException] {
+  partiallySerializedBlock.valuesIterator
+}
+  }
+
+  test("discard() can be called more than once"

[GitHub] spark pull request #15085: [SPARK-17484] Prevent invalid block locations fro...

2016-09-13 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15085#discussion_r78660620
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -857,9 +862,11 @@ private[spark] class BlockManager(
 
 val startTimeMs = System.currentTimeMillis
 var blockWasSuccessfullyStored: Boolean = false
+var exceptionWasThrown: Boolean = true
 val result: Option[T] = try {
   val res = putBody(putBlockInfo)
   blockWasSuccessfullyStored = res.isEmpty
+  exceptionWasThrown = false
   res
 } finally {
   if (blockWasSuccessfullyStored) {
--- End diff --

Wouldn't blockWasSuccessfullyStored be false if exceptionWasThrown were 
true ?
In that case, couldn't we write this as
try {
...
} catch (Exception e) {
  removeBlock
  addUpdatedBlock
} finally {
 // WHatever was there before ?
}


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15043: [SPARK-17491] Close serialization stream to fix w...

2016-09-13 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15043#discussion_r78645978
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -782,6 +785,9 @@ private[storage] class PartiallySerializedBlock[T](
* `close()` on it to free its resources.
*/
   def valuesIterator: PartiallyUnrolledIterator[T] = {
+// Close the serialization stream so that the serializer's internal 
buffers are freed and any
+// "end-of-stream" markers can be written out so that `unrolled` is a 
valid serialized stream.
+serializationStream.close()
--- End diff --

It seems like 'unrolled' may basically be invalid until serializationStream 
is called.

But it looks like valuesIterator is not the only place where unrolled is 
used


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15036: [SPARK-17483] Refactoring in BlockManager status reporti...

2016-09-09 Thread srinathshankar
Github user srinathshankar commented on the issue:

https://github.com/apache/spark/pull/15036
  
One minor additional comments, but the rest LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15036: [SPARK-17483] Refactoring in BlockManager status ...

2016-09-09 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15036#discussion_r78263993
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1316,21 +1309,27 @@ private[spark] class BlockManager(
 // The block has already been removed; do nothing.
 logWarning(s"Asked to remove block $blockId, which does not exist")
   case Some(info) =>
-// Removals are idempotent in disk store and memory store. At 
worst, we get a warning.
-val removedFromMemory = memoryStore.remove(blockId)
-val removedFromDisk = diskStore.remove(blockId)
-if (!removedFromMemory && !removedFromDisk) {
-  logWarning(s"Block $blockId could not be removed as it was not 
found in either " +
-"the disk, memory, or external block store")
-}
-blockInfoManager.removeBlock(blockId)
-val removeBlockStatus = getCurrentBlockStatus(blockId, info)
-if (tellMaster && info.tellMaster) {
-  reportBlockStatus(blockId, info, removeBlockStatus)
-}
-Option(TaskContext.get()).foreach { c =>
-  c.taskMetrics().incUpdatedBlockStatuses(blockId -> 
removeBlockStatus)
-}
+removeBlockInternal(blockId, info, tellMaster)
+}
+  }
+
+  /**
+   * Internal version of [[removeBlock()]] which assumes that the caller 
already holds a write
+   * lock on the block.
+   */
+  private def removeBlockInternal(blockId: BlockId, info: BlockInfo, 
tellMaster: Boolean): Unit = {
+// Removals are idempotent in disk store and memory store. At worst, 
we get a warning.
+val removedFromMemory = memoryStore.remove(blockId)
+val removedFromDisk = diskStore.remove(blockId)
+if (!removedFromMemory && !removedFromDisk) {
+  logWarning(s"Block $blockId could not be removed as it was not found 
on disk or in memory")
+}
+blockInfoManager.removeBlock(blockId)
+if (tellMaster && info.tellMaster) {
--- End diff --

Could we just pass in tellMaster && info.tellMaster into one parameter and 
get rid of the info: BlockInfo parameter ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15036: [SPARK-17483] Refactoring in BlockManager status ...

2016-09-09 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15036#discussion_r78263751
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -807,8 +801,8 @@ private[spark] class BlockManager(
 // Now that the block is in either the memory or disk store,
 // tell the master about it.
 info.size = size
-if (tellMaster) {
-  reportBlockStatus(blockId, info, putBlockStatus)
+if (tellMaster && info.tellMaster) {
--- End diff --

Ok, sounds like this is not trivial to fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15037: [SPARK-17485] Prevent failed remote reads of cached bloc...

2016-09-09 Thread srinathshankar
Github user srinathshankar commented on the issue:

https://github.com/apache/spark/pull/15037
  
How does the refreshing of locations here : 
https://github.com/apache/spark/pull/15037/files?diff=unified#diff-2b643ea78c1add0381754b1f47eec132R581
affect maxFetchFailures. Seems possible that if the number of new locations 
is smaller than the old, we repeatedly go over the same locations. Is that the 
intent ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15037: [SPARK-17485] Prevent failed remote reads of cach...

2016-09-09 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15037#discussion_r78260239
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -565,8 +565,9 @@ private[spark] class BlockManager(
 // Give up trying anymore locations. Either we've tried all of 
the original locations,
 // or we've refreshed the list of locations from the master, 
and have still
 // hit failures after trying locations from the refreshed list.
-throw new BlockFetchException(s"Failed to fetch block after" +
-  s" ${totalFailureCount} fetch failures. Most recent failure 
cause:", e)
+logError(s"Failed to fetch block after $totalFailureCount 
fetch failures." +
--- End diff --

If a block is not found at a particular location, I take it that's a 
non-fatal error ? In that case, should we really be logging "error" if the 
block is not found ? Or do we expect the block to be found *somewhere* remotely 
during the normal course of execution ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15036: [SPARK-17483] Minor refactoring in BlockManager s...

2016-09-09 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15036#discussion_r78257052
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1316,21 +1310,28 @@ private[spark] class BlockManager(
 // The block has already been removed; do nothing.
 logWarning(s"Asked to remove block $blockId, which does not exist")
   case Some(info) =>
-// Removals are idempotent in disk store and memory store. At 
worst, we get a warning.
-val removedFromMemory = memoryStore.remove(blockId)
-val removedFromDisk = diskStore.remove(blockId)
-if (!removedFromMemory && !removedFromDisk) {
-  logWarning(s"Block $blockId could not be removed as it was not 
found in either " +
-"the disk, memory, or external block store")
-}
-blockInfoManager.removeBlock(blockId)
-val removeBlockStatus = getCurrentBlockStatus(blockId, info)
-if (tellMaster && info.tellMaster) {
-  reportBlockStatus(blockId, info, removeBlockStatus)
-}
-Option(TaskContext.get()).foreach { c =>
-  c.taskMetrics().incUpdatedBlockStatuses(blockId -> 
removeBlockStatus)
-}
+removeBlockInternal(blockId, info, tellMaster)
+}
+  }
+
+  /**
+   * Internal version of [[removeBlock()]] which assumes that the caller 
already holds a write
+   * lock on the block.
+   */
+  private def removeBlockInternal(blockId: BlockId, info: BlockInfo, 
tellMaster: Boolean): Unit = {
+// Removals are idempotent in disk store and memory store. At worst, 
we get a warning.
+val removedFromMemory = memoryStore.remove(blockId)
+val removedFromDisk = diskStore.remove(blockId)
+if (!removedFromMemory && !removedFromDisk) {
+  logWarning(s"Block $blockId could not be removed as it was not found 
in either " +
--- End diff --

Hmm, you're actually only removing from memory and disk, but the warning 
mentions an external block store.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15036: [SPARK-17483] Minor refactoring in BlockManager s...

2016-09-09 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/15036#discussion_r78256795
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -807,8 +801,8 @@ private[spark] class BlockManager(
 // Now that the block is in either the memory or disk store,
 // tell the master about it.
 info.size = size
-if (tellMaster) {
-  reportBlockStatus(blockId, info, putBlockStatus)
+if (tellMaster && info.tellMaster) {
--- End diff --

This is a bit confusing. What is the difference between tellMaster and 
info.tellMaster ? For instance, why isn't this tellMaster || info.tellMaster. 
(I understand that && is the current behavior, but I'm not sure why.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14912: [SPARK-17357][SQL] Fix current predicate pushdown

2016-09-06 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14912#discussion_r77748668
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -171,6 +172,27 @@ class FilterPushdownSuite extends PlanTest {
 comparePlans(optimized, correctAnswer)
   }
 
+  test("push down filters that are combined") {
+// The following predicate ('a === 2 || 'a === 3) && ('c > 10 || 'a 
=== 2)
+// will be simplified as ('a == 2) || ('c > 10 && 'a == 3).
+// ('a === 2 || 'a === 3) can be pushed down. But the simplified one 
can't.
--- End diff --

I agree with you that we should respect the interaction between 
CombineFilters, PushDownPredicates and other rules. I do think it's important 
that cnf conversion run before any of the push-down / reordering rules. And the 
simplification rules should run afterwards. 
My concern with rolling this into CombineFilters is that it doesn't get 
triggered unless there are adjoining Filter nodes. In the example you have:
val originalQuery = testRelation

  .select('a, 'b, ('c + 1) as 'cc)  

  .groupBy('a)('a, count('cc) as 'c)

  .where('c > 10)   

  .where(('a === 2) || ('c > 10 && 'a === 3))

I think that (a == 2 || a==3) should get pushed down even if you don't have 
".where (c > 10)",
but I'm not sure that it will be since toCNF is in CombineFilters. Could 
you confirm ?
My suggestion is that toCNF warrants a separate rule -- for example when 
you're doing joins, and you have
select * from A inner join C on (A.a1 = C.c1) where A.a2 = 2 || (C.c2 = 10 
&& A.a2 = 3),
you want (A.a2 = 2 || A.a2 = 3) pushed down into A


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14867: [SPARK-17296][SQL] Simplify parser join processin...

2016-09-06 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14867#discussion_r77675078
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 ---
@@ -360,10 +360,54 @@ class PlanParserSuite extends PlanTest {
 test("left anti join", LeftAnti, testExistence)
 test("anti join", LeftAnti, testExistence)
 
+// Test natural cross join
+intercept("select * from a natural cross join b")
+
+// Test natural join with a condition
+intercept("select * from a natural join b on a.id = b.id")
+
 // Test multiple consecutive joins
 assertEqual(
   "select * from a join b join c right join d",
   table("a").join(table("b")).join(table("c")).join(table("d"), 
RightOuter).select(star()))
+
+// SPARK-17296
+assertEqual(
+  "select * from t1 cross join t2 join t3 on t3.id = t1.id join t4 on 
t4.id = t1.id",
+  table("t1")
+.join(table("t2"), Cross)
+.join(table("t3"), Inner, Option(Symbol("t3.id") === 
Symbol("t1.id")))
+.join(table("t4"), Inner, Option(Symbol("t4.id") === 
Symbol("t1.id")))
+.select(star()))
+
+// Test multiple on clauses.
+intercept("select * from t1 inner join t2 inner join t3 on col3 = col2 
on col3 = col1")
+
+// Parenthesis
+assertEqual(
+  "select * from t1 inner join (t2 inner join t3 on col3 = col2) on 
col3 = col1",
+  table("t1")
+.join(table("t2")
+  .join(table("t3"), Inner, Option('col3 === 'col2)), Inner, 
Option('col3 === 'col1))
+.select(star()))
+assertEqual(
+  "select * from t1 inner join (t2 inner join t3) on col3 = col2",
+  table("t1")
+.join(table("t2").join(table("t3"), Inner, None), Inner, 
Option('col3 === 'col2))
+.select(star()))
+assertEqual(
+  "select * from t1 inner join (t2 inner join t3 on col3 = col2)",
+  table("t1")
+.join(table("t2").join(table("t3"), Inner, Option('col3 === 
'col2)), Inner, None)
+.select(star()))
+
+// Implicit joins.
+assertEqual(
--- End diff --

Great, LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14897: [SPARK-17338][SQL] add global temp view

2016-09-02 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14897#discussion_r77424940
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala ---
@@ -222,10 +245,19 @@ case class AlterViewAsCommand(
 qe.assertAnalyzed()
 val analyzedPlan = qe.analyzed
 
-if (session.sessionState.catalog.isTemporaryTable(name)) {
-  session.sessionState.catalog.createTempView(name.table, 
analyzedPlan, overrideIfExists = true)
-} else {
+if (name.database.isDefined) {
   alterPermanentView(session, analyzedPlan)
+} else if (session.sessionState.catalog.isTemporaryTable(name)) {
--- End diff --

Shouldn't we check the current database for the view name ? Or do we assume 
that database is resolved into the tableidentifier ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14897: [SPARK-17338][SQL] add global temp view

2016-09-02 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14897#discussion_r77424851
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala ---
@@ -222,10 +245,19 @@ case class AlterViewAsCommand(
 qe.assertAnalyzed()
 val analyzedPlan = qe.analyzed
 
-if (session.sessionState.catalog.isTemporaryTable(name)) {
-  session.sessionState.catalog.createTempView(name.table, 
analyzedPlan, overrideIfExists = true)
-} else {
+if (name.database.isDefined) {
   alterPermanentView(session, analyzedPlan)
+} else if (session.sessionState.catalog.isTemporaryTable(name)) {
+  session.sessionState.catalog.createTempView(
--- End diff --

Not part of this patch, but why is it ok for an alter view to work on a 
table ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14867: [SPARK-17296][SQL] Simplify parser join processin...

2016-09-02 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14867#discussion_r77421576
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 ---
@@ -360,10 +360,28 @@ class PlanParserSuite extends PlanTest {
 test("left anti join", LeftAnti, testExistence)
 test("anti join", LeftAnti, testExistence)
 
+// Test natural cross join
+intercept("select * from a natural cross join b")
+
+// Test natural join with a condition
+intercept("select * from a natural join b on a.id = b.id")
+
 // Test multiple consecutive joins
 assertEqual(
   "select * from a join b join c right join d",
   table("a").join(table("b")).join(table("c")).join(table("d"), 
RightOuter).select(star()))
+
+// SPARK-17296
+assertEqual(
+  "select * from t1 cross join t2 join t3 on t3.id = t1.id join t4 on 
t4.id = t1.id",
+  table("t1")
+.join(table("t2"))
+.join(table("t3"), Inner, Option(Symbol("t3.id") === 
Symbol("t1.id")))
+.join(table("t4"), Inner, Option(Symbol("t4.id") === 
Symbol("t1.id")))
+.select(star()))
+
+// Test multiple on clauses.
+intercept("select * from t1 inner join t2 inner join t3 on col3 = col2 
on col3 = col1")
--- End diff --

As discussed, let's also add a test somewhere for
SELECT * FROM T1 INNER JOIN (T2 INNER JOIN T3 ON col3 = col2) ON col3 = col1
SELECT * FROM T1 INNER JOIN (T2 INNER JOIN T3) ON col3 = col2
SELECT * FROM T1 INNER JOIN (T2 INNER JOIN T3 ON col3 = col2)

This looks good to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14867: [SPARK-17296][SQL] Simplify parser join processin...

2016-09-02 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14867#discussion_r77418488
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 ---
@@ -360,10 +360,25 @@ class PlanParserSuite extends PlanTest {
 test("left anti join", LeftAnti, testExistence)
 test("anti join", LeftAnti, testExistence)
 
+// Test natural cross join
+intercept("select * from a natural cross join b")
+
+// Test natural join with a condition
+intercept("select * from a natural join b on a.id = b.id")
+
 // Test multiple consecutive joins
 assertEqual(
   "select * from a join b join c right join d",
   table("a").join(table("b")).join(table("c")).join(table("d"), 
RightOuter).select(star()))
+
+// SPARK-17296
+assertEqual(
+  "select * from t1 cross join t2 join t3 on t3.id = t1.id join t4 on 
t4.id = t1.id",
--- End diff --

To clarify, it looks like your patch will disallow both queries at the 
parser level. Could you add a test that enforces this ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14797: [SPARK-17230] [SQL] Should not pass optimized query into...

2016-09-02 Thread srinathshankar
Github user srinathshankar commented on the issue:

https://github.com/apache/spark/pull/14797
  
Looks fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14867: [SPARK-17296][SQL] Simplify parser join processin...

2016-09-02 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14867#discussion_r77417316
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 ---
@@ -360,10 +360,25 @@ class PlanParserSuite extends PlanTest {
 test("left anti join", LeftAnti, testExistence)
 test("anti join", LeftAnti, testExistence)
 
+// Test natural cross join
+intercept("select * from a natural cross join b")
+
+// Test natural join with a condition
+intercept("select * from a natural join b on a.id = b.id")
+
 // Test multiple consecutive joins
 assertEqual(
   "select * from a join b join c right join d",
   table("a").join(table("b")).join(table("c")).join(table("d"), 
RightOuter).select(star()))
+
+// SPARK-17296
+assertEqual(
+  "select * from t1 cross join t2 join t3 on t3.id = t1.id join t4 on 
t4.id = t1.id",
--- End diff --

How is something like 
SELECT * FROM T1 INNER JOIN T2 INNER JOIN T3 ON col3 = col2 ON col3 = col1;
supposed to parse ? 
Without your change it returns the following error:
org.apache.spark.sql.AnalysisException: cannot resolve '`col3`' given input 
columns: [col1, col2]; line 1 pos 63
which I don't understand. The following parses though:
SELECT * FROM T1 INNER JOIN T2 INNER JOIN T3 ON col1 = col2 ON col2 = col1
and returns a result


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14866: [SPARK-17298][SQL] Require explicit CROSS join for carte...

2016-09-02 Thread srinathshankar
Github user srinathshankar commented on the issue:

https://github.com/apache/spark/pull/14866
  
I'll update the python and R APIs in a follow up. Right now in python and R 
a cross join is done if no join exprs/columns and join types are specified. It 
would be good to require explicit cross joins in these apis as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14912: [SPARK-17357][SQL] Simplified predicates should b...

2016-09-02 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14912#discussion_r77383932
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -171,6 +172,27 @@ class FilterPushdownSuite extends PlanTest {
 comparePlans(optimized, correctAnswer)
   }
 
+  test("push down filters that are combined") {
+// The following predicate ('a === 2 || 'a === 3) && ('c > 10 || 'a 
=== 2)
+// will be simplified as ('a == 2) || ('c > 10 && 'a == 3).
+// ('a === 2 || 'a === 3) can be pushed down. But the simplified one 
can't.
--- End diff --

So what happens if I just have the predicate
(a = 2) || (c > 10 && a = 3)
Will anything will be pushed down ? Have you considered instead modifying 
the boolean simplification logic. 
Another approach that will catch these cases is as follows:
1.a Convert filters to conjunctive normal form
1.b combine filters
1.c Push filters
1.a, b and c will be run in a batch until fixed point. 
Follow this batch by BooleanSimplification -- this can find and extract 
common factors for efficiency.
Overall, cnf may maximize the potential for filter push down



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-09-02 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r77380304
  
--- Diff: R/pkg/R/DataFrame.R ---
@@ -2265,7 +2265,7 @@ setMethod("join",
   signature(x = "SparkDataFrame", y = "SparkDataFrame"),
   function(x, y, joinExpr = NULL, joinType = NULL) {
 if (is.null(joinExpr)) {
-  sdf <- callJMethod(x@sdf, "join", y@sdf)
+  sdf <- callJMethod(x@sdf, "crossJoin", y@sdf)
 } else {
   if (class(joinExpr) != "Column") stop("joinExpr must be a 
Column")
   if (is.null(joinType)) {
--- End diff --

I've added cross to allowable join types. With a joinExpr I think inner is 
appropriate


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14797: [SPARK-17230] [SQL] Should not pass optimized que...

2016-09-01 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14797#discussion_r77280523
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -479,13 +480,23 @@ case class DataSource(
   }
 }
 
+// SPARK-17230: Resolve the partition columns so 
InsertIntoHadoopFsRelationCommand does
+// not need to have the query as child, to avoid to analyze an 
optimized query,
+// because InsertIntoHadoopFsRelationCommand will be optimized 
first.
+val columns = partitionColumns.map { name =>
--- End diff --

Can this be done before 
val partitionSchema = ...
above ? Looks like we're trying to analyze the columns there as well...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14907: [SPARK-17351] Refactor JDBCRDD to expose ResultSe...

2016-09-01 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14907#discussion_r77278500
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -154,6 +163,297 @@ object JdbcUtils extends Logging {
   throw new IllegalArgumentException(s"Can't get JDBC type for 
${dt.simpleString}"))
   }
 
+  /**
+   * Maps a JDBC type to a Catalyst type.  This function is called only 
when
+   * the JdbcDialect class corresponding to your database driver returns 
null.
+   *
+   * @param sqlType - A field of java.sql.Types
+   * @return The Catalyst type corresponding to sqlType.
+   */
+  private def getCatalystType(
+  sqlType: Int,
+  precision: Int,
+  scale: Int,
+  signed: Boolean): DataType = {
+val answer = sqlType match {
+  // scalastyle:off
+  case java.sql.Types.ARRAY => null
+  case java.sql.Types.BIGINT=> if (signed) { LongType } else { 
DecimalType(20,0) }
+  case java.sql.Types.BINARY=> BinaryType
+  case java.sql.Types.BIT   => BooleanType // @see JdbcDialect 
for quirks
+  case java.sql.Types.BLOB  => BinaryType
+  case java.sql.Types.BOOLEAN   => BooleanType
+  case java.sql.Types.CHAR  => StringType
+  case java.sql.Types.CLOB  => StringType
+  case java.sql.Types.DATALINK  => null
+  case java.sql.Types.DATE  => DateType
+  case java.sql.Types.DECIMAL
+if precision != 0 || scale != 0 => DecimalType.bounded(precision, 
scale)
+  case java.sql.Types.DECIMAL   => DecimalType.SYSTEM_DEFAULT
+  case java.sql.Types.DISTINCT  => null
+  case java.sql.Types.DOUBLE=> DoubleType
+  case java.sql.Types.FLOAT => FloatType
+  case java.sql.Types.INTEGER   => if (signed) { IntegerType } 
else { LongType }
+  case java.sql.Types.JAVA_OBJECT   => null
+  case java.sql.Types.LONGNVARCHAR  => StringType
+  case java.sql.Types.LONGVARBINARY => BinaryType
+  case java.sql.Types.LONGVARCHAR   => StringType
+  case java.sql.Types.NCHAR => StringType
+  case java.sql.Types.NCLOB => StringType
+  case java.sql.Types.NULL  => null
+  case java.sql.Types.NUMERIC
+if precision != 0 || scale != 0 => DecimalType.bounded(precision, 
scale)
+  case java.sql.Types.NUMERIC   => DecimalType.SYSTEM_DEFAULT
+  case java.sql.Types.NVARCHAR  => StringType
+  case java.sql.Types.OTHER => null
+  case java.sql.Types.REAL  => DoubleType
+  case java.sql.Types.REF   => StringType
+  case java.sql.Types.ROWID => LongType
+  case java.sql.Types.SMALLINT  => IntegerType
+  case java.sql.Types.SQLXML=> StringType
+  case java.sql.Types.STRUCT=> StringType
+  case java.sql.Types.TIME  => TimestampType
+  case java.sql.Types.TIMESTAMP => TimestampType
+  case java.sql.Types.TINYINT   => IntegerType
+  case java.sql.Types.VARBINARY => BinaryType
+  case java.sql.Types.VARCHAR   => StringType
+  case _=> null
+  // scalastyle:on
+}
+
+if (answer == null) throw new SQLException("Unsupported type " + 
sqlType)
+answer
+  }
+
+  /**
+   * Takes a [[ResultSet]] and returns its Catalyst schema.
+   *
+   * @return A [[StructType]] giving the Catalyst schema.
+   * @throws SQLException if the schema contains an unsupported type.
+   */
+  def getSchema(resultSet: ResultSet, dialect: JdbcDialect): StructType = {
+val rsmd = resultSet.getMetaData
+val ncols = rsmd.getColumnCount
+val fields = new Array[StructField](ncols)
+var i = 0
+while (i < ncols) {
+  val columnName = rsmd.getColumnLabel(i + 1)
+  val dataType = rsmd.getColumnType(i + 1)
+  val typeName = rsmd.getColumnTypeName(i + 1)
+  val fieldSize = rsmd.getPrecision(i + 1)
+  val fieldScale = rsmd.getScale(i + 1)
+  val isSigned = {
+try {
+  rsmd.isSigned(i + 1)
+} catch {
+  // Workaround for HIVE-14684:
+  case e: SQLException if
+  e.getMessage == "Method not supported" &&
+rsmd.getClass.getName == 
"org.apache.hive.jdbc.HiveResultSetMetaData" => true
+}
+  }
+  val nullable = rsm

[GitHub] spark pull request #14907: [SPARK-17351] Refactor JDBCRDD to expose ResultSe...

2016-09-01 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14907#discussion_r77277372
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 ---
@@ -154,6 +163,297 @@ object JdbcUtils extends Logging {
   throw new IllegalArgumentException(s"Can't get JDBC type for 
${dt.simpleString}"))
   }
 
+  /**
+   * Maps a JDBC type to a Catalyst type.  This function is called only 
when
+   * the JdbcDialect class corresponding to your database driver returns 
null.
+   *
+   * @param sqlType - A field of java.sql.Types
+   * @return The Catalyst type corresponding to sqlType.
+   */
+  private def getCatalystType(
+  sqlType: Int,
+  precision: Int,
+  scale: Int,
+  signed: Boolean): DataType = {
+val answer = sqlType match {
+  // scalastyle:off
+  case java.sql.Types.ARRAY => null
+  case java.sql.Types.BIGINT=> if (signed) { LongType } else { 
DecimalType(20,0) }
+  case java.sql.Types.BINARY=> BinaryType
+  case java.sql.Types.BIT   => BooleanType // @see JdbcDialect 
for quirks
+  case java.sql.Types.BLOB  => BinaryType
+  case java.sql.Types.BOOLEAN   => BooleanType
+  case java.sql.Types.CHAR  => StringType
+  case java.sql.Types.CLOB  => StringType
+  case java.sql.Types.DATALINK  => null
+  case java.sql.Types.DATE  => DateType
+  case java.sql.Types.DECIMAL
+if precision != 0 || scale != 0 => DecimalType.bounded(precision, 
scale)
+  case java.sql.Types.DECIMAL   => DecimalType.SYSTEM_DEFAULT
+  case java.sql.Types.DISTINCT  => null
+  case java.sql.Types.DOUBLE=> DoubleType
+  case java.sql.Types.FLOAT => FloatType
+  case java.sql.Types.INTEGER   => if (signed) { IntegerType } 
else { LongType }
+  case java.sql.Types.JAVA_OBJECT   => null
+  case java.sql.Types.LONGNVARCHAR  => StringType
+  case java.sql.Types.LONGVARBINARY => BinaryType
+  case java.sql.Types.LONGVARCHAR   => StringType
+  case java.sql.Types.NCHAR => StringType
+  case java.sql.Types.NCLOB => StringType
+  case java.sql.Types.NULL  => null
+  case java.sql.Types.NUMERIC
+if precision != 0 || scale != 0 => DecimalType.bounded(precision, 
scale)
+  case java.sql.Types.NUMERIC   => DecimalType.SYSTEM_DEFAULT
+  case java.sql.Types.NVARCHAR  => StringType
+  case java.sql.Types.OTHER => null
+  case java.sql.Types.REAL  => DoubleType
+  case java.sql.Types.REF   => StringType
+  case java.sql.Types.ROWID => LongType
+  case java.sql.Types.SMALLINT  => IntegerType
+  case java.sql.Types.SQLXML=> StringType
+  case java.sql.Types.STRUCT=> StringType
+  case java.sql.Types.TIME  => TimestampType
+  case java.sql.Types.TIMESTAMP => TimestampType
+  case java.sql.Types.TINYINT   => IntegerType
+  case java.sql.Types.VARBINARY => BinaryType
+  case java.sql.Types.VARCHAR   => StringType
+  case _=> null
+  // scalastyle:on
+}
+
+if (answer == null) throw new SQLException("Unsupported type " + 
sqlType)
+answer
+  }
+
+  /**
+   * Takes a [[ResultSet]] and returns its Catalyst schema.
+   *
+   * @return A [[StructType]] giving the Catalyst schema.
+   * @throws SQLException if the schema contains an unsupported type.
+   */
+  def getSchema(resultSet: ResultSet, dialect: JdbcDialect): StructType = {
+val rsmd = resultSet.getMetaData
+val ncols = rsmd.getColumnCount
+val fields = new Array[StructField](ncols)
+var i = 0
+while (i < ncols) {
+  val columnName = rsmd.getColumnLabel(i + 1)
+  val dataType = rsmd.getColumnType(i + 1)
+  val typeName = rsmd.getColumnTypeName(i + 1)
+  val fieldSize = rsmd.getPrecision(i + 1)
+  val fieldScale = rsmd.getScale(i + 1)
+  val isSigned = {
+try {
+  rsmd.isSigned(i + 1)
+} catch {
+  // Workaround for HIVE-14684:
+  case e: SQLException if
+  e.getMessage == "Method not supported" &&
+rsmd.getClass.getName == 
"org.apache.hive.jdbc.HiveResultSetMetaData" => true
+}
+  }
+  val nullable = rsm

[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-09-01 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r77219760
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -933,6 +936,47 @@ object CombineLimits extends Rule[LogicalPlan] {
 }
 
 /**
+ * Check if there any cartesian products between joins of any type in the 
optimized plan tree.
+ * Throw an error if a cartesian product is found without an explicit 
cross join specified.
+ * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is 
true.
+ *
+ * This rule must be run AFTER the ReorderJoin rule since the join 
conditions for each join must be
+ * collected before checking if it is a cartesian product. If you have
+ * SELECT * from R, S where R.r = S.s,
+ * the join between R and S is not a cartesian product and therefore 
should be allowed.
+ * The predicate R.r = S.s is not recognized as a join condition until the 
ReorderJoin rule.
+ */
+case class CheckCartesianProducts(conf: CatalystConf)
--- End diff --

As discussed, while it is probably best for this to be in analysis (does 
throw an analysis exception after all), leaving it here until we want to 
reorganize the analysis/optimizer rules.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-31 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r77099268
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -39,39 +38,46 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
*
* The joined plan are picked from left to right, prefer those has at 
least one join condition.
*
-   * @param input a list of LogicalPlans to join.
+   * @param input a list of LogicalPlans to inner join and the type of 
inner join.
* @param conditions a list of condition for join.
*/
   @tailrec
-  def createOrderedJoin(input: Seq[LogicalPlan], conditions: 
Seq[Expression]): LogicalPlan = {
+  def createOrderedJoin(input: Seq[(LogicalPlan, InnerLike)], conditions: 
Seq[Expression])
+: LogicalPlan = {
 assert(input.size >= 2)
 if (input.size == 2) {
--- End diff --

Sameer helped me interpret this comment. I *think* I've addressed this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-31 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r77098768
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala 
---
@@ -28,6 +28,7 @@ object JoinType {
 case "rightouter" | "right" => RightOuter
 case "leftsemi" => LeftSemi
 case "leftanti" => LeftAnti
+case "cross" => Cross
 case _ =>
   val supported = Seq(
--- End diff --

done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-31 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r77098708
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -933,6 +936,47 @@ object CombineLimits extends Rule[LogicalPlan] {
 }
 
 /**
+ * Check if there any cartesian products between joins of any type in the 
optimized plan tree.
+ * Throw an error if a cartesian product is found without an explicit 
cross join specified.
+ * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is 
true.
+ *
+ * This rule must be run AFTER the ReorderJoin rule since the join 
conditions for each join must be
+ * collected before checking if it is a cartesian product. If you have
+ * SELECT * from R, S where R.r = S.s,
+ * the join between R and S is not a cartesian product and therefore 
should be allowed.
+ * The predicate R.r = S.s is not recognized as a join condition until the 
ReorderJoin rule.
+ */
+case class CheckCartesianProducts(conf: CatalystConf)
--- End diff --

This is just the earliest possible place to do the check. (In fact it 
ideally belongs in analysis). Could you elaborate on triggering this in 
QueryExecution ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-31 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r77097569
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -933,6 +936,47 @@ object CombineLimits extends Rule[LogicalPlan] {
 }
 
 /**
+ * Check if there any cartesian products between joins of any type in the 
optimized plan tree.
+ * Throw an error if a cartesian product is found without an explicit 
cross join specified.
+ * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is 
true.
+ *
+ * This rule must be run AFTER the ReorderJoin rule since the join 
conditions for each join must be
+ * collected before checking if it is a cartesian product. If you have
+ * SELECT * from R, S where R.r = S.s,
+ * the join between R and S is not a cartesian product and therefore 
should be allowed.
+ * The predicate R.r = S.s is not recognized as a join condition until the 
ReorderJoin rule.
+ */
+case class CheckCartesianProducts(conf: CatalystConf)
+extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Check if a join is a cartesian product. Returns true if
+   * there are no join conditions involving references from both left and 
right.
+   */
+  def isCartesianProduct(join: Join)
+  : Boolean = {
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-31 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r77069424
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -933,6 +936,45 @@ object CombineLimits extends Rule[LogicalPlan] {
 }
 
 /**
+ * Check if there any cartesian products between joins of any type in the 
optimized plan tree.
+ * Throw an error if a cartesian product is found without an explicit 
cross join specified.
+ * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is 
true.
+ *
+ * This rule must be run AFTER the ReorderJoin rule since the join 
conditions for each join must be
+ * collected before checking if it is a cartesian product. If you have
+ * SELECT * from R, S where R.r = S.s,
+ * the join between R and S is not a cartesian product and therefore 
should be allowed.
+ * The predicate R.r = S.s is not recognized as a join condition until the 
ReorderJoin rule.
+ */
+case class CheckCartesianProducts(conf: CatalystConf)
+extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Check if a join specified by left, right and condition is a cartesian 
product. Returns true if
+   * there are no join conditions involving references from both left and 
right.
+   */
+  def isCartesianProduct(left: LogicalPlan, right: LogicalPlan, condition: 
Option[Expression])
+  : Boolean = {
+val conditions = 
condition.map(splitConjunctivePredicates).getOrElse(Nil)
+!conditions.map(_.references).exists(refs => 
refs.exists(left.outputSet.contains)
+&& refs.exists(right.outputSet.contains))
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan =
+if (conf.allowCartesianProduct) {
+  plan
+} else plan transform {
+  case j @ Join(left, right, Inner(false) | LeftOuter | RightOuter | 
FullOuter, condition)
+if isCartesianProduct(left, right, condition) =>
+  throw new AnalysisException(
+s"""Detected cartesian product for ${j.joinType.sql} join 
between logical plans
+   |${left.treeString(false).trim}
+   |and
+   |${right.treeString(false).trim}
+   |Use a CROSS JOIN to allow cartesian products between these 
relations""".stripMargin)
--- End diff --

done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-31 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r77067766
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -698,6 +698,8 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
   override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
 
   override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
+
+  override def allowCartesianProduct: Boolean = 
getConf(CROSS_JOINS_ENABLED)
--- End diff --

As discussed, I only have the crossJoinEnabled flag, which has been exposed 
in CatalystConf. Also as discussed, deprecating crossJoinEnabled and switching 
the conf flag to requireCrossJoinSyntax is a good idea, will address in a 
follow-up PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-31 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r77067849
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
 ---
@@ -353,7 +353,7 @@ case class BroadcastNestedLoopJoinExec(
 val broadcastedRelation = 
broadcast.executeBroadcast[Array[InternalRow]]()
--- End diff --

The physical plan checks have been removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-31 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r77049987
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -39,39 +38,44 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
*
* The joined plan are picked from left to right, prefer those has at 
least one join condition.
*
-   * @param input a list of LogicalPlans to join.
+   * @param input a list LogicalPlans to inner join, and a bool speciying 
if an explicit cross join
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-31 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r77049867
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala ---
@@ -303,7 +303,7 @@ package object dsl {
 
   def join(
 otherPlan: LogicalPlan,
-joinType: JoinType = Inner,
+joinType: JoinType = Inner(false),
--- End diff --

Now renamed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-31 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r77049426
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -933,6 +936,45 @@ object CombineLimits extends Rule[LogicalPlan] {
 }
 
 /**
+ * Check if there any cartesian products between joins of any type in the 
optimized plan tree.
+ * Throw an error if a cartesian product is found without an explicit 
cross join specified.
+ * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is 
true.
+ *
+ * This rule must be run AFTER the ReorderJoin rule since the join 
conditions for each join must be
+ * collected before checking if it is a cartesian product. If you have
+ * SELECT * from R, S where R.r = S.s,
+ * the join between R and S is not a cartesian product and therefore 
should be allowed.
+ * The predicate R.r = S.s is not recognized as a join condition until the 
ReorderJoin rule.
+ */
+case class CheckCartesianProducts(conf: CatalystConf)
+extends Rule[LogicalPlan] with PredicateHelper {
+  /**
+   * Check if a join specified by left, right and condition is a cartesian 
product. Returns true if
+   * there are no join conditions involving references from both left and 
right.
+   */
+  def isCartesianProduct(left: LogicalPlan, right: LogicalPlan, condition: 
Option[Expression])
--- End diff --

done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-30 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r76857357
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
 ---
@@ -642,7 +642,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with 
SQLTestUtils {
 checkColumnNames(
   """SELECT x.a, y.a, x.b, y.b
 |FROM (SELECT 1 AS a, 2 AS b) x
-|INNER JOIN (SELECT 1 AS a, 2 AS b) y
+|CROSS JOIN (SELECT 1 AS a, 2 AS b) y
--- End diff --

The join condition is eliminated because it is "trivial" according to the 
foldable propagation rule. This is annoying, but an expected by-product of 
detecting joins in the optimization phase. I can check to see if there is a way 
to split up the operator optimization batch so the cartesian check can be 
inserted before the constant propagation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-29 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r76716719
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -159,23 +159,30 @@ object ExtractEquiJoinKeys extends Logging with 
PredicateHelper {
  */
 object ExtractFiltersAndInnerJoins extends PredicateHelper {
 
-  // flatten all inner joins, which are next to each other
-  def flattenJoin(plan: LogicalPlan): (Seq[LogicalPlan], Seq[Expression]) 
= plan match {
-case Join(left, right, Inner, cond) =>
-  val (plans, conditions) = flattenJoin(left)
-  (plans ++ Seq(right), conditions ++ cond.toSeq)
+  /**
+   * Flatten all inner joins, which are next to each other.
+   * Return a list of logical plans to be joined with a boolean for each 
plan indicating if it
+   * was involved in an explicit cross join. Also returns the entire list 
of join conditions for
+   * the left-deep tree.
+   */
+  def flattenJoin(plan: LogicalPlan, parentJoinType: Inner = Inner(false))
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-29 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r76716740
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1534,6 +1542,34 @@ object SimplifyCaseConversionExpressions extends 
Rule[LogicalPlan] {
 }
 
 /**
+ * Check if ther eany cartesian products between joins of any type in the 
optimized plan tree.
+ * Throw an error if a cartesian product is found without an explicit 
cross join specified.
+ * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is 
true.
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-29 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r76716676
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1534,6 +1542,34 @@ object SimplifyCaseConversionExpressions extends 
Rule[LogicalPlan] {
 }
 
 /**
+ * Check if ther eany cartesian products between joins of any type in the 
optimized plan tree.
+ * Throw an error if a cartesian product is found without an explicit 
cross join specified.
+ * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is 
true.
+ */
+case class CheckCartesianProducts(conf: CatalystConf) extends 
Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan =
+if (conf.allowCartesianProduct) {
+  plan
+} else plan transform {
+  case j @ Join(left, right, joinType, conditions)
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-29 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r76716689
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1534,6 +1542,34 @@ object SimplifyCaseConversionExpressions extends 
Rule[LogicalPlan] {
 }
 
 /**
+ * Check if ther eany cartesian products between joins of any type in the 
optimized plan tree.
+ * Throw an error if a cartesian product is found without an explicit 
cross join specified.
+ * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is 
true.
+ */
+case class CheckCartesianProducts(conf: CatalystConf) extends 
Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan =
+if (conf.allowCartesianProduct) {
+  plan
+} else plan transform {
+  case j @ Join(left, right, joinType, conditions)
+  if Seq(Inner(false), LeftOuter, RightOuter, 
FullOuter).contains(joinType) =>
+val hasJoinCondition =
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-29 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14866#discussion_r76683461
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1534,6 +1542,34 @@ object SimplifyCaseConversionExpressions extends 
Rule[LogicalPlan] {
 }
 
 /**
+ * Check if ther eany cartesian products between joins of any type in the 
optimized plan tree.
+ * Throw an error if a cartesian product is found without an explicit 
cross join specified.
+ * This rule is effectively disabled if the CROSS_JOINS_ENABLED flag is 
true.
+ */
+case class CheckCartesianProducts(conf: CatalystConf) extends 
Rule[LogicalPlan] {
--- End diff --

As discussed, must be done after ReorderJoin. Added a comment explaining 
why.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14866: [SPARK-17298][SQL] Require explicit CROSS join for carte...

2016-08-29 Thread srinathshankar
Github user srinathshankar commented on the issue:

https://github.com/apache/spark/pull/14866
  
@rxin @sameeragarwal @hvanhovell @davies @JoshRosen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14866: [SPARK-17298][SQL] Require explicit CROSS join fo...

2016-08-29 Thread srinathshankar
GitHub user srinathshankar opened a pull request:

https://github.com/apache/spark/pull/14866

[SPARK-17298][SQL] Require explicit CROSS join for cartesian products

## What changes were proposed in this pull request?

Require the use of CROSS join syntax in SQL (and a new crossJoin
DataFrame API) to specify explicit cartesian products between relations.
By cartesian product we mean a join between relations R and S where
there is no join condition involving columns from both R and S.

If a cartesian product is detected in the absence of an explicit CROSS
join, an error must be thrown. Turning on the
"spark.sql.crossJoin.enabled" configuration flag will disable this check
and allow cartesian products without an explicit CROSS join.

The new crossJoin DataFrame API must be used to specify explicit cross
joins. The existing join(DataFrame) method will produce a INNER join
that will require a subsequent join condition.
That is df1.join(df2) is equivalent to select * from df1, df2.

## How was this patch tested?

Added cross-join.sql to the SQLQueryTestSuite to test the check for 
cartesian products. Added a couple of tests to the DataFrameJoinSuite to test 
the crossJoin API. Modified various other test suites to explicitly specify a 
cross join where an INNER join or a comma-separated list was previously used.






You can merge this pull request into a Git repository by running:

$ git pull https://github.com/srinathshankar/spark crossjoin

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14866.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14866


commit 250d96dcf43c09dbff821e5b20f3f0ea86f08887
Author: Srinath Shankar 
Date:   2016-08-25T16:43:08Z

[SPARK-17298][SQL] Require explicit CROSS join for cartesian products

Require the use of CROSS join syntax in SQL (and a new crossJoin
DataFrame API) to specify explicit cartesian products between relations.
By cartesian product we mean a join between relations R and S where
there is no join condition involving columns from both R and S.

If a cartesian product is detected in the absence of an explicit CROSS
join, an error must be thrown. Turning on the
"spark.sql.crossJoin.enabled" configuration flag will disable this check
and allow cartesian products without an explicit CROSS join.

The new crossJoin DataFrame API must be used to specify explicit cross
joins. The existing join(DataFrame) method will produce a INNER join
that will require a subsequent join condition.
That is df1.join(df2) == select * from df1, df2.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14721: [SPARK-17158][SQL] Change error message for out of range...

2016-08-19 Thread srinathshankar
Github user srinathshankar commented on the issue:

https://github.com/apache/spark/pull/14721
  
Fixed up error messages in ExpressionParserSuite


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14721: [SPARK-17158][SQL] Change error message for out o...

2016-08-19 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14721#discussion_r75548962
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -1278,10 +1278,17 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] 
with Logging {
   }
 
   /** Create a numeric literal expression. */
-  private def numericLiteral(ctx: NumberContext)(f: String => Any): 
Literal = withOrigin(ctx) {
-val raw = ctx.getText
+  private def numericLiteral(ctx: NumberContext)
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14721: [SPARK-17158][SQL] Change error message for out o...

2016-08-19 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14721#discussion_r75548986
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -1278,10 +1278,17 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] 
with Logging {
   }
 
   /** Create a numeric literal expression. */
-  private def numericLiteral(ctx: NumberContext)(f: String => Any): 
Literal = withOrigin(ctx) {
-val raw = ctx.getText
+  private def numericLiteral(ctx: NumberContext)
+(minValue: BigDecimal, maxValue: BigDecimal, 
typeName: String)
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14721: [SPARK-17158][SQL] Change error message for out o...

2016-08-19 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14721#discussion_r75535335
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -1291,28 +1297,32 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] 
with Logging {
   /**
* Create a Byte Literal expression.
*/
-  override def visitTinyIntLiteral(ctx: TinyIntLiteralContext): Literal = 
numericLiteral(ctx) {
+  override def visitTinyIntLiteral(ctx: TinyIntLiteralContext): Literal =
+  numericLiteral(ctx)((Byte.MinValue, Byte.MaxValue), 
ByteType.simpleString) {
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14721: [SPARK-17158][SQL] Change error message for out o...

2016-08-19 Thread srinathshankar
Github user srinathshankar commented on a diff in the pull request:

https://github.com/apache/spark/pull/14721#discussion_r75535229
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -1278,10 +1278,16 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] 
with Logging {
   }
 
   /** Create a numeric literal expression. */
-  private def numericLiteral(ctx: NumberContext)(f: String => Any): 
Literal = withOrigin(ctx) {
-val raw = ctx.getText
+  private def numericLiteral(ctx: NumberContext)(range: (BigDecimal, 
BigDecimal), typeName: String)
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14721: [SC-4296][SQL] Change error message for out of range num...

2016-08-19 Thread srinathshankar
Github user srinathshankar commented on the issue:

https://github.com/apache/spark/pull/14721
  
@sameeragarwal @rxin @davies 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14721: [SC-4296][SQL] Change error message for out of ra...

2016-08-19 Thread srinathshankar
GitHub user srinathshankar opened a pull request:

https://github.com/apache/spark/pull/14721

[SC-4296][SQL] Change error message for out of range numeric literals

## What changes were proposed in this pull request?

Modifies error message for numeric literals to
Numeric literal  does not fit in range [min, max] for type 


## How was this patch tested?

Ran SqlQueryTestSuite via sbt

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/srinathshankar/spark sc4296

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14721.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14721


commit cca27bee2b4d0f52730dd8d19f8d9f32fabf6a62
Author: Srinath Shankar 
Date:   2016-08-17T21:22:55Z

[SC-4296][SQL] Change error message for out of range numeric literals

Modifies the error message produced when numeric literals are out of
range. Message produced is now of the form
Numeric literal  does not fit in range [min, max] for type 




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org