[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/13494


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-12 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70404513
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlyQuerySuite extends QueryTest with 
SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("col1", "col2", "partcol1", "partcol2")
+data.write.partitionBy("partcol1", 
"partcol2").mode("append").saveAsTable("srcpart")
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  sql("DROP TABLE IF EXISTS srcpart")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private def assertMetadataOnlyQuery(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 0)
+  }
+
+  private def testMetadataOnly(name: String, sqls: String*): Unit = {
+test(name) {
+  withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+sqls.foreach { case q => assertMetadataOnlyQuery(sql(q)) }
+  }
+  withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
+  }
+}
+  }
+
+  private def testNotMetadataOnly(name: String, sqls: String*): Unit = {
+test(name) {
+  withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
+  }
+  withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
+  }
+}
+  }
+
+  testMetadataOnly(
+"OptimizeMetadataOnlyQuery test: aggregate expression is partition 
columns",
--- End diff --

Sure. thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-12 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70404475
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max`, `Min`, `First` and `Last` are always distinct 
aggregate functions no matter
+  // they have DISTINCT keyword or not, as the result will be 
same.
+  case _: Max => true
+  case _: Min => true
+  case _: First => true
+  case _: Last => true
+  case _ => false
+})
+  }
+  if (isAllDistinctAgg) {
+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  /**
+   * Returns the partition attributes of the table relation plan.
+   */
+  private def getPartitionAttrs(
+  partitionColumnNames: Seq[String],
+  relation: LogicalPlan): Seq[Attribute] = {
+val partColumns = partitionColumnNames.map(_.toLowerCase).toSet
+relation.output.filter(a => partColumns.contains(a.name.toLowerCase))
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the 
given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+  child: LogicalPlan,
+  relation: LogicalPlan): LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
+val partitionData = fsRel

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70380596
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlyQuerySuite extends QueryTest with 
SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("col1", "col2", "partcol1", "partcol2")
+data.write.partitionBy("partcol1", 
"partcol2").mode("append").saveAsTable("srcpart")
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  sql("DROP TABLE IF EXISTS srcpart")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private def assertMetadataOnlyQuery(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 0)
+  }
+
+  private def testMetadataOnly(name: String, sqls: String*): Unit = {
+test(name) {
+  withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+sqls.foreach { case q => assertMetadataOnlyQuery(sql(q)) }
+  }
+  withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
+  }
+}
+  }
+
+  private def testNotMetadataOnly(name: String, sqls: String*): Unit = {
+test(name) {
+  withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
+  }
+  withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
+  }
+}
+  }
+
+  testMetadataOnly(
+"OptimizeMetadataOnlyQuery test: aggregate expression is partition 
columns",
--- End diff --

I think we can remove the prefix: `OptimizeMetadataOnlyQuery test`. The 
test report will print the name of this test suite for these tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70380410
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max`, `Min`, `First` and `Last` are always distinct 
aggregate functions no matter
+  // they have DISTINCT keyword or not, as the result will be 
same.
+  case _: Max => true
+  case _: Min => true
+  case _: First => true
+  case _: Last => true
+  case _ => false
+})
+  }
+  if (isAllDistinctAgg) {
+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  /**
+   * Returns the partition attributes of the table relation plan.
+   */
+  private def getPartitionAttrs(
+  partitionColumnNames: Seq[String],
+  relation: LogicalPlan): Seq[Attribute] = {
+val partColumns = partitionColumnNames.map(_.toLowerCase).toSet
+relation.output.filter(a => partColumns.contains(a.name.toLowerCase))
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the 
given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+  child: LogicalPlan,
+  relation: LogicalPlan): LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
+val partitionData = fsRelat

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70370384
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max`, `Min`, `First` and `Last` are always distinct 
aggregate functions no matter
+  // they have DISTINCT keyword or not, as the result will be 
same.
+  case _: Max => true
+  case _: Min => true
+  case _: First => true
+  case _: Last => true
+  case _ => false
+})
+  }
+  if (isAllDistinctAgg) {
+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the 
given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+  child: LogicalPlan,
+  relation: LogicalPlan): LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partAttrs = PartitionedRelation.getPartitionAttrs(
--- End diff --

@cloud-fan  I will define two functions for getPartitionAttrs(). In the 
future, I think we can put getPartitionAttrs() into relation plan. If i has 
some problem, please tell me. thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infra

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70369134
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max`, `Min`, `First` and `Last` are always distinct 
aggregate functions no matter
+  // they have DISTINCT keyword or not, as the result will be 
same.
+  case _: Max => true
+  case _: Min => true
+  case _: First => true
+  case _: Last => true
+  case _ => false
+})
+  }
+  if (isAllDistinctAgg) {
+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the 
given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+  child: LogicalPlan,
+  relation: LogicalPlan): LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partAttrs = PartitionedRelation.getPartitionAttrs(
+  fsRelation.partitionSchema.map(_.name), l)
+val partitionData = fsRelation.location.listFiles(filters = 
Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
+
+  case relation: CatalogRelation =>
+val partAttrs = PartitionedRelation.getPartitionAttrs(
+  relation.catalogTable.partitionColumnNames, relation)
+val partitionData = 
catalog.l

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70368905
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max`, `Min`, `First` and `Last` are always distinct 
aggregate functions no matter
+  // they have DISTINCT keyword or not, as the result will be 
same.
+  case _: Max => true
+  case _: Min => true
+  case _: First => true
+  case _: Last => true
+  case _ => false
+})
+  }
+  if (isAllDistinctAgg) {
+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the 
given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+  child: LogicalPlan,
+  relation: LogicalPlan): LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partAttrs = PartitionedRelation.getPartitionAttrs(
+  fsRelation.partitionSchema.map(_.name), l)
+val partitionData = fsRelation.location.listFiles(filters = 
Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
+
+  case relation: CatalogRelation =>
+val partAttrs = PartitionedRelation.getPartitionAttrs(
+  relation.catalogTable.partitionColumnNames, relation)
+val partitionData = 
catalog.l

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70368861
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max`, `Min`, `First` and `Last` are always distinct 
aggregate functions no matter
+  // they have DISTINCT keyword or not, as the result will be 
same.
+  case _: Max => true
+  case _: Min => true
+  case _: First => true
+  case _: Last => true
+  case _ => false
+})
+  }
+  if (isAllDistinctAgg) {
+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the 
given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+  child: LogicalPlan,
+  relation: LogicalPlan): LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partAttrs = PartitionedRelation.getPartitionAttrs(
--- End diff --

Because object PartitionedRelation also use getPartitionAttrs, Now i just 
define it in PartitionedRelation. If it define a private method in class 
OptimizeMetadataOnlyQuery, there are two same getPartitionAttrs() functions in 
PartitionedRelation and OptimizeMetadataOnlyQuery. Based on it, here use 
PartitionedRelation.getPartitionAttrs.


---
If your project is set up for it, you can reply to this email and have your
reply appe

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70362160
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max`, `Min`, `First` and `Last` are always distinct 
aggregate functions no matter
+  // they have DISTINCT keyword or not, as the result will be 
same.
+  case _: Max => true
+  case _: Min => true
+  case _: First => true
+  case _: Last => true
+  case _ => false
+})
+  }
+  if (isAllDistinctAgg) {
+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the 
given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+  child: LogicalPlan,
+  relation: LogicalPlan): LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partAttrs = PartitionedRelation.getPartitionAttrs(
+  fsRelation.partitionSchema.map(_.name), l)
+val partitionData = fsRelation.location.listFiles(filters = 
Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
+
+  case relation: CatalogRelation =>
+val partAttrs = PartitionedRelation.getPartitionAttrs(
+  relation.catalogTable.partitionColumnNames, relation)
+val partitionData = 
catalog.lis

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70361858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max`, `Min`, `First` and `Last` are always distinct 
aggregate functions no matter
+  // they have DISTINCT keyword or not, as the result will be 
same.
+  case _: Max => true
+  case _: Min => true
+  case _: First => true
+  case _: Last => true
+  case _ => false
+})
+  }
+  if (isAllDistinctAgg) {
+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the 
given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+  child: LogicalPlan,
+  relation: LogicalPlan): LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partAttrs = PartitionedRelation.getPartitionAttrs(
--- End diff --

does `getPartitionAttrs` need to be a method in `PartitionedRelation`? I 
think it can just be a private method in parent class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70316261
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlyQuerySuite extends QueryTest with 
SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("col1", "col2", "partcol1", "partcol2")
+data.write.partitionBy("partcol1", 
"partcol2").mode("append").saveAsTable("srcpart")
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  sql("DROP TABLE IF EXISTS srcpart")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private def assertMetadataOnlyQuery(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 0)
+  }
+
+  private def testMetadataOnly(name: String, sqls: String*): Unit = {
+test(name) {
+  withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+sqls.foreach { case q => assertMetadataOnlyQuery(sql(q)) }
+  }
+  withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
+  }
+}
+  }
+
+  private def testNotMetadataOnly(name: String, sqls: String*): Unit = {
--- End diff --

NVM - I am blind


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70316064
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max`, `Min`, `First` and `Last` are always distinct 
aggregate functions no matter
+  // they have DISTINCT keyword or not, as the result will be 
same.
+  case _: Max => true
+  case _: Min => true
+  case _: First => true
+  case _: Last => true
+  case _ => false
+})
+  }
+  if (isAllDistinctAgg) {
+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the 
given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+  child: LogicalPlan,
+  relation: LogicalPlan): LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partAttrs = PartitionedRelation.getPartitionAttrs(
+  fsRelation.partitionSchema.map(_.name), l)
+val partitionData = fsRelation.location.listFiles(filters = 
Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
+
+  case relation: CatalogRelation =>
+val partAttrs = PartitionedRelation.getPartitionAttrs(
+  relation.catalogTable.partitionColumnNames, relation)
+val partitionData = 
catalog.li

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70316221
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlyQuerySuite extends QueryTest with 
SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("col1", "col2", "partcol1", "partcol2")
+data.write.partitionBy("partcol1", 
"partcol2").mode("append").saveAsTable("srcpart")
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  sql("DROP TABLE IF EXISTS srcpart")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private def assertMetadataOnlyQuery(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 0)
+  }
+
+  private def testMetadataOnly(name: String, sqls: String*): Unit = {
+test(name) {
+  withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+sqls.foreach { case q => assertMetadataOnlyQuery(sql(q)) }
+  }
+  withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
+sqls.foreach { case q => assertNotMetadataOnlyQuery(sql(q)) }
+  }
+}
+  }
+
+  private def testNotMetadataOnly(name: String, sqls: String*): Unit = {
--- End diff --

When is this one used?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70267348
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlyQuerySuite extends QueryTest with 
SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("col1", "col2", "partcol1", "partcol2")
+data.write.partitionBy("partcol1", 
"partcol2").mode("append").saveAsTable("srcpart")
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  sql("DROP TABLE IF EXISTS srcpart")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private def assertMetadataOnlyQuery(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 0)
+  }
+
+  test("OptimizeMetadataOnlyQuery test: aggregate expression is partition 
columns") {
--- End diff --

Get it, thanks. I will update it. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70253642
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max` and `Min` are always distinct aggregate functions 
no matter they have
+  // DISTINCT keyword or not, as the result will be same.
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isAllDistinctAgg) {
+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the 
given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+  child: LogicalPlan,
+  relation: LogicalPlan): LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partColumns = 
fsRelation.partitionSchema.map(_.name.toLowerCase).toSet
+val partAttrs = l.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = fsRelation.location.listFiles(filters = 
Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
+
+  case relation: CatalogRelation =>
+val partColumns = 
relation.catalogTable.partitionColumnNames.map(_.toLowerCase).toSet
+val partAttrs = relation.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partiti

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70252043
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlyQuerySuite extends QueryTest with 
SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("col1", "col2", "partcol1", "partcol2")
+data.write.partitionBy("partcol1", 
"partcol2").mode("append").saveAsTable("srcpart")
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  sql("DROP TABLE IF EXISTS srcpart")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private def assertMetadataOnlyQuery(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 0)
+  }
+
+  test("OptimizeMetadataOnlyQuery test: aggregate expression is partition 
columns") {
--- End diff --

That is not what I mean. I all for thorough testing, however the structure 
the same everywhere. You could generalize this (and improve readability), e.g.:
```scala
def testMetadataOnly(name: String, sql: String*): Unit = {
  test(name) {
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
  sqls.foreach(assertMetadataOnlyQuery(sql(_)))
}
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "false") {
  sqls.foreach(assertNotMetadataOnlyQuery(sql(_)))
}
  }
}

testMetadataOnly(
  "OptimizeMetadataOnlyQuery test: aggregate expression is partition 
columns", 
  "select partcol1 from srcpart group by partcol1",
  "select partcol2 from srcpart where partcol1 = 0 group by partcol2")
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70249015
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlyQuerySuite extends QueryTest with 
SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("col1", "col2", "partcol1", "partcol2")
+data.write.partitionBy("partcol1", 
"partcol2").mode("append").saveAsTable("srcpart")
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  sql("DROP TABLE IF EXISTS srcpart")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private def assertMetadataOnlyQuery(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 0)
+  }
+
+  test("OptimizeMetadataOnlyQuery test: aggregate expression is partition 
columns") {
--- End diff --

as @rxin addressed before, Dividing this into multiple functions is to have 
separate test cases for each of the category that has documented in the 
OptimizeMetadataOnlyQuery.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70248199
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max` and `Min` are always distinct aggregate functions 
no matter they have
+  // DISTINCT keyword or not, as the result will be same.
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isAllDistinctAgg) {
+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the 
given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+  child: LogicalPlan,
+  relation: LogicalPlan): LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partColumns = 
fsRelation.partitionSchema.map(_.name.toLowerCase).toSet
+val partAttrs = l.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = fsRelation.location.listFiles(filters = 
Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
+
+  case relation: CatalogRelation =>
+val partColumns = 
relation.catalogTable.partitionColumnNames.map(_.toLowerCase).toSet
+val partAttrs = relation.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partit

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70248137
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max` and `Min` are always distinct aggregate functions 
no matter they have
+  // DISTINCT keyword or not, as the result will be same.
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isAllDistinctAgg) {
+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the 
given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+  child: LogicalPlan,
+  relation: LogicalPlan): LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partColumns = 
fsRelation.partitionSchema.map(_.name.toLowerCase).toSet
+val partAttrs = l.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = fsRelation.location.listFiles(filters = 
Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
+
+  case relation: CatalogRelation =>
+val partColumns = 
relation.catalogTable.partitionColumnNames.map(_.toLowerCase).toSet
+val partAttrs = relation.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partit

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70247911
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max` and `Min` are always distinct aggregate functions 
no matter they have
+  // DISTINCT keyword or not, as the result will be same.
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isAllDistinctAgg) {
+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the 
given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+  child: LogicalPlan,
+  relation: LogicalPlan): LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partColumns = 
fsRelation.partitionSchema.map(_.name.toLowerCase).toSet
+val partAttrs = l.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
--- End diff --

