spark git commit: [SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata
Repository: spark Updated Branches: refs/heads/master 440ea31b7 - c42c3fc7f [SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata https://issues.apache.org/jira/browse/SPARK-6575 Author: Yin Huai yh...@databricks.com This patch had conflicts when merged, resolved by Committer: Cheng Lian l...@databricks.com Closes #5339 from yhuai/parquetRelationCache and squashes the following commits: b0e1a42 [Yin Huai] Address comments. 83d9846 [Yin Huai] Remove unnecessary change. c0dc7a4 [Yin Huai] Cache converted parquet relations. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c42c3fc7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c42c3fc7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c42c3fc7 Branch: refs/heads/master Commit: c42c3fc7f7b79a1f6ce990d39b5d9d14ab19fcf0 Parents: 440ea31 Author: Yin Huai yh...@databricks.com Authored: Fri Apr 3 14:40:36 2015 +0800 Committer: Cheng Lian l...@databricks.com Committed: Fri Apr 3 14:40:36 2015 +0800 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 28 ++-- .../spark/sql/hive/execution/commands.scala | 5 ++-- .../apache/spark/sql/hive/parquetSuites.scala | 2 -- 3 files changed, 23 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c42c3fc7/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 76d329a..c4da34a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -116,8 +116,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } override def refreshTable(databaseName: String, tableName: String): Unit = { -// refresh table does not eagerly reload the cache. It just invalidate the cache. +// refreshTable does not eagerly reload the cache. It just invalidate the cache. // Next time when we use the table, it will be populated in the cache. +// Since we also cache ParquetRealtions converted from Hive Parquet tables and +// adding converted ParquetRealtions into the cache is not defined in the load function +// of the cache (instead, we add the cache entry in convertToParquetRelation), +// it is better at here to invalidate the cache to avoid confusing waring logs from the +// cache loader (e.g. cannot find data source provider, which is only defined for +// data source table.). invalidateTable(databaseName, tableName) } @@ -242,21 +248,27 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) def getCached( - tableIdentifier: QualifiedTableName, - pathsInMetastore: Seq[String], - schemaInMetastore: StructType, - partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { +tableIdentifier: QualifiedTableName, +pathsInMetastore: Seq[String], +schemaInMetastore: StructType, +partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null = None // Cache miss -case logical @ LogicalRelation(parquetRelation: ParquetRelation2) = +case logical@LogicalRelation(parquetRelation: ParquetRelation2) = // If we have the same paths, same schema, and same partition spec, // we will use the cached Parquet Relation. val useCached = -parquetRelation.paths == pathsInMetastore +parquetRelation.paths.toSet == pathsInMetastore.toSet logical.schema.sameType(metastoreSchema) parquetRelation.maybePartitionSpec == partitionSpecInMetastore - if (useCached) Some(logical) else None + if (useCached) { +Some(logical) + } else { +// If the cached relation is not updated, we invalidate it right away. +cachedDataSourceTables.invalidate(tableIdentifier) +None + } case other = logWarning( s${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored + http://git-wip-us.apache.org/repos/asf/spark/blob/c42c3fc7/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala -- diff --git
spark git commit: [SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata
Repository: spark Updated Branches: refs/heads/branch-1.3 ac705aa83 - 0c1b78b72 [SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata https://issues.apache.org/jira/browse/SPARK-6575 Author: Yin Huai yh...@databricks.com This patch had conflicts when merged, resolved by Committer: Cheng Lian l...@databricks.com Closes #5339 from yhuai/parquetRelationCache and squashes the following commits: b0e1a42 [Yin Huai] Address comments. 83d9846 [Yin Huai] Remove unnecessary change. c0dc7a4 [Yin Huai] Cache converted parquet relations. (cherry picked from commit c42c3fc7f7b79a1f6ce990d39b5d9d14ab19fcf0) Signed-off-by: Cheng Lian l...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c1b78b7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c1b78b7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c1b78b7 Branch: refs/heads/branch-1.3 Commit: 0c1b78b72bfb5ef806a6c5258a4e5b021b8d1912 Parents: ac705aa Author: Yin Huai yh...@databricks.com Authored: Fri Apr 3 14:40:36 2015 +0800 Committer: Cheng Lian l...@databricks.com Committed: Fri Apr 3 14:40:53 2015 +0800 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 28 ++-- .../spark/sql/hive/execution/commands.scala | 5 ++-- .../apache/spark/sql/hive/parquetSuites.scala | 2 -- 3 files changed, 23 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0c1b78b7/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 76d329a..c4da34a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -116,8 +116,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } override def refreshTable(databaseName: String, tableName: String): Unit = { -// refresh table does not eagerly reload the cache. It just invalidate the cache. +// refreshTable does not eagerly reload the cache. It just invalidate the cache. // Next time when we use the table, it will be populated in the cache. +// Since we also cache ParquetRealtions converted from Hive Parquet tables and +// adding converted ParquetRealtions into the cache is not defined in the load function +// of the cache (instead, we add the cache entry in convertToParquetRelation), +// it is better at here to invalidate the cache to avoid confusing waring logs from the +// cache loader (e.g. cannot find data source provider, which is only defined for +// data source table.). invalidateTable(databaseName, tableName) } @@ -242,21 +248,27 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) def getCached( - tableIdentifier: QualifiedTableName, - pathsInMetastore: Seq[String], - schemaInMetastore: StructType, - partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { +tableIdentifier: QualifiedTableName, +pathsInMetastore: Seq[String], +schemaInMetastore: StructType, +partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null = None // Cache miss -case logical @ LogicalRelation(parquetRelation: ParquetRelation2) = +case logical@LogicalRelation(parquetRelation: ParquetRelation2) = // If we have the same paths, same schema, and same partition spec, // we will use the cached Parquet Relation. val useCached = -parquetRelation.paths == pathsInMetastore +parquetRelation.paths.toSet == pathsInMetastore.toSet logical.schema.sameType(metastoreSchema) parquetRelation.maybePartitionSpec == partitionSpecInMetastore - if (useCached) Some(logical) else None + if (useCached) { +Some(logical) + } else { +// If the cached relation is not updated, we invalidate it right away. +cachedDataSourceTables.invalidate(tableIdentifier) +None + } case other = logWarning( s${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored +
spark git commit: [SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata
Repository: spark Updated Branches: refs/heads/master 45134ec92 - 4b82bd730 [SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata https://issues.apache.org/jira/browse/SPARK-6575 Author: Yin Huai yh...@databricks.com Closes #5339 from yhuai/parquetRelationCache and squashes the following commits: 83d9846 [Yin Huai] Remove unnecessary change. c0dc7a4 [Yin Huai] Cache converted parquet relations. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b82bd73 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b82bd73 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b82bd73 Branch: refs/heads/master Commit: 4b82bd730a24f96d94dfea87420cfaa4253a5ccb Parents: 45134ec Author: Yin Huai yh...@databricks.com Authored: Thu Apr 2 20:23:08 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Thu Apr 2 20:23:08 2015 -0700 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 61 +- .../apache/spark/sql/hive/parquetSuites.scala | 112 +++ 2 files changed, 167 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4b82bd73/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index bbd920a..76d329a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -116,7 +116,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } override def refreshTable(databaseName: String, tableName: String): Unit = { -cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase) +// refresh table does not eagerly reload the cache. It just invalidate the cache. +// Next time when we use the table, it will be populated in the cache. +invalidateTable(databaseName, tableName) } def invalidateTable(databaseName: String, tableName: String): Unit = { @@ -229,13 +231,42 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = { val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging -val parquetOptions = Map( - ParquetRelation2.METASTORE_SCHEMA - metastoreSchema.json, - ParquetRelation2.MERGE_SCHEMA - mergeSchema.toString) // NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to // serialize the Metastore schema to JSON and pass it as a data source option because of the // evil case insensitivity issue, which is reconciled within `ParquetRelation2`. +val parquetOptions = Map( + ParquetRelation2.METASTORE_SCHEMA - metastoreSchema.json, + ParquetRelation2.MERGE_SCHEMA - mergeSchema.toString) +val tableIdentifier = + QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) + +def getCached( + tableIdentifier: QualifiedTableName, + pathsInMetastore: Seq[String], + schemaInMetastore: StructType, + partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { + cachedDataSourceTables.getIfPresent(tableIdentifier) match { +case null = None // Cache miss +case logical @ LogicalRelation(parquetRelation: ParquetRelation2) = + // If we have the same paths, same schema, and same partition spec, + // we will use the cached Parquet Relation. + val useCached = +parquetRelation.paths == pathsInMetastore +logical.schema.sameType(metastoreSchema) +parquetRelation.maybePartitionSpec == partitionSpecInMetastore + + if (useCached) Some(logical) else None +case other = + logWarning( +s${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored + + sas Parquet. However, we are getting a ${other} from the metastore cache. + + sThis cached entry will be invalidated.) + cachedDataSourceTables.invalidate(tableIdentifier) + None + } +} + if (metastoreRelation.hiveQlTable.isPartitioned) { val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) val partitionColumnDataTypes = partitionSchema.map(_.dataType) @@ -248,10 +279,28 @@ private[hive] class HiveMetastoreCatalog(hive:
spark git commit: [SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata
Repository: spark Updated Branches: refs/heads/branch-1.3 0ef46b2d8 - 0c1c0fb90 [SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata https://issues.apache.org/jira/browse/SPARK-6575 Author: Yin Huai yh...@databricks.com Closes #5339 from yhuai/parquetRelationCache and squashes the following commits: 83d9846 [Yin Huai] Remove unnecessary change. c0dc7a4 [Yin Huai] Cache converted parquet relations. (cherry picked from commit 4b82bd730a24f96d94dfea87420cfaa4253a5ccb) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c1c0fb9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c1c0fb9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c1c0fb9 Branch: refs/heads/branch-1.3 Commit: 0c1c0fb90d025e60c5ab74bd80a7c36482070b80 Parents: 0ef46b2 Author: Yin Huai yh...@databricks.com Authored: Thu Apr 2 20:23:08 2015 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Thu Apr 2 20:23:16 2015 -0700 -- .../spark/sql/hive/HiveMetastoreCatalog.scala | 61 +- .../apache/spark/sql/hive/parquetSuites.scala | 112 +++ 2 files changed, 167 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0c1c0fb9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index bbd920a..76d329a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -116,7 +116,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } override def refreshTable(databaseName: String, tableName: String): Unit = { -cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase) +// refresh table does not eagerly reload the cache. It just invalidate the cache. +// Next time when we use the table, it will be populated in the cache. +invalidateTable(databaseName, tableName) } def invalidateTable(databaseName: String, tableName: String): Unit = { @@ -229,13 +231,42 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = { val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging -val parquetOptions = Map( - ParquetRelation2.METASTORE_SCHEMA - metastoreSchema.json, - ParquetRelation2.MERGE_SCHEMA - mergeSchema.toString) // NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to // serialize the Metastore schema to JSON and pass it as a data source option because of the // evil case insensitivity issue, which is reconciled within `ParquetRelation2`. +val parquetOptions = Map( + ParquetRelation2.METASTORE_SCHEMA - metastoreSchema.json, + ParquetRelation2.MERGE_SCHEMA - mergeSchema.toString) +val tableIdentifier = + QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) + +def getCached( + tableIdentifier: QualifiedTableName, + pathsInMetastore: Seq[String], + schemaInMetastore: StructType, + partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { + cachedDataSourceTables.getIfPresent(tableIdentifier) match { +case null = None // Cache miss +case logical @ LogicalRelation(parquetRelation: ParquetRelation2) = + // If we have the same paths, same schema, and same partition spec, + // we will use the cached Parquet Relation. + val useCached = +parquetRelation.paths == pathsInMetastore +logical.schema.sameType(metastoreSchema) +parquetRelation.maybePartitionSpec == partitionSpecInMetastore + + if (useCached) Some(logical) else None +case other = + logWarning( +s${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored + + sas Parquet. However, we are getting a ${other} from the metastore cache. + + sThis cached entry will be invalidated.) + cachedDataSourceTables.invalidate(tableIdentifier) + None + } +} + if (metastoreRelation.hiveQlTable.isPartitioned) { val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) val