[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18421


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r133892835
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are 
collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all 
partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an 
`AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes are 
calculated. When `noscan`
+ * is `true`, only total size in bytes is computed.
+ */
+case class AnalyzePartitionCommand(
+tableIdent: TableIdentifier,
+partitionSpec: Map[String, Option[String]],
+noscan: Boolean = true) extends RunnableCommand {
+
+  private def getPartitionSpec(table: CatalogTable): 
Option[TablePartitionSpec] = {
+val normalizedPartitionSpec =
+  PartitioningUtils.normalizePartitionSpec(partitionSpec, 
table.partitionColumnNames,
+table.identifier.quotedString, conf.resolver);
--- End diff --

Nit: remove `;`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-18 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r133892798
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are 
collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all 
partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an 
`AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes are 
calculated. When `noscan`
+ * is `true`, only total size in bytes is computed.
+ */
+case class AnalyzePartitionCommand(
+tableIdent: TableIdentifier,
+partitionSpec: Map[String, Option[String]],
+noscan: Boolean = true) extends RunnableCommand {
+
+  private def getPartitionSpec(table: CatalogTable): 
Option[TablePartitionSpec] = {
+val normalizedPartitionSpec =
+  PartitioningUtils.normalizePartitionSpec(partitionSpec, 
table.partitionColumnNames,
+table.identifier.quotedString, conf.resolver);
+
+// Report an error if partition columns in partition specification do 
not form
+// a prefix of the list of partition columns defined in the table 
schema
+val isSpecified =
--- End diff --

-> `isNotSpecified`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-17 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r133819737
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are 
collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all 
partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an 
`AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes are 
calculated. When `noscan`
+ * is `true`, only total size in bytes is computed.
+ */
+case class AnalyzePartitionCommand(
+tableIdent: TableIdentifier,
+partitionSpec: Map[String, Option[String]],
+noscan: Boolean = true) extends RunnableCommand {
+
+  private def getPartitionSpec(table: CatalogTable): 
Option[TablePartitionSpec] = {
+val partitionColumnNames = table.partitionColumnNames.toSet
+val partitionSpecWithCase =
--- End diff --

Instead of changing the case, could you call 
`sparkSession.sessionState.conf.resolver`? There are many examples in the code 
base. Thanks! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-10 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r132493566
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -256,6 +257,201 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+def createPartition(ds: String, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  createPartition("2010-01-01", "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", "SELECT '1', 'A' from src UNION ALL 
SELECT '1', 'A' from src")
+  createPartition("2010-01-03", "SELECT '1', 'A' from src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS")
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+  val tableId = TableIdentifier(tableName)
+  val partition =
+spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, 
"hr" -> hr))
+  partition.stats
+}
+
+def assertPartitionStats(
+ds: String,
+hr: String,
+rowCount: Option[BigInt],
+sizeInBytes: BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === rowCount)
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def createPartition(ds: String, hr: Int, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) 
$query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING, hr INT)")
+
+  createPartition("2010-01-01", 10, "SELECT '1', 'A' from src")
+  createPartition("2010-01-01", 11, "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", 10, "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", 11,
+"SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = None, 
sizeInBytes = 2000)
+  assert(queryStats("2010-01-02", "10") === None)
+  assert(queryStats("2010-01-02", "11") === None)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "11", rowCount = None, 
sizeInBytes = 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = Some(500), 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = Some(500), 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "11", rowCount = None, 
sizeInBytes = 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-10 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r132490843
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 ---
@@ -107,6 +109,7 @@ case class CatalogTablePartition(
 if (parameters.nonEmpty) {
   map.put("Partition Parameters", s"{${parameters.map(p => p._1 + "=" 
+ p._2).mkString(", ")}}")
 }
+stats.foreach(s => map.put("Partition Statistics", s.simpleString))
--- End diff --

Indeed. It works like you said it would. :-) Adding a test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-09 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r132099901
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -256,6 +257,222 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+def createPartition(ds: String, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  createPartition("2010-01-01", "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", "SELECT '1', 'A' from src UNION ALL 
SELECT '1', 'A' from src")
+  createPartition("2010-01-03", "SELECT '1', 'A' from src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS")
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+  val tableId = TableIdentifier(tableName)
+  val partition =
+spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, 
"hr" -> hr))
+  partition.stats
+}
+
+def assertPartitionStats(
+ds: String,
+hr: String,
+rowCount: Option[BigInt],
+sizeInBytes: BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === rowCount)
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def createPartition(ds: String, hr: Int, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) 
$query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING, hr INT)")
+
+  createPartition("2010-01-01", 10, "SELECT '1', 'A' from src")
+  createPartition("2010-01-01", 11, "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", 10, "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", 11,
+"SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = None, 
sizeInBytes = 2000)
+  assert(queryStats("2010-01-02", "10") === None)
+  assert(queryStats("2010-01-02", "11") === None)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "11", rowCount = None, 
sizeInBytes = 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = Some(500), 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = Some(500), 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "11", rowCount = None, 
sizeInBytes = 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-09 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r132099286
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -1033,25 +998,126 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 currentFullPath
   }
 
+  private def statsToProperties(
+  stats: CatalogStatistics,
+  schema: StructType): Map[String, String] = {
+
+var statsProperties: Map[String, String] =
+  Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
+if (stats.rowCount.isDefined) {
+  statsProperties += STATISTICS_NUM_ROWS -> 
stats.rowCount.get.toString()
+}
+
+val colNameTypeMap: Map[String, DataType] =
+  schema.fields.map(f => (f.name, f.dataType)).toMap
+stats.colStats.foreach { case (colName, colStat) =>
+  colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, 
v) =>
+statsProperties += (columnStatKeyPropName(colName, k) -> v)
+  }
+}
+
+statsProperties
+  }
+
+  private def statsFromProperties(
+  properties: Map[String, String],
+  table: String,
+  schema: StructType): Option[CatalogStatistics] = {
+
+val statsProps = properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
+if (statsProps.isEmpty) {
+  None
+} else {
+
+  val colStats = new mutable.HashMap[String, ColumnStat]
+
+  // For each column, recover its column stats. Note that this is 
currently a O(n^2) operation,
+  // but given the number of columns it usually not enormous, this is 
probably OK as a start.
+  // If we want to map this a linear operation, we'd need a stronger 
contract between the
+  // naming convention used for serialization.
+  schema.foreach { field =>
+if (statsProps.contains(columnStatKeyPropName(field.name, 
ColumnStat.KEY_VERSION))) {
+  // If "version" field is defined, then the column stat is 
defined.
+  val keyPrefix = columnStatKeyPropName(field.name, "")
+  val colStatMap = 
statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) =>
+(k.drop(keyPrefix.length), v)
+  }
+
+  ColumnStat.fromMap(table, field, colStatMap).foreach {
+colStat => colStats += field.name -> colStat
+  }
+}
+  }
+
+  Some(CatalogStatistics(
+sizeInBytes = BigInt(statsProps(STATISTICS_TOTAL_SIZE)),
+rowCount = statsProps.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
+colStats = colStats.toMap))
+}
+  }
+
   override def alterPartitions(
   db: String,
   table: String,
   newParts: Seq[CatalogTablePartition]): Unit = withClient {
 val lowerCasedParts = newParts.map(p => p.copy(spec = 
lowerCasePartitionSpec(p.spec)))
+
+val rawTable = getRawTable(db, table)
+
+// For datasource tables and hive serde tables created by spark 2.1 or 
higher,
+// the data schema is stored in the table properties.
+val schema = restoreTableMetadata(rawTable).schema
+
+// convert partition statistics to properties so that we can persist 
them through hive api
+val withStatsProps = lowerCasedParts.map(p => {
+  if (p.stats.isDefined) {
+val statsProperties = statsToProperties(p.stats.get, schema)
+p.copy(parameters = p.parameters ++ statsProperties)
+  } else {
+p
+  }
+})
+
 // Note: Before altering table partitions in Hive, you *must* set the 
current database
 // to the one that contains the table of interest. Otherwise you will 
end up with the
 // most helpful error message ever: "Unable to alter partition. alter 
is not possible."
 // See HIVE-2742 for more detail.
 client.setCurrentDatabase(db)
-client.alterPartitions(db, table, lowerCasedParts)
+client.alterPartitions(db, table, withStatsProps)
   }
 
   override def getPartition(
   db: String,
   table: String,
   spec: TablePartitionSpec): CatalogTablePartition = withClient {
 val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
-part.copy(spec = restorePartitionSpec(part.spec, getTable(db, 
table).partitionColumnNames))
+restorePartitionMetadata(part, getTable(db, table))
+  }
+
+  /**
+   * Restores partition metadata from the partition properties.
+   *
+   * Reads partition-level statistics from partition properties, puts these
+   * into [[CatalogTablePartition#stats]] and removes these special entries
+   * from the partition properties.
  

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-08 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r131993682
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -256,6 +257,201 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+def createPartition(ds: String, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  createPartition("2010-01-01", "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", "SELECT '1', 'A' from src UNION ALL 
SELECT '1', 'A' from src")
+  createPartition("2010-01-03", "SELECT '1', 'A' from src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS")
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+  val tableId = TableIdentifier(tableName)
+  val partition =
+spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, 
"hr" -> hr))
+  partition.stats
+}
+
+def assertPartitionStats(
+ds: String,
+hr: String,
+rowCount: Option[BigInt],
+sizeInBytes: BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === rowCount)
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def createPartition(ds: String, hr: Int, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) 
$query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING, hr INT)")
+
+  createPartition("2010-01-01", 10, "SELECT '1', 'A' from src")
+  createPartition("2010-01-01", 11, "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", 10, "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", 11,
+"SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = None, 
sizeInBytes = 2000)
+  assert(queryStats("2010-01-02", "10") === None)
+  assert(queryStats("2010-01-02", "11") === None)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "11", rowCount = None, 
sizeInBytes = 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = Some(500), 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = Some(500), 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "11", rowCount = None, 
sizeInBytes = 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-08 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r131993628
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 ---
@@ -107,6 +109,7 @@ case class CatalogTablePartition(
 if (parameters.nonEmpty) {
   map.put("Partition Parameters", s"{${parameters.map(p => p._1 + "=" 
+ p._2).mkString(", ")}}")
 }
+stats.foreach(s => map.put("Partition Statistics", s.simpleString))
--- End diff --

Yes, please add it. Try it and we should expose it to the external users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-08 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r131957187
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -256,6 +257,201 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+def createPartition(ds: String, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  createPartition("2010-01-01", "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", "SELECT '1', 'A' from src UNION ALL 
SELECT '1', 'A' from src")
+  createPartition("2010-01-03", "SELECT '1', 'A' from src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS")
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+  val tableId = TableIdentifier(tableName)
+  val partition =
+spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, 
"hr" -> hr))
+  partition.stats
+}
+
+def assertPartitionStats(
+ds: String,
+hr: String,
+rowCount: Option[BigInt],
+sizeInBytes: BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === rowCount)
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def createPartition(ds: String, hr: Int, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) 
$query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING, hr INT)")
+
+  createPartition("2010-01-01", 10, "SELECT '1', 'A' from src")
+  createPartition("2010-01-01", 11, "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", 10, "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", 11,
+"SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = None, 
sizeInBytes = 2000)
+  assert(queryStats("2010-01-02", "10") === None)
+  assert(queryStats("2010-01-02", "11") === None)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "11", rowCount = None, 
sizeInBytes = 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = Some(500), 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = Some(500), 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "11", rowCount = None, 
sizeInBytes = 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-08 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r131957033
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are 
collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all 
partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an 
`AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes is calculated. 
When `noscan`
+ * is `false`, only total size in bytes is computed.
+ */
+case class AnalyzePartitionCommand(
+tableIdent: TableIdentifier,
+partitionSpec: Map[String, Option[String]],
+noscan: Boolean = true) extends RunnableCommand {
+
+  private def validatePartitionSpec(table: CatalogTable): 
Option[TablePartitionSpec] = {
+val partitionColumnNames = table.partitionColumnNames.toSet
+val invalidColumnNames = 
partitionSpec.keys.filterNot(partitionColumnNames.contains(_))
+if (invalidColumnNames.nonEmpty) {
+  val tableId = table.identifier
+  throw new AnalysisException(s"Partition specification for table 
'${tableId.table}' " +
+s"in database '${tableId.database.get}' refers to unknown 
partition column(s): " +
+invalidColumnNames.mkString(","))
+}
+
+val filteredSpec = partitionSpec.filter(_._2.isDefined)
--- End diff --

@gatorsmile , this PR doesn't impose this requirement. Hive docs also 
doesn't list it: https://cwiki.apache.org/confluence/display/Hive/StatsDev


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-08 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r131953998
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 ---
@@ -107,6 +109,7 @@ case class CatalogTablePartition(
 if (parameters.nonEmpty) {
   map.put("Partition Parameters", s"{${parameters.map(p => p._1 + "=" 
+ p._2).mkString(", ")}}")
 }
+stats.foreach(s => map.put("Partition Statistics", s.simpleString))
--- End diff --

@gatorsmile , this code doesn't make partition stats appear in the output 
of DESC command. It only adds stats into to CatalogTablePartition.toString 
output (similar to CatalogTable.toString). Do you still want me to add some 
tests for this functionality? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r131733047
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 ---
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, 
CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTableType}
--- End diff --

Nit: `import org.apache.spark.sql.catalyst.catalog.CatalogTableType`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r131732850
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 ---
@@ -37,31 +37,19 @@ case class AnalyzeTableCommand(
 if (tableMeta.tableType == CatalogTableType.VIEW) {
   throw new AnalysisException("ANALYZE TABLE is not supported on 
views.")
 }
-val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta)
 
-val oldTotalSize = 
tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(-1L)
-val oldRowCount = 
tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
-var newStats: Option[CatalogStatistics] = None
-if (newTotalSize >= 0 && newTotalSize != oldTotalSize) {
-  newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
-}
-// We only set rowCount when noscan is false, because otherwise:
-// 1. when total size is not changed, we don't need to alter the table;
-// 2. when total size is changed, `oldRowCount` becomes invalid.
-// This is to make sure that we only record the right statistics.
-if (!noscan) {
-  val newRowCount = sparkSession.table(tableIdentWithDB).count()
-  if (newRowCount >= 0 && newRowCount != oldRowCount) {
-newStats = if (newStats.isDefined) {
-  newStats.map(_.copy(rowCount = Some(BigInt(newRowCount
-} else {
-  Some(CatalogStatistics(
-sizeInBytes = oldTotalSize, rowCount = 
Some(BigInt(newRowCount
-}
+// Compute stats for the whole table
+val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta)
+val newRowCount =
+  if (noscan) {
+None
+  } else {
+Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
--- End diff --

Let us shorten it to a single line.
```Scala
val newRowCount =
  if (noscan) None else 
Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r131717623
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -256,6 +257,201 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+def createPartition(ds: String, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  createPartition("2010-01-01", "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", "SELECT '1', 'A' from src UNION ALL 
SELECT '1', 'A' from src")
+  createPartition("2010-01-03", "SELECT '1', 'A' from src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS")
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+  val tableId = TableIdentifier(tableName)
+  val partition =
+spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, 
"hr" -> hr))
+  partition.stats
+}
+
+def assertPartitionStats(
+ds: String,
+hr: String,
+rowCount: Option[BigInt],
+sizeInBytes: BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === rowCount)
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def createPartition(ds: String, hr: Int, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) 
$query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING, hr INT)")
+
+  createPartition("2010-01-01", 10, "SELECT '1', 'A' from src")
+  createPartition("2010-01-01", 11, "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", 10, "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", 11,
+"SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = None, 
sizeInBytes = 2000)
+  assert(queryStats("2010-01-02", "10") === None)
+  assert(queryStats("2010-01-02", "11") === None)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "11", rowCount = None, 
sizeInBytes = 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = Some(500), 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = Some(500), 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "11", rowCount = None, 
sizeInBytes = 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r131707260
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are 
collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all 
partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an 
`AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes is calculated. 
When `noscan`
+ * is `false`, only total size in bytes is computed.
+ */
+case class AnalyzePartitionCommand(
+tableIdent: TableIdentifier,
+partitionSpec: Map[String, Option[String]],
+noscan: Boolean = true) extends RunnableCommand {
+
+  private def validatePartitionSpec(table: CatalogTable): 
Option[TablePartitionSpec] = {
+val partitionColumnNames = table.partitionColumnNames.toSet
+val invalidColumnNames = 
partitionSpec.keys.filterNot(partitionColumnNames.contains(_))
+if (invalidColumnNames.nonEmpty) {
+  val tableId = table.identifier
+  throw new AnalysisException(s"Partition specification for table 
'${tableId.table}' " +
+s"in database '${tableId.database.get}' refers to unknown 
partition column(s): " +
+invalidColumnNames.mkString(","))
+}
+
+val filteredSpec = partitionSpec.filter(_._2.isDefined)
--- End diff --

In the table insert with the mixed partition spec, we have the following 
requirement:
> In part_spec, the static partition keys must come before the dynamic 
partition keys. That means, all partition columns having constant values need 
to appear before other partition columns that do not have an assigned constant 
value.

Do we have similar requirement in analyze table partitions? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r131706188
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are 
collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all 
partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an 
`AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes is calculated. 
When `noscan`
+ * is `false`, only total size in bytes is computed.
+ */
+case class AnalyzePartitionCommand(
+tableIdent: TableIdentifier,
+partitionSpec: Map[String, Option[String]],
+noscan: Boolean = true) extends RunnableCommand {
+
+  private def validatePartitionSpec(table: CatalogTable): 
Option[TablePartitionSpec] = {
--- End diff --

`validatePartitionSpec ` -> `getDynamicPartitionSpec`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r131704801
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are 
collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all 
partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an 
`AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes is calculated. 
When `noscan`
+ * is `false`, only total size in bytes is computed.
+ */
+case class AnalyzePartitionCommand(
+tableIdent: TableIdentifier,
+partitionSpec: Map[String, Option[String]],
+noscan: Boolean = true) extends RunnableCommand {
+
+  private def validatePartitionSpec(table: CatalogTable): 
Option[TablePartitionSpec] = {
+val partitionColumnNames = table.partitionColumnNames.toSet
+val invalidColumnNames = 
partitionSpec.keys.filterNot(partitionColumnNames.contains(_))
--- End diff --

Here, we also need to consider the conf `spark.sql.caseSensitive`. There 
are multiple examples for the similar issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r131703032
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are 
collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all 
partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an 
`AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes is calculated. 
When `noscan`
+ * is `false`, only total size in bytes is computed.
--- End diff --

`false` -> `true`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r131702852
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are 
collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all 
partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an 
`AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes is calculated. 
When `noscan`
--- End diff --

Nit: `is` -> `are`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r131701675
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -90,30 +90,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command or an 
[[AnalyzeColumnCommand]] command.
-   * Example SQL for analyzing table :
+   * Create an [[AnalyzeTableCommand]] command, or an 
[[AnalyzePartitionCommand]]
+   * or an [[AnalyzeColumnCommand]] command.
+   * Example SQL for analyzing table or a set of partitions :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
+   *   ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], 
partcol2[=val2], ...)]
+   *   COMPUTE STATISTICS [NOSCAN];
* }}}
+   *
* Example SQL for analyzing columns :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
+   *   ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS 
column1, column2;
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
-if (ctx.partitionSpec != null) {
-  logWarning(s"Partition specification is ignored: 
${ctx.partitionSpec.getText}")
+if (ctx.identifier != null &&
+ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
+  throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
 }
-if (ctx.identifier != null) {
-  if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
-throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
+
+val table = visitTableIdentifier(ctx.tableIdentifier)
+if (ctx.identifierSeq() == null) {
+  if (ctx.partitionSpec != null) {
+AnalyzePartitionCommand(table, 
visitPartitionSpec(ctx.partitionSpec),
+  noscan = ctx.identifier != null)
+  } else {
+AnalyzeTableCommand(table, noscan = ctx.identifier != null)
   }
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier))
-} else if (ctx.identifierSeq() == null) {
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), 
noscan = false)
 } else {
+  if (ctx.partitionSpec != null) {
+logWarning("Partition specification is ignored when collecting 
column statistics: " +
+  ctx.partitionSpec.getText)
--- End diff --

Ok. Let us keep it unchanged,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-08-07 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r131700550
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 ---
@@ -107,6 +109,7 @@ case class CatalogTablePartition(
 if (parameters.nonEmpty) {
   map.put("Partition Parameters", s"{${parameters.map(p => p._1 + "=" 
+ p._2).mkString(", ")}}")
 }
+stats.foreach(s => map.put("Partition Statistics", s.simpleString))
--- End diff --

This needs a test case. Could you add a new test file to [this test 
suite](https://github.com/apache/spark/tree/4ecc648ad713f9d618adf0406b5d39981779059d/sql/core/src/test/resources/sql-tests/inputs)
 for analyzing and desc table partitions? You can generate the result file by 
running the command:

> SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/test-only *SQLQueryTestSuite"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-31 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r130408151
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are 
collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all 
partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an 
`AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes is calculated. 
When `noscan`
+ * is `false`, only total size in bytes is computed.
+ */
+case class AnalyzePartitionCommand(
+tableIdent: TableIdentifier,
+partitionSpec: Map[String, Option[String]],
+noscan: Boolean = true) extends RunnableCommand {
+
+  private def validatePartitionSpec(table: CatalogTable): 
Option[TablePartitionSpec] = {
+val partitionColumnNames = table.partitionColumnNames.toSet
+val invalidColumnNames = 
partitionSpec.keys.filterNot(partitionColumnNames.contains(_))
+if (invalidColumnNames.nonEmpty) {
+  val tableId = table.identifier
+  throw new AnalysisException(s"Partition specification for table 
'${tableId.table}' " +
+s"in database '${tableId.database}' refers to unknown partition 
column(s): " +
+invalidColumnNames.mkString(","))
+}
+
+val filteredSpec = partitionSpec.filter(_._2.isDefined)
+if (filteredSpec.isEmpty) {
+  None
+} else {
+  Some(filteredSpec.mapValues(_.get))
+}
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+if (tableMeta.tableType == CatalogTableType.VIEW) {
+  throw new AnalysisException("ANALYZE TABLE is not supported on 
views.")
+}
+
+val partitionValueSpec = validatePartitionSpec(tableMeta)
+
+val partitions = 
sessionState.catalog.listPartitions(tableMeta.identifier, partitionValueSpec)
+
+if (partitions.isEmpty) {
+  if (partitionValueSpec.isDefined) {
+throw new NoSuchPartitionException(db, tableIdent.table, 
partitionValueSpec.get)
+  } else {
+// the user requested to analyze all partitions for a table which 
has no partitions
+// return normally, since there is nothing to do
+return Seq.empty[Row]
+  }
+}
+
+// Compute statistics for individual partitions
+val rowCounts: Map[TablePartitionSpec, BigInt] =
+  if (noscan) {
+Map.empty
+  } else {
+calculateRowCountsPerPartition(sparkSession, tableMeta, 
partitionValueSpec)
+  }
+
+// Update the metastore if newly computed statistics are different 
from those
+// recorded in the metastore.
+val 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-31 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r130404020
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -90,30 +90,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command or an 
[[AnalyzeColumnCommand]] command.
-   * Example SQL for analyzing table :
+   * Create an [[AnalyzeTableCommand]] command, or an 
[[AnalyzePartitionCommand]]
+   * or an [[AnalyzeColumnCommand]] command.
+   * Example SQL for analyzing table or a set of partitions :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
+   *   ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], 
partcol2[=val2], ...)]
+   *   COMPUTE STATISTICS [NOSCAN];
* }}}
+   *
* Example SQL for analyzing columns :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
+   *   ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS 
column1, column2;
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
-if (ctx.partitionSpec != null) {
-  logWarning(s"Partition specification is ignored: 
${ctx.partitionSpec.getText}")
+if (ctx.identifier != null &&
+ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
+  throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
 }
-if (ctx.identifier != null) {
-  if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
-throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
+
+val table = visitTableIdentifier(ctx.tableIdentifier)
+if (ctx.identifierSeq() == null) {
+  if (ctx.partitionSpec != null) {
+AnalyzePartitionCommand(table, 
visitPartitionSpec(ctx.partitionSpec),
+  noscan = ctx.identifier != null)
+  } else {
+AnalyzeTableCommand(table, noscan = ctx.identifier != null)
   }
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier))
-} else if (ctx.identifierSeq() == null) {
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), 
noscan = false)
 } else {
+  if (ctx.partitionSpec != null) {
+logWarning("Partition specification is ignored when collecting 
column statistics: " +
+  ctx.partitionSpec.getText)
--- End diff --

This is the existing behavoir. ANALYZE TABLE command simply ignores 
PARTITION clause. This PR is adding support for PARTITION clause for COMPUTE 
STATISTICS, but keeps COMPUTE STATISTICS FOR COLUMNS as is. I'm planning to add 
partition support to FOR COLUMN In a follow-up PR.

Changing this code to throw an exception in this PR will *break* existing 
uses if folks are relying on a WARN.

Would it be OK to keep this functionality unchanged in the PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-31 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r130402344
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -112,4 +112,29 @@ object CommandUtils extends Logging {
 size
   }
 
+  def compareAndGetNewStats(
+  oldStats: Option[CatalogStatistics],
+  newTotalSize: BigInt,
+  newRowCount: Option[BigInt]): Option[CatalogStatistics] = {
+val oldTotalSize = oldStats.map(_.sizeInBytes.toLong).getOrElse(0L)
+val oldRowCount = 
oldStats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
+var newStats: Option[CatalogStatistics] = None
+if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
--- End diff --

Indeed. I fixed this to use -1 and >=0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r128313551
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are 
collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all 
partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an 
`AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes is calculated. 
When `noscan`
+ * is `false`, only total size in bytes is computed.
+ */
+case class AnalyzePartitionCommand(
+tableIdent: TableIdentifier,
+partitionSpec: Map[String, Option[String]],
+noscan: Boolean = true) extends RunnableCommand {
+
+  private def validatePartitionSpec(table: CatalogTable): 
Option[TablePartitionSpec] = {
+val partitionColumnNames = table.partitionColumnNames.toSet
+val invalidColumnNames = 
partitionSpec.keys.filterNot(partitionColumnNames.contains(_))
+if (invalidColumnNames.nonEmpty) {
+  val tableId = table.identifier
+  throw new AnalysisException(s"Partition specification for table 
'${tableId.table}' " +
+s"in database '${tableId.database}' refers to unknown partition 
column(s): " +
+invalidColumnNames.mkString(","))
+}
+
+val filteredSpec = partitionSpec.filter(_._2.isDefined)
+if (filteredSpec.isEmpty) {
+  None
+} else {
+  Some(filteredSpec.mapValues(_.get))
+}
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+if (tableMeta.tableType == CatalogTableType.VIEW) {
+  throw new AnalysisException("ANALYZE TABLE is not supported on 
views.")
+}
+
+val partitionValueSpec = validatePartitionSpec(tableMeta)
+
+val partitions = 
sessionState.catalog.listPartitions(tableMeta.identifier, partitionValueSpec)
+
+if (partitions.isEmpty) {
+  if (partitionValueSpec.isDefined) {
+throw new NoSuchPartitionException(db, tableIdent.table, 
partitionValueSpec.get)
+  } else {
+// the user requested to analyze all partitions for a table which 
has no partitions
+// return normally, since there is nothing to do
+return Seq.empty[Row]
+  }
+}
+
+// Compute statistics for individual partitions
+val rowCounts: Map[TablePartitionSpec, BigInt] =
+  if (noscan) {
+Map.empty
+  } else {
+calculateRowCountsPerPartition(sparkSession, tableMeta, 
partitionValueSpec)
+  }
+
+// Update the metastore if newly computed statistics are different 
from those
+// recorded in the metastore.
+val 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r128306842
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are 
collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all 
partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an 
`AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes is calculated. 
When `noscan`
+ * is `false`, only total size in bytes is computed.
+ */
+case class AnalyzePartitionCommand(
+tableIdent: TableIdentifier,
+partitionSpec: Map[String, Option[String]],
+noscan: Boolean = true) extends RunnableCommand {
+
+  private def validatePartitionSpec(table: CatalogTable): 
Option[TablePartitionSpec] = {
+val partitionColumnNames = table.partitionColumnNames.toSet
+val invalidColumnNames = 
partitionSpec.keys.filterNot(partitionColumnNames.contains(_))
+if (invalidColumnNames.nonEmpty) {
+  val tableId = table.identifier
+  throw new AnalysisException(s"Partition specification for table 
'${tableId.table}' " +
+s"in database '${tableId.database}' refers to unknown partition 
column(s): " +
+invalidColumnNames.mkString(","))
+}
+
+val filteredSpec = partitionSpec.filter(_._2.isDefined)
+if (filteredSpec.isEmpty) {
+  None
+} else {
+  Some(filteredSpec.mapValues(_.get))
+}
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+if (tableMeta.tableType == CatalogTableType.VIEW) {
+  throw new AnalysisException("ANALYZE TABLE is not supported on 
views.")
+}
+
+val partitionValueSpec = validatePartitionSpec(tableMeta)
+
+val partitions = 
sessionState.catalog.listPartitions(tableMeta.identifier, partitionValueSpec)
+
+if (partitions.isEmpty) {
+  if (partitionValueSpec.isDefined) {
+throw new NoSuchPartitionException(db, tableIdent.table, 
partitionValueSpec.get)
+  } else {
+// the user requested to analyze all partitions for a table which 
has no partitions
+// return normally, since there is nothing to do
+return Seq.empty[Row]
+  }
+}
+
+// Compute statistics for individual partitions
+val rowCounts: Map[TablePartitionSpec, BigInt] =
+  if (noscan) {
+Map.empty
+  } else {
+calculateRowCountsPerPartition(sparkSession, tableMeta, 
partitionValueSpec)
+  }
+
+// Update the metastore if newly computed statistics are different 
from those
+// recorded in the metastore.
+val 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r128306520
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are 
collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all 
partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an 
`AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes is calculated. 
When `noscan`
+ * is `false`, only total size in bytes is computed.
+ */
+case class AnalyzePartitionCommand(
+tableIdent: TableIdentifier,
+partitionSpec: Map[String, Option[String]],
+noscan: Boolean = true) extends RunnableCommand {
+
+  private def validatePartitionSpec(table: CatalogTable): 
Option[TablePartitionSpec] = {
+val partitionColumnNames = table.partitionColumnNames.toSet
+val invalidColumnNames = 
partitionSpec.keys.filterNot(partitionColumnNames.contains(_))
+if (invalidColumnNames.nonEmpty) {
+  val tableId = table.identifier
+  throw new AnalysisException(s"Partition specification for table 
'${tableId.table}' " +
+s"in database '${tableId.database}' refers to unknown partition 
column(s): " +
+invalidColumnNames.mkString(","))
+}
+
+val filteredSpec = partitionSpec.filter(_._2.isDefined)
+if (filteredSpec.isEmpty) {
+  None
+} else {
+  Some(filteredSpec.mapValues(_.get))
+}
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+if (tableMeta.tableType == CatalogTableType.VIEW) {
+  throw new AnalysisException("ANALYZE TABLE is not supported on 
views.")
+}
+
+val partitionValueSpec = validatePartitionSpec(tableMeta)
+
+val partitions = 
sessionState.catalog.listPartitions(tableMeta.identifier, partitionValueSpec)
+
+if (partitions.isEmpty) {
+  if (partitionValueSpec.isDefined) {
+throw new NoSuchPartitionException(db, tableIdent.table, 
partitionValueSpec.get)
+  } else {
+// the user requested to analyze all partitions for a table which 
has no partitions
+// return normally, since there is nothing to do
+return Seq.empty[Row]
+  }
+}
+
+// Compute statistics for individual partitions
+val rowCounts: Map[TablePartitionSpec, BigInt] =
+  if (noscan) {
+Map.empty
+  } else {
+calculateRowCountsPerPartition(sparkSession, tableMeta, 
partitionValueSpec)
+  }
+
+// Update the metastore if newly computed statistics are different 
from those
+// recorded in the metastore.
+val 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r128306573
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are 
collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all 
partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an 
`AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes is calculated. 
When `noscan`
+ * is `false`, only total size in bytes is computed.
+ */
+case class AnalyzePartitionCommand(
+tableIdent: TableIdentifier,
+partitionSpec: Map[String, Option[String]],
+noscan: Boolean = true) extends RunnableCommand {
+
+  private def validatePartitionSpec(table: CatalogTable): 
Option[TablePartitionSpec] = {
+val partitionColumnNames = table.partitionColumnNames.toSet
+val invalidColumnNames = 
partitionSpec.keys.filterNot(partitionColumnNames.contains(_))
+if (invalidColumnNames.nonEmpty) {
+  val tableId = table.identifier
+  throw new AnalysisException(s"Partition specification for table 
'${tableId.table}' " +
+s"in database '${tableId.database}' refers to unknown partition 
column(s): " +
+invalidColumnNames.mkString(","))
+}
+
+val filteredSpec = partitionSpec.filter(_._2.isDefined)
+if (filteredSpec.isEmpty) {
+  None
+} else {
+  Some(filteredSpec.mapValues(_.get))
+}
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+if (tableMeta.tableType == CatalogTableType.VIEW) {
+  throw new AnalysisException("ANALYZE TABLE is not supported on 
views.")
+}
+
+val partitionValueSpec = validatePartitionSpec(tableMeta)
+
+val partitions = 
sessionState.catalog.listPartitions(tableMeta.identifier, partitionValueSpec)
+
+if (partitions.isEmpty) {
+  if (partitionValueSpec.isDefined) {
+throw new NoSuchPartitionException(db, tableIdent.table, 
partitionValueSpec.get)
+  } else {
+// the user requested to analyze all partitions for a table which 
has no partitions
+// return normally, since there is nothing to do
+return Seq.empty[Row]
+  }
+}
+
+// Compute statistics for individual partitions
+val rowCounts: Map[TablePartitionSpec, BigInt] =
+  if (noscan) {
+Map.empty
+  } else {
+calculateRowCountsPerPartition(sparkSession, tableMeta, 
partitionValueSpec)
+  }
+
+// Update the metastore if newly computed statistics are different 
from those
+// recorded in the metastore.
+val 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r128306201
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are 
collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all 
partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an 
`AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes is calculated. 
When `noscan`
+ * is `false`, only total size in bytes is computed.
+ */
+case class AnalyzePartitionCommand(
+tableIdent: TableIdentifier,
+partitionSpec: Map[String, Option[String]],
+noscan: Boolean = true) extends RunnableCommand {
+
+  private def validatePartitionSpec(table: CatalogTable): 
Option[TablePartitionSpec] = {
+val partitionColumnNames = table.partitionColumnNames.toSet
+val invalidColumnNames = 
partitionSpec.keys.filterNot(partitionColumnNames.contains(_))
+if (invalidColumnNames.nonEmpty) {
+  val tableId = table.identifier
+  throw new AnalysisException(s"Partition specification for table 
'${tableId.table}' " +
+s"in database '${tableId.database}' refers to unknown partition 
column(s): " +
+invalidColumnNames.mkString(","))
+}
+
+val filteredSpec = partitionSpec.filter(_._2.isDefined)
+if (filteredSpec.isEmpty) {
+  None
+} else {
+  Some(filteredSpec.mapValues(_.get))
+}
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+if (tableMeta.tableType == CatalogTableType.VIEW) {
+  throw new AnalysisException("ANALYZE TABLE is not supported on 
views.")
+}
+
+val partitionValueSpec = validatePartitionSpec(tableMeta)
+
+val partitions = 
sessionState.catalog.listPartitions(tableMeta.identifier, partitionValueSpec)
+
+if (partitions.isEmpty) {
+  if (partitionValueSpec.isDefined) {
+throw new NoSuchPartitionException(db, tableIdent.table, 
partitionValueSpec.get)
+  } else {
+// the user requested to analyze all partitions for a table which 
has no partitions
+// return normally, since there is nothing to do
+return Seq.empty[Row]
+  }
+}
+
+// Compute statistics for individual partitions
+val rowCounts: Map[TablePartitionSpec, BigInt] =
+  if (noscan) {
+Map.empty
+  } else {
+calculateRowCountsPerPartition(sparkSession, tableMeta, 
partitionValueSpec)
+  }
+
+// Update the metastore if newly computed statistics are different 
from those
+// recorded in the metastore.
+val 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r128306062
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are 
collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all 
partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an 
`AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes is calculated. 
When `noscan`
+ * is `false`, only total size in bytes is computed.
+ */
+case class AnalyzePartitionCommand(
+tableIdent: TableIdentifier,
+partitionSpec: Map[String, Option[String]],
+noscan: Boolean = true) extends RunnableCommand {
+
+  private def validatePartitionSpec(table: CatalogTable): 
Option[TablePartitionSpec] = {
+val partitionColumnNames = table.partitionColumnNames.toSet
+val invalidColumnNames = 
partitionSpec.keys.filterNot(partitionColumnNames.contains(_))
+if (invalidColumnNames.nonEmpty) {
+  val tableId = table.identifier
+  throw new AnalysisException(s"Partition specification for table 
'${tableId.table}' " +
+s"in database '${tableId.database}' refers to unknown partition 
column(s): " +
+invalidColumnNames.mkString(","))
+}
+
+val filteredSpec = partitionSpec.filter(_._2.isDefined)
+if (filteredSpec.isEmpty) {
+  None
+} else {
+  Some(filteredSpec.mapValues(_.get))
+}
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+val sessionState = sparkSession.sessionState
+val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+if (tableMeta.tableType == CatalogTableType.VIEW) {
+  throw new AnalysisException("ANALYZE TABLE is not supported on 
views.")
+}
+
+val partitionValueSpec = validatePartitionSpec(tableMeta)
+
+val partitions = 
sessionState.catalog.listPartitions(tableMeta.identifier, partitionValueSpec)
+
+if (partitions.isEmpty) {
+  if (partitionValueSpec.isDefined) {
+throw new NoSuchPartitionException(db, tableIdent.table, 
partitionValueSpec.get)
+  } else {
+// the user requested to analyze all partitions for a table which 
has no partitions
+// return normally, since there is nothing to do
+return Seq.empty[Row]
+  }
+}
+
+// Compute statistics for individual partitions
+val rowCounts: Map[TablePartitionSpec, BigInt] =
+  if (noscan) {
+Map.empty
+  } else {
+calculateRowCountsPerPartition(sparkSession, tableMeta, 
partitionValueSpec)
+  }
+
+// Update the metastore if newly computed statistics are different 
from those
+// recorded in the metastore.
+val 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r128300439
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -90,30 +90,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command or an 
[[AnalyzeColumnCommand]] command.
-   * Example SQL for analyzing table :
+   * Create an [[AnalyzeTableCommand]] command, or an 
[[AnalyzePartitionCommand]]
+   * or an [[AnalyzeColumnCommand]] command.
+   * Example SQL for analyzing table or a set of partitions :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
+   *   ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], 
partcol2[=val2], ...)]
+   *   COMPUTE STATISTICS [NOSCAN];
* }}}
+   *
* Example SQL for analyzing columns :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
+   *   ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS 
column1, column2;
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
-if (ctx.partitionSpec != null) {
-  logWarning(s"Partition specification is ignored: 
${ctx.partitionSpec.getText}")
+if (ctx.identifier != null &&
+ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
+  throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
 }
-if (ctx.identifier != null) {
-  if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
-throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
+
+val table = visitTableIdentifier(ctx.tableIdentifier)
+if (ctx.identifierSeq() == null) {
+  if (ctx.partitionSpec != null) {
+AnalyzePartitionCommand(table, 
visitPartitionSpec(ctx.partitionSpec),
+  noscan = ctx.identifier != null)
+  } else {
+AnalyzeTableCommand(table, noscan = ctx.identifier != null)
   }
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier))
-} else if (ctx.identifierSeq() == null) {
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), 
noscan = false)
 } else {
+  if (ctx.partitionSpec != null) {
+logWarning("Partition specification is ignored when collecting 
column statistics: " +
+  ctx.partitionSpec.getText)
--- End diff --

We should issue an exception here; otherwise, users might get confused 
since it has different behaviors. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r128296684
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -182,6 +183,189 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+def createPartition(ds: String, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  createPartition("2010-01-01", "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", "SELECT '1', 'A' from src UNION ALL 
SELECT '1', 'A' from src")
+  createPartition("2010-01-03", "SELECT '1', 'A' from src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS")
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+  val tableId = TableIdentifier(tableName)
+  val partition =
+spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, 
"hr" -> hr))
+  partition.stats
+}
+
+def assertPartitionStats(
+ds: String,
+hr: String,
+rowCount: Option[BigInt],
+sizeInBytes: BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === rowCount)
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def createPartition(ds: String, hr: Int, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) 
$query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING, hr INT)")
+
+  createPartition("2010-01-01", 10, "SELECT '1', 'A' from src")
+  createPartition("2010-01-01", 11, "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", 10, "SELECT '1', 'A' from src")
+  createPartition("2010-01-02", 11,
+"SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = None, 
sizeInBytes = 2000)
+  assert(queryStats("2010-01-02", "10") === None)
+  assert(queryStats("2010-01-02", "11") === None)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "11", rowCount = None, 
sizeInBytes = 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  assertPartitionStats("2010-01-01", "10", rowCount = Some(500), 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-01", "11", rowCount = Some(500), 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "10", rowCount = None, 
sizeInBytes = 2000)
+  assertPartitionStats("2010-01-02", "11", rowCount = None, 
sizeInBytes = 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r128293867
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -90,30 +90,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command or an 
[[AnalyzeColumnCommand]] command.
-   * Example SQL for analyzing table :
+   * Create an [[AnalyzeTableCommand]] command, or an 
[[AnalyzePartitionCommand]]
+   * or an [[AnalyzeColumnCommand]] command.
+   * Example SQL for analyzing table or a set of partitions :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
+   *   ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], 
partcol2[=val2], ...)]
+   *   COMPUTE STATISTICS [NOSCAN];
* }}}
+   *
* Example SQL for analyzing columns :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
+   *   ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS 
column1, column2;
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
-if (ctx.partitionSpec != null) {
-  logWarning(s"Partition specification is ignored: 
${ctx.partitionSpec.getText}")
+if (ctx.identifier != null &&
+ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
+  throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
 }
-if (ctx.identifier != null) {
-  if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
-throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
+
+val table = visitTableIdentifier(ctx.tableIdentifier)
+if (ctx.identifierSeq() == null) {
+  if (ctx.partitionSpec != null) {
+AnalyzePartitionCommand(table, 
visitPartitionSpec(ctx.partitionSpec),
+  noscan = ctx.identifier != null)
+  } else {
+AnalyzeTableCommand(table, noscan = ctx.identifier != null)
   }
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier))
-} else if (ctx.identifierSeq() == null) {
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), 
noscan = false)
 } else {
+  if (ctx.partitionSpec != null) {
+logWarning("Partition specification is ignored when collecting 
column statistics: " +
+  ctx.partitionSpec.getText)
--- End diff --

Why?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r128291908
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -90,30 +90,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command or an 
[[AnalyzeColumnCommand]] command.
-   * Example SQL for analyzing table :
+   * Create an [[AnalyzeTableCommand]] command, or an 
[[AnalyzePartitionCommand]]
+   * or an [[AnalyzeColumnCommand]] command.
+   * Example SQL for analyzing table or a set of partitions :
--- End diff --

`table or a set of partitions :` -> `a table or a set of partitions:`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r128274136
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 ---
@@ -37,31 +37,17 @@ case class AnalyzeTableCommand(
 if (tableMeta.tableType == CatalogTableType.VIEW) {
   throw new AnalysisException("ANALYZE TABLE is not supported on 
views.")
 }
-val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta)
 
-val oldTotalSize = 
tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(-1L)
-val oldRowCount = 
tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
-var newStats: Option[CatalogStatistics] = None
-if (newTotalSize >= 0 && newTotalSize != oldTotalSize) {
-  newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
-}
-// We only set rowCount when noscan is false, because otherwise:
-// 1. when total size is not changed, we don't need to alter the table;
-// 2. when total size is changed, `oldRowCount` becomes invalid.
-// This is to make sure that we only record the right statistics.
-if (!noscan) {
-  val newRowCount = sparkSession.table(tableIdentWithDB).count()
-  if (newRowCount >= 0 && newRowCount != oldRowCount) {
-newStats = if (newStats.isDefined) {
-  newStats.map(_.copy(rowCount = Some(BigInt(newRowCount
-} else {
-  Some(CatalogStatistics(
-sizeInBytes = oldTotalSize, rowCount = 
Some(BigInt(newRowCount
-}
+// Compute stats for the whole table
+val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta)
+val newRowCount =
+  if (noscan) {
+None
+  } else {
+Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
   }
-}
-// Update the metastore if the above statistics of the table are 
different from those
-// recorded in the metastore.
--- End diff --

add it back?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r128272119
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -112,4 +112,29 @@ object CommandUtils extends Logging {
 size
   }
 
+  def compareAndGetNewStats(
+  oldStats: Option[CatalogStatistics],
+  newTotalSize: BigInt,
+  newRowCount: Option[BigInt]): Option[CatalogStatistics] = {
+val oldTotalSize = oldStats.map(_.sizeInBytes.toLong).getOrElse(0L)
--- End diff --

The latest code has a change here. Update the initial value to `-1L`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r128269600
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -1028,25 +994,115 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 currentFullPath
   }
 
+  private def statsToProperties(
+  stats: CatalogStatistics,
+  schema: StructType): Map[String, String] = {
+
+var statsProperties: Map[String, String] =
+  Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
+if (stats.rowCount.isDefined) {
+  statsProperties += STATISTICS_NUM_ROWS -> 
stats.rowCount.get.toString()
+}
+
+val colNameTypeMap: Map[String, DataType] =
+  schema.fields.map(f => (f.name, f.dataType)).toMap
+stats.colStats.foreach { case (colName, colStat) =>
+  colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, 
v) =>
+statsProperties += (columnStatKeyPropName(colName, k) -> v)
+  }
+}
+
+statsProperties
+  }
+
+  private def statsFromProperties(
+  properties: Map[String, String],
+  table: String,
+  schema: StructType): Option[CatalogStatistics] = {
+
+val statsProps = properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
+if (statsProps.isEmpty) {
+  None
+} else {
+
+  val colStats = new mutable.HashMap[String, ColumnStat]
+
+  // For each column, recover its column stats. Note that this is 
currently a O(n^2) operation,
+  // but given the number of columns it usually not enormous, this is 
probably OK as a start.
+  // If we want to map this a linear operation, we'd need a stronger 
contract between the
+  // naming convention used for serialization.
+  schema.foreach { field =>
+if (statsProps.contains(columnStatKeyPropName(field.name, 
ColumnStat.KEY_VERSION))) {
+  // If "version" field is defined, then the column stat is 
defined.
+  val keyPrefix = columnStatKeyPropName(field.name, "")
+  val colStatMap = 
statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) =>
+(k.drop(keyPrefix.length), v)
+  }
+
+  ColumnStat.fromMap(table, field, colStatMap).foreach {
+colStat => colStats += field.name -> colStat
+  }
+}
+  }
+
+  Some(CatalogStatistics(
+sizeInBytes = BigInt(statsProps(STATISTICS_TOTAL_SIZE)),
+rowCount = statsProps.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
+colStats = colStats.toMap))
+}
+  }
+
   override def alterPartitions(
   db: String,
   table: String,
   newParts: Seq[CatalogTablePartition]): Unit = withClient {
 val lowerCasedParts = newParts.map(p => p.copy(spec = 
lowerCasePartitionSpec(p.spec)))
+
+val rawTable = getRawTable(db, table)
+
+// convert partition statistics to properties so that we can persist 
them through hive api
+val withStatsProps = lowerCasedParts.map(p => {
+  if (p.stats.isDefined) {
+val statsProperties = statsToProperties(p.stats.get, 
rawTable.schema)
+p.copy(parameters = p.parameters ++ statsProperties)
+  } else {
+p
+  }
+})
+
 // Note: Before altering table partitions in Hive, you *must* set the 
current database
 // to the one that contains the table of interest. Otherwise you will 
end up with the
 // most helpful error message ever: "Unable to alter partition. alter 
is not possible."
 // See HIVE-2742 for more detail.
 client.setCurrentDatabase(db)
-client.alterPartitions(db, table, lowerCasedParts)
+client.alterPartitions(db, table, withStatsProps)
   }
 
   override def getPartition(
   db: String,
   table: String,
   spec: TablePartitionSpec): CatalogTablePartition = withClient {
 val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
-part.copy(spec = restorePartitionSpec(part.spec, getTable(db, 
table).partitionColumnNames))
+restorePartitionMetadata(part, getTable(db, table))
+  }
+
+  private def restorePartitionMetadata(
+  partition: CatalogTablePartition,
+  table: CatalogTable): CatalogTablePartition = {
+val restoredSpec = restorePartitionSpec(partition.spec, 
table.partitionColumnNames)
+
+// construct Spark's statistics from information in Hive metastore
--- End diff --

Here, we only respect Spark own statistics. Please also clarify it here. 


---
If your project is set up for it, you can reply to this email and 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r128268309
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -1028,25 +994,115 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 currentFullPath
   }
 
+  private def statsToProperties(
+  stats: CatalogStatistics,
+  schema: StructType): Map[String, String] = {
+
+var statsProperties: Map[String, String] =
+  Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
+if (stats.rowCount.isDefined) {
+  statsProperties += STATISTICS_NUM_ROWS -> 
stats.rowCount.get.toString()
+}
+
+val colNameTypeMap: Map[String, DataType] =
+  schema.fields.map(f => (f.name, f.dataType)).toMap
+stats.colStats.foreach { case (colName, colStat) =>
+  colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, 
v) =>
+statsProperties += (columnStatKeyPropName(colName, k) -> v)
+  }
+}
+
+statsProperties
+  }
+
+  private def statsFromProperties(
+  properties: Map[String, String],
+  table: String,
+  schema: StructType): Option[CatalogStatistics] = {
+
+val statsProps = properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
+if (statsProps.isEmpty) {
+  None
+} else {
+
+  val colStats = new mutable.HashMap[String, ColumnStat]
+
+  // For each column, recover its column stats. Note that this is 
currently a O(n^2) operation,
+  // but given the number of columns it usually not enormous, this is 
probably OK as a start.
+  // If we want to map this a linear operation, we'd need a stronger 
contract between the
+  // naming convention used for serialization.
+  schema.foreach { field =>
+if (statsProps.contains(columnStatKeyPropName(field.name, 
ColumnStat.KEY_VERSION))) {
+  // If "version" field is defined, then the column stat is 
defined.
+  val keyPrefix = columnStatKeyPropName(field.name, "")
+  val colStatMap = 
statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) =>
+(k.drop(keyPrefix.length), v)
+  }
+
+  ColumnStat.fromMap(table, field, colStatMap).foreach {
+colStat => colStats += field.name -> colStat
+  }
+}
+  }
+
+  Some(CatalogStatistics(
+sizeInBytes = BigInt(statsProps(STATISTICS_TOTAL_SIZE)),
+rowCount = statsProps.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
+colStats = colStats.toMap))
+}
+  }
+
   override def alterPartitions(
   db: String,
   table: String,
   newParts: Seq[CatalogTablePartition]): Unit = withClient {
 val lowerCasedParts = newParts.map(p => p.copy(spec = 
lowerCasePartitionSpec(p.spec)))
+
+val rawTable = getRawTable(db, table)
+
+// convert partition statistics to properties so that we can persist 
them through hive api
+val withStatsProps = lowerCasedParts.map(p => {
+  if (p.stats.isDefined) {
+val statsProperties = statsToProperties(p.stats.get, 
rawTable.schema)
+p.copy(parameters = p.parameters ++ statsProperties)
+  } else {
+p
+  }
+})
+
 // Note: Before altering table partitions in Hive, you *must* set the 
current database
 // to the one that contains the table of interest. Otherwise you will 
end up with the
 // most helpful error message ever: "Unable to alter partition. alter 
is not possible."
 // See HIVE-2742 for more detail.
 client.setCurrentDatabase(db)
-client.alterPartitions(db, table, lowerCasedParts)
+client.alterPartitions(db, table, withStatsProps)
   }
 
   override def getPartition(
   db: String,
   table: String,
   spec: TablePartitionSpec): CatalogTablePartition = withClient {
 val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
-part.copy(spec = restorePartitionSpec(part.spec, getTable(db, 
table).partitionColumnNames))
+restorePartitionMetadata(part, getTable(db, table))
+  }
+
+  private def restorePartitionMetadata(
--- End diff --

Like what we did for `restoreTableMetadata`, please add the function 
description.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r128166394
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -112,4 +112,29 @@ object CommandUtils extends Logging {
 size
   }
 
+  def compareAndGetNewStats(
+  oldStats: Option[CatalogStatistics],
+  newTotalSize: BigInt,
+  newRowCount: Option[BigInt]): Option[CatalogStatistics] = {
+val oldTotalSize = oldStats.map(_.sizeInBytes.toLong).getOrElse(0L)
+val oldRowCount = 
oldStats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
+var newStats: Option[CatalogStatistics] = None
+if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
--- End diff --

Is that possible the new one is equal to zero?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-11 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126725750
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -181,6 +182,151 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  private val SELECT_FROM_SRC = "SELECT '1', 'A' from src"
+
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+def createPartition(ds: String, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  createPartition("2010-01-01", SELECT_FROM_SRC)
+  createPartition("2010-01-02", s"$SELECT_FROM_SRC UNION ALL 
$SELECT_FROM_SRC")
+  createPartition("2010-01-03", SELECT_FROM_SRC)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS")
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+  val tableId = TableIdentifier(tableName)
+  val partition =
+spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, 
"hr" -> hr))
+  partition.stats
+}
+
+def assertStats(ds: String, hr: String, rowCount: BigInt, sizeInBytes: 
BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === Some(rowCount))
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def assertSizeInBytesStats(ds: String, hr: String, sizeInBytes: 
BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === None)
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def createPartition(ds: String, hr: Int, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) 
$query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING, hr INT)")
+
+  createPartition("2010-01-01", 10, SELECT_FROM_SRC)
+  createPartition("2010-01-01", 11, SELECT_FROM_SRC)
+  createPartition("2010-01-02", 10, SELECT_FROM_SRC)
+  createPartition("2010-01-02", 11, s"$SELECT_FROM_SRC UNION ALL 
$SELECT_FROM_SRC")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  assertSizeInBytesStats("2010-01-01", "10", 2000)
+  assertSizeInBytesStats("2010-01-01", "11", 2000)
+  assert(queryStats("2010-01-02", "10") === None)
+  assert(queryStats("2010-01-02", "11") === None)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assertSizeInBytesStats("2010-01-01", "10", 2000)
+  assertSizeInBytesStats("2010-01-01", "11", 2000)
+  assertSizeInBytesStats("2010-01-02", "10", 2000)
+  assertSizeInBytesStats("2010-01-02", "11", 2*2000)
--- End diff --

I've added checks for all 4 partitions after *every* analyze statement to 
make sure there are no unintended side effects, e.g. analyzing partition A 
doesn't change stats for partition B. Would it be ok to keep this logic or do 
you still think I should remove it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-11 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126724336
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -181,6 +182,151 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  private val SELECT_FROM_SRC = "SELECT '1', 'A' from src"
+
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+def createPartition(ds: String, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  createPartition("2010-01-01", SELECT_FROM_SRC)
+  createPartition("2010-01-02", s"$SELECT_FROM_SRC UNION ALL 
$SELECT_FROM_SRC")
+  createPartition("2010-01-03", SELECT_FROM_SRC)
--- End diff --

@wzhfy, I'm using multiple partitions to protect against bugs like the 
following:

- ANALYZE PARTITION collects stats for the whole table instead of a single 
partition (hence, test table needs to have 2+ partitions)
- ANALYZE PARTITION collects stats for the *first* partition instead of the 
partition specified in the SQL command (hence, need to analyze a couple of 
different partitions)

Using "analyze single partition" for the test name I was hoping to 
communicate the intent of testing analyzing of a single partition as opposed to 
a set of partitions. Does this make sense? Do you still want me to reduce the 
test to a single-partition table?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-10 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126452142
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -93,30 +93,50 @@ class SparkSqlAstBuilder extends AstBuilder {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command or an 
[[AnalyzeColumnCommand]] command.
-   * Example SQL for analyzing table :
+   * Create an [[AnalyzeTableCommand]] command, or an 
[[AnalyzePartitionCommand]]
+   * or an [[AnalyzeColumnCommand]] command.
+   * Example SQL for analyzing table or a set of partitions :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
+   *   ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], 
partcol2[=val2], ...)]
+   *   COMPUTE STATISTICS [NOSCAN];
* }}}
+   *
* Example SQL for analyzing columns :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
+   *   ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS 
column1, column2;
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
-if (ctx.partitionSpec != null) {
-  logWarning(s"Partition specification is ignored: 
${ctx.partitionSpec.getText}")
+if (ctx.identifier != null &&
+ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
+  throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
 }
-if (ctx.identifier != null) {
-  if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
-throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
+
+val partitionSpec =
+  if (ctx.partitionSpec != null) {
+val filteredSpec = 
visitPartitionSpec(ctx.partitionSpec).filter(_._2.isDefined)
+if (filteredSpec.isEmpty) {
+  None
+} else {
+  Some(filteredSpec.mapValues(_.get))
+}
+  } else {
+None
+  }
+
+val table = visitTableIdentifier(ctx.tableIdentifier)
+if (ctx.identifierSeq() == null) {
+  if (partitionSpec.isDefined) {
+AnalyzePartitionCommand(table, partitionSpec.get, noscan = 
ctx.identifier != null)
+  } else {
+AnalyzeTableCommand(table, noscan = ctx.identifier != null)
--- End diff --

@wzhfy , good catch. Thank you! I'll fix this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-07 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126273805
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -93,30 +93,50 @@ class SparkSqlAstBuilder extends AstBuilder {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command or an 
[[AnalyzeColumnCommand]] command.
-   * Example SQL for analyzing table :
+   * Create an [[AnalyzeTableCommand]] command, or an 
[[AnalyzePartitionCommand]]
+   * or an [[AnalyzeColumnCommand]] command.
+   * Example SQL for analyzing table or a set of partitions :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
+   *   ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], 
partcol2[=val2], ...)]
+   *   COMPUTE STATISTICS [NOSCAN];
* }}}
+   *
* Example SQL for analyzing columns :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
+   *   ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS 
column1, column2;
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
-if (ctx.partitionSpec != null) {
-  logWarning(s"Partition specification is ignored: 
${ctx.partitionSpec.getText}")
+if (ctx.identifier != null &&
+ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
+  throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
 }
-if (ctx.identifier != null) {
-  if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
-throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
+
+val partitionSpec =
+  if (ctx.partitionSpec != null) {
+val filteredSpec = 
visitPartitionSpec(ctx.partitionSpec).filter(_._2.isDefined)
+if (filteredSpec.isEmpty) {
+  None
+} else {
+  Some(filteredSpec.mapValues(_.get))
+}
+  } else {
+None
+  }
+
+val table = visitTableIdentifier(ctx.tableIdentifier)
+if (ctx.identifierSeq() == null) {
+  if (partitionSpec.isDefined) {
+AnalyzePartitionCommand(table, partitionSpec.get, noscan = 
ctx.identifier != null)
+  } else {
+AnalyzeTableCommand(table, noscan = ctx.identifier != null)
--- End diff --

@mbasmanova IIUC, the logic is wrong here. For example, when analyzing 
partition (ds, hr), we should not remove them in parser. Currently we parse it 
to AnalyzeTableCommand, which collects table-level stats. But what we need to 
do is to collect partition-level stats for all partitions.
Check hive's behavior here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-07 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126273773
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -201,6 +202,193 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS").collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS").collect()
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze single partition noscan") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+.collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+.collect()
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze a set of partitions") {
--- End diff --

@mbasmanova IIUC, the logic is wrong here. For example, when analyzing 
partition (ds, hr), we should not remove them in parser. Currently we parse it 
to `AnalyzeTableCommand`, which collects table-level stats. But what we need to 
do is to collect **partition-level stats for all partitions**.
Check hive's behavior 
[here](https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-Examples)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-07 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126273150
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 ---
@@ -17,14 +17,17 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, 
CatalogTableType}
-import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, 
CatalogTable, CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, 
Expression, Literal}
 
 
 /**
- * Analyzes the given table to generate statistics, which will be used in 
query optimizations.
+ * Analyzes the given table to generate statistics, which will be used in
+ * query optimizations.
--- End diff --

unnecessary change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-07 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126273394
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -181,6 +182,151 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  private val SELECT_FROM_SRC = "SELECT '1', 'A' from src"
+
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+def createPartition(ds: String, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  createPartition("2010-01-01", SELECT_FROM_SRC)
+  createPartition("2010-01-02", s"$SELECT_FROM_SRC UNION ALL 
$SELECT_FROM_SRC")
+  createPartition("2010-01-03", SELECT_FROM_SRC)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS")
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+  val tableId = TableIdentifier(tableName)
+  val partition =
+spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, 
"hr" -> hr))
+  partition.stats
+}
+
+def assertStats(ds: String, hr: String, rowCount: BigInt, sizeInBytes: 
BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === Some(rowCount))
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def assertSizeInBytesStats(ds: String, hr: String, sizeInBytes: 
BigInt): Unit = {
--- End diff --

We can merge this method with `assertStats` by adding `rowCount: 
Option[BigInt]` as a paramter.
Also please rename `assertStats` to `assertPartitionStats`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-07 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126273439
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -181,6 +182,151 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  private val SELECT_FROM_SRC = "SELECT '1', 'A' from src"
+
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+def createPartition(ds: String, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  createPartition("2010-01-01", SELECT_FROM_SRC)
+  createPartition("2010-01-02", s"$SELECT_FROM_SRC UNION ALL 
$SELECT_FROM_SRC")
+  createPartition("2010-01-03", SELECT_FROM_SRC)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS")
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+  val tableId = TableIdentifier(tableName)
+  val partition =
+spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, 
"hr" -> hr))
+  partition.stats
+}
+
+def assertStats(ds: String, hr: String, rowCount: BigInt, sizeInBytes: 
BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === Some(rowCount))
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def assertSizeInBytesStats(ds: String, hr: String, sizeInBytes: 
BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === None)
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def createPartition(ds: String, hr: Int, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) 
$query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING, hr INT)")
+
+  createPartition("2010-01-01", 10, SELECT_FROM_SRC)
+  createPartition("2010-01-01", 11, SELECT_FROM_SRC)
+  createPartition("2010-01-02", 10, SELECT_FROM_SRC)
+  createPartition("2010-01-02", 11, s"$SELECT_FROM_SRC UNION ALL 
$SELECT_FROM_SRC")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  assertSizeInBytesStats("2010-01-01", "10", 2000)
+  assertSizeInBytesStats("2010-01-01", "11", 2000)
+  assert(queryStats("2010-01-02", "10") === None)
+  assert(queryStats("2010-01-02", "11") === None)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assertSizeInBytesStats("2010-01-01", "10", 2000)
+  assertSizeInBytesStats("2010-01-01", "11", 2000)
+  assertSizeInBytesStats("2010-01-02", "10", 2000)
+  assertSizeInBytesStats("2010-01-02", "11", 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  assertStats("2010-01-01", "10", 500, 2000)
+  assertStats("2010-01-01", "11", 500, 2000)
+  assertSizeInBytesStats("2010-01-02", "10", 2000)
+  assertSizeInBytesStats("2010-01-02", "11", 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS")
+
+  assertStats("2010-01-01", "10", 500, 2000)
+  assertStats("2010-01-01", "11", 500, 2000)
+ 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-07 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126273560
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -181,6 +182,151 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  private val SELECT_FROM_SRC = "SELECT '1', 'A' from src"
--- End diff --

we can just use `SELECT '1', 'A'` and remove this val.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-07 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126273102
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -93,30 +93,50 @@ class SparkSqlAstBuilder extends AstBuilder {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command or an 
[[AnalyzeColumnCommand]] command.
-   * Example SQL for analyzing table :
+   * Create an [[AnalyzeTableCommand]] command, or an 
[[AnalyzePartitionCommand]]
+   * or an [[AnalyzeColumnCommand]] command.
+   * Example SQL for analyzing table or a set of partitions :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
+   *   ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], 
partcol2[=val2], ...)]
+   *   COMPUTE STATISTICS [NOSCAN];
* }}}
+   *
* Example SQL for analyzing columns :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
+   *   ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS 
column1, column2;
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
-if (ctx.partitionSpec != null) {
-  logWarning(s"Partition specification is ignored: 
${ctx.partitionSpec.getText}")
+if (ctx.identifier != null &&
+ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
+  throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
 }
-if (ctx.identifier != null) {
-  if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
-throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
+
+val partitionSpec =
--- End diff --

`partitionSpec` ->`partitionValueSpec`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-07 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126273584
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -181,6 +182,151 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  private val SELECT_FROM_SRC = "SELECT '1', 'A' from src"
+
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+def createPartition(ds: String, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  createPartition("2010-01-01", SELECT_FROM_SRC)
+  createPartition("2010-01-02", s"$SELECT_FROM_SRC UNION ALL 
$SELECT_FROM_SRC")
+  createPartition("2010-01-03", SELECT_FROM_SRC)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS")
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+  val tableId = TableIdentifier(tableName)
+  val partition =
+spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, 
"hr" -> hr))
+  partition.stats
+}
+
+def assertStats(ds: String, hr: String, rowCount: BigInt, sizeInBytes: 
BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === Some(rowCount))
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def assertSizeInBytesStats(ds: String, hr: String, sizeInBytes: 
BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === None)
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def createPartition(ds: String, hr: Int, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) 
$query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING, hr INT)")
+
+  createPartition("2010-01-01", 10, SELECT_FROM_SRC)
+  createPartition("2010-01-01", 11, SELECT_FROM_SRC)
+  createPartition("2010-01-02", 10, SELECT_FROM_SRC)
+  createPartition("2010-01-02", 11, s"$SELECT_FROM_SRC UNION ALL 
$SELECT_FROM_SRC")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  assertSizeInBytesStats("2010-01-01", "10", 2000)
+  assertSizeInBytesStats("2010-01-01", "11", 2000)
+  assert(queryStats("2010-01-02", "10") === None)
+  assert(queryStats("2010-01-02", "11") === None)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assertSizeInBytesStats("2010-01-01", "10", 2000)
+  assertSizeInBytesStats("2010-01-01", "11", 2000)
+  assertSizeInBytesStats("2010-01-02", "10", 2000)
+  assertSizeInBytesStats("2010-01-02", "11", 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  assertStats("2010-01-01", "10", 500, 2000)
+  assertStats("2010-01-01", "11", 500, 2000)
+  assertSizeInBytesStats("2010-01-02", "10", 2000)
+  assertSizeInBytesStats("2010-01-02", "11", 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS")
+
+  assertStats("2010-01-01", "10", 500, 2000)
+  assertStats("2010-01-01", "11", 500, 2000)
+ 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-07 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126273555
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -181,6 +182,151 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  private val SELECT_FROM_SRC = "SELECT '1', 'A' from src"
+
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+def createPartition(ds: String, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  createPartition("2010-01-01", SELECT_FROM_SRC)
+  createPartition("2010-01-02", s"$SELECT_FROM_SRC UNION ALL 
$SELECT_FROM_SRC")
+  createPartition("2010-01-03", SELECT_FROM_SRC)
--- End diff --

Since it's a test for `analyze single partition`, we only need one 
partition here, the others seems redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-07 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126273031
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -93,30 +93,50 @@ class SparkSqlAstBuilder extends AstBuilder {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command or an 
[[AnalyzeColumnCommand]] command.
-   * Example SQL for analyzing table :
+   * Create an [[AnalyzeTableCommand]] command, or an 
[[AnalyzePartitionCommand]]
+   * or an [[AnalyzeColumnCommand]] command.
+   * Example SQL for analyzing table or a set of partitions :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
+   *   ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], 
partcol2[=val2], ...)]
+   *   COMPUTE STATISTICS [NOSCAN];
* }}}
+   *
* Example SQL for analyzing columns :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
+   *   ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS 
column1, column2;
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
-if (ctx.partitionSpec != null) {
-  logWarning(s"Partition specification is ignored: 
${ctx.partitionSpec.getText}")
+if (ctx.identifier != null &&
+ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
+  throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
 }
-if (ctx.identifier != null) {
-  if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
-throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
+
+val partitionSpec =
+  if (ctx.partitionSpec != null) {
+val filteredSpec = 
visitPartitionSpec(ctx.partitionSpec).filter(_._2.isDefined)
+if (filteredSpec.isEmpty) {
+  None
+} else {
+  Some(filteredSpec.mapValues(_.get))
+}
+  } else {
+None
+  }
+
+val table = visitTableIdentifier(ctx.tableIdentifier)
+if (ctx.identifierSeq() == null) {
+  if (partitionSpec.isDefined) {
+AnalyzePartitionCommand(table, partitionSpec.get, noscan = 
ctx.identifier != null)
+  } else {
+AnalyzeTableCommand(table, noscan = ctx.identifier != null)
   }
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier))
-} else if (ctx.identifierSeq() == null) {
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), 
noscan = false)
 } else {
+  if (partitionSpec.isDefined) {
+logWarning(s"Partition specification is ignored: 
${ctx.partitionSpec.getText}")
--- End diff --

Partition specification is ignored when collecting column statistics


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-07 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126273588
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -181,6 +182,151 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  private val SELECT_FROM_SRC = "SELECT '1', 'A' from src"
+
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+def createPartition(ds: String, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  createPartition("2010-01-01", SELECT_FROM_SRC)
+  createPartition("2010-01-02", s"$SELECT_FROM_SRC UNION ALL 
$SELECT_FROM_SRC")
+  createPartition("2010-01-03", SELECT_FROM_SRC)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS")
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+  val tableId = TableIdentifier(tableName)
+  val partition =
+spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, 
"hr" -> hr))
+  partition.stats
+}
+
+def assertStats(ds: String, hr: String, rowCount: BigInt, sizeInBytes: 
BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === Some(rowCount))
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def assertSizeInBytesStats(ds: String, hr: String, sizeInBytes: 
BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === None)
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def createPartition(ds: String, hr: Int, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) 
$query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING, hr INT)")
+
+  createPartition("2010-01-01", 10, SELECT_FROM_SRC)
+  createPartition("2010-01-01", 11, SELECT_FROM_SRC)
+  createPartition("2010-01-02", 10, SELECT_FROM_SRC)
+  createPartition("2010-01-02", 11, s"$SELECT_FROM_SRC UNION ALL 
$SELECT_FROM_SRC")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  assertSizeInBytesStats("2010-01-01", "10", 2000)
+  assertSizeInBytesStats("2010-01-01", "11", 2000)
+  assert(queryStats("2010-01-02", "10") === None)
+  assert(queryStats("2010-01-02", "11") === None)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assertSizeInBytesStats("2010-01-01", "10", 2000)
+  assertSizeInBytesStats("2010-01-01", "11", 2000)
+  assertSizeInBytesStats("2010-01-02", "10", 2000)
+  assertSizeInBytesStats("2010-01-02", "11", 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  assertStats("2010-01-01", "10", 500, 2000)
--- End diff --

`assertStats("2010-01-01", "10", rowCount = 500, sizeInBytes = 2000)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-07 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126273094
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -93,30 +93,50 @@ class SparkSqlAstBuilder extends AstBuilder {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command or an 
[[AnalyzeColumnCommand]] command.
-   * Example SQL for analyzing table :
+   * Create an [[AnalyzeTableCommand]] command, or an 
[[AnalyzePartitionCommand]]
+   * or an [[AnalyzeColumnCommand]] command.
+   * Example SQL for analyzing table or a set of partitions :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
+   *   ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], 
partcol2[=val2], ...)]
+   *   COMPUTE STATISTICS [NOSCAN];
* }}}
+   *
* Example SQL for analyzing columns :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
+   *   ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS 
column1, column2;
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
-if (ctx.partitionSpec != null) {
-  logWarning(s"Partition specification is ignored: 
${ctx.partitionSpec.getText}")
+if (ctx.identifier != null &&
+ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
+  throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
 }
-if (ctx.identifier != null) {
-  if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
-throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
+
+val partitionSpec =
+  if (ctx.partitionSpec != null) {
+val filteredSpec = 
visitPartitionSpec(ctx.partitionSpec).filter(_._2.isDefined)
+if (filteredSpec.isEmpty) {
+  None
+} else {
+  Some(filteredSpec.mapValues(_.get))
+}
+  } else {
+None
+  }
+
+val table = visitTableIdentifier(ctx.tableIdentifier)
+if (ctx.identifierSeq() == null) {
+  if (partitionSpec.isDefined) {
+AnalyzePartitionCommand(table, partitionSpec.get, noscan = 
ctx.identifier != null)
+  } else {
+AnalyzeTableCommand(table, noscan = ctx.identifier != null)
   }
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier))
-} else if (ctx.identifierSeq() == null) {
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), 
noscan = false)
 } else {
+  if (partitionSpec.isDefined) {
--- End diff --

should use `if (ctx.partitionSpec != null)`, currently we don't support 
partition-level column stats.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-07 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126273582
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -181,6 +182,151 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  private val SELECT_FROM_SRC = "SELECT '1', 'A' from src"
+
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+def createPartition(ds: String, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  createPartition("2010-01-01", SELECT_FROM_SRC)
+  createPartition("2010-01-02", s"$SELECT_FROM_SRC UNION ALL 
$SELECT_FROM_SRC")
+  createPartition("2010-01-03", SELECT_FROM_SRC)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS")
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+  val tableId = TableIdentifier(tableName)
+  val partition =
+spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, 
"hr" -> hr))
+  partition.stats
+}
+
+def assertStats(ds: String, hr: String, rowCount: BigInt, sizeInBytes: 
BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === Some(rowCount))
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def assertSizeInBytesStats(ds: String, hr: String, sizeInBytes: 
BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === None)
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def createPartition(ds: String, hr: Int, query: String): Unit = {
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) 
$query")
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING, hr INT)")
+
+  createPartition("2010-01-01", 10, SELECT_FROM_SRC)
+  createPartition("2010-01-01", 11, SELECT_FROM_SRC)
+  createPartition("2010-01-02", 10, SELECT_FROM_SRC)
+  createPartition("2010-01-02", 11, s"$SELECT_FROM_SRC UNION ALL 
$SELECT_FROM_SRC")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+
+  assertSizeInBytesStats("2010-01-01", "10", 2000)
+  assertSizeInBytesStats("2010-01-01", "11", 2000)
+  assert(queryStats("2010-01-02", "10") === None)
+  assert(queryStats("2010-01-02", "11") === None)
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+
+  assertSizeInBytesStats("2010-01-01", "10", 2000)
+  assertSizeInBytesStats("2010-01-01", "11", 2000)
+  assertSizeInBytesStats("2010-01-02", "10", 2000)
+  assertSizeInBytesStats("2010-01-02", "11", 2*2000)
--- End diff --

Checks for `ds='2010-01-02'` has no difference with `ds='2010-01-01'`, we 
can remove this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-07 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r126273260
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 ---
@@ -72,3 +61,81 @@ case class AnalyzeTableCommand(
 Seq.empty[Row]
   }
 }
+
+/**
+ * Analyzes a given set of partitions to generate per-partition 
statistics, which will be used in
+ * query optimizations.
+ */
+case class AnalyzePartitionCommand(
--- End diff --

shall we create a new file for this class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-05 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125772533
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -201,6 +202,193 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS").collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS").collect()
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze single partition noscan") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+.collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+.collect()
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze a set of partitions") {
--- End diff --

@wzhfy, given that these cases are covered in SparkSqlParserSuite, is it 
still necessary to cover them again in StatisticsSuite? Partition columns 
without values are removed at parsing stage so that AnalyzePartitionCommand 
always receives partition column along with a value.

re: (ds, hr='10') - 
https://cwiki.apache.org/confluence/display/Hive/StatsDev suggests that it is 
allowed; this PR supports this syntax.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-05 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125763666
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -201,6 +202,193 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS").collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS").collect()
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze single partition noscan") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+.collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+.collect()
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+  val tableId = TableIdentifier(tableName)
+  val partition =
+spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, 
"hr" -> hr))
+  partition.stats
+}
+
+def assertStats(ds: String, hr: String, rowCount: BigInt, sizeInBytes: 
BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === Some(rowCount))
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def assertNoStats(ds: String, hr: String): Unit = {
+  assert(queryStats(ds, hr) === None)
+}
+
+def createPartition(ds: String, hr: Int, query: String): Unit = {
--- End diff --

I created this method to make the test code more concise. For example, this 
method helps avoid multi-line methods calls like this:

sql(
s"""
   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02', hr=11)
   |SELECT * FROM src
   |UNION ALL
   |SELECT * FROM src
 """.stripMargin)

becomes 

createPartition("2010-01-02", 11, "SELECT * FROM SRC UNION ALL SELECT * 
FROM SRC")


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, 

[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-05 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125688658
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
---
@@ -986,6 +987,7 @@ private[hive] object HiveClientImpl {
 tpart.setTableName(ht.getTableName)
 tpart.setValues(partValues.asJava)
 tpart.setSd(storageDesc)
+tpart.setParameters(mutable.Map(p.parameters.toSeq: _*).asJava)
--- End diff --

p.parameters.asJava produces as immutable map and causes 
java.lang.UnsupportedOperationException: remove;


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125152195
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 ---
@@ -17,18 +17,24 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, 
CatalogTableType}
-import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, 
UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, 
CatalogTable, CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, 
Expression, Literal}
 
 
 /**
- * Analyzes the given table to generate statistics, which will be used in 
query optimizations.
+ * Analyzes the given table or partition to generate statistics, which 
will be used in
+ * query optimizations.
+ *
+ * If certain partition specs are specified, then statistics are gathered 
for only those partitions.
  */
 case class AnalyzeTableCommand(
 tableIdent: TableIdentifier,
-noscan: Boolean = true) extends RunnableCommand {
+noscan: Boolean = true,
+partitionSpec: Option[TablePartitionSpec] = None) extends 
RunnableCommand {
--- End diff --

How about adding a new `AnalyzePartitionCommand`? We can put all 
partition-level logic there (including partition-level column stats in the 
future). I think that would make the logic clearer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125156014
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -201,6 +202,193 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS").collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS").collect()
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze single partition noscan") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+.collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+.collect()
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze a set of partitions") {
--- End diff --

Please also check the following three cases:
```
ANALYZE TABLE tab PARTITION(ds='2010-01-01', hr) COMPUTE STATISTICS; -- 
analyze two partitions
ANALYZE TABLE tab PARTITION(ds, hr) COMPUTE STATISTICS; -- analyze four 
partitions
ANALYZE TABLE tab PARTITION(ds, hr='10') COMPUTE STATISTICS; -- Is this 
allowed in hive?
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125152166
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -91,29 +91,44 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
 
   /**
* Create an [[AnalyzeTableCommand]] command or an 
[[AnalyzeColumnCommand]] command.
-   * Example SQL for analyzing table :
+   * Example SQL for analyzing table or a set of partitions :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
+   *   ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], 
partcol2[=val2], ...)]
+   *   COMPUTE STATISTICS [NOSCAN];
* }}}
+   *
* Example SQL for analyzing columns :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
+   *   ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS 
column1, column2;
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
-if (ctx.partitionSpec != null) {
-  logWarning(s"Partition specification is ignored: 
${ctx.partitionSpec.getText}")
+if (ctx.identifier != null &&
+ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
+  throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
 }
-if (ctx.identifier != null) {
-  if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
-throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
+
+val partitionSpec =
+  if (ctx.partitionSpec != null) {
+val filteredSpec = visitPartitionSpec(ctx.partitionSpec).filter(x 
=> x._2.isDefined)
--- End diff --

`_._2.isDefined`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125152350
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 ---
@@ -38,10 +44,92 @@ case class AnalyzeTableCommand(
 if (tableMeta.tableType == CatalogTableType.VIEW) {
   throw new AnalysisException("ANALYZE TABLE is not supported on 
views.")
 }
-val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta)
 
-val oldTotalSize = 
tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
-val oldRowCount = 
tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
+if (!partitionSpec.isDefined) {
+  // Compute stats for the whole table
+  val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta)
+  val newRowCount =
+if (noscan) {
+  None
+} else {
+  Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
+}
+
+  def updateStats(newStats: CatalogStatistics): Unit = {
+sessionState.catalog.alterTableStats(tableIdentWithDB, 
Some(newStats))
+// Refresh the cached data source table in the catalog.
+sessionState.catalog.refreshTable(tableIdentWithDB)
+  }
+
+  calculateAndUpdateStats(tableMeta.stats, newTotalSize, newRowCount, 
updateStats)
+} else {
+  val partitions = 
sessionState.catalog.listPartitions(tableMeta.identifier, partitionSpec)
+
+  if (partitionSpec.isDefined && partitions.isEmpty) {
+throw new NoSuchPartitionException(db, tableIdent.table, 
partitionSpec.get)
+  }
+
+  // Compute stats for individual partitions
+  val rowCounts: Map[TablePartitionSpec, BigInt] =
+if (noscan) {
+  Map.empty
+} else {
+  calculateRowCountsPerPartition(sparkSession, tableMeta)
+}
+
+  partitions.foreach(p => {
+val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta, p)
+val newRowCount = rowCounts.get(p.spec)
+
+def updateStats(newStats: CatalogStatistics): Unit = {
+  sessionState.catalog.alterPartitions(tableMeta.identifier,
+List(p.copy(stats = Some(newStats
+}
+
+calculateAndUpdateStats(p.stats, newTotalSize, newRowCount, 
updateStats)
+  })
+}
+
+Seq.empty[Row]
+  }
+
+  private def calculateRowCountsPerPartition(
+  sparkSession: SparkSession,
+  tableMeta: CatalogTable): Map[TablePartitionSpec, BigInt] = {
+val filters = partitionSpec.get.map {
+  case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), 
Literal(value))
+}
+val filter = filters match {
+  case head :: tail =>
+if (tail.isEmpty) head
+else tail.foldLeft(head: Expression)((a, b) => And(a, b))
+}
+
+val partitionColumns = tableMeta.partitionColumnNames.map(c => 
Column(c))
+
+val df = 
sparkSession.table(tableMeta.identifier).filter(Column(filter))
+  .groupBy(partitionColumns: _*).count()
+
+val numPartitionColumns = partitionColumns.size
+val partitionColumnIndexes = 0 to (numPartitionColumns - 1)
+
+df.collect().map(r => {
--- End diff --

`.map { row =>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125152274
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 ---
@@ -38,10 +44,92 @@ case class AnalyzeTableCommand(
 if (tableMeta.tableType == CatalogTableType.VIEW) {
   throw new AnalysisException("ANALYZE TABLE is not supported on 
views.")
 }
-val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta)
 
-val oldTotalSize = 
tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
-val oldRowCount = 
tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
+if (!partitionSpec.isDefined) {
+  // Compute stats for the whole table
+  val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta)
+  val newRowCount =
+if (noscan) {
+  None
+} else {
+  Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
+}
+
+  def updateStats(newStats: CatalogStatistics): Unit = {
+sessionState.catalog.alterTableStats(tableIdentWithDB, 
Some(newStats))
+// Refresh the cached data source table in the catalog.
+sessionState.catalog.refreshTable(tableIdentWithDB)
+  }
+
+  calculateAndUpdateStats(tableMeta.stats, newTotalSize, newRowCount, 
updateStats)
+} else {
+  val partitions = 
sessionState.catalog.listPartitions(tableMeta.identifier, partitionSpec)
+
+  if (partitionSpec.isDefined && partitions.isEmpty) {
+throw new NoSuchPartitionException(db, tableIdent.table, 
partitionSpec.get)
+  }
+
+  // Compute stats for individual partitions
+  val rowCounts: Map[TablePartitionSpec, BigInt] =
+if (noscan) {
+  Map.empty
+} else {
+  calculateRowCountsPerPartition(sparkSession, tableMeta)
+}
+
+  partitions.foreach(p => {
+val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta, p)
+val newRowCount = rowCounts.get(p.spec)
+
+def updateStats(newStats: CatalogStatistics): Unit = {
+  sessionState.catalog.alterPartitions(tableMeta.identifier,
--- End diff --

Can we collect all partitions with new stats and alter them together? Now 
`alterPartitions` is called per partition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125153463
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -1028,25 +994,115 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 currentFullPath
   }
 
+  private def statsToHiveProperties(
--- End diff --

How about `statsToProperties` and `statsFromProperties`? the current names 
seem related to hive stats, it's a little ambiguous.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125153290
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -201,6 +202,193 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS").collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS").collect()
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze single partition noscan") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+.collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+.collect()
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+  val tableId = TableIdentifier(tableName)
+  val partition =
+spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, 
"hr" -> hr))
+  partition.stats
+}
+
+def assertStats(ds: String, hr: String, rowCount: BigInt, sizeInBytes: 
BigInt): Unit = {
--- End diff --

can we make this a common method `checkPartitionStats`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125152564
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 ---
@@ -38,10 +44,92 @@ case class AnalyzeTableCommand(
 if (tableMeta.tableType == CatalogTableType.VIEW) {
   throw new AnalysisException("ANALYZE TABLE is not supported on 
views.")
 }
-val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta)
 
-val oldTotalSize = 
tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
-val oldRowCount = 
tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
+if (!partitionSpec.isDefined) {
+  // Compute stats for the whole table
+  val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta)
+  val newRowCount =
+if (noscan) {
+  None
+} else {
+  Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
+}
+
+  def updateStats(newStats: CatalogStatistics): Unit = {
+sessionState.catalog.alterTableStats(tableIdentWithDB, 
Some(newStats))
+// Refresh the cached data source table in the catalog.
+sessionState.catalog.refreshTable(tableIdentWithDB)
+  }
+
+  calculateAndUpdateStats(tableMeta.stats, newTotalSize, newRowCount, 
updateStats)
+} else {
+  val partitions = 
sessionState.catalog.listPartitions(tableMeta.identifier, partitionSpec)
+
+  if (partitionSpec.isDefined && partitions.isEmpty) {
+throw new NoSuchPartitionException(db, tableIdent.table, 
partitionSpec.get)
+  }
+
+  // Compute stats for individual partitions
+  val rowCounts: Map[TablePartitionSpec, BigInt] =
+if (noscan) {
+  Map.empty
+} else {
+  calculateRowCountsPerPartition(sparkSession, tableMeta)
+}
+
+  partitions.foreach(p => {
+val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta, p)
+val newRowCount = rowCounts.get(p.spec)
+
+def updateStats(newStats: CatalogStatistics): Unit = {
+  sessionState.catalog.alterPartitions(tableMeta.identifier,
+List(p.copy(stats = Some(newStats
+}
+
+calculateAndUpdateStats(p.stats, newTotalSize, newRowCount, 
updateStats)
+  })
+}
+
+Seq.empty[Row]
+  }
+
+  private def calculateRowCountsPerPartition(
+  sparkSession: SparkSession,
+  tableMeta: CatalogTable): Map[TablePartitionSpec, BigInt] = {
+val filters = partitionSpec.get.map {
+  case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), 
Literal(value))
+}
+val filter = filters match {
+  case head :: tail =>
+if (tail.isEmpty) head
+else tail.foldLeft(head: Expression)((a, b) => And(a, b))
+}
--- End diff --

`val filter = filters.reduce(And)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125153400
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
---
@@ -986,6 +987,7 @@ private[hive] object HiveClientImpl {
 tpart.setTableName(ht.getTableName)
 tpart.setValues(partValues.asJava)
 tpart.setSd(storageDesc)
+tpart.setParameters(mutable.Map(p.parameters.toSeq: _*).asJava)
--- End diff --

`tpart.setParameters(p.parameters.asJava)` and remove unnecessary import.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125153275
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -201,6 +202,193 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS").collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS").collect()
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze single partition noscan") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+.collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+.collect()
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+  val tableId = TableIdentifier(tableName)
+  val partition =
+spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, 
"hr" -> hr))
+  partition.stats
+}
+
+def assertStats(ds: String, hr: String, rowCount: BigInt, sizeInBytes: 
BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === Some(rowCount))
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def assertNoStats(ds: String, hr: String): Unit = {
--- End diff --

shall we remove this one line method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125152161
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -91,29 +91,44 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
 
   /**
* Create an [[AnalyzeTableCommand]] command or an 
[[AnalyzeColumnCommand]] command.
-   * Example SQL for analyzing table :
+   * Example SQL for analyzing table or a set of partitions :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
+   *   ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], 
partcol2[=val2], ...)]
+   *   COMPUTE STATISTICS [NOSCAN];
* }}}
+   *
* Example SQL for analyzing columns :
* {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
+   *   ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS 
column1, column2;
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
-if (ctx.partitionSpec != null) {
-  logWarning(s"Partition specification is ignored: 
${ctx.partitionSpec.getText}")
+if (ctx.identifier != null &&
+ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
+  throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
 }
-if (ctx.identifier != null) {
-  if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
-throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
+
+val partitionSpec =
+  if (ctx.partitionSpec != null) {
+val filteredSpec = visitPartitionSpec(ctx.partitionSpec).filter(x 
=> x._2.isDefined)
+if (filteredSpec.isEmpty) {
+  None
+} else {
+  Some(filteredSpec.mapValues(v => v.get))
--- End diff --

`mapValues(_.get)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125156045
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -201,6 +202,193 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
--- End diff --

How about using SELECT '1', 'A' in these tests? We don't need to make the 
table this big.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125153486
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 ---
@@ -52,6 +52,13 @@ object CommandUtils extends Logging {
 }
   }
 
+  def calculateTotalSize(
+  sessionState: SessionState,
+  catalogTable: CatalogTable,
+  partition: CatalogTablePartition): Long = {
+calculateLocationSize(sessionState, catalogTable.identifier, 
partition.storage.locationUri)
--- End diff --

Shall we remove this method? It's only one line and can be called directly 
using `calculateLocationSize`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125153235
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -201,6 +202,193 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS").collect()
--- End diff --

no need to call `collect()` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125153266
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -201,6 +202,193 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS").collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS").collect()
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze single partition noscan") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+.collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+.collect()
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
--- End diff --

make this a common method `getPartitionStats`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125152313
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 ---
@@ -38,10 +44,92 @@ case class AnalyzeTableCommand(
 if (tableMeta.tableType == CatalogTableType.VIEW) {
   throw new AnalysisException("ANALYZE TABLE is not supported on 
views.")
 }
-val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta)
 
-val oldTotalSize = 
tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
-val oldRowCount = 
tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
+if (!partitionSpec.isDefined) {
+  // Compute stats for the whole table
+  val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta)
+  val newRowCount =
+if (noscan) {
+  None
+} else {
+  Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
+}
+
+  def updateStats(newStats: CatalogStatistics): Unit = {
+sessionState.catalog.alterTableStats(tableIdentWithDB, 
Some(newStats))
+// Refresh the cached data source table in the catalog.
+sessionState.catalog.refreshTable(tableIdentWithDB)
+  }
+
+  calculateAndUpdateStats(tableMeta.stats, newTotalSize, newRowCount, 
updateStats)
+} else {
+  val partitions = 
sessionState.catalog.listPartitions(tableMeta.identifier, partitionSpec)
+
+  if (partitionSpec.isDefined && partitions.isEmpty) {
+throw new NoSuchPartitionException(db, tableIdent.table, 
partitionSpec.get)
+  }
+
+  // Compute stats for individual partitions
+  val rowCounts: Map[TablePartitionSpec, BigInt] =
+if (noscan) {
+  Map.empty
+} else {
+  calculateRowCountsPerPartition(sparkSession, tableMeta)
+}
+
+  partitions.foreach(p => {
+val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta, p)
+val newRowCount = rowCounts.get(p.spec)
+
+def updateStats(newStats: CatalogStatistics): Unit = {
+  sessionState.catalog.alterPartitions(tableMeta.identifier,
+List(p.copy(stats = Some(newStats
+}
+
+calculateAndUpdateStats(p.stats, newTotalSize, newRowCount, 
updateStats)
+  })
+}
+
+Seq.empty[Row]
+  }
+
+  private def calculateRowCountsPerPartition(
+  sparkSession: SparkSession,
+  tableMeta: CatalogTable): Map[TablePartitionSpec, BigInt] = {
+val filters = partitionSpec.get.map {
+  case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), 
Literal(value))
+}
+val filter = filters match {
+  case head :: tail =>
+if (tail.isEmpty) head
+else tail.foldLeft(head: Expression)((a, b) => And(a, b))
+}
+
+val partitionColumns = tableMeta.partitionColumnNames.map(c => 
Column(c))
--- End diff --

`.map(Column(_))`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125153437
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -201,6 +202,193 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS").collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS").collect()
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze single partition noscan") {
--- End diff --

We can combine test cases for noscan and non-noscan by first analyze with 
noscan and check, then analyze without noscan and check. Currently there are 
many redundant codes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125153279
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -201,6 +202,193 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 }
   }
 
+  test("analyze single partition") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS").collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS").collect()
+
+  assert(queryStats("2010-01-01").rowCount.get === 500)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount.get === 2*500)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze single partition noscan") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String): CatalogStatistics = {
+  val partition =
+
spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> 
ds))
+  partition.stats.get
+}
+
+withTable(tableName) {
+  sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') 
SELECT * FROM src")
+  sql(
+s"""
+   |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02')
+   |SELECT * FROM src
+   |UNION ALL
+   |SELECT * FROM src
+ """.stripMargin)
+  sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') 
SELECT * FROM src")
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
+.collect()
+
+  sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
+.collect()
+
+  assert(queryStats("2010-01-01").rowCount === None)
+  assert(queryStats("2010-01-01").sizeInBytes === 5812)
+
+  assert(queryStats("2010-01-02").rowCount === None)
+  assert(queryStats("2010-01-02").sizeInBytes === 2*5812)
+}
+  }
+
+  test("analyze a set of partitions") {
+val tableName = "analyzeTable_part"
+
+def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+  val tableId = TableIdentifier(tableName)
+  val partition =
+spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, 
"hr" -> hr))
+  partition.stats
+}
+
+def assertStats(ds: String, hr: String, rowCount: BigInt, sizeInBytes: 
BigInt): Unit = {
+  val stats = queryStats(ds, hr).get
+  assert(stats.rowCount === Some(rowCount))
+  assert(stats.sizeInBytes === sizeInBytes)
+}
+
+def assertNoStats(ds: String, hr: String): Unit = {
+  assert(queryStats(ds, hr) === None)
+}
+
+def createPartition(ds: String, hr: Int, query: String): Unit = {
--- End diff --

same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125152278
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 ---
@@ -38,10 +44,92 @@ case class AnalyzeTableCommand(
 if (tableMeta.tableType == CatalogTableType.VIEW) {
   throw new AnalysisException("ANALYZE TABLE is not supported on 
views.")
 }
-val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta)
 
-val oldTotalSize = 
tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
-val oldRowCount = 
tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
+if (!partitionSpec.isDefined) {
+  // Compute stats for the whole table
+  val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta)
+  val newRowCount =
+if (noscan) {
+  None
+} else {
+  Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
+}
+
+  def updateStats(newStats: CatalogStatistics): Unit = {
+sessionState.catalog.alterTableStats(tableIdentWithDB, 
Some(newStats))
+// Refresh the cached data source table in the catalog.
+sessionState.catalog.refreshTable(tableIdentWithDB)
+  }
+
+  calculateAndUpdateStats(tableMeta.stats, newTotalSize, newRowCount, 
updateStats)
+} else {
+  val partitions = 
sessionState.catalog.listPartitions(tableMeta.identifier, partitionSpec)
+
+  if (partitionSpec.isDefined && partitions.isEmpty) {
+throw new NoSuchPartitionException(db, tableIdent.table, 
partitionSpec.get)
+  }
+
+  // Compute stats for individual partitions
+  val rowCounts: Map[TablePartitionSpec, BigInt] =
+if (noscan) {
+  Map.empty
+} else {
+  calculateRowCountsPerPartition(sparkSession, tableMeta)
+}
+
+  partitions.foreach(p => {
--- End diff --

`partitions.foreach { p =>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-07-01 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r125152408
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 ---
@@ -38,10 +44,92 @@ case class AnalyzeTableCommand(
 if (tableMeta.tableType == CatalogTableType.VIEW) {
   throw new AnalysisException("ANALYZE TABLE is not supported on 
views.")
 }
-val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta)
 
-val oldTotalSize = 
tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
-val oldRowCount = 
tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
+if (!partitionSpec.isDefined) {
+  // Compute stats for the whole table
+  val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta)
+  val newRowCount =
+if (noscan) {
+  None
+} else {
+  Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
+}
+
+  def updateStats(newStats: CatalogStatistics): Unit = {
+sessionState.catalog.alterTableStats(tableIdentWithDB, 
Some(newStats))
+// Refresh the cached data source table in the catalog.
+sessionState.catalog.refreshTable(tableIdentWithDB)
+  }
+
+  calculateAndUpdateStats(tableMeta.stats, newTotalSize, newRowCount, 
updateStats)
+} else {
+  val partitions = 
sessionState.catalog.listPartitions(tableMeta.identifier, partitionSpec)
+
+  if (partitionSpec.isDefined && partitions.isEmpty) {
+throw new NoSuchPartitionException(db, tableIdent.table, 
partitionSpec.get)
+  }
+
+  // Compute stats for individual partitions
+  val rowCounts: Map[TablePartitionSpec, BigInt] =
+if (noscan) {
+  Map.empty
+} else {
+  calculateRowCountsPerPartition(sparkSession, tableMeta)
+}
+
+  partitions.foreach(p => {
+val newTotalSize = CommandUtils.calculateTotalSize(sessionState, 
tableMeta, p)
+val newRowCount = rowCounts.get(p.spec)
+
+def updateStats(newStats: CatalogStatistics): Unit = {
+  sessionState.catalog.alterPartitions(tableMeta.identifier,
+List(p.copy(stats = Some(newStats
+}
+
+calculateAndUpdateStats(p.stats, newTotalSize, newRowCount, 
updateStats)
+  })
+}
+
+Seq.empty[Row]
+  }
+
+  private def calculateRowCountsPerPartition(
+  sparkSession: SparkSession,
+  tableMeta: CatalogTable): Map[TablePartitionSpec, BigInt] = {
+val filters = partitionSpec.get.map {
+  case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), 
Literal(value))
+}
+val filter = filters match {
+  case head :: tail =>
+if (tail.isEmpty) head
+else tail.foldLeft(head: Expression)((a, b) => And(a, b))
+}
+
+val partitionColumns = tableMeta.partitionColumnNames.map(c => 
Column(c))
+
+val df = 
sparkSession.table(tableMeta.identifier).filter(Column(filter))
+  .groupBy(partitionColumns: _*).count()
+
+val numPartitionColumns = partitionColumns.size
+val partitionColumnIndexes = 0 to (numPartitionColumns - 1)
+
+df.collect().map(r => {
+  val partitionColumnValues = partitionColumnIndexes.map(i => 
r.get(i).toString)
--- End diff --

`val partitionColumnValues = partitionColumns.indices.map(row.getString)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-06-29 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r124871491
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 ---
@@ -239,18 +239,20 @@ class SparkSqlParserSuite extends AnalysisTest {
   AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
 assertEqual("analyze table t compute statistics noscan",
   AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
-assertEqual("analyze table t partition (a) compute statistics nOscAn",
+assertEqual("analyze table t compute statistics nOscAn",
   AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
 
-// Partitions specified - we currently parse them but don't do 
anything with it
+// Partitions specified
 assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE 
STATISTICS",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
+  AnalyzeTableCommand(TableIdentifier("t"), noscan = false,
+partitionSpec = Some(Map("ds" -> "2008-04-09", "hr" -> "11"
 assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE 
STATISTICS noscan",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
-assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
-assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS 
noscan",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
+  AnalyzeTableCommand(TableIdentifier("t"), noscan = true,
+partitionSpec = Some(Map("ds" -> "2008-04-09", "hr" -> "11"
+intercept("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS",
--- End diff --

@wzhfy, I expect syntax change to be quite small and incremental. 
Currently, it is necessary to specify all partition columns along with values. 
The change will be to allow only a subset of partition columns and allow 
partition columns without values.

`PARTITION (partcol1=val1,...) -> PARTITION (partcol1[=val1],...)`

That said, I want to try to allow partial partition specs in this PR. Let 
me spend some time on it and report back my findings.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-06-28 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r124703434
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 ---
@@ -239,18 +239,20 @@ class SparkSqlParserSuite extends AnalysisTest {
   AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
 assertEqual("analyze table t compute statistics noscan",
   AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
-assertEqual("analyze table t partition (a) compute statistics nOscAn",
+assertEqual("analyze table t compute statistics nOscAn",
   AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
 
-// Partitions specified - we currently parse them but don't do 
anything with it
+// Partitions specified
 assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE 
STATISTICS",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
+  AnalyzeTableCommand(TableIdentifier("t"), noscan = false,
+partitionSpec = Some(Map("ds" -> "2008-04-09", "hr" -> "11"
 assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE 
STATISTICS noscan",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
-assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
-assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS 
noscan",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
+  AnalyzeTableCommand(TableIdentifier("t"), noscan = true,
+partitionSpec = Some(Map("ds" -> "2008-04-09", "hr" -> "11"
+intercept("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS",
--- End diff --

If we first supports only exact partition spec, in the near future we will 
change (replace) the syntax and also the related implementation. I mean we are 
not doing it incrementally, so I prefer support the right syntax in this single 
pr. Would support that syntax require lots of work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-06-28 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r124546295
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -95,25 +95,32 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
* {{{
*   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
* }}}
+   * Example SQL for analyzing a single partition :
+   * {{{
+   *   ANALYZE TABLE table PARTITION (key=value,..) COMPUTE STATISTICS 
[NOSCAN];
--- End diff --

I'm seeing that visitNonOptionalPartitionSpec detects unset partition 
columns and throws an exception: Found an empty partition key '$key'. Is this 
sufficient or do you have something else in mind?

/**
   * Create a partition specification map without optional values.
   */
  protected def visitNonOptionalPartitionSpec(
  ctx: PartitionSpecContext): Map[String, String] = withOrigin(ctx) {
visitPartitionSpec(ctx).map {
  case (key, None) => throw new ParseException(s"Found an empty 
partition key '$key'.", ctx)
  case (key, Some(value)) => key -> value
}
  }



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-06-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r124457964
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -95,25 +95,32 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
* {{{
*   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
--- End diff --

Also here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-06-28 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r124457890
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -95,25 +95,32 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
* {{{
*   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
* }}}
+   * Example SQL for analyzing a single partition :
+   * {{{
+   *   ANALYZE TABLE table PARTITION (key=value,..) COMPUTE STATISTICS 
[NOSCAN];
--- End diff --

The existing syntax is not good. Could you improve them?
```SQL
ANALYZE TABLE [db_name.]tablename [PARTITION(partcol1[=val1], 
partcol2[=val2], ...)] 
COMPUTE STATISTICS [NOSCAN]
```

In addition, since we have a restriction, please do not call 
`visitNonOptionalPartitionSpec`. Instead, we can capture the non-set partition 
column and issue a more user-friendly exception message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-06-27 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r124457412
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 ---
@@ -239,18 +239,20 @@ class SparkSqlParserSuite extends AnalysisTest {
   AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
 assertEqual("analyze table t compute statistics noscan",
   AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
-assertEqual("analyze table t partition (a) compute statistics nOscAn",
+assertEqual("analyze table t compute statistics nOscAn",
   AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
 
-// Partitions specified - we currently parse them but don't do 
anything with it
+// Partitions specified
 assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE 
STATISTICS",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
+  AnalyzeTableCommand(TableIdentifier("t"), noscan = false,
+partitionSpec = Some(Map("ds" -> "2008-04-09", "hr" -> "11"
 assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE 
STATISTICS noscan",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
-assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
-assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS 
noscan",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
+  AnalyzeTableCommand(TableIdentifier("t"), noscan = true,
+partitionSpec = Some(Map("ds" -> "2008-04-09", "hr" -> "11"
+intercept("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS",
--- End diff --

I am fine about this. Please write a comment in your PR description, the 
class description of `visitAnalyze` and `AnalyzeTableCommand `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-06-27 Thread mbasmanova
Github user mbasmanova commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r124448971
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 ---
@@ -239,18 +239,20 @@ class SparkSqlParserSuite extends AnalysisTest {
   AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
 assertEqual("analyze table t compute statistics noscan",
   AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
-assertEqual("analyze table t partition (a) compute statistics nOscAn",
+assertEqual("analyze table t compute statistics nOscAn",
   AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
 
-// Partitions specified - we currently parse them but don't do 
anything with it
+// Partitions specified
 assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE 
STATISTICS",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
+  AnalyzeTableCommand(TableIdentifier("t"), noscan = false,
+partitionSpec = Some(Map("ds" -> "2008-04-09", "hr" -> "11"
 assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE 
STATISTICS noscan",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
-assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
-assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS 
noscan",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
+  AnalyzeTableCommand(TableIdentifier("t"), noscan = true,
+partitionSpec = Some(Map("ds" -> "2008-04-09", "hr" -> "11"
+intercept("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS",
--- End diff --

I see. Thanks for pointing that out. Currently, this PR supports only exact 
partition spec. It doesn't support partial partition specs describing the 
document above. My preference would be to keep it simple for this PR and 
support only exact spec and add support for partial specs in a follow up PR. 
What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-06-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r124178671
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 ---
@@ -239,18 +239,20 @@ class SparkSqlParserSuite extends AnalysisTest {
   AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
 assertEqual("analyze table t compute statistics noscan",
   AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
-assertEqual("analyze table t partition (a) compute statistics nOscAn",
+assertEqual("analyze table t compute statistics nOscAn",
   AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
 
-// Partitions specified - we currently parse them but don't do 
anything with it
+// Partitions specified
 assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE 
STATISTICS",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
+  AnalyzeTableCommand(TableIdentifier("t"), noscan = false,
+partitionSpec = Some(Map("ds" -> "2008-04-09", "hr" -> "11"
 assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE 
STATISTICS noscan",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
-assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
-assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS 
noscan",
-  AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
+  AnalyzeTableCommand(TableIdentifier("t"), noscan = true,
+partitionSpec = Some(Map("ds" -> "2008-04-09", "hr" -> "11"
+intercept("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS",
--- End diff --

This should be legal based on the description of Hive? 

https://cwiki.apache.org/confluence/display/Hive/StatsDev


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-06-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r124178194
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 ---
@@ -24,18 +24,23 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, 
CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, 
CatalogTable, CatalogTablePartition, CatalogTableType}
+import 
org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, 
Expression, Literal}
 import org.apache.spark.sql.internal.SessionState
 
 
 /**
- * Analyzes the given table to generate statistics, which will be used in 
query optimizations.
+ * Analyzes the given table or partition to generate statistics, which 
will be used in
+ * query optimizations.
--- End diff --

Could you please add the description about `partitionSpec`? 
>  If certain partition specs are specified, then statistics are gathered 
for only those partitions. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-06-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r124177691
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -101,19 +101,30 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
* }}}
--- End diff --

Please also update the above description with newly supported the partition 
spec. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-06-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r124177638
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -101,19 +101,30 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
-if (ctx.partitionSpec != null) {
-  logWarning(s"Partition specification is ignored: 
${ctx.partitionSpec.getText}")
-}
-if (ctx.identifier != null) {
+val noscan = if (ctx.identifier != null) {
   if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
 throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
   }
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier))
-} else if (ctx.identifierSeq() == null) {
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), 
noscan = false)
+  true
+} else {
+  false
+}
+
+val partitionSpec = if (ctx.partitionSpec != null) {
+  Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)
 } else {
+  None
+}
--- End diff --

```val partitionSpec = 
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-06-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r124177416
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -101,19 +101,30 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
-if (ctx.partitionSpec != null) {
-  logWarning(s"Partition specification is ignored: 
${ctx.partitionSpec.getText}")
-}
-if (ctx.identifier != null) {
+val noscan = if (ctx.identifier != null) {
   if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
 throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
   }
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier))
-} else if (ctx.identifierSeq() == null) {
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), 
noscan = false)
+  true
+} else {
+  false
+}
--- End diff --

```Scala
if (ctx.identifier != null &&
ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
  throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-06-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r124177318
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---
@@ -101,19 +101,30 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder(conf) {
* }}}
*/
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = 
withOrigin(ctx) {
-if (ctx.partitionSpec != null) {
-  logWarning(s"Partition specification is ignored: 
${ctx.partitionSpec.getText}")
-}
-if (ctx.identifier != null) {
+val noscan = if (ctx.identifier != null) {
   if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
 throw new ParseException(s"Expected `NOSCAN` instead of 
`${ctx.identifier.getText}`", ctx)
   }
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier))
-} else if (ctx.identifierSeq() == null) {
-  AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), 
noscan = false)
+  true
+} else {
+  false
+}
+
+val partitionSpec = if (ctx.partitionSpec != null) {
+  Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)
 } else {
+  None
+}
+
+val table = visitTableIdentifier(ctx.tableIdentifier)
+if (ctx.identifierSeq() == null) {
+  AnalyzeTableCommand(table, noscan, partitionSpec)
--- End diff --

`AnalyzeTableCommand(table, noscan = ctx.identifier != null, partitionSpec)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-06-26 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18421#discussion_r124175975
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 ---
@@ -96,7 +96,8 @@ object CatalogStorageFormat {
 case class CatalogTablePartition(
 spec: CatalogTypes.TablePartitionSpec,
 storage: CatalogStorageFormat,
-parameters: Map[String, String] = Map.empty) {
+parameters: Map[String, String] = Map.empty,
+stats: Option[CatalogStatistics] = None) {
--- End diff --

Add this part to the description of this case class. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18421: [SPARK-21213][SQL] Support collecting partition-l...

2017-06-26 Thread mbasmanova
GitHub user mbasmanova opened a pull request:

https://github.com/apache/spark/pull/18421

[SPARK-21213][SQL] Support collecting partition-level statistics: row…

…Count and sizeInBytes

## What changes were proposed in this pull request?

Added support for ANALYZE TABLE table PARTITION (key=value,..) COMPUTE 
STATISTICS [NOSCAN] SQL command to calculate total number of rows and size in 
bytes for the specified partition. Calculated statistics are stores in Hive 
Metastore as user-defined properties attached to the partition object. Property 
names are the same as the ones used to store table-level row count and size in 
bytes. Specifically, spark.sql.statistics.totalSize and 
spark.sql.statistics.numRows.

## How was this patch tested?

Added tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mbasmanova/spark mbasmanova-analyze-partition

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18421.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18421


commit e18f144a2c6b318a285fbb184bb5c78408f06039
Author: Masha Basmanova 
Date:   2017-06-12T20:36:28Z

[SPARK-21213][SQL] Support collecting partition-level statistics: rowCount 
and sizeInBytes




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org