Yes, there is no need to do it. thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70247856
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max` and `Min` are always distinct aggregate functions 
no matter they have
+  // DISTINCT keyword or not, as the result will be same.
+  case _: Max => true
--- End diff --

Yes, we need to handle them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70240831
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OptimizeMetadataOnlyQuerySuite extends QueryTest with 
SharedSQLContext {
+  import testImplicits._
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+val data = (1 to 10).map(i => (i, s"data-$i", i % 2, if ((i % 2) == 0) 
"even" else "odd"))
+  .toDF("col1", "col2", "partcol1", "partcol2")
+data.write.partitionBy("partcol1", 
"partcol2").mode("append").saveAsTable("srcpart")
+  }
+
+  override protected def afterAll(): Unit = {
+try {
+  sql("DROP TABLE IF EXISTS srcpart")
+} finally {
+  super.afterAll()
+}
+  }
+
+  private def assertMetadataOnlyQuery(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 1)
+  }
+
+  private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = {
+val localRelations = df.queryExecution.optimizedPlan.collect {
+  case l @ LocalRelation(_, _) => l
+}
+assert(localRelations.size == 0)
+  }
+
+  test("OptimizeMetadataOnlyQuery test: aggregate expression is partition 
columns") {
--- End diff --

MINOR: all these test have the same structure. We could move this into a 
function...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70238437
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max` and `Min` are always distinct aggregate functions 
no matter they have
+  // DISTINCT keyword or not, as the result will be same.
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isAllDistinctAgg) {
+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the 
given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+  child: LogicalPlan,
+  relation: LogicalPlan): LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partColumns = 
fsRelation.partitionSchema.map(_.name.toLowerCase).toSet
+val partAttrs = l.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
--- End diff --

