spark git commit: [SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata

2015-04-03 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 440ea31b7 - c42c3fc7f


[SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata

https://issues.apache.org/jira/browse/SPARK-6575

Author: Yin Huai yh...@databricks.com

This patch had conflicts when merged, resolved by
Committer: Cheng Lian l...@databricks.com

Closes #5339 from yhuai/parquetRelationCache and squashes the following commits:

b0e1a42 [Yin Huai] Address comments.
83d9846 [Yin Huai] Remove unnecessary change.
c0dc7a4 [Yin Huai] Cache converted parquet relations.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c42c3fc7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c42c3fc7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c42c3fc7

Branch: refs/heads/master
Commit: c42c3fc7f7b79a1f6ce990d39b5d9d14ab19fcf0
Parents: 440ea31
Author: Yin Huai yh...@databricks.com
Authored: Fri Apr 3 14:40:36 2015 +0800
Committer: Cheng Lian l...@databricks.com
Committed: Fri Apr 3 14:40:36 2015 +0800

--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 28 ++--
 .../spark/sql/hive/execution/commands.scala |  5 ++--
 .../apache/spark/sql/hive/parquetSuites.scala   |  2 --
 3 files changed, 23 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c42c3fc7/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 76d329a..c4da34a 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -116,8 +116,14 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
   }
 
   override def refreshTable(databaseName: String, tableName: String): Unit = {
-// refresh table does not eagerly reload the cache. It just invalidate the 
cache.
+// refreshTable does not eagerly reload the cache. It just invalidate the 
cache.
 // Next time when we use the table, it will be populated in the cache.
+// Since we also cache ParquetRealtions converted from Hive Parquet tables 
and
+// adding converted ParquetRealtions into the cache is not defined in the 
load function
+// of the cache (instead, we add the cache entry in 
convertToParquetRelation),
+// it is better at here to invalidate the cache to avoid confusing waring 
logs from the
+// cache loader (e.g. cannot find data source provider, which is only 
defined for
+// data source table.).
 invalidateTable(databaseName, tableName)
   }
 
