[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-15 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r225113719
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -523,35 +523,114 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitionsFilters: Seq[Seq[Expression]],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
+val timeZone = 
Option(sparkSession.sessionState.conf.sessionLocalTimeZone)
 val table = catalog.getTableMetadata(tableName)
+val partitionColumns = table.partitionColumnNames
+val partitionAttributes = table.partitionSchema.toAttributes.map(a => 
a.name -> a).toMap
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val resolvedSpecs = partitionsFilters.flatMap { filtersSpec =>
+  if (hasComplexFilters(filtersSpec)) {
+generatePartitionSpec(filtersSpec,
+  partitionColumns,
+  partitionAttributes,
+  table.identifier,
+  catalog,
+  sparkSession.sessionState.conf.resolver,
+  timeZone,
+  ifExists)
+  } else {
+val partitionSpec = filtersSpec.map {
+  case EqualTo(key: Attribute, Literal(value, StringType)) =>
+key.name -> value.toString
+}.toMap
+PartitioningUtils.normalizePartitionSpec(
+  partitionSpec,
+  partitionColumns,
+  table.identifier.quotedString,
+  sparkSession.sessionState.conf.resolver) :: Nil
+  }
 }
 
 catalog.dropPartitions(
--- End diff --

Yes, this is my understanding. You can check `DDLTaks.dropPartitions`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r225108214
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -523,35 +523,114 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitionsFilters: Seq[Seq[Expression]],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
+val timeZone = 
Option(sparkSession.sessionState.conf.sessionLocalTimeZone)
 val table = catalog.getTableMetadata(tableName)
+val partitionColumns = table.partitionColumnNames
+val partitionAttributes = table.partitionSchema.toAttributes.map(a => 
a.name -> a).toMap
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val resolvedSpecs = partitionsFilters.flatMap { filtersSpec =>
+  if (hasComplexFilters(filtersSpec)) {
+generatePartitionSpec(filtersSpec,
+  partitionColumns,
+  partitionAttributes,
+  table.identifier,
+  catalog,
+  sparkSession.sessionState.conf.resolver,
+  timeZone,
+  ifExists)
+  } else {
+val partitionSpec = filtersSpec.map {
+  case EqualTo(key: Attribute, Literal(value, StringType)) =>
+key.name -> value.toString
+}.toMap
+PartitioningUtils.normalizePartitionSpec(
+  partitionSpec,
+  partitionColumns,
+  table.identifier.quotedString,
+  sparkSession.sessionState.conf.resolver) :: Nil
+  }
 }
 
 catalog.dropPartitions(
--- End diff --

So the implementation here is similar to how hive implements it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-15 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r225101976
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -523,35 +523,114 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitionsFilters: Seq[Seq[Expression]],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
+val timeZone = 
Option(sparkSession.sessionState.conf.sessionLocalTimeZone)
 val table = catalog.getTableMetadata(tableName)
+val partitionColumns = table.partitionColumnNames
+val partitionAttributes = table.partitionSchema.toAttributes.map(a => 
a.name -> a).toMap
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val resolvedSpecs = partitionsFilters.flatMap { filtersSpec =>
+  if (hasComplexFilters(filtersSpec)) {
+generatePartitionSpec(filtersSpec,
+  partitionColumns,
+  partitionAttributes,
+  table.identifier,
+  catalog,
+  sparkSession.sessionState.conf.resolver,
+  timeZone,
+  ifExists)
+  } else {
+val partitionSpec = filtersSpec.map {
+  case EqualTo(key: Attribute, Literal(value, StringType)) =>
+key.name -> value.toString
+}.toMap
+PartitioningUtils.normalizePartitionSpec(
+  partitionSpec,
+  partitionColumns,
+  table.identifier.quotedString,
+  sparkSession.sessionState.conf.resolver) :: Nil
+  }
 }
 
 catalog.dropPartitions(
--- End diff --

unfortunately, no. I checked 
https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
 but I could find none.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-15 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r225101681
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 ---
@@ -382,6 +382,30 @@ case class OuterReference(e: NamedExpression)
   override def newInstance(): NamedExpression = 
OuterReference(e.newInstance())
 }
 
+/**
+ * A place holder used to hold the name of the partition attributes 
specified when running commands
+ * involving partitions, eg. ALTER TABLE ... DROP PARTITIONS.
+ */
+case class PartitioningAttribute(name: String)
+  extends Attribute with Unevaluable {
+  override val exprId: ExprId = NamedExpression.newExprId
+  // Not really needed and used. We just need a dataType to be used during 
analysis for resolving
+  // the expressions. The String type is used because all the literals in 
PARTITION operations are
+  // parsed as strings and eventually casted later.
+  override def dataType: DataType = StringType
--- End diff --

Probably I should improve the comment then. it's misleading: this is 
actually needed because otherwise we may hit exceptions since the `dataType` is 
checked when running `checkInputDataTypes` of the comparison operator 
containing it. I'll improve the comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r225074416
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,31 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
--- End diff --

nit: can we move `withOrigin(ctx)` here? i.e.
```
def xxx(): T = withOrigin(ctx) {
  ...
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r225073862
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 ---
@@ -382,6 +382,30 @@ case class OuterReference(e: NamedExpression)
   override def newInstance(): NamedExpression = 
OuterReference(e.newInstance())
 }
 
+/**
+ * A place holder used to hold the name of the partition attributes 
specified when running commands
+ * involving partitions, eg. ALTER TABLE ... DROP PARTITIONS.
+ */
+case class PartitioningAttribute(name: String)
+  extends Attribute with Unevaluable {
+  override val exprId: ExprId = NamedExpression.newExprId
--- End diff --

even it's a fake attribute, we should not change the `exprId` when this 
expression gets copied. Can we move `exprId` to the constructor?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r225074108
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 ---
@@ -382,6 +382,30 @@ case class OuterReference(e: NamedExpression)
   override def newInstance(): NamedExpression = 
OuterReference(e.newInstance())
 }
 
+/**
+ * A place holder used to hold the name of the partition attributes 
specified when running commands
+ * involving partitions, eg. ALTER TABLE ... DROP PARTITIONS.
+ */
+case class PartitioningAttribute(name: String)
+  extends Attribute with Unevaluable {
+  override val exprId: ExprId = NamedExpression.newExprId
+  // Not really needed and used. We just need a dataType to be used during 
analysis for resolving
+  // the expressions. The String type is used because all the literals in 
PARTITION operations are
+  // parsed as strings and eventually casted later.
+  override def dataType: DataType = StringType
--- End diff --

If it's not needed, can we throw exception here? We may need to override 
`toString` though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r225076055
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -523,35 +523,114 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitionsFilters: Seq[Seq[Expression]],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
+val timeZone = 
Option(sparkSession.sessionState.conf.sessionLocalTimeZone)
 val table = catalog.getTableMetadata(tableName)
+val partitionColumns = table.partitionColumnNames
+val partitionAttributes = table.partitionSchema.toAttributes.map(a => 
a.name -> a).toMap
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val resolvedSpecs = partitionsFilters.flatMap { filtersSpec =>
+  if (hasComplexFilters(filtersSpec)) {
+generatePartitionSpec(filtersSpec,
+  partitionColumns,
+  partitionAttributes,
+  table.identifier,
+  catalog,
+  sparkSession.sessionState.conf.resolver,
+  timeZone,
+  ifExists)
+  } else {
+val partitionSpec = filtersSpec.map {
+  case EqualTo(key: Attribute, Literal(value, StringType)) =>
+key.name -> value.toString
+}.toMap
+PartitioningUtils.normalizePartitionSpec(
+  partitionSpec,
+  partitionColumns,
+  table.identifier.quotedString,
+  sparkSession.sessionState.conf.resolver) :: Nil
+  }
 }
 
 catalog.dropPartitions(
--- End diff --

does hive have an API to drop partitions with a predicate? I think the 
current approach is very inefficient with non-equal partition predicates.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-10 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r224082261
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -521,35 +521,114 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
--- End diff --

sure, I will. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r223913441
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -521,35 +521,114 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
--- End diff --

But it's also weird to use `AttributeReference` this way. Can we create a 
new `Attribute` implementation for this purpose? Basically we only need a 
resolved expression to hold the partition column name. The type doesn't matter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-09 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r223758511
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -521,35 +521,114 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
--- End diff --

I thought about that. The point is that we have anyway to check that the 
attributes specified are the partitioning ones. So I am not sure it is worth to 
run the whole analyzer rules for something we have anyway to handle somehow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r223753508
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -521,35 +521,114 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
--- End diff --

Shall we make table relation as a child? then we can resolve the 
`partitionsFilters` automatically.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-09 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r223710579
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -521,35 +521,112 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitionsFilters: Seq[Seq[Expression]],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
+val timeZone = 
Option(sparkSession.sessionState.conf.sessionLocalTimeZone)
 val table = catalog.getTableMetadata(tableName)
+val partitionColumns = table.partitionColumnNames
+val partitionAttributes = table.partitionSchema.toAttributes.map(a => 
a.name -> a).toMap
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val resolvedSpecs = partitionsFilters.flatMap { filtersSpec =>
+  if (hasComplexFilters(filtersSpec)) {
+generatePartitionSpec(filtersSpec,
+  partitionColumns,
+  partitionAttributes,
+  table.identifier,
+  catalog,
+  sparkSession.sessionState.conf.resolver,
+  timeZone,
+  ifExists)
+  } else {
+val partitionSpec = filtersSpec.map {
+  case EqualTo(key: Attribute, Literal(value, StringType)) =>
+key.name -> value.toString
+}.toMap
+PartitioningUtils.normalizePartitionSpec(
+  partitionSpec,
+  partitionColumns,
+  table.identifier.quotedString,
+  sparkSession.sessionState.conf.resolver) :: Nil
+  }
 }
 
 catalog.dropPartitions(
-  table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, 
purge = purge,
+  table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge 
= purge,
   retainData = retainData)
 
 CommandUtils.updateTableStats(sparkSession, table)
 
 Seq.empty[Row]
   }
 
+  def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = {
+partitionFilterSpec.exists(!_.isInstanceOf[EqualTo])
+  }
+
+  def generatePartitionSpec(
+  partitionFilterSpec: Seq[Expression],
+  partitionColumns: Seq[String],
+  partitionAttributes: Map[String, Attribute],
+  tableIdentifier: TableIdentifier,
+  catalog: SessionCatalog,
+  resolver: Resolver,
+  timeZone: Option[String],
+  ifExists: Boolean): Seq[TablePartitionSpec] = {
+val filters = partitionFilterSpec.map { pFilter =>
+  pFilter.transform {
+// Resolve the partition attributes
+case partitionCol: Attribute =>
+  val normalizedPartition = 
PartitioningUtils.normalizePartitionColumn(
+partitionCol.name,
+partitionColumns,
+tableIdentifier.quotedString,
+resolver)
+  partitionAttributes(normalizedPartition)
+  }.transform {
+// Cast the partition value to the data type of the corresponding 
partition attribute
+case cmp @ BinaryComparison(partitionAttr, value)
+if !partitionAttr.dataType.sameType(value.dataType) =>
+  cmp.withNewChildren(Seq(partitionAttr, Cast(value, 
partitionAttr.dataType, timeZone)))
--- End diff --

ah nice catch, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-09 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r223703615
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -521,35 +521,112 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitionsFilters: Seq[Seq[Expression]],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
+val timeZone = 
Option(sparkSession.sessionState.conf.sessionLocalTimeZone)
 val table = catalog.getTableMetadata(tableName)
+val partitionColumns = table.partitionColumnNames
+val partitionAttributes = table.partitionSchema.toAttributes.map(a => 
a.name -> a).toMap
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val resolvedSpecs = partitionsFilters.flatMap { filtersSpec =>
+  if (hasComplexFilters(filtersSpec)) {
+generatePartitionSpec(filtersSpec,
+  partitionColumns,
+  partitionAttributes,
+  table.identifier,
+  catalog,
+  sparkSession.sessionState.conf.resolver,
+  timeZone,
+  ifExists)
+  } else {
+val partitionSpec = filtersSpec.map {
+  case EqualTo(key: Attribute, Literal(value, StringType)) =>
+key.name -> value.toString
+}.toMap
+PartitioningUtils.normalizePartitionSpec(
+  partitionSpec,
+  partitionColumns,
+  table.identifier.quotedString,
+  sparkSession.sessionState.conf.resolver) :: Nil
+  }
 }
 
 catalog.dropPartitions(
-  table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, 
purge = purge,
+  table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge 
= purge,
   retainData = retainData)
 
 CommandUtils.updateTableStats(sparkSession, table)
 
 Seq.empty[Row]
   }
 
+  def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = {
+partitionFilterSpec.exists(!_.isInstanceOf[EqualTo])
+  }
+
+  def generatePartitionSpec(
+  partitionFilterSpec: Seq[Expression],
+  partitionColumns: Seq[String],
+  partitionAttributes: Map[String, Attribute],
+  tableIdentifier: TableIdentifier,
+  catalog: SessionCatalog,
+  resolver: Resolver,
+  timeZone: Option[String],
+  ifExists: Boolean): Seq[TablePartitionSpec] = {
+val filters = partitionFilterSpec.map { pFilter =>
+  pFilter.transform {
+// Resolve the partition attributes
+case partitionCol: Attribute =>
+  val normalizedPartition = 
PartitioningUtils.normalizePartitionColumn(
+partitionCol.name,
+partitionColumns,
+tableIdentifier.quotedString,
+resolver)
+  partitionAttributes(normalizedPartition)
+  }.transform {
+// Cast the partition value to the data type of the corresponding 
partition attribute
+case cmp @ BinaryComparison(partitionAttr, value)
+if !partitionAttr.dataType.sameType(value.dataType) =>
+  cmp.withNewChildren(Seq(partitionAttr, Cast(value, 
partitionAttr.dataType, timeZone)))
--- End diff --

The predicates are not actually converted to Hive's partition predicates. 
If it can't convert the predicates, `getPartitionsByFilter` will call 
`getAllPartitionsMethod` to fetch all partitions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-09 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r223626350
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -521,35 +521,112 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitionsFilters: Seq[Seq[Expression]],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
+val timeZone = 
Option(sparkSession.sessionState.conf.sessionLocalTimeZone)
 val table = catalog.getTableMetadata(tableName)
+val partitionColumns = table.partitionColumnNames
+val partitionAttributes = table.partitionSchema.toAttributes.map(a => 
a.name -> a).toMap
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val resolvedSpecs = partitionsFilters.flatMap { filtersSpec =>
+  if (hasComplexFilters(filtersSpec)) {
+generatePartitionSpec(filtersSpec,
+  partitionColumns,
+  partitionAttributes,
+  table.identifier,
+  catalog,
+  sparkSession.sessionState.conf.resolver,
+  timeZone,
+  ifExists)
+  } else {
+val partitionSpec = filtersSpec.map {
+  case EqualTo(key: Attribute, Literal(value, StringType)) =>
+key.name -> value.toString
+}.toMap
+PartitioningUtils.normalizePartitionSpec(
+  partitionSpec,
+  partitionColumns,
+  table.identifier.quotedString,
+  sparkSession.sessionState.conf.resolver) :: Nil
+  }
 }
 
 catalog.dropPartitions(
-  table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, 
purge = purge,
+  table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge 
= purge,
   retainData = retainData)
 
 CommandUtils.updateTableStats(sparkSession, table)
 
 Seq.empty[Row]
   }
 
+  def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = {
+partitionFilterSpec.exists(!_.isInstanceOf[EqualTo])
+  }
+
+  def generatePartitionSpec(
+  partitionFilterSpec: Seq[Expression],
+  partitionColumns: Seq[String],
+  partitionAttributes: Map[String, Attribute],
+  tableIdentifier: TableIdentifier,
+  catalog: SessionCatalog,
+  resolver: Resolver,
+  timeZone: Option[String],
+  ifExists: Boolean): Seq[TablePartitionSpec] = {
+val filters = partitionFilterSpec.map { pFilter =>
+  pFilter.transform {
+// Resolve the partition attributes
+case partitionCol: Attribute =>
+  val normalizedPartition = 
PartitioningUtils.normalizePartitionColumn(
+partitionCol.name,
+partitionColumns,
+tableIdentifier.quotedString,
+resolver)
+  partitionAttributes(normalizedPartition)
+  }.transform {
+// Cast the partition value to the data type of the corresponding 
partition attribute
+case cmp @ BinaryComparison(partitionAttr, value)
+if !partitionAttr.dataType.sameType(value.dataType) =>
+  cmp.withNewChildren(Seq(partitionAttr, Cast(value, 
partitionAttr.dataType, timeZone)))
--- End diff --

Yes, please see the tests here: 
https://github.com/apache/spark/pull/20999/files/a964d2a7def5aed04bd362b3000b36583c0ba272#diff-b7094baa12601424a5d19cb930e3402fR663.

Notice that value is always a `string` so in all cases with different 
datatypes we are using the cast.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-08 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r223414695
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -1015,6 +1036,23 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 val left = expression(ctx.left)
 val right = expression(ctx.right)
 val operator = 
ctx.comparisonOperator().getChild(0).asInstanceOf[TerminalNode]
+buildComparison(left, right, operator)
+  }
+
+  /**
+   * Creates a comparison expression. The following comparison operators 
are supported:
+   * - Equal: '=' or '=='
+   * - Null-safe Equal: '<=>'
--- End diff --

Seems we can't support null-safe equality because it is not supported by 
Hive metastore partition predicate pushdown. See HiveShim.scala.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-08 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r223415516
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -521,35 +521,112 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitionsFilters: Seq[Seq[Expression]],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
+val timeZone = 
Option(sparkSession.sessionState.conf.sessionLocalTimeZone)
 val table = catalog.getTableMetadata(tableName)
+val partitionColumns = table.partitionColumnNames
+val partitionAttributes = table.partitionSchema.toAttributes.map(a => 
a.name -> a).toMap
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val resolvedSpecs = partitionsFilters.flatMap { filtersSpec =>
+  if (hasComplexFilters(filtersSpec)) {
+generatePartitionSpec(filtersSpec,
+  partitionColumns,
+  partitionAttributes,
+  table.identifier,
+  catalog,
+  sparkSession.sessionState.conf.resolver,
+  timeZone,
+  ifExists)
+  } else {
+val partitionSpec = filtersSpec.map {
+  case EqualTo(key: Attribute, Literal(value, StringType)) =>
+key.name -> value.toString
+}.toMap
+PartitioningUtils.normalizePartitionSpec(
+  partitionSpec,
+  partitionColumns,
+  table.identifier.quotedString,
+  sparkSession.sessionState.conf.resolver) :: Nil
+  }
 }
 
 catalog.dropPartitions(
-  table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, 
purge = purge,
+  table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge 
= purge,
   retainData = retainData)
 
 CommandUtils.updateTableStats(sparkSession, table)
 
 Seq.empty[Row]
   }
 
+  def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = {
+partitionFilterSpec.exists(!_.isInstanceOf[EqualTo])
+  }
+
+  def generatePartitionSpec(
+  partitionFilterSpec: Seq[Expression],
+  partitionColumns: Seq[String],
+  partitionAttributes: Map[String, Attribute],
+  tableIdentifier: TableIdentifier,
+  catalog: SessionCatalog,
+  resolver: Resolver,
+  timeZone: Option[String],
+  ifExists: Boolean): Seq[TablePartitionSpec] = {
+val filters = partitionFilterSpec.map { pFilter =>
+  pFilter.transform {
+// Resolve the partition attributes
+case partitionCol: Attribute =>
+  val normalizedPartition = 
PartitioningUtils.normalizePartitionColumn(
+partitionCol.name,
+partitionColumns,
+tableIdentifier.quotedString,
+resolver)
+  partitionAttributes(normalizedPartition)
+  }.transform {
+// Cast the partition value to the data type of the corresponding 
partition attribute
+case cmp @ BinaryComparison(partitionAttr, value)
+if !partitionAttr.dataType.sameType(value.dataType) =>
+  cmp.withNewChildren(Seq(partitionAttr, Cast(value, 
partitionAttr.dataType, timeZone)))
--- End diff --

hmm, have you tested `Cast` cases? I look at `convertFilters` method in 
HiveShim.scala, and seems we don't convert `Cast` in the pushdown predicates to 
Hive.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-11 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216593044
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
+pFilter.comparisonOperator() == null) {
+  throw new ParseException(s"Invalid partition spec: 
${pFilter.getText}", ctx)
+}
+// We cannot use UnresolvedAttribute because resolution is 
performed after Analysis, when
+// running the command. The type is not relevant, it is replaced 
during the real resolution
+val partition =
+  AttributeReference(pFilter.identifier().getText, StringType)()
--- End diff --

sure, thanks @maropu.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-10 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216525719
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
+pFilter.comparisonOperator() == null) {
+  throw new ParseException(s"Invalid partition spec: 
${pFilter.getText}", ctx)
+}
+// We cannot use UnresolvedAttribute because resolution is 
performed after Analysis, when
+// running the command. The type is not relevant, it is replaced 
during the real resolution
+val partition =
+  AttributeReference(pFilter.identifier().getText, StringType)()
--- End diff --

Ya, looks good to me. But, I'm not sure which one is the right approach, so 
we'd be better to wait for other reviewer's comments here, too. cc: @gatorsmile 
@viirya 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-10 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216380067
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
+pFilter.comparisonOperator() == null) {
+  throw new ParseException(s"Invalid partition spec: 
${pFilter.getText}", ctx)
+}
+// We cannot use UnresolvedAttribute because resolution is 
performed after Analysis, when
+// running the command. The type is not relevant, it is replaced 
during the real resolution
+val partition =
+  AttributeReference(pFilter.identifier().getText, StringType)()
--- End diff --

oh, now I see, sorry. What about then having a `Seq[Filter]` instead? In 
order to avoid the `splitDisjunctivePredicates`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-10 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216322634
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
+pFilter.comparisonOperator() == null) {
+  throw new ParseException(s"Invalid partition spec: 
${pFilter.getText}", ctx)
+}
+// We cannot use UnresolvedAttribute because resolution is 
performed after Analysis, when
+// running the command. The type is not relevant, it is replaced 
during the real resolution
+val partition =
+  AttributeReference(pFilter.identifier().getText, StringType)()
--- End diff --

yea, if you put `Expression` in the class fields of 
`AlterTableDropPartitionCommand` 
(https://github.com/apache/spark/pull/20999/files#diff-54979ed5797b4a6193cf663dc23baca5R524),
 it fails in `Analyzer.executeAndCheck`. But, if we put `LogicalPlan` in the 
class fields, IIUC it doesn't fail there.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-10 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216307940
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
+pFilter.comparisonOperator() == null) {
+  throw new ParseException(s"Invalid partition spec: 
${pFilter.getText}", ctx)
+}
+// We cannot use UnresolvedAttribute because resolution is 
performed after Analysis, when
+// running the command. The type is not relevant, it is replaced 
during the real resolution
+val partition =
+  AttributeReference(pFilter.identifier().getText, StringType)()
+val value = Literal(visitStringConstant(pFilter.constant()))
--- End diff --

ok, thanks for the check. It's ok to keep the current one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-10 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216305161
  
--- Diff: 
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---
@@ -261,6 +261,14 @@ partitionVal
 : identifier (EQ constant)?
 ;
 
+dropPartitionSpec
+: PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')'
+;
+
+dropPartitionVal
+: identifier (comparisonOperator constant)?
--- End diff --

thanks for the check. I still like meaningful messages though, we shold 
wait for other reviewer's comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-10 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216304811
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
+pFilter.comparisonOperator() == null) {
+  throw new ParseException(s"Invalid partition spec: 
${pFilter.getText}", ctx)
+}
+// We cannot use UnresolvedAttribute because resolution is 
performed after Analysis, when
+// running the command. The type is not relevant, it is replaced 
during the real resolution
+val partition =
+  AttributeReference(pFilter.identifier().getText, StringType)()
--- End diff --

I may be missing something here, so sorry if I am not understanding 
something, but I think the issue is that the analyzer is called anyway before 
the `AlterTableDropPartitionCommand.run` command and it fails because of the 
unresolved attributes. Moreover, in your code I don't see it being part neither 
of `children` nor of `innerChildren`.

I think the alternative here is to add a rule to the analyzer for this, but 
it seems an overkill to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-10 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216299331
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
+pFilter.comparisonOperator() == null) {
+  throw new ParseException(s"Invalid partition spec: 
${pFilter.getText}", ctx)
+}
+// We cannot use UnresolvedAttribute because resolution is 
performed after Analysis, when
+// running the command. The type is not relevant, it is replaced 
during the real resolution
+val partition =
+  AttributeReference(pFilter.identifier().getText, StringType)()
--- End diff --

yea, so I called the analyzer inside`AlterTableDropPartitionCommand.run`.

https://github.com/apache/spark/commit/52506f1ebfb36dfaf0380a58cb68ee6aa5225de4#diff-54979ed5797b4a6193cf663dc23baca5R539

Or, if you want to resolve the plan in the analyzing phase, how about just 
add it in 
`children` instread of `innerChildren`? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-10 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216227938
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
+pFilter.comparisonOperator() == null) {
+  throw new ParseException(s"Invalid partition spec: 
${pFilter.getText}", ctx)
+}
+// We cannot use UnresolvedAttribute because resolution is 
performed after Analysis, when
+// running the command. The type is not relevant, it is replaced 
during the real resolution
+val partition =
+  AttributeReference(pFilter.identifier().getText, StringType)()
--- End diff --

Your approach has the same issue, ie. would fail because the `LogicalPlan` 
you added is not resolved after analysis.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-09 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216174690
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
+pFilter.comparisonOperator() == null) {
+  throw new ParseException(s"Invalid partition spec: 
${pFilter.getText}", ctx)
+}
+// We cannot use UnresolvedAttribute because resolution is 
performed after Analysis, when
+// running the command. The type is not relevant, it is replaced 
during the real resolution
+val partition =
+  AttributeReference(pFilter.identifier().getText, StringType)()
--- End diff --

For example, how about this approach? (you tried already?) 
https://github.com/apache/spark/commit/52506f1ebfb36dfaf0380a58cb68ee6aa5225de4

It added unresolved a logical plan (an input relation and filters) for 
`AlterTableDropPartitionCommand`, then resolved the plan in 
`AlterTableDropPartitionCommand.run` and computed partition specs based on the 
resolved expressions?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-09 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216162541
  
--- Diff: 
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---
@@ -261,6 +261,14 @@ partitionVal
 : identifier (EQ constant)?
 ;
 
+dropPartitionSpec
+: PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')'
+;
+
+dropPartitionVal
+: identifier (comparisonOperator constant)?
--- End diff --

Hive throws this parser exception:
```
hive> alter table test1 drop partition(1 > c);
NoViableAltException(368@[])
at 
org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.identifier(HiveParser_IdentifiersParser.java:12014)
at 
org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.dropPartitionVal(HiveParser_IdentifiersParser.java:11684)
at 
org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.dropPartitionSpec(HiveParser_IdentifiersParser.java:11563)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.dropPartitionSpec(HiveParser.java:44851)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.alterStatementSuffixDropPartitions(HiveParser.java:11564)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.alterTableStatementSuffix(HiveParser.java:8000)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.alterStatement(HiveParser.java:7450)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.ddlStatement(HiveParser.java:4340)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:2497)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1423)
at 
org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:209)
at org.apache.hadoop.hive.ql.parse.ParseUtils.parse(ParseUtils.java:74)
at org.apache.hadoop.hive.ql.parse.ParseUtils.parse(ParseUtils.java:67)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:615)
at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1829)
at org.apache.hadoop.hive.ql.Driver.compileAndRespond(Driver.java:1776)
at org.apache.hadoop.hive.ql.Driver.compileAndRespond(Driver.java:1771)
at 
org.apache.hadoop.hive.ql.reexec.ReExecDriver.compileAndRespond(ReExecDriver.java:126)
at 
org.apache.hadoop.hive.ql.reexec.ReExecDriver.run(ReExecDriver.java:214)
at 
org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:239)
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:188)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:402)
at 
org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:832)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:770)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:694)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
FAILED: ParseException line 1:33 cannot recognize input near '1' '>' 'c' in 
drop partition statement
```
so yes, it is analogous to this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-09 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216157835
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -927,7 +927,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
 }
 AlterTableDropPartitionCommand(
   visitTableIdentifier(ctx.tableIdentifier),
-  ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
+  ctx.dropPartitionSpec().asScala.map(visitDropPartitionSpec),
--- End diff --

mmmh, I am not sure how to update it. The only difference is that `specN` 
now can be any kind of filter instead of just `partitionColumn = value`. So it 
is actually the definition of specN which changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216131276
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -927,7 +927,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
 }
 AlterTableDropPartitionCommand(
   visitTableIdentifier(ctx.tableIdentifier),
-  ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
+  ctx.dropPartitionSpec().asScala.map(visitDropPartitionSpec),
--- End diff --

Can you update the comment?: 
https://github.com/apache/spark/blob/01c3dfab158d40653f8ce5d96f57220297545d5b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala#L916


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216128234
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
+pFilter.comparisonOperator() == null) {
+  throw new ParseException(s"Invalid partition spec: 
${pFilter.getText}", ctx)
+}
+// We cannot use UnresolvedAttribute because resolution is 
performed after Analysis, when
+// running the command. The type is not relevant, it is replaced 
during the real resolution
+val partition =
+  AttributeReference(pFilter.identifier().getText, StringType)()
--- End diff --

Let me have more time to check this behaviour.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216127950
  
--- Diff: 
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---
@@ -261,6 +261,14 @@ partitionVal
 : identifier (EQ constant)?
 ;
 
+dropPartitionSpec
+: PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')'
+;
+
+dropPartitionVal
+: identifier (comparisonOperator constant)?
--- End diff --

yea, yes. I like user-understandable error messages.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216127931
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -29,10 +29,10 @@ import org.apache.hadoop.mapred.{FileInputFormat, 
JobConf}
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, 
Resolver}
+import org.apache.spark.sql.catalyst.analysis.{Resolver, 
UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog._
 import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.catalyst.expressions._
--- End diff --

I just wanted to check if your IDE wrongly folded this import, or not. It's 
ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216127734
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
--- End diff --

I saw no null check in the other partition spec, then I though so;  
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala#L274


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216126194
  
--- Diff: 
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---
@@ -261,6 +261,14 @@ partitionVal
 : identifier (EQ constant)?
 ;
 
+dropPartitionSpec
+: PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')'
+;
+
+dropPartitionVal
+: identifier (comparisonOperator constant)?
--- End diff --

Hive does throw an error in that case, you mean asking that error is a 
parsing or another kind of exception?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216126249
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
+pFilter.comparisonOperator() == null) {
+  throw new ParseException(s"Invalid partition spec: 
${pFilter.getText}", ctx)
--- End diff --

sure, will do ASAP, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216126480
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
 ---
@@ -861,7 +861,8 @@ class DDLParserSuite extends PlanTest with 
SharedSQLContext {
 assertUnsupported(sql2_view)
 
 val tableIdent = TableIdentifier("table_name", None)
-val expected1_table = AlterTableDropPartitionCommand(
+
+val expected1_table = AlterTableDropPartitionCommand.fromSpecs(
--- End diff --

sure, will do, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216126332
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
+pFilter.comparisonOperator() == null) {
+  throw new ParseException(s"Invalid partition spec: 
${pFilter.getText}", ctx)
+}
+// We cannot use UnresolvedAttribute because resolution is 
performed after Analysis, when
+// running the command. The type is not relevant, it is replaced 
during the real resolution
+val partition =
+  AttributeReference(pFilter.identifier().getText, StringType)()
+val value = Literal(visitStringConstant(pFilter.constant()))
--- End diff --

thanks, will update


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216126453
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -1015,6 +1037,23 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 val left = expression(ctx.left)
 val right = expression(ctx.right)
 val operator = 
ctx.comparisonOperator().getChild(0).asInstanceOf[TerminalNode]
+buildComparison(left, right, operator)
+  }
+
+  /**
+   * Creates a comparison expression. The following comparison operators 
are supported:
+   * - Equal: '=' or '=='
+   * - Null-safe Equal: '<=>'
+   * - Not Equal: '<>' or '!='
+   * - Less than: '<'
+   * - Less then or Equal: '<='
+   * - Greater than: '>'
+   * - Greater then or Equal: '>='
+   */
--- End diff --

yes, it does


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216126471
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -29,10 +29,10 @@ import org.apache.hadoop.mapred.{FileInputFormat, 
JobConf}
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, 
Resolver}
+import org.apache.spark.sql.catalyst.analysis.{Resolver, 
UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog._
 import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.catalyst.expressions._
--- End diff --

not sure what you mean here. The list of imports would be very long, as I 
use, EqualTo, And, Literal, Cast, BinaryComparison, etc. I can list all them, 
but I am not sure it is worth. What do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216126217
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
--- End diff --

I am not sure. The other 2 conditions can definitely be true, but I am not 
sure about this. I think it is safer to check it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216126491
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -577,6 +578,143 @@ class HiveDDLSuite
 }
   }
 
+  def testDropPartition(dataType: DataType, value1: Any, value2: Any): 
Unit = {
+withTable("tbl_x") {
+  sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p ${dataType.sql})")
+  sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value1)")
+  sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value2)")
+  sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= $value2)")
+  checkAnswer(sql("SHOW PARTITIONS tbl_x"),
+Row(s"p=$value1") :: Nil)
+  sql(s"ALTER TABLE tbl_x DROP PARTITION (p = $value1)")
+  checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil)
+}
+  }
+
+  test("SPARK-14922: Drop partitions by filter") {
+withTable("sales") {
+  sql("CREATE TABLE sales (id INT) PARTITIONED BY (country STRING, 
quarter STRING)")
+  for (country <- Seq("AU", "US", "CA", "KR")) {
+for (quarter <- 1 to 5) {
+  sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', 
quarter = '$quarter')")
+}
+  }
+  sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > 
'2')")
+  checkAnswer(sql("SHOW PARTITIONS sales"),
+Row("country=AU/quarter=1") ::
+  Row("country=AU/quarter=2") ::
+  Row("country=CA/quarter=1") ::
+  Row("country=CA/quarter=2") ::
+  Row("country=KR/quarter=1") ::
+  Row("country=KR/quarter=2") ::
+  Row("country=KR/quarter=3") ::
+  Row("country=KR/quarter=4") ::
+  Row("country=KR/quarter=5") ::
+  Row("country=US/quarter=1") ::
+  Row("country=US/quarter=2") ::
+  Row("country=US/quarter=3") ::
+  Row("country=US/quarter=4") ::
+  Row("country=US/quarter=5") :: Nil)
+  sql("ALTER TABLE sales DROP PARTITION (country < 'CA'), PARTITION 
(quarter = '5')")
+  checkAnswer(sql("SHOW PARTITIONS sales"),
+Row("country=CA/quarter=1") ::
+  Row("country=CA/quarter=2") ::
+  Row("country=KR/quarter=1") ::
+  Row("country=KR/quarter=2") ::
+  Row("country=KR/quarter=3") ::
+  Row("country=KR/quarter=4") ::
+  Row("country=US/quarter=1") ::
+  Row("country=US/quarter=2") ::
+  Row("country=US/quarter=3") ::
+  Row("country=US/quarter=4") :: Nil)
+  sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION 
(quarter <= '1')")
+  checkAnswer(sql("SHOW PARTITIONS sales"),
+Row("country=KR/quarter=2") ::
+  Row("country=KR/quarter=3") ::
+  Row("country=KR/quarter=4") ::
+  Row("country=US/quarter=2") ::
+  Row("country=US/quarter=3") ::
+  Row("country=US/quarter=4") :: Nil)
+  sql("ALTER TABLE sales DROP PARTITION (country = 'KR', quarter = 
'4')")
+  sql("ALTER TABLE sales DROP PARTITION (country = 'US', quarter = 
'3')")
+  checkAnswer(sql("SHOW PARTITIONS sales"),
+Row("country=KR/quarter=2") ::
+  Row("country=KR/quarter=3") ::
+  Row("country=US/quarter=2") ::
+  Row("country=US/quarter=4") :: Nil)
+  sql("ALTER TABLE sales DROP PARTITION (quarter <= '2'), PARTITION 
(quarter >= '4')")
+  checkAnswer(sql("SHOW PARTITIONS sales"),
+Row("country=KR/quarter=3") :: Nil)
+  // According to the declarative partition spec definitions, this 
drops the union of target
+  // partitions without exceptions. Hive raises exceptions because it 
handles them sequentially.
+  sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION 
(quarter <= '3')")
+  checkAnswer(sql("SHOW PARTITIONS sales"), Nil)
+}
+withTable("tbl_x") {
+  sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p STRING)")
+  sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'false')")
+  sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'true')")
+  sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= 'true')")
+  checkAnswer(sql("SHOW PARTITIONS tbl_x"),
+Row(s"p=false") :: Nil)
+  sql(s"ALTER TABLE tbl_x DROP PARTITION (p = 'false')")
+  checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil)
+}
+testDropPartition(IntegerType, 1, 2)
+testDropPartition(BooleanType, false, true)
+testDropPartition(LongType, 1L, 2L)
+testDropPartition(ShortType, 1.toShort, 2.toShort)
+testDropPartition(ByteType, 1.toByte, 2.toByte)
+testDropPartition(FloatType, 1.0F, 2.0F)
+

[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216126328
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
+pFilter.comparisonOperator() == null) {
+  throw new ParseException(s"Invalid partition spec: 
${pFilter.getText}", ctx)
+}
+// We cannot use UnresolvedAttribute because resolution is 
performed after Analysis, when
+// running the command. The type is not relevant, it is replaced 
during the real resolution
+val partition =
+  AttributeReference(pFilter.identifier().getText, StringType)()
--- End diff --

Well, the answer is in `def expressions` in `QueryPlan`. In #19691, we end 
up with a `Seq[(TablePartitionSpec, Seq[Expression])]`, so the expressions 
there are not recognized/considered by the `Analyzer`. In this PR we have 
`Seq[Expression]` (which is way cleaner IMHO and address comment 
https://github.com/apache/spark/pull/19691/files#r193002268), so these 
expressions are considered by the Analyzer.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216124388
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -1015,6 +1037,23 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 val left = expression(ctx.left)
 val right = expression(ctx.right)
 val operator = 
ctx.comparisonOperator().getChild(0).asInstanceOf[TerminalNode]
+buildComparison(left, right, operator)
+  }
+
+  /**
+   * Creates a comparison expression. The following comparison operators 
are supported:
+   * - Equal: '=' or '=='
+   * - Null-safe Equal: '<=>'
+   * - Not Equal: '<>' or '!='
+   * - Less than: '<'
+   * - Less then or Equal: '<='
+   * - Greater than: '>'
+   * - Greater then or Equal: '>='
+   */
--- End diff --

Hive also supports all the comparators above?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216124365
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
+pFilter.comparisonOperator() == null) {
+  throw new ParseException(s"Invalid partition spec: 
${pFilter.getText}", ctx)
+}
+// We cannot use UnresolvedAttribute because resolution is 
performed after Analysis, when
+// running the command. The type is not relevant, it is replaced 
during the real resolution
+val partition =
+  AttributeReference(pFilter.identifier().getText, StringType)()
--- End diff --

Looks weird and why can we use `UnresolvedAttribute` in #19691?

https://github.com/apache/spark/pull/19691/files#diff-9847f5cef7cf7fbc5830fbc6b779ee10R293
(Sorry, but probably I miss something?)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216124252
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
+pFilter.comparisonOperator() == null) {
+  throw new ParseException(s"Invalid partition spec: 
${pFilter.getText}", ctx)
+}
+// We cannot use UnresolvedAttribute because resolution is 
performed after Analysis, when
+// running the command. The type is not relevant, it is replaced 
during the real resolution
+val partition =
+  AttributeReference(pFilter.identifier().getText, StringType)()
+val value = Literal(visitStringConstant(pFilter.constant()))
--- End diff --

`val value = Literal(visitStringConstant(pFilter.constant()), StringType)` 
for better readablilty?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216124163
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
--- End diff --

no chance `pFilter.identifier() == null`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216124059
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
+withOrigin(ctx) {
+  ctx.dropPartitionVal().asScala.map { pFilter =>
+if (pFilter.identifier() == null || pFilter.constant() == null ||
+pFilter.comparisonOperator() == null) {
+  throw new ParseException(s"Invalid partition spec: 
${pFilter.getText}", ctx)
--- End diff --

Can you add tests for this exception in `DDLParserSuite.scala`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216123948
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -29,10 +29,10 @@ import org.apache.hadoop.mapred.{FileInputFormat, 
JobConf}
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, 
Resolver}
+import org.apache.spark.sql.catalyst.analysis.{Resolver, 
UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog._
 import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.catalyst.expressions._
--- End diff --

too many imports?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216123701
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
 ---
@@ -861,7 +861,8 @@ class DDLParserSuite extends PlanTest with 
SharedSQLContext {
 assertUnsupported(sql2_view)
 
 val tableIdent = TableIdentifier("table_name", None)
-val expected1_table = AlterTableDropPartitionCommand(
+
+val expected1_table = AlterTableDropPartitionCommand.fromSpecs(
--- End diff --

Can you add tests case to check if the parser can accept the comparators 
added by this pr?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216122595
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -577,6 +578,143 @@ class HiveDDLSuite
 }
   }
 
+  def testDropPartition(dataType: DataType, value1: Any, value2: Any): 
Unit = {
+withTable("tbl_x") {
+  sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p ${dataType.sql})")
+  sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value1)")
+  sql(s"ALTER TABLE tbl_x ADD PARTITION (p = $value2)")
+  sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= $value2)")
+  checkAnswer(sql("SHOW PARTITIONS tbl_x"),
+Row(s"p=$value1") :: Nil)
+  sql(s"ALTER TABLE tbl_x DROP PARTITION (p = $value1)")
+  checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil)
+}
+  }
+
+  test("SPARK-14922: Drop partitions by filter") {
+withTable("sales") {
+  sql("CREATE TABLE sales (id INT) PARTITIONED BY (country STRING, 
quarter STRING)")
+  for (country <- Seq("AU", "US", "CA", "KR")) {
+for (quarter <- 1 to 5) {
+  sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', 
quarter = '$quarter')")
+}
+  }
+  sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > 
'2')")
+  checkAnswer(sql("SHOW PARTITIONS sales"),
+Row("country=AU/quarter=1") ::
+  Row("country=AU/quarter=2") ::
+  Row("country=CA/quarter=1") ::
+  Row("country=CA/quarter=2") ::
+  Row("country=KR/quarter=1") ::
+  Row("country=KR/quarter=2") ::
+  Row("country=KR/quarter=3") ::
+  Row("country=KR/quarter=4") ::
+  Row("country=KR/quarter=5") ::
+  Row("country=US/quarter=1") ::
+  Row("country=US/quarter=2") ::
+  Row("country=US/quarter=3") ::
+  Row("country=US/quarter=4") ::
+  Row("country=US/quarter=5") :: Nil)
+  sql("ALTER TABLE sales DROP PARTITION (country < 'CA'), PARTITION 
(quarter = '5')")
+  checkAnswer(sql("SHOW PARTITIONS sales"),
+Row("country=CA/quarter=1") ::
+  Row("country=CA/quarter=2") ::
+  Row("country=KR/quarter=1") ::
+  Row("country=KR/quarter=2") ::
+  Row("country=KR/quarter=3") ::
+  Row("country=KR/quarter=4") ::
+  Row("country=US/quarter=1") ::
+  Row("country=US/quarter=2") ::
+  Row("country=US/quarter=3") ::
+  Row("country=US/quarter=4") :: Nil)
+  sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION 
(quarter <= '1')")
+  checkAnswer(sql("SHOW PARTITIONS sales"),
+Row("country=KR/quarter=2") ::
+  Row("country=KR/quarter=3") ::
+  Row("country=KR/quarter=4") ::
+  Row("country=US/quarter=2") ::
+  Row("country=US/quarter=3") ::
+  Row("country=US/quarter=4") :: Nil)
+  sql("ALTER TABLE sales DROP PARTITION (country = 'KR', quarter = 
'4')")
+  sql("ALTER TABLE sales DROP PARTITION (country = 'US', quarter = 
'3')")
+  checkAnswer(sql("SHOW PARTITIONS sales"),
+Row("country=KR/quarter=2") ::
+  Row("country=KR/quarter=3") ::
+  Row("country=US/quarter=2") ::
+  Row("country=US/quarter=4") :: Nil)
+  sql("ALTER TABLE sales DROP PARTITION (quarter <= '2'), PARTITION 
(quarter >= '4')")
+  checkAnswer(sql("SHOW PARTITIONS sales"),
+Row("country=KR/quarter=3") :: Nil)
+  // According to the declarative partition spec definitions, this 
drops the union of target
+  // partitions without exceptions. Hive raises exceptions because it 
handles them sequentially.
+  sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION 
(quarter <= '3')")
+  checkAnswer(sql("SHOW PARTITIONS sales"), Nil)
+}
+withTable("tbl_x") {
+  sql(s"CREATE TABLE tbl_x (a INT) PARTITIONED BY (p STRING)")
+  sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'false')")
+  sql(s"ALTER TABLE tbl_x ADD PARTITION (p = 'true')")
+  sql(s"ALTER TABLE tbl_x DROP PARTITION (p >= 'true')")
+  checkAnswer(sql("SHOW PARTITIONS tbl_x"),
+Row(s"p=false") :: Nil)
+  sql(s"ALTER TABLE tbl_x DROP PARTITION (p = 'false')")
+  checkAnswer(sql("SHOW PARTITIONS tbl_x"), Nil)
+}
+testDropPartition(IntegerType, 1, 2)
+testDropPartition(BooleanType, false, true)
+testDropPartition(LongType, 1L, 2L)
+testDropPartition(ShortType, 1.toShort, 2.toShort)
+testDropPartition(ByteType, 1.toByte, 2.toByte)
+testDropPartition(FloatType, 1.0F, 2.0F)
+

[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-09-08 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r216122503
  
--- Diff: 
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---
@@ -261,6 +261,14 @@ partitionVal
 : identifier (EQ constant)?
 ;
 
+dropPartitionSpec
+: PARTITION '(' dropPartitionVal (',' dropPartitionVal)* ')'
+;
+
+dropPartitionVal
+: identifier (comparisonOperator constant)?
--- End diff --

Hive also throws antler errors for the case `2 > partCol1`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org