There is no need to determine the `partAttrs` again; we can just pass them 
as an argument.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70236613
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max` and `Min` are always distinct aggregate functions 
no matter they have
+  // DISTINCT keyword or not, as the result will be same.
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isAllDistinctAgg) {
+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the 
given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+  child: LogicalPlan,
+  relation: LogicalPlan): LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partColumns = 
fsRelation.partitionSchema.map(_.name.toLowerCase).toSet
+val partAttrs = l.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = fsRelation.location.listFiles(filters = 
Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
+
+  case relation: CatalogRelation =>
+val partColumns = 
relation.catalogTable.partitionColumnNames.map(_.toLowerCase).toSet
+val partAttrs = relation.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partiti

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70236562
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max` and `Min` are always distinct aggregate functions 
no matter they have
+  // DISTINCT keyword or not, as the result will be same.
+  case _: Max => true
+  case _: Min => true
+  case _ => false
+})
+  }
+  if (isAllDistinctAgg) {
+
a.withNewChildren(Seq(replaceTableScanWithPartitionMetadata(child, relation)))
+  } else {
+a
+  }
+} else {
+  a
+}
+}
+  }
+
+  /**
+   * Transform the given plan, find its table scan nodes that matches the 
given relation, and then
+   * replace the table scan node with its corresponding partition values.
+   */
+  private def replaceTableScanWithPartitionMetadata(
+  child: LogicalPlan,
+  relation: LogicalPlan): LogicalPlan = {
+child transform {
+  case plan if plan eq relation =>
+relation match {
+  case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
+val partColumns = 
fsRelation.partitionSchema.map(_.name.toLowerCase).toSet
+val partAttrs = l.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partitionData = fsRelation.location.listFiles(filters = 
Nil)
+LocalRelation(partAttrs, partitionData.map(_.values))
+
+  case relation: CatalogRelation =>
+val partColumns = 
relation.catalogTable.partitionColumnNames.map(_.toLowerCase).toSet
+val partAttrs = relation.output.filter(a => 
partColumns.contains(a.name.toLowerCase))
+val partiti

[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r70230056
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule optimizes the execution of queries that can be answered by 
looking only at
+ * partition-level metadata. This applies when all the columns scanned are 
partition columns, and
+ * the query has an aggregate operator that satisfies the following 
conditions:
+ * 1. aggregate expression is partition columns.
+ *  e.g. SELECT col FROM tbl GROUP BY col.
+ * 2. aggregate function on partition columns with DISTINCT.
+ *  e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1.
+ * 3. aggregate function on partition columns which have same result w or 
w/o DISTINCT keyword.
+ *  e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
+ */
+case class OptimizeMetadataOnlyQuery(
+catalog: SessionCatalog,
+conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!conf.optimizerMetadataOnly) {
+  return plan
+}
+
+plan.transform {
+  case a @ Aggregate(_, aggExprs, child @ 
PartitionedRelation(partAttrs, relation)) =>
+// We only apply this optimization when only partitioned 
attributes are scanned.
+if (a.references.subsetOf(partAttrs)) {
+  val aggFunctions = aggExprs.flatMap(_.collect {
+case agg: AggregateExpression => agg
+  })
+  val isAllDistinctAgg = aggFunctions.forall { agg =>
+agg.isDistinct || (agg.aggregateFunction match {
+  // `Max` and `Min` are always distinct aggregate functions 
no matter they have
+  // DISTINCT keyword or not, as the result will be same.
+  case _: Max => true
--- End diff --

First/Last?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-07 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69860765
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala 
---
@@ -1689,4 +1689,76 @@ class SQLQuerySuite extends QueryTest with 
SQLTestUtils with TestHiveSingleton {
   )
 }
   }
+
+  test("spark-15752 optimize metadata only query for hive table") {
--- End diff --

I will make them having same cases. thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69853489
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala 
---
@@ -1689,4 +1689,76 @@ class SQLQuerySuite extends QueryTest with 
SQLTestUtils with TestHiveSingleton {
   )
 }
   }
+
+  test("spark-15752 optimize metadata only query for hive table") {
--- End diff --

why this test is so different from the one in sql core `SQLQuerySuite`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-06 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69843015
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala 
---
@@ -1689,4 +1689,86 @@ class SQLQuerySuite extends QueryTest with 
SQLTestUtils with TestHiveSingleton {
   )
 }
   }
+
+  test("spark-15752 optimize metadata only query for hive table") {
+withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+  withTable("data_15752", "srcpart_15752", "srctext_15752") {
+val df = Seq((1, "2"), (3, "4")).toDF("key", "value")
+df.createOrReplaceTempView("data_15752")
+sql(
+  """
+|CREATE TABLE srcpart_15752 (col1 INT, col2 STRING)
+|PARTITIONED BY (partcol1 INT, partcol2 STRING) STORED AS 
parquet
+  """.stripMargin)
+for (partcol1 <- Seq(11, 12); partcol2 <- Seq("a", "b")) {
+  sql(
+s"""
+  |INSERT OVERWRITE TABLE srcpart_15752
+  |PARTITION (partcol1='$partcol1', partcol2='$partcol2')
+  |select key, value from data_15752
+""".stripMargin)
+}
+checkAnswer(
+  sql("select partcol1 from srcpart_15752 where partcol1 = 11 
group by partcol1"),
+  Row(11))
+checkAnswer(sql("select max(partcol1) from srcpart_15752"), 
Row(12))
+checkAnswer(sql("select max(partcol1) from srcpart_15752 where 
partcol1 = 11"), Row(11))
+checkAnswer(
+  sql("select max(partcol1) from (select partcol1 from 
srcpart_15752) t"),
+  Row(12))
+checkAnswer(
+  sql("select max(col) from (select partcol1 + 1 as col from 
srcpart_15752 " +
+"where partcol1 = 12) t"),
+  Row(13))
+checkAnswer(sql("select distinct partcol1 from srcpart_15752"), 
Row(11) :: Row(12) :: Nil)
+checkAnswer(sql("select distinct partcol1 from srcpart_15752 where 
partcol1 = 11"), Row(11))
+checkAnswer(
+  sql("select distinct col from (select partcol1 + 1 as col from 
srcpart_15752 " +
+"where partcol1 = 12) t"),
+  Row(13))
+
+// Now donot support metadata only optimizer
--- End diff --

This PR do not handle them, They will be added in follow-up PRs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13494: [SPARK-15752] [SQL] Optimize metadata only query ...

2016-07-06 Thread clockfly
Github user clockfly commented on a diff in the pull request:

https://github.com/apache/spark/pull/13494#discussion_r69830577
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala 
---
@@ -1689,4 +1689,86 @@ class SQLQuerySuite extends QueryTest with 
SQLTestUtils with TestHiveSingleton {
   )
 }
   }
+
+  test("spark-15752 optimize metadata only query for hive table") {
+withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
+  withTable("data_15752", "srcpart_15752", "srctext_15752") {
+val df = Seq((1, "2"), (3, "4")).toDF("key", "value")
+df.createOrReplaceTempView("data_15752")
+sql(
+  """
+|CREATE TABLE srcpart_15752 (col1 INT, col2 STRING)
+|PARTITIONED BY (partcol1 INT, partcol2 STRING) STORED AS 
parquet
+  """.stripMargin)
+for (partcol1 <- Seq(11, 12); partcol2 <- Seq("a", "b")) {
+  sql(
+s"""
+  |INSERT OVERWRITE TABLE srcpart_15752
+  |PARTITION (partcol1='$partcol1', partcol2='$partcol2')
+  |select key, value from data_15752
+""".stripMargin)
+}
+checkAnswer(
+  sql("select partcol1 from srcpart_15752 where partcol1 = 11 
group by partcol1"),
+  Row(11))
+checkAnswer(sql("select max(partcol1) from srcpart_15752"), 
Row(12))
+checkAnswer(sql("select max(partcol1) from srcpart_15752 where 
partcol1 = 11"), Row(11))
+checkAnswer(
+  sql("select max(partcol1) from (select partcol1 from 
srcpart_15752) t"),
+  Row(12))
+checkAnswer(
+  sql("select max(col) from (select partcol1 + 1 as col from 
srcpart_15752 " +
+"where partcol1 = 12) t"),
+  Row(13))
+checkAnswer(sql("select distinct partcol1 from srcpart_15752"), 
Row(11) :: Row(12) :: Nil)
+checkAnswer(sql("select distinct partcol1 from srcpart_15752 where 
partcol1 = 11"), Row(11))
+checkAnswer(
+  sql("select distinct col from (select partcol1 + 1 as col from 
srcpart_15752 " +
+"where partcol1 = 12) t"),
+  Row(13))
+
+// Now donot support metadata only optimizer
--- End diff --

`Now donot support metadata only optimizer`

What this means?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org