@@ -242,21 +248,27 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
   QualifiedTableName(metastoreRelation.databaseName, 
metastoreRelation.tableName)
 
 def getCached(
-  tableIdentifier: QualifiedTableName,
-  pathsInMetastore: Seq[String],
-  schemaInMetastore: StructType,
-  partitionSpecInMetastore: Option[PartitionSpec]): 
Option[LogicalRelation] = {
+tableIdentifier: QualifiedTableName,
+pathsInMetastore: Seq[String],
+schemaInMetastore: StructType,
+partitionSpecInMetastore: Option[PartitionSpec]): 
Option[LogicalRelation] = {
   cachedDataSourceTables.getIfPresent(tableIdentifier) match {
 case null = None // Cache miss
-case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =
+case logical@LogicalRelation(parquetRelation: ParquetRelation2) =
   // If we have the same paths, same schema, and same partition spec,
   // we will use the cached Parquet Relation.
   val useCached =
-parquetRelation.paths == pathsInMetastore 
+parquetRelation.paths.toSet == pathsInMetastore.toSet 
 logical.schema.sameType(metastoreSchema) 
 parquetRelation.maybePartitionSpec == partitionSpecInMetastore
 
-  if (useCached) Some(logical) else None
+  if (useCached) {
+Some(logical)
+  } else {
+// If the cached relation is not updated, we invalidate it right 
away.
+cachedDataSourceTables.invalidate(tableIdentifier)
+None
+  }
 case other =
   logWarning(
 s${metastoreRelation.databaseName}.${metastoreRelation.tableName} 
shold be stored  +

http://git-wip-us.apache.org/repos/asf/spark/blob/c42c3fc7/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
--
diff --git 

spark git commit: [SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata

2015-04-03 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 ac705aa83 - 0c1b78b72


[SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata

https://issues.apache.org/jira/browse/SPARK-6575

Author: Yin Huai yh...@databricks.com

This patch had conflicts when merged, resolved by
Committer: Cheng Lian l...@databricks.com

Closes #5339 from yhuai/parquetRelationCache and squashes the following commits:

b0e1a42 [Yin Huai] Address comments.
83d9846 [Yin Huai] Remove unnecessary change.
c0dc7a4 [Yin Huai] Cache converted parquet relations.

(cherry picked from commit c42c3fc7f7b79a1f6ce990d39b5d9d14ab19fcf0)
Signed-off-by: Cheng Lian l...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c1b78b7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c1b78b7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c1b78b7

Branch: refs/heads/branch-1.3
Commit: 0c1b78b72bfb5ef806a6c5258a4e5b021b8d1912
Parents: ac705aa
Author: Yin Huai yh...@databricks.com
Authored: Fri Apr 3 14:40:36 2015 +0800
Committer: Cheng Lian l...@databricks.com
Committed: Fri Apr 3 14:40:53 2015 +0800

--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 28 ++--
 .../spark/sql/hive/execution/commands.scala |  5 ++--
 .../apache/spark/sql/hive/parquetSuites.scala   |  2 --
 3 files changed, 23 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0c1b78b7/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 76d329a..c4da34a 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -116,8 +116,14 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
   }
 
   override def refreshTable(databaseName: String, tableName: String): Unit = {
-// refresh table does not eagerly reload the cache. It just invalidate the 
cache.
+// refreshTable does not eagerly reload the cache. It just invalidate the 
cache.
 // Next time when we use the table, it will be populated in the cache.
+// Since we also cache ParquetRealtions converted from Hive Parquet tables 
and
+// adding converted ParquetRealtions into the cache is not defined in the 
load function
+// of the cache (instead, we add the cache entry in 
convertToParquetRelation),
+// it is better at here to invalidate the cache to avoid confusing waring 
logs from the
+// cache loader (e.g. cannot find data source provider, which is only 
defined for
+// data source table.).
 invalidateTable(databaseName, tableName)
   }
 
@@ -242,21 +248,27 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
   QualifiedTableName(metastoreRelation.databaseName, 
metastoreRelation.tableName)
 
 def getCached(
-  tableIdentifier: QualifiedTableName,
-  pathsInMetastore: Seq[String],
-  schemaInMetastore: StructType,
-  partitionSpecInMetastore: Option[PartitionSpec]): 
Option[LogicalRelation] = {
+tableIdentifier: QualifiedTableName,
+pathsInMetastore: Seq[String],
+schemaInMetastore: StructType,
+partitionSpecInMetastore: Option[PartitionSpec]): 
Option[LogicalRelation] = {
   cachedDataSourceTables.getIfPresent(tableIdentifier) match {
 case null = None // Cache miss
-case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =
+case logical@LogicalRelation(parquetRelation: ParquetRelation2) =
   // If we have the same paths, same schema, and same partition spec,
   // we will use the cached Parquet Relation.
   val useCached =
-parquetRelation.paths == pathsInMetastore 
+parquetRelation.paths.toSet == pathsInMetastore.toSet 
 logical.schema.sameType(metastoreSchema) 
 parquetRelation.maybePartitionSpec == partitionSpecInMetastore
 
-  if (useCached) Some(logical) else None
+  if (useCached) {
+Some(logical)
+  } else {
+// If the cached relation is not updated, we invalidate it right 
away.
+cachedDataSourceTables.invalidate(tableIdentifier)
+None
+  }
 case other =
   logWarning(
 s${metastoreRelation.databaseName}.${metastoreRelation.tableName} 
shold be stored  +


spark git commit: [SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata

2015-04-02 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 45134ec92 - 4b82bd730


[SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata

https://issues.apache.org/jira/browse/SPARK-6575

Author: Yin Huai yh...@databricks.com

Closes #5339 from yhuai/parquetRelationCache and squashes the following commits:

83d9846 [Yin Huai] Remove unnecessary change.
c0dc7a4 [Yin Huai] Cache converted parquet relations.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b82bd73
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b82bd73
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b82bd73

Branch: refs/heads/master
Commit: 4b82bd730a24f96d94dfea87420cfaa4253a5ccb
Parents: 45134ec
Author: Yin Huai yh...@databricks.com
Authored: Thu Apr 2 20:23:08 2015 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Thu Apr 2 20:23:08 2015 -0700

--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  61 +-
 .../apache/spark/sql/hive/parquetSuites.scala   | 112 +++
 2 files changed, 167 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4b82bd73/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index bbd920a..76d329a 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -116,7 +116,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
   }
 
   override def refreshTable(databaseName: String, tableName: String): Unit = {
-cachedDataSourceTables.refresh(QualifiedTableName(databaseName, 
tableName).toLowerCase)
+// refresh table does not eagerly reload the cache. It just invalidate the 
cache.
+// Next time when we use the table, it will be populated in the cache.
+invalidateTable(databaseName, tableName)
   }
 
   def invalidateTable(databaseName: String, tableName: String): Unit = {
@@ -229,13 +231,42 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
   private def convertToParquetRelation(metastoreRelation: MetastoreRelation): 
LogicalRelation = {
 val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
 val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
-val parquetOptions = Map(
-  ParquetRelation2.METASTORE_SCHEMA - metastoreSchema.json,
-  ParquetRelation2.MERGE_SCHEMA - mergeSchema.toString)
 
 // NOTE: Instead of passing Metastore schema directly to 
`ParquetRelation2`, we have to
 // serialize the Metastore schema to JSON and pass it as a data source 
option because of the
 // evil case insensitivity issue, which is reconciled within 
`ParquetRelation2`.
+val parquetOptions = Map(
+  ParquetRelation2.METASTORE_SCHEMA - metastoreSchema.json,
+  ParquetRelation2.MERGE_SCHEMA - mergeSchema.toString)
+val tableIdentifier =
+  QualifiedTableName(metastoreRelation.databaseName, 
metastoreRelation.tableName)
+
+def getCached(
+  tableIdentifier: QualifiedTableName,
+  pathsInMetastore: Seq[String],
+  schemaInMetastore: StructType,
+  partitionSpecInMetastore: Option[PartitionSpec]): 
Option[LogicalRelation] = {
+  cachedDataSourceTables.getIfPresent(tableIdentifier) match {
+case null = None // Cache miss
+case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =
+  // If we have the same paths, same schema, and same partition spec,
+  // we will use the cached Parquet Relation.
+  val useCached =
+parquetRelation.paths == pathsInMetastore 
+logical.schema.sameType(metastoreSchema) 
+parquetRelation.maybePartitionSpec == partitionSpecInMetastore
+
+  if (useCached) Some(logical) else None
+case other =
+  logWarning(
+s${metastoreRelation.databaseName}.${metastoreRelation.tableName} 
shold be stored  +
+  sas Parquet. However, we are getting a ${other} from the 
metastore cache.  +
+  sThis cached entry will be invalidated.)
+  cachedDataSourceTables.invalidate(tableIdentifier)
+  None
+  }
+}
+
 if (metastoreRelation.hiveQlTable.isPartitioned) {
   val partitionSchema = 
StructType.fromAttributes(metastoreRelation.partitionKeys)
   val partitionColumnDataTypes = partitionSchema.map(_.dataType)
@@ -248,10 +279,28 @@ private[hive] class HiveMetastoreCatalog(hive: 

spark git commit: [SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata

2015-04-02 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 0ef46b2d8 - 0c1c0fb90


[SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata

https://issues.apache.org/jira/browse/SPARK-6575

Author: Yin Huai yh...@databricks.com

Closes #5339 from yhuai/parquetRelationCache and squashes the following commits:

83d9846 [Yin Huai] Remove unnecessary change.
c0dc7a4 [Yin Huai] Cache converted parquet relations.

(cherry picked from commit 4b82bd730a24f96d94dfea87420cfaa4253a5ccb)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c1c0fb9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c1c0fb9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c1c0fb9

Branch: refs/heads/branch-1.3
Commit: 0c1c0fb90d025e60c5ab74bd80a7c36482070b80
Parents: 0ef46b2
Author: Yin Huai yh...@databricks.com
Authored: Thu Apr 2 20:23:08 2015 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Thu Apr 2 20:23:16 2015 -0700

--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  61 +-
 .../apache/spark/sql/hive/parquetSuites.scala   | 112 +++
 2 files changed, 167 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0c1c0fb9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index bbd920a..76d329a 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -116,7 +116,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
   }
 
   override def refreshTable(databaseName: String, tableName: String): Unit = {
-cachedDataSourceTables.refresh(QualifiedTableName(databaseName, 
tableName).toLowerCase)
+// refresh table does not eagerly reload the cache. It just invalidate the 
cache.
+// Next time when we use the table, it will be populated in the cache.
+invalidateTable(databaseName, tableName)
   }
 
   def invalidateTable(databaseName: String, tableName: String): Unit = {
@@ -229,13 +231,42 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
   private def convertToParquetRelation(metastoreRelation: MetastoreRelation): 
LogicalRelation = {
 val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
 val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
-val parquetOptions = Map(
-  ParquetRelation2.METASTORE_SCHEMA - metastoreSchema.json,
-  ParquetRelation2.MERGE_SCHEMA - mergeSchema.toString)
 
 // NOTE: Instead of passing Metastore schema directly to 
`ParquetRelation2`, we have to
 // serialize the Metastore schema to JSON and pass it as a data source 
option because of the
 // evil case insensitivity issue, which is reconciled within 
`ParquetRelation2`.
+val parquetOptions = Map(
+  ParquetRelation2.METASTORE_SCHEMA - metastoreSchema.json,
+  ParquetRelation2.MERGE_SCHEMA - mergeSchema.toString)
+val tableIdentifier =
+  QualifiedTableName(metastoreRelation.databaseName, 
metastoreRelation.tableName)
+
+def getCached(
+  tableIdentifier: QualifiedTableName,
+  pathsInMetastore: Seq[String],
+  schemaInMetastore: StructType,
+  partitionSpecInMetastore: Option[PartitionSpec]): 
Option[LogicalRelation] = {
+  cachedDataSourceTables.getIfPresent(tableIdentifier) match {
+case null = None // Cache miss
+case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =
+  // If we have the same paths, same schema, and same partition spec,
+  // we will use the cached Parquet Relation.
+  val useCached =
+parquetRelation.paths == pathsInMetastore 
+logical.schema.sameType(metastoreSchema) 
+parquetRelation.maybePartitionSpec == partitionSpecInMetastore
+
+  if (useCached) Some(logical) else None
+case other =
+  logWarning(
+s${metastoreRelation.databaseName}.${metastoreRelation.tableName} 
shold be stored  +
+  sas Parquet. However, we are getting a ${other} from the 
metastore cache.  +
+  sThis cached entry will be invalidated.)
+  cachedDataSourceTables.invalidate(tableIdentifier)
+  None
+  }
+}
+
 if (metastoreRelation.hiveQlTable.isPartitioned) {
   val partitionSchema = 
StructType.fromAttributes(metastoreRelation.partitionKeys)
   val