Repository: incubator-hawq Updated Branches: refs/heads/master c577d622a -> f08b5e6c6
HAWQ-372. Fix single row insert and COPY hang in high concurrent workloads Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/f08b5e6c Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/f08b5e6c Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/f08b5e6c Branch: refs/heads/master Commit: f08b5e6c64387bc6067f1a45ceec1c50b3b1ce8d Parents: c577d62 Author: Ruilong Huo <r...@pivotal.io> Authored: Tue Feb 2 02:44:49 2016 -0800 Committer: Ruilong Huo <r...@pivotal.io> Committed: Tue Feb 2 02:44:49 2016 -0800 ---------------------------------------------------------------------- src/backend/commands/analyze.c | 436 ++++++++++++++++++++---------------- src/backend/commands/copy.c | 22 +- src/include/commands/vacuum.h | 2 +- 3 files changed, 265 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f08b5e6c/src/backend/commands/analyze.c ---------------------------------------------------------------------- diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c index d0e2523..4dac537 100644 --- a/src/backend/commands/analyze.c +++ b/src/backend/commands/analyze.c @@ -256,20 +256,14 @@ void analyzeStatement(VacuumStmt *stmt, List *relids, int preferred_seg_num) GpAutoStatsModeValue autostatInFunctionsvalBackup = gp_autostats_mode_in_functions; bool optimizerBackup = optimizer; int target_seg_num = (preferred_seg_num > 0) ? preferred_seg_num : GetUtilPartitionNum(); - QueryResource *resource = AllocateResource(QRL_ONCE, 1, 0, target_seg_num, target_seg_num, NULL, 0); - QueryResource *savedResource = NULL; gp_autostats_mode = GP_AUTOSTATS_NONE; gp_autostats_mode_in_functions = GP_AUTOSTATS_NONE; optimizer = false; - - savedResource = GetActiveQueryResource(); - SetActiveQueryResource(resource); - PG_TRY(); { - analyzeStmt(stmt, relids); + analyzeStmt(stmt, relids, target_seg_num); gp_autostats_mode = autostatvalBackup; gp_autostats_mode_in_functions = autostatInFunctionsvalBackup; optimizer = optimizerBackup; @@ -281,18 +275,12 @@ void analyzeStatement(VacuumStmt *stmt, List *relids, int preferred_seg_num) gp_autostats_mode = autostatvalBackup; gp_autostats_mode_in_functions = autostatInFunctionsvalBackup; optimizer = optimizerBackup; - FreeResource(resource); - UnsetActiveQueryResource(); - SetActiveQueryResource(savedResource); + /* Carry on with error handling. */ PG_RE_THROW(); } PG_END_TRY(); - FreeResource(resource); - UnsetActiveQueryResource(); - SetActiveQueryResource(savedResource); - Assert(gp_autostats_mode == autostatvalBackup); Assert(gp_autostats_mode_in_functions == autostatInFunctionsvalBackup); Assert(optimizer == optimizerBackup); @@ -304,7 +292,7 @@ void analyzeStatement(VacuumStmt *stmt, List *relids, int preferred_seg_num) * vacstmt - Vacuum statement. * relids - Usually NULL except when called by autovacuum. */ -void analyzeStmt(VacuumStmt *stmt, List *relids) +void analyzeStmt(VacuumStmt *stmt, List *relids, int target_seg_num) { List *lRelOids = NIL; MemoryContext callerContext = NULL; @@ -478,237 +466,301 @@ void analyzeStmt(VacuumStmt *stmt, List *relids) MemoryContextSwitchTo(analyzeStatementContext); } + /** + * We open relations with appropreciate locks + */ + List *candidateRelations = NIL; + foreach (le1, lRelOids) { Oid candidateOid = InvalidOid; Relation candidateRelation = NULL; - bool bTemp; - - bTemp = false; Assert(analyzeStatementContext == CurrentMemoryContext); - if (bUseOwnXacts) - { - /** - * We use a different transaction per relation so that we - * may release locks on relations as soon as possible. - */ - StartTransactionCommand(); - ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); - MemoryContextSwitchTo(analyzeStatementContext); - } - candidateOid = lfirst_oid(le1); candidateRelation = - try_relation_open(candidateOid, ShareUpdateExclusiveLock, false); + try_relation_open(candidateOid, ShareUpdateExclusiveLock, false); if (candidateRelation) { - /** - * We got a lock on the relation. Good! - */ - if (analyzePermitted(RelationGetRelid(candidateRelation))) - { - StringInfoData ext_uri; + candidateRelations = lappend(candidateRelations, candidateRelation); + } + else + { + elog(ERROR, "Cannot open and lock relation %s for analyze", + RelationGetRelationName(candidateRelation)); + } + } - /* - * We have permission to ANALYZE. - */ + /** + * We allocate query resource for analyze + */ + QueryResource *resource = AllocateResource(QRL_ONCE, 1, 0, target_seg_num, target_seg_num, NULL, 0); + QueryResource *savedResource = NULL; - /* MPP-7576: don't track internal namespace tables */ - switch (candidateRelation->rd_rel->relnamespace) - { - case PG_CATALOG_NAMESPACE: - /* MPP-7773: don't track objects in system namespace - * if modifying system tables (eg during upgrade) - */ - if (allowSystemTableModsDDL) - bTemp = true; - break; - - case PG_TOAST_NAMESPACE: - case PG_BITMAPINDEX_NAMESPACE: - case PG_AOSEGMENT_NAMESPACE: - bTemp = true; - break; - default: - break; - } + savedResource = GetActiveQueryResource(); + SetActiveQueryResource(resource); - /* MPP-7572: Don't track metadata if table in any - * temporary namespace + /** + * We do actual analyze + */ + PG_TRY(); + { + foreach (le1, candidateRelations) + { + Relation candidateRelation = NULL; + bool bTemp = false; + + Assert(analyzeStatementContext == CurrentMemoryContext); + + if (bUseOwnXacts) + { + /** + * We use a different transaction per relation so that we + * may release locks on relations as soon as possible. */ - if (!bTemp) - bTemp = isAnyTempNamespace( - candidateRelation->rd_rel->relnamespace); + StartTransactionCommand(); + ActiveSnapshot = CopySnapshot(GetTransactionSnapshot()); + MemoryContextSwitchTo(analyzeStatementContext); + } - initStringInfo(&ext_uri); + candidateRelation = (Relation)lfirst(le1); - if (candidateRelation->rd_rel->relkind != RELKIND_RELATION) - { - /** - * Is the relation the right kind? - */ - ereport(WARNING, - (errmsg("skipping \"%s\" --- cannot analyze indexes, views, external tables or special system tables", - RelationGetRelationName(candidateRelation)))); - relation_close(candidateRelation, ShareUpdateExclusiveLock); - } - else if (isOtherTempNamespace(RelationGetNamespace(candidateRelation))) - { - /* Silently ignore tables that are temp tables of other backends. */ - relation_close(candidateRelation, ShareUpdateExclusiveLock); - } - else if (RelationIsExternalPxfReadOnly(candidateRelation, &ext_uri) && - !pxf_enable_stat_collection) - { - /* PXF supports ANALYZE, but only when the GUC is on */ - ereport(WARNING, - (errmsg("skipping \"%s\" --- analyze for PXF tables is turned off by 'pxf_enable_stat_collection'", - RelationGetRelationName(candidateRelation)))); - relation_close(candidateRelation, ShareUpdateExclusiveLock); - } - else + if (candidateRelation) + { + /** + * We got a lock on the relation. Good! + */ + if (analyzePermitted(RelationGetRelid(candidateRelation))) { - List *lAttNames = NIL; + StringInfoData ext_uri; - /* Switch to per relation context */ - MemoryContextSwitchTo(analyzeRelationContext); + /* + * We have permission to ANALYZE. + */ - if (stmt->va_cols) + /* MPP-7576: don't track internal namespace tables */ + switch (candidateRelation->rd_rel->relnamespace) { - /** - * Column names have been provided. Should have specified relation name as well. - */ - Assert(stmt->relation && "Column names specified but not relation name"); - lAttNames = buildExplicitAttributeNames(RelationGetRelid(candidateRelation), stmt); + case PG_CATALOG_NAMESPACE: + /* MPP-7773: don't track objects in system namespace + * if modifying system tables (eg during upgrade) + */ + if (allowSystemTableModsDDL) + { + bTemp = true; + } + break; + + case PG_TOAST_NAMESPACE: + case PG_BITMAPINDEX_NAMESPACE: + case PG_AOSEGMENT_NAMESPACE: + bTemp = true; + break; + + default: + break; } - else + + /* MPP-7572: Don't track metadata if table in any + * temporary namespace + */ + if (!bTemp) { - lAttNames = analyzableAttributes(candidateRelation); + bTemp = isAnyTempNamespace( + candidateRelation->rd_rel->relnamespace); } - /* Start a sub-transaction for each analyzed table */ - MemoryContext oldcontext = CurrentMemoryContext; - ResourceOwner oldowner = CurrentResourceOwner; - BeginInternalSubTransaction(NULL); - MemoryContextSwitchTo(oldcontext); + initStringInfo(&ext_uri); - PG_TRY(); + if (candidateRelation->rd_rel->relkind != RELKIND_RELATION) { - analyzeRelation(candidateRelation, lAttNames, stmt->rootonly); - -#ifdef FAULT_INJECTOR - FaultInjector_InjectFaultIfSet( - AnalyzeSubxactError, - DDLNotSpecified, - "", /* databaseName */ - ""); /* tableName */ -#endif /* FAULT_INJECTOR */ + /** + * Is the relation the right kind? + */ + ereport(WARNING, + (errmsg("skipping \"%s\" --- cannot analyze indexes, views, external tables or special system tables", + RelationGetRelationName(candidateRelation)))); - ReleaseCurrentSubTransaction(); - MemoryContextSwitchTo(oldcontext); - CurrentResourceOwner = oldowner; - successCount += 1; + relation_close(candidateRelation, ShareUpdateExclusiveLock); } - PG_CATCH(); + else if (isOtherTempNamespace(RelationGetNamespace(candidateRelation))) { - ErrorData *edata; - - /* Save error info */ - MemoryContextSwitchTo(oldcontext); - edata = CopyErrorData(); - FlushErrorState(); - - elog(WARNING, "skipping \"%s\" --- error returned: %s", - RelationGetRelationName(candidateRelation), - edata->message); - failCount += 1; - appendStringInfo(&failNames, "%s", failCount == 1 ? "(" : ", "); - appendStringInfo(&failNames, "%s", RelationGetRelationName(candidateRelation)); + /* Silently ignore tables that are temp tables of other backends. */ + relation_close(candidateRelation, ShareUpdateExclusiveLock); + } + else if (RelationIsExternalPxfReadOnly(candidateRelation, &ext_uri) && + (!pxf_enable_stat_collection)) + { + /* PXF supports ANALYZE, but only when the GUC is on */ + ereport(WARNING, + (errmsg("skipping \"%s\" --- analyze for PXF tables is turned off by 'pxf_enable_stat_collection'", + RelationGetRelationName(candidateRelation)))); + relation_close(candidateRelation, ShareUpdateExclusiveLock); + } + else + { + List *lAttNames = NIL; - /* rollback this table's sub-transaction */ - RollbackAndReleaseCurrentSubTransaction(); - MemoryContextSwitchTo(oldcontext); - CurrentResourceOwner = oldowner; + /* Switch to per relation context */ + MemoryContextSwitchTo(analyzeRelationContext); - /* Cancel from user should result in canceling ANALYZE, not just this table */ - if (edata->sqlerrcode == ERRCODE_QUERY_CANCELED) + if (stmt->va_cols) { - ReThrowError(edata); + /** + * Column names have been provided. Should have specified relation name as well. + */ + Assert(stmt->relation && "Column names specified but not relation name"); + lAttNames = buildExplicitAttributeNames(RelationGetRelid(candidateRelation), stmt); } else { - /* release error state */ - FreeErrorData(edata); + lAttNames = analyzableAttributes(candidateRelation); } - } - PG_END_TRY(); - - /* Switch back to statement context and reset relation context */ - MemoryContextSwitchTo(analyzeStatementContext); - MemoryContextResetAndDeleteChildren(analyzeRelationContext); - /* - * Close source relation now, but keep lock so - * that no one deletes it before we commit. (If - * someone did, they'd fail to clean up the - * entries we made in pg_statistic. Also, - * releasing the lock before commit would expose - * us to concurrent-update failures.) - */ + /* Start a sub-transaction for each analyzed table */ + MemoryContext oldcontext = CurrentMemoryContext; + ResourceOwner oldowner = CurrentResourceOwner; + BeginInternalSubTransaction(NULL); + MemoryContextSwitchTo(oldcontext); - relation_close(candidateRelation, NoLock); + PG_TRY(); + { + analyzeRelation(candidateRelation, lAttNames, stmt->rootonly); - /* MPP-6929: metadata tracking */ - if (!bTemp && (Gp_role == GP_ROLE_DISPATCH)) - { - char *asubtype = ""; +#ifdef FAULT_INJECTOR + FaultInjector_InjectFaultIfSet( + AnalyzeSubxactError, + DDLNotSpecified, + "", /* databaseName */ + ""); /* tableName */ +#endif /* FAULT_INJECTOR */ - if (IsAutoVacuumProcess()) - asubtype = "AUTO"; + ReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + successCount += 1; + } + PG_CATCH(); + { + ErrorData *edata; + + /* Save error info */ + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + + elog(WARNING, "skipping \"%s\" --- error returned: %s", + RelationGetRelationName(candidateRelation), + edata->message); + failCount += 1; + appendStringInfo(&failNames, "%s", failCount == 1 ? "(" : ", "); + appendStringInfo(&failNames, "%s", RelationGetRelationName(candidateRelation)); + + /* rollback this table's sub-transaction */ + RollbackAndReleaseCurrentSubTransaction(); + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + /* Cancel from user should result in canceling ANALYZE, not just this table */ + if (edata->sqlerrcode == ERRCODE_QUERY_CANCELED) + { + ReThrowError(edata); + } + else + { + /* release error state */ + FreeErrorData(edata); + } + } + PG_END_TRY(); + + /* Switch back to statement context and reset relation context */ + MemoryContextSwitchTo(analyzeStatementContext); + MemoryContextResetAndDeleteChildren(analyzeRelationContext); + + /* + * Close source relation now, but keep lock so + * that no one deletes it before we commit. (If + * someone did, they'd fail to clean up the + * entries we made in pg_statistic. Also, + * releasing the lock before commit would expose + * us to concurrent-update failures.) + */ + relation_close(candidateRelation, NoLock); - MetaTrackUpdObject(RelationRelationId, - RelationGetRelid(candidateRelation), - GetUserId(), - "ANALYZE", - asubtype - ); + /* MPP-6929: metadata tracking */ + if (!bTemp && (Gp_role == GP_ROLE_DISPATCH)) + { + char *asubtype = ""; + + if (IsAutoVacuumProcess()) + { + asubtype = "AUTO"; + } + + MetaTrackUpdObject(RelationRelationId, + RelationGetRelid(candidateRelation), + GetUserId(), + "ANALYZE", + asubtype + ); + } } } + else + { + /** + * We don't have permissions to ANALYZE the relation. Print warning and move on + * to the next relation. + */ + ereport(WARNING, + (errmsg("Skipping \"%s\" --- only table or database owner can analyze it", + RelationGetRelationName(candidateRelation)))); + + relation_close(candidateRelation, ShareUpdateExclusiveLock); + } /* if (analyzePermitted(RelationGetRelid(candidateRelation))) */ } else { + /* + * Relation may have been dropped out from under us. + * TODO: should we print a warning here? Do we print it during + * ANALYZE DB or AutoVacuum? + */ + } /* if (candidateRelation) */ + + if (bUseOwnXacts) + { /** - * We don't have permissions to ANALYZE the relation. Print warning and move on - * to the next relation. + * We commit the transaction so that locks on the relation may be released. */ - ereport(WARNING, - (errmsg("Skipping \"%s\" --- only table or database owner can analyze it", - RelationGetRelationName(candidateRelation)))); - relation_close(candidateRelation, ShareUpdateExclusiveLock); - } /* if (analyzePermitted(RelationGetRelid(candidateRelation))) */ + CommitTransactionCommand(); + MemoryContextSwitchTo(analyzeStatementContext); + } } - else - { - /* - * Relation may have been dropped out from under us. - * TODO: should we print a warning here? Do we print it during - * ANALYZE DB or AutoVacuum? - */ - } /* if (candidateRelation) */ + } /* End of PG_TRY() */ + PG_CATCH(); + { + FreeResource(resource); + resource = NULL; + UnsetActiveQueryResource(); + SetActiveQueryResource(savedResource); - if (bUseOwnXacts) - { - /** - * We commit the transaction so that locks on the relation may be released. - */ - CommitTransactionCommand(); - MemoryContextSwitchTo(analyzeStatementContext); - } + /* Carry on with error handling. */ + PG_RE_THROW(); } + PG_END_TRY(); + + /** + * We now free query resource + */ + FreeResource(resource); + resource = NULL; + UnsetActiveQueryResource(); + SetActiveQueryResource(savedResource); if (bUseOwnXacts) { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f08b5e6c/src/backend/commands/copy.c ---------------------------------------------------------------------- diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index ce7483a..acf303c 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -1695,6 +1695,26 @@ DoCopy(const CopyStmt *stmt, const char *queryString) if(cstate->cdbsreh) destroyCdbSreh(cstate->cdbsreh); + /** + * Query resource allocation for COPY probably contains four parts: + * 1) query resource for COPY itself + * 2) query resource for AO/Parquet segment file on HDFS + * 3) query resource for ANALYZE + * 4) query resource for several SPI for ANALYZE + * Query resource in 2 inherits from 1, and that in 4 inherits from 3. + * + * To prevent query resource "deadlock" that many concurrent + * COPY transactions hold query resource in 1 and 2, and thus + * makes some COPY transactions pending to get query resource + * for 3 and 4, here we return query resource for 1 and 2 as + * soon as possible. + */ + if (cstate->resource) + { + FreeResource(cstate->resource); + cstate->resource = NULL; + } + /* Clean up storage (probably not really necessary) */ processed = cstate->processed; @@ -1724,8 +1744,6 @@ DoCopy(const CopyStmt *stmt, const char *queryString) pfree(cstate->attribute_buf.data); pfree(cstate->line_buf.data); - if (cstate->resource) - FreeResource(cstate->resource); pfree(cstate); return processed; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/f08b5e6c/src/include/commands/vacuum.h ---------------------------------------------------------------------- diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 74a433a..4a2b551 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -181,5 +181,5 @@ extern List *get_oids_for_bitmap(List *all_extra_oids, Relation Irel, Relation o /* in commands/analyze.c */ extern void analyzeStatement(VacuumStmt *vacstmt, List *relids, int preferred_seg_num); -extern void analyzeStmt(VacuumStmt *vacstmt, List *relids); +extern void analyzeStmt(VacuumStmt *vacstmt, List *relids, int target_seg_num); #endif /* VACUUM_H */