http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/48ff52c9/src/backend/nodes/outfuncs.c ---------------------------------------------------------------------- diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index cf6bf04..5894184 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -2111,15 +2111,18 @@ _outCreateStmt(StringInfo str, CreateStmt *node) { WRITE_NODE_TYPE("CREATESTMT"); - WRITE_NODE_FIELD(relation); - WRITE_NODE_FIELD(tableElts); - WRITE_NODE_FIELD(inhRelations); - WRITE_NODE_FIELD(constraints); - WRITE_NODE_FIELD(options); - WRITE_ENUM_FIELD(oncommit, OnCommitAction); - WRITE_STRING_FIELD(tablespacename); - WRITE_NODE_FIELD(distributedBy); - WRITE_NODE_FIELD(partitionBy); + WRITE_CHAR_FIELD(base.relKind); + WRITE_NODE_FIELD(base.relation); + WRITE_NODE_FIELD(base.tableElts); + WRITE_NODE_FIELD(base.inhRelations); + WRITE_NODE_FIELD(base.constraints); + WRITE_NODE_FIELD(base.options); + WRITE_ENUM_FIELD(base.oncommit, OnCommitAction); + WRITE_STRING_FIELD(base.tablespacename); + WRITE_NODE_FIELD(base.distributedBy); + WRITE_BOOL_FIELD(base.is_part_child); + WRITE_BOOL_FIELD(base.is_add_part); + WRITE_NODE_FIELD(base.partitionBy); WRITE_OID_FIELD(oidInfo.relOid); WRITE_OID_FIELD(oidInfo.comptypeOid); WRITE_OID_FIELD(oidInfo.toastOid); @@ -2131,13 +2134,10 @@ _outCreateStmt(StringInfo str, CreateStmt *node) WRITE_OID_FIELD(oidInfo.aoblkdirOid); WRITE_OID_FIELD(oidInfo.aoblkdirIndexOid); WRITE_OID_FIELD(oidInfo.aoblkdirComptypeOid); - WRITE_CHAR_FIELD(relKind); WRITE_CHAR_FIELD(relStorage); /* policy omitted */ /* postCreate omitted */ WRITE_NODE_FIELD(deferredStmts); - WRITE_BOOL_FIELD(is_part_child); - WRITE_BOOL_FIELD(is_add_part); WRITE_BOOL_FIELD(is_split_part); WRITE_OID_FIELD(ownerid); WRITE_BOOL_FIELD(buildAoBlkdir); @@ -2170,16 +2170,27 @@ _outCreateExternalStmt(StringInfo str, CreateExternalStmt *node) { WRITE_NODE_TYPE("CREATEEXTERNALSTMT"); - WRITE_NODE_FIELD(relation); - WRITE_NODE_FIELD(tableElts); + WRITE_CHAR_FIELD(base.relKind); + WRITE_NODE_FIELD(base.relation); + WRITE_NODE_FIELD(base.tableElts); + WRITE_NODE_FIELD(base.inhRelations); + WRITE_NODE_FIELD(base.constraints); + WRITE_NODE_FIELD(base.options); + WRITE_ENUM_FIELD(base.oncommit, OnCommitAction); + WRITE_STRING_FIELD(base.tablespacename); + WRITE_NODE_FIELD(base.distributedBy); + WRITE_BOOL_FIELD(base.is_part_child); + WRITE_BOOL_FIELD(base.is_add_part); + WRITE_NODE_FIELD(base.partitionBy); WRITE_NODE_FIELD(exttypedesc); WRITE_STRING_FIELD(format); - WRITE_NODE_FIELD(formatOpts); WRITE_BOOL_FIELD(isweb); WRITE_BOOL_FIELD(iswritable); + WRITE_BOOL_FIELD(isexternal); + WRITE_BOOL_FIELD(forceCreateDir); + WRITE_STRING_FIELD(parentPath); WRITE_NODE_FIELD(sreh); WRITE_NODE_FIELD(encoding); - WRITE_NODE_FIELD(distributedBy); } static void
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/48ff52c9/src/backend/nodes/readfast.c ---------------------------------------------------------------------- diff --git a/src/backend/nodes/readfast.c b/src/backend/nodes/readfast.c index f9aee80..2cc7035 100644 --- a/src/backend/nodes/readfast.c +++ b/src/backend/nodes/readfast.c @@ -2033,14 +2033,17 @@ _readCreateStmt(const char ** str) { READ_LOCALS(CreateStmt); - READ_NODE_FIELD(relation); - READ_NODE_FIELD(tableElts); - READ_NODE_FIELD(inhRelations); - READ_NODE_FIELD(constraints); - READ_NODE_FIELD(options); - READ_ENUM_FIELD(oncommit,OnCommitAction); - READ_STRING_FIELD(tablespacename); - READ_NODE_FIELD(distributedBy); + READ_CHAR_FIELD(base.relKind); + READ_NODE_FIELD(base.relation); + READ_NODE_FIELD(base.tableElts); + READ_NODE_FIELD(base.inhRelations); + READ_NODE_FIELD(base.constraints); + READ_NODE_FIELD(base.options); + READ_ENUM_FIELD(base.oncommit,OnCommitAction); + READ_STRING_FIELD(base.tablespacename); + READ_NODE_FIELD(base.distributedBy); + READ_BOOL_FIELD(base.is_part_child); + READ_BOOL_FIELD(base.is_add_part); READ_OID_FIELD(oidInfo.relOid); READ_OID_FIELD(oidInfo.comptypeOid); READ_OID_FIELD(oidInfo.toastOid); @@ -2052,13 +2055,10 @@ _readCreateStmt(const char ** str) READ_OID_FIELD(oidInfo.aoblkdirOid); READ_OID_FIELD(oidInfo.aoblkdirIndexOid); READ_OID_FIELD(oidInfo.aoblkdirComptypeOid); - READ_CHAR_FIELD(relKind); READ_CHAR_FIELD(relStorage); /* policy omitted */ /* postCreate - for analysis, QD only */ /* deferredStmts - for analysis, QD only */ - READ_BOOL_FIELD(is_part_child); - READ_BOOL_FIELD(is_add_part); READ_BOOL_FIELD(is_split_part); READ_OID_FIELD(ownerid); READ_BOOL_FIELD(buildAoBlkdir); @@ -2070,16 +2070,16 @@ _readCreateStmt(const char ** str) * Some extra checks to make sure we didn't get lost * during serialization/deserialization */ - Assert(local_node->relKind == RELKIND_INDEX || - local_node->relKind == RELKIND_RELATION || - local_node->relKind == RELKIND_SEQUENCE || - local_node->relKind == RELKIND_UNCATALOGED || - local_node->relKind == RELKIND_TOASTVALUE || - local_node->relKind == RELKIND_VIEW || - local_node->relKind == RELKIND_COMPOSITE_TYPE || - local_node->relKind == RELKIND_AOSEGMENTS || - local_node->relKind == RELKIND_AOBLOCKDIR); - Assert(local_node->oncommit <= ONCOMMIT_DROP); + Assert(local_node->base.relKind == RELKIND_INDEX || + local_node->base.relKind == RELKIND_RELATION || + local_node->base.relKind == RELKIND_SEQUENCE || + local_node->base.relKind == RELKIND_UNCATALOGED || + local_node->base.relKind == RELKIND_TOASTVALUE || + local_node->base.relKind == RELKIND_VIEW || + local_node->base.relKind == RELKIND_COMPOSITE_TYPE || + local_node->base.relKind == RELKIND_AOSEGMENTS || + local_node->base.relKind == RELKIND_AOBLOCKDIR); + Assert(local_node->base.oncommit <= ONCOMMIT_DROP); READ_DONE(); } @@ -2316,16 +2316,26 @@ _readCreateExternalStmt(const char ** str) { READ_LOCALS(CreateExternalStmt); - READ_NODE_FIELD(relation); - READ_NODE_FIELD(tableElts); + READ_CHAR_FIELD(base.relKind); + READ_NODE_FIELD(base.relation); + READ_NODE_FIELD(base.tableElts); + READ_NODE_FIELD(base.inhRelations); + READ_NODE_FIELD(base.constraints); + READ_NODE_FIELD(base.options); + READ_ENUM_FIELD(base.oncommit,OnCommitAction); + READ_STRING_FIELD(base.tablespacename); + READ_NODE_FIELD(base.distributedBy); + READ_BOOL_FIELD(base.is_part_child); + READ_BOOL_FIELD(base.is_add_part); READ_NODE_FIELD(exttypedesc); READ_STRING_FIELD(format); - READ_NODE_FIELD(formatOpts); READ_BOOL_FIELD(isweb); READ_BOOL_FIELD(iswritable); + READ_BOOL_FIELD(isexternal); + READ_BOOL_FIELD(forceCreateDir); + READ_STRING_FIELD(parentPath); READ_NODE_FIELD(sreh); READ_NODE_FIELD(encoding); - READ_NODE_FIELD(distributedBy); READ_DONE(); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/48ff52c9/src/backend/nodes/readfuncs.c ---------------------------------------------------------------------- diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index cbfbb53..dc7de27 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -2154,16 +2154,18 @@ _readCreateStmt(void) { READ_LOCALS(CreateStmt); - READ_NODE_FIELD(relation); - READ_NODE_FIELD(tableElts); - READ_NODE_FIELD(inhRelations); - READ_NODE_FIELD(constraints); - - READ_NODE_FIELD(options); - READ_ENUM_FIELD(oncommit,OnCommitAction); - READ_STRING_FIELD(tablespacename); - READ_NODE_FIELD(distributedBy); - READ_NODE_FIELD(partitionBy); + READ_CHAR_FIELD(base.relKind); + READ_NODE_FIELD(base.relation); + READ_NODE_FIELD(base.tableElts); + READ_NODE_FIELD(base.inhRelations); + READ_NODE_FIELD(base.constraints); + READ_NODE_FIELD(base.options); + READ_ENUM_FIELD(base.oncommit,OnCommitAction); + READ_STRING_FIELD(base.tablespacename); + READ_NODE_FIELD(base.distributedBy); + READ_BOOL_FIELD(base.is_part_child); + READ_BOOL_FIELD(base.is_add_part); + READ_NODE_FIELD(base.partitionBy); READ_OID_FIELD(oidInfo.relOid); READ_OID_FIELD(oidInfo.comptypeOid); READ_OID_FIELD(oidInfo.toastOid); @@ -2175,13 +2177,10 @@ _readCreateStmt(void) READ_OID_FIELD(oidInfo.aoblkdirOid); READ_OID_FIELD(oidInfo.aoblkdirIndexOid); READ_OID_FIELD(oidInfo.aoblkdirComptypeOid); - READ_CHAR_FIELD(relKind); READ_CHAR_FIELD(relStorage); /* policy omitted */ /* postCreate omitted */ READ_NODE_FIELD(deferredStmts); - READ_BOOL_FIELD(is_part_child); - READ_BOOL_FIELD(is_add_part); READ_BOOL_FIELD(is_split_part); READ_OID_FIELD(ownerid); READ_BOOL_FIELD(buildAoBlkdir); @@ -2300,16 +2299,27 @@ _readCreateExternalStmt(void) { READ_LOCALS(CreateExternalStmt); - READ_NODE_FIELD(relation); - READ_NODE_FIELD(tableElts); + READ_CHAR_FIELD(base.relKind); + READ_NODE_FIELD(base.relation); + READ_NODE_FIELD(base.tableElts); + READ_NODE_FIELD(base.inhRelations); + READ_NODE_FIELD(base.constraints); + READ_NODE_FIELD(base.options); + READ_ENUM_FIELD(base.oncommit,OnCommitAction); + READ_STRING_FIELD(base.tablespacename); + READ_NODE_FIELD(base.distributedBy); + READ_BOOL_FIELD(base.is_part_child); + READ_BOOL_FIELD(base.is_add_part); + READ_NODE_FIELD(base.partitionBy); READ_NODE_FIELD(exttypedesc); READ_STRING_FIELD(format); - READ_NODE_FIELD(formatOpts); READ_BOOL_FIELD(isweb); READ_BOOL_FIELD(iswritable); + READ_BOOL_FIELD(isexternal); + READ_BOOL_FIELD(forceCreateDir); + READ_STRING_FIELD(parentPath); READ_NODE_FIELD(sreh); READ_NODE_FIELD(encoding); - READ_NODE_FIELD(distributedBy); local_node->policy = NULL; READ_DONE(); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/48ff52c9/src/backend/optimizer/plan/createplan.c ---------------------------------------------------------------------- diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 7a7f261..cc8fc0e 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -1127,6 +1127,11 @@ bool is_pxf_protocol(Uri *uri) return false; } +bool is_hdfs_protocol(Uri *uri) +{ + return uri->protocol == URI_HDFS; +} + /* * create plan for pxf */ @@ -1441,6 +1446,10 @@ create_externalscan_plan(CreatePlanContext *ctx, Path *best_path, segdb_file_map = create_pxf_plan(segdb_file_map, rel, total_primaries, ctx, scan_relid); } + else if (using_location && is_hdfs_protocol(uri)) + { + // nothing to do + } /* (2) */ else if(using_location && (uri->protocol == URI_GPFDIST || uri->protocol == URI_GPFDISTS || @@ -1839,17 +1848,30 @@ create_externalscan_plan(CreatePlanContext *ctx, Path *best_path, /* data encoding */ encoding = rel->ext_encoding; - scan_plan = make_externalscan(tlist, - scan_clauses, - scan_relid, - filenames, - fmtopts, - rel->fmttype, - ismasteronly, - rejectlimit, - islimitinrows, - fmtErrTblOid, - encoding); + if (using_location && (is_hdfs_protocol(uri))) + scan_plan = make_externalscan(tlist, + scan_clauses, + scan_relid, + rel->locationlist, + fmtopts, + rel->fmttype, + ismasteronly, + rejectlimit, + islimitinrows, + fmtErrTblOid, + encoding); + else + scan_plan = make_externalscan(tlist, + scan_clauses, + scan_relid, + filenames, + fmtopts, + rel->fmttype, + ismasteronly, + rejectlimit, + islimitinrows, + fmtErrTblOid, + encoding); copy_path_costsize(ctx->root, &scan_plan->scan.plan, best_path); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/48ff52c9/src/backend/optimizer/plan/planner.c ---------------------------------------------------------------------- diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index d754df3..e34da95 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -318,11 +318,7 @@ PlannedStmt *refineCachedPlan(PlannedStmt * plannedstmt, /* * Now, we want to allocate resource. */ - allocResult = calculate_planner_segment_num(my_parse, plannedstmt->resource->life, - plannedstmt->rtable, plannedstmt->intoPolicy, - plannedstmt->nMotionNodes + plannedstmt->nInitPlans + 1, - -1); - + allocResult = calculate_planner_segment_num(plannedstmt, my_parse, plannedstmt->resource->life, -1); Assert(allocResult); ppResult->saResult = *allocResult; @@ -628,9 +624,7 @@ static void resource_negotiator(Query *parse, int cursorOptions, /* * Now, we want to allocate resource. */ - allocResult = calculate_planner_segment_num(my_parse, resourceLife, - plannedstmt->rtable, plannedstmt->intoPolicy, - plannedstmt->nMotionNodes + plannedstmt->nInitPlans + 1, -1); + allocResult = calculate_planner_segment_num(plannedstmt, my_parse, resourceLife, -1); Assert(allocResult); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/48ff52c9/src/backend/parser/analyze.c ---------------------------------------------------------------------- diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index 5024389..3d953c2 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -41,10 +41,15 @@ *------------------------------------------------------------------------- */ + #include "postgres.h" +#include "port.h" + +#include <uuid/uuid.h> #include "access/heapam.h" #include "access/reloptions.h" +#include "access/plugstorage.h" #include "catalog/catquery.h" #include "catalog/gp_policy.h" #include "catalog/heap.h" @@ -52,6 +57,8 @@ #include "catalog/indexing.h" #include "catalog/namespace.h" #include "catalog/pg_compression.h" +#include "catalog/pg_constraint.h" +#include "catalog/pg_exttable.h" #include "catalog/pg_partition.h" #include "catalog/pg_partition_rule.h" #include "catalog/pg_operator.h" @@ -59,6 +66,7 @@ #include "catalog/pg_type_encoding.h" #include "cdb/cdbpartition.h" #include "cdb/cdbparquetstoragewrite.h" +#include "commands/dbcommands.h" #include "commands/defrem.h" #include "commands/prepare.h" #include "commands/tablecmds.h" @@ -66,10 +74,14 @@ #include "miscadmin.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" +#include "nodes/nodes.h" +#include "nodes/pg_list.h" +#include "nodes/value.h" #include "optimizer/clauses.h" #include "optimizer/plancat.h" #include "optimizer/tlist.h" #include "optimizer/var.h" +#include "optimizer/planmain.h" #include "parser/analyze.h" #include "parser/gramparse.h" #include "parser/parse_agg.h" @@ -88,7 +100,9 @@ #include "utils/builtins.h" #include "utils/datum.h" #include "utils/lsyscache.h" +#include "utils/palloc.h" #include "utils/syscache.h" +#include "utils/uri.h" #include "cdb/cdbappendonlyam.h" #include "cdb/cdbvars.h" @@ -228,6 +242,9 @@ static void transformColumnDefinition(ParseState *pstate, static void transformTableConstraint(ParseState *pstate, CreateStmtContext *cxt, Constraint *constraint); +static void transformExtTableConstraint(ParseState *pstate, + CreateStmtContext *cxt, + Constraint *constraint); static void transformETDistributedBy(ParseState *pstate, CreateStmtContext *cxt, List *distributedBy, GpPolicy **policyp, List *options, List *likeDistributedBy, @@ -235,7 +252,7 @@ static void transformETDistributedBy(ParseState *pstate, CreateStmtContext *cxt, bool iswritable, bool onmaster); static void transformDistributedBy(ParseState *pstate, CreateStmtContext *cxt, - List *distributedBy, GpPolicy ** policy, List *options, + List *distributedBy, GpPolicy ** policy, List *options, List *likeDistributedBy, bool bQuiet); static void transformPartitionBy(ParseState *pstate, @@ -523,7 +540,7 @@ do_parse_analyze(Node *parseTree, ParseState *pstate) */ if (pstate->parentParseState == NULL && query->utilityStmt && IsA(query->utilityStmt, CreateStmt) && - ((CreateStmt *)query->utilityStmt)->partitionBy) + ((CreateStmt *)query->utilityStmt)->base.partitionBy) { /* * We just break the statements into two lists: alter statements and @@ -1572,7 +1589,7 @@ validateColumnStorageEncodingClauses(List *stenc, CreateStmt *stmt) return; /* Generate a hash table for all the columns */ - foreach(lc, stmt->tableElts) + foreach(lc, stmt->base.tableElts) { Node *n = lfirst(lc); @@ -1600,7 +1617,7 @@ validateColumnStorageEncodingClauses(List *stenc, CreateStmt *stmt) cacheFlags = HASH_ELEM; ht = hash_create("column info cache", - list_length(stmt->tableElts), + list_length(stmt->base.tableElts), &cacheInfo, cacheFlags); } @@ -1620,7 +1637,7 @@ validateColumnStorageEncodingClauses(List *stenc, CreateStmt *stmt) errmsg("column \"%s\" duplicated", colname), errOmitLocation(true))); - + } ce->count = 0; } @@ -1715,7 +1732,7 @@ TypeNameGetStorageDirective(TypeName *typname) Datum options; bool isnull; - options = caql_getattr(pcqCtx, + options = caql_getattr(pcqCtx, Anum_pg_type_encoding_typoptions, &isnull); @@ -1730,7 +1747,7 @@ TypeNameGetStorageDirective(TypeName *typname) /* * Make a default column storage directive from a WITH clause - * Ignore options in the WITH clause that don't appear in + * Ignore options in the WITH clause that don't appear in * storage_directives for column-level compression. */ List * @@ -1848,7 +1865,7 @@ transformAttributeEncoding(List *stenc, CreateStmt *stmt, CreateStmtContext cxt) * try and set the same options! */ - if (encodings_overlap(stmt->options, c->encoding, false)) + if (encodings_overlap(stmt->base.options, c->encoding, false)) ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("DEFAULT COLUMN ENCODING clause cannot " @@ -1863,7 +1880,7 @@ transformAttributeEncoding(List *stenc, CreateStmt *stmt, CreateStmtContext cxt) */ if (!deflt) { - tmpenc = form_default_storage_directive(stmt->options); + tmpenc = form_default_storage_directive(stmt->base.options); } else { @@ -1875,7 +1892,7 @@ transformAttributeEncoding(List *stenc, CreateStmt *stmt, CreateStmtContext cxt) deflt = makeNode(ColumnReferenceStorageDirective); deflt->deflt = true; deflt->encoding = transformStorageEncodingClause(tmpenc); - } + } /* * Loop over all columns. If a column has a column reference storage clause @@ -1981,11 +1998,12 @@ transformCreateStmt(ParseState *pstate, CreateStmt *stmt, cxt.stmtType = "CREATE TABLE"; - cxt.relation = stmt->relation; - cxt.inhRelations = stmt->inhRelations; + cxt.isExternalTable = false; + cxt.relation = stmt->base.relation; + cxt.inhRelations = stmt->base.inhRelations; cxt.isalter = false; - cxt.isaddpart = stmt->is_add_part; - cxt.columns = NIL; + cxt.isaddpart = stmt->base.is_add_part; + cxt.columns = NIL; cxt.ckconstraints = NIL; cxt.fkconstraints = NIL; cxt.ixconstraints = NIL; @@ -1994,12 +2012,12 @@ transformCreateStmt(ParseState *pstate, CreateStmt *stmt, cxt.alist = NIL; cxt.dlist = NIL; /* for deferred analysis requiring the created table */ cxt.pkey = NULL; - cxt.hasoids = interpretOidsOption(stmt->options); + cxt.hasoids = interpretOidsOption(stmt->base.options); stmt->policy = NULL; /* Disallow inheritance in combination with partitioning. */ - if (stmt->inhRelations && (stmt->partitionBy || stmt->is_part_child )) + if (stmt->base.inhRelations && (stmt->base.partitionBy || stmt->base.is_part_child )) { ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), @@ -2007,7 +2025,7 @@ transformCreateStmt(ParseState *pstate, CreateStmt *stmt, } /* Only on top-most partitioned tables. */ - if ( stmt->partitionBy && !stmt->is_part_child ) + if ( stmt->base.partitionBy && !stmt->base.is_part_child ) { fixCreateStmtForPartitionedTable(stmt); } @@ -2016,7 +2034,7 @@ transformCreateStmt(ParseState *pstate, CreateStmt *stmt, * Run through each primary element in the table creation clause. Separate * column defs from constraints, and do preliminary analysis. */ - foreach(elements, stmt->tableElts) + foreach(elements, stmt->base.tableElts) { Node *element = lfirst(elements); @@ -2045,8 +2063,8 @@ transformCreateStmt(ParseState *pstate, CreateStmt *stmt, (InhRelation *) element, false); if (Gp_role == GP_ROLE_DISPATCH && isBeginning && - stmt->distributedBy == NIL && - stmt->inhRelations == NIL && + stmt->base.distributedBy == NIL && + stmt->base.inhRelations == NIL && stmt->policy == NULL) { likeDistributedBy = getLikeDistributionPolicy((InhRelation *) element); @@ -2079,7 +2097,7 @@ transformCreateStmt(ParseState *pstate, CreateStmt *stmt, /* * Postprocess constraints that give rise to index definitions. */ - transformIndexConstraints(pstate, &cxt, stmt->is_add_part || stmt->is_split_part); + transformIndexConstraints(pstate, &cxt, stmt->base.is_add_part || stmt->is_split_part); /* * Carry any deferred analysis statements forward. Added for MPP-13750 @@ -2095,7 +2113,7 @@ transformCreateStmt(ParseState *pstate, CreateStmt *stmt, * Postprocess foreign-key constraints. * But don't cascade FK constraints to parts, yet. */ - if ( ! stmt->is_part_child ) + if ( ! stmt->base.is_part_child ) transformFKConstraints(pstate, &cxt, true, false); /* @@ -2113,9 +2131,9 @@ transformCreateStmt(ParseState *pstate, CreateStmt *stmt, * to the partition which the user wants to be non-AO. Just ignore it * instead. */ - if (stmt->is_part_child) + if (stmt->base.is_part_child) { - if (co_explicitly_disabled(stmt->options) || !stenc) + if (co_explicitly_disabled(stmt->base.options) || !stenc) stmt->attr_encodings = NIL; else { @@ -2132,29 +2150,29 @@ transformCreateStmt(ParseState *pstate, CreateStmt *stmt, /* * Postprocess Greenplum Database distribution columns */ - if (stmt->is_part_child || - (stmt->partitionBy && + if (stmt->base.is_part_child || + (stmt->base.partitionBy && ( /* be very quiet if set subpartn template */ - (((PartitionBy *)(stmt->partitionBy))->partQuiet == + (((PartitionBy *)(stmt->base.partitionBy))->partQuiet == PART_VERBO_NOPARTNAME) || ( /* quiet for partitions of depth > 0 */ - (((PartitionBy *)(stmt->partitionBy))->partDepth != 0) && - (((PartitionBy *)(stmt->partitionBy))->partQuiet != + (((PartitionBy *)(stmt->base.partitionBy))->partDepth != 0) && + (((PartitionBy *)(stmt->base.partitionBy))->partQuiet != PART_VERBO_NORMAL) ) ) )) bQuiet = true; /* silence distro messages for partitions */ - transformDistributedBy(pstate, &cxt, stmt->distributedBy, &stmt->policy, stmt->options, + transformDistributedBy(pstate, &cxt, stmt->base.distributedBy, &stmt->policy, stmt->base.options, likeDistributedBy, bQuiet); /* * Process table partitioning clause */ - transformPartitionBy(pstate, &cxt, stmt, stmt->partitionBy, stmt->policy); + transformPartitionBy(pstate, &cxt, stmt, stmt->base.partitionBy, stmt->policy); /* * Output results. @@ -2162,147 +2180,657 @@ transformCreateStmt(ParseState *pstate, CreateStmt *stmt, q = makeNode(Query); q->commandType = CMD_UTILITY; q->utilityStmt = (Node *) stmt; - stmt->tableElts = cxt.columns; - stmt->constraints = cxt.ckconstraints; + stmt->base.tableElts = cxt.columns; + stmt->base.constraints = cxt.ckconstraints; *extras_before = list_concat(*extras_before, cxt.blist); *extras_after = list_concat(cxt.alist, *extras_after); return q; } -static Query * -transformCreateExternalStmt(ParseState *pstate, CreateExternalStmt *stmt, - List **extras_before, List **extras_after) -{ - CreateStmtContext cxt; - Query *q; - ListCell *elements; - ExtTableTypeDesc *exttypeDesc = NULL; - List *likeDistributedBy = NIL; - bool bQuiet = false; /* shut up transformDistributedBy messages */ - bool onmaster = false; - bool iswritable = stmt->iswritable; - - cxt.stmtType = "CREATE EXTERNAL TABLE"; - cxt.relation = stmt->relation; - cxt.inhRelations = NIL; - cxt.hasoids = false; - cxt.isalter = false; - cxt.columns = NIL; - cxt.ckconstraints = NIL; - cxt.fkconstraints = NIL; - cxt.ixconstraints = NIL; - cxt.pkey = NULL; - - cxt.blist = NIL; - cxt.alist = NIL; - - /* - * Run through each primary element in the table creation clause. Separate - * column defs from constraints, and do preliminary analysis. - */ - foreach(elements, stmt->tableElts) - { - Node *element = lfirst(elements); - - switch (nodeTag(element)) - { - case T_ColumnDef: - transformColumnDefinition(pstate, &cxt, - (ColumnDef *) element); - break; - - case T_Constraint: - case T_FkConstraint: - /* should never happen. If it does fix gram.y */ - elog(ERROR, "node type %d not supported for external tables", - (int) nodeTag(element)); - break; - - case T_InhRelation: - { - /* LIKE */ - bool isBeginning = (cxt.columns == NIL); - - transformInhRelation(pstate, &cxt, - (InhRelation *) element, true); - - if (Gp_role == GP_ROLE_DISPATCH && isBeginning && - stmt->distributedBy == NIL && - stmt->policy == NULL && - iswritable /* dont bother if readable table */) - { - likeDistributedBy = getLikeDistributionPolicy((InhRelation *) element); - } - } - break; - - default: - elog(ERROR, "unrecognized node type: %d", - (int) nodeTag(element)); - break; - } - } +enum PreDefinedFormatterOptionVALTYPE { + PREDEF_FMTOPT_VAL_NO, + PREDEF_FMTOPT_VAL_STRING, + PREDEF_FMTOPT_VAL_SIGNEDINTEGER, + PREDEF_FMTOPT_VAL_COLNAMELIST +}; + +enum PreDefinedFormatterOptionID { + PREDEF_FMT_OPT_ID_DELIMITER, + PREDEF_FMT_OPT_ID_NULL, + PREDEF_FMT_OPT_ID_HEADER, + PREDEF_FMT_OPT_ID_QUOTE, + PREDEF_FMT_OPT_ID_ESCAPE, + PREDEF_FMT_OPT_ID_FORCENOTNULL, + PREDEF_FMT_OPT_ID_FORCEQUOTE, + PREDEF_FMT_OPT_ID_FILLMISSINGFIELDS, + PREDEF_FMT_OPT_ID_NEWLINE, + PREDEF_FMT_OPT_ID_UNPREDEFINED, + PREDEF_FMT_OPT_ID_ILLEGAL +}; + +typedef struct PreDefinedFormatterOption { + char keyword[3][32]; + int nKeyword; + bool hasValue; + enum PreDefinedFormatterOptionVALTYPE valueType; + enum PreDefinedFormatterOptionID optID; +} PreDefinedFormatterOption; + +#define PREDEF_FMTOPT_SIZE 9 + +enum PreDefinedFormatterOptionID MatchExternalRelationFormatterOption( + PreDefinedFormatterOption *options, ListCell *head) { + ListCell *p1 = head; + ListCell *p2 = head->next; + ListCell *p3 = p2 == NULL ? NULL : p2->next; + ListCell *p4 = p3 == NULL ? NULL : p3->next; + + DefElem *de1 = (DefElem *)lfirst(p1); + DefElem *de2 = p2 == NULL ? NULL : (DefElem *)lfirst(p2); + DefElem *de3 = p3 == NULL ? NULL : (DefElem *)lfirst(p3); + DefElem *de4 = p4 == NULL ? NULL : (DefElem *)lfirst(p4); + + if (strcmp("#ident", de1->defname) != 0) { + return PREDEF_FMT_OPT_ID_ILLEGAL; /* must start with a #ident elem */ + } + + for (int i = 0; i < PREDEF_FMTOPT_SIZE; ++i) { + PreDefinedFormatterOption *pdOpt = &(options[i]); + if (pdOpt->nKeyword == 1 && + strcasecmp(pdOpt->keyword[0], ((Value *)(de1->arg))->val.str) == 0) { + if (!options[i].hasValue) { + return options[i].optID; /* Got no value option */ + } else if (p2 == NULL) { + return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value field */ + } else if (((strcmp("#string", de2->defname) == 0) && + (options[i].valueType == PREDEF_FMTOPT_VAL_STRING)) || + ((strcmp("#int", de2->defname) == 0) && + (options[i].valueType == PREDEF_FMTOPT_VAL_SIGNEDINTEGER)) || + ((strcmp("#collist", de2->defname) == 0) && + (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST)) || + ((strcmp("#ident", de2->defname) == 0) && + (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST))) { + return options[i].optID; /* Got option having one value */ + } else { + return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value type */ + } + } else if (pdOpt->nKeyword == 2 && de2 != NULL && + strcasecmp(pdOpt->keyword[0], ((Value *)(de1->arg))->val.str) == + 0 && + strcasecmp(pdOpt->keyword[1], ((Value *)(de2->arg))->val.str) == + 0) { + if (!options[i].hasValue) { + return options[i].optID; /* got no value option */ + } else if (de3 == NULL) { + return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value field */ + } else if (((strcmp("#string", de3->defname) == 0) && + (options[i].valueType == PREDEF_FMTOPT_VAL_STRING)) || + ((strcmp("#int", de3->defname) == 0) && + (options[i].valueType == PREDEF_FMTOPT_VAL_SIGNEDINTEGER)) || + ((strcmp("#collist", de3->defname) == 0) && + (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST)) || + ((strcmp("#ident", de3->defname) == 0) && + (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST))) { + return options[i].optID; /* Got option having one value */ + } else { + return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value type */ + } + } else if (pdOpt->nKeyword == 3 && de2 != NULL && de3 != NULL && + strcasecmp(pdOpt->keyword[0], ((Value *)(de1->arg))->val.str) == + 0 && + strcasecmp(pdOpt->keyword[1], ((Value *)(de2->arg))->val.str) == + 0 && + strcasecmp(pdOpt->keyword[2], ((Value *)(de3->arg))->val.str) == + 0) { + if (!options[i].hasValue) { + return options[i].optID; /* got no value option */ + } else if (de4 == NULL) { + return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value field */ + } else if (((strcmp("#string", de4->defname) == 0) && + (options[i].valueType == PREDEF_FMTOPT_VAL_STRING)) || + ((strcmp("#int", de4->defname) == 0) && + (options[i].valueType == PREDEF_FMTOPT_VAL_SIGNEDINTEGER)) || + ((strcmp("#collist", de4->defname) == 0) && + (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST)) || + ((strcmp("#ident", de4->defname) == 0) && + (options[i].valueType == PREDEF_FMTOPT_VAL_COLNAMELIST))) { + return options[i].optID; /* Got option having one value */ + } else { + return PREDEF_FMT_OPT_ID_ILLEGAL; /* no expected value type */ + } + } + } - /* - * Check if this is an EXECUTE ON MASTER table. We'll need this information - * in transformExternalDistributedBy. While at it, we also check if an error - * table is attempted to be used on ON MASTER table and error if so. - */ - if(!iswritable) - { - exttypeDesc = (ExtTableTypeDesc *)stmt->exttypedesc; + /* + * We expect user defined special options which should be consumed + * further by customized formatter. + */ + return PREDEF_FMT_OPT_ID_UNPREDEFINED; +} - if(exttypeDesc->exttabletype == EXTTBL_TYPE_EXECUTE) - { - ListCell *exec_location_opt; +void recognizeExternalRelationFormatterOptions( + CreateExternalStmt *createExtStmt) { + PreDefinedFormatterOption options[PREDEF_FMTOPT_SIZE] = { + {{"delimiter", "", ""}, + 1, + true, + PREDEF_FMTOPT_VAL_STRING, + PREDEF_FMT_OPT_ID_DELIMITER}, + {{"null", "", ""}, + 1, + true, + PREDEF_FMTOPT_VAL_STRING, + PREDEF_FMT_OPT_ID_NULL}, + {{"header", "", ""}, + 1, + false, + PREDEF_FMTOPT_VAL_NO, + PREDEF_FMT_OPT_ID_HEADER}, + {{"quote", "", ""}, + 1, + true, + PREDEF_FMTOPT_VAL_STRING, + PREDEF_FMT_OPT_ID_QUOTE}, + {{"escape", "", ""}, + 1, + true, + PREDEF_FMTOPT_VAL_STRING, + PREDEF_FMT_OPT_ID_ESCAPE}, + {{"force", "not", "null"}, + 3, + true, + PREDEF_FMTOPT_VAL_COLNAMELIST, + PREDEF_FMT_OPT_ID_FORCENOTNULL}, + {{"force", "quote", ""}, + 2, + true, + PREDEF_FMTOPT_VAL_COLNAMELIST, + PREDEF_FMT_OPT_ID_FORCEQUOTE}, + {{"fill", "missing", "fields"}, + 3, + false, + PREDEF_FMTOPT_VAL_NO, + PREDEF_FMT_OPT_ID_FILLMISSINGFIELDS}, + {{"newline", "", ""}, + 1, + true, + PREDEF_FMTOPT_VAL_STRING, + PREDEF_FMT_OPT_ID_NEWLINE}}; + + List *newOpts = NULL; + ListCell *optCell = list_head(createExtStmt->base.options); + + /* Add restriction of error lines */ + if (createExtStmt->sreh != NULL) { + /* Handle error table specification and reject number per segment */ + SingleRowErrorDesc *errDesc = (SingleRowErrorDesc *)createExtStmt->sreh; + if (errDesc->rejectlimit > 0 && errDesc->is_hdfs_protocol_text) { + newOpts = lappend( + newOpts, + makeDefElem("reject_limit", makeInteger(errDesc->rejectlimit))); + if (errDesc->hdfsLoc) + newOpts = + lappend(newOpts, + makeDefElem("err_table", + (Node *)makeString(pstrdup(errDesc->hdfsLoc)))); + } + } + + while (optCell != NULL) { + /* Try a match now. */ + enum PreDefinedFormatterOptionID id = + MatchExternalRelationFormatterOption(options, optCell); + switch (id) { + case PREDEF_FMT_OPT_ID_DELIMITER: { + DefElem *de = (DefElem *)lfirst(optCell->next); + Value *v = (Value *)(de->arg); + DefElem *newde = + makeDefElem("delimiter", (Node *)makeString(v->val.str)); + newOpts = lappend(newOpts, newde); + optCell = optCell->next->next; + break; + } + case PREDEF_FMT_OPT_ID_NULL: { + DefElem *de = (DefElem *)lfirst(optCell->next); + Value *v = (Value *)(de->arg); + DefElem *newde = makeDefElem("null", (Node *)makeString(v->val.str)); + newOpts = lappend(newOpts, newde); + optCell = optCell->next->next; + break; + } + case PREDEF_FMT_OPT_ID_HEADER: { + DefElem *newde = makeDefElem("header", (Node *)makeInteger(TRUE)); + newOpts = lappend(newOpts, newde); + optCell = optCell->next; + break; + } + case PREDEF_FMT_OPT_ID_QUOTE: { + DefElem *de = (DefElem *)lfirst(optCell->next); + Value *v = (Value *)(de->arg); + DefElem *newde = makeDefElem("quote", (Node *)makeString(v->val.str)); + newOpts = lappend(newOpts, newde); + optCell = optCell->next->next; + break; + } + case PREDEF_FMT_OPT_ID_ESCAPE: { + DefElem *de = (DefElem *)lfirst(optCell->next); + Value *v = (Value *)(de->arg); + DefElem *newde = makeDefElem("escape", (Node *)makeString(v->val.str)); + newOpts = lappend(newOpts, newde); + optCell = optCell->next->next; + break; + } + case PREDEF_FMT_OPT_ID_FORCENOTNULL: { + DefElem *newde = NULL; + DefElem *de = (DefElem *)lfirst(optCell->next->next->next); + if (strcmp("#ident", de->defname) == 0) { + /* + * The case there is only one column name which is recognized + * as a ident string. + */ + Value *v = (Value *)(de->arg); + List *collist = list_make1(makeString(v->val.str)); + newde = makeDefElem("force_notnull", (Node *)collist); + + } else { + /* + * There are multiple column names in a list already + * recognized by parser. + */ + List *collist = NULL; + ListCell *colCell = NULL; + foreach (colCell, (List *)(de->arg)) { + collist = lappend(collist, + makeString(((Value *)lfirst(colCell))->val.str)); + elog(LOG, "recognized column list colname:%s", + ((Value *)lfirst(colCell))->val.str); + } + newde = makeDefElem("force_notnull", (Node *)collist); + + /* TODO: check where the old instance is freed */ + } + newOpts = lappend(newOpts, newde); + optCell = optCell->next->next->next->next; + break; + } + case PREDEF_FMT_OPT_ID_FORCEQUOTE: { + DefElem *newde = NULL; + DefElem *de = (DefElem *)lfirst(optCell->next->next); + if (strcmp("#ident", de->defname) == 0) { + /* + * The case there is only one column name which is recognized + * as a ident string. + */ + Value *v = (Value *)(de->arg); + List *collist = list_make1(makeString(v->val.str)); + newde = makeDefElem("force_quote", (Node *)collist); + + } else { + /* + * There are multiple column names in a list already + * recognized by parser. + */ + List *collist = NULL; + ListCell *colCell = NULL; + foreach (colCell, (List *)(de->arg)) { + collist = lappend(collist, + makeString(((Value *)lfirst(colCell))->val.str)); + elog(LOG, "recognized column list colname:%s", + ((Value *)lfirst(colCell))->val.str); + } + newde = makeDefElem("force_quote", (Node *)collist); + + /* TODO: check where the old instance is freed */ + } + newOpts = lappend(newOpts, newde); + optCell = optCell->next->next->next; + break; + } + case PREDEF_FMT_OPT_ID_FILLMISSINGFIELDS: { + DefElem *newde = + makeDefElem("fill_missing_fields", (Node *)makeInteger(TRUE)); + newOpts = lappend(newOpts, newde); + optCell = optCell->next->next->next; + break; + } + case PREDEF_FMT_OPT_ID_NEWLINE: { + DefElem *de = (DefElem *)lfirst(optCell->next); + Value *v = (Value *)(de->arg); + DefElem *newde = makeDefElem("newline", (Node *)makeString(v->val.str)); + newOpts = lappend(newOpts, newde); + optCell = optCell->next->next; + break; + } + case PREDEF_FMT_OPT_ID_UNPREDEFINED: { + /* + * In case it is a user defined option. we combind all continuous + * ident until we see a string constant or a integer constant. + * So this means user defined formatter's user defined option + * values can only be string or integer values. + */ + int c = 0; + int identlength = 0; + ListCell *walkerCell = optCell; + while (walkerCell != NULL && + strcmp("#ident", ((DefElem *)lfirst(walkerCell))->defname) == + 0) { + c++; + Value *v = (Value *)(((DefElem *)lfirst(walkerCell))->arg); + identlength += strlen(v->val.str) + 1; + walkerCell = walkerCell->next; + } - foreach(exec_location_opt, exttypeDesc->on_clause) - { - DefElem *defel = (DefElem *) lfirst(exec_location_opt); + /* Decide the value part */ + Node *value = NULL; + if (walkerCell == NULL) { + /* The case the option without value. we set TRUE for it. */ + value = makeInteger(TRUE); + } else { + DefElem *de = (DefElem *)lfirst(walkerCell); + if (strcmp("#collist", de->defname) == 0) { + /* + * We don't accept column name list value types for + * customized formatter's user defined options. + */ + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "cannot support column name list as an unknown " + "option's value"), + errOmitLocation(true))); + } else if (strcmp("#int", de->defname) == 0) { + value = makeInteger(((Value *)(de->arg))->val.ival); + } else { + value = makeString(((Value *)(de->arg))->val.str); + } + } - if (strcmp(defel->defname, "master") == 0) - { - SingleRowErrorDesc *srehDesc = (SingleRowErrorDesc *)stmt->sreh; + /* Build key part. */ + char *newKey = (char *)palloc0(sizeof(char) * identlength); + ListCell *walkerCell2 = optCell; + int counter = 0; + for (; counter < c; counter++, walkerCell2 = walkerCell2->next) { + Value *v = (Value *)(((DefElem *)lfirst(walkerCell2))->arg); + if (counter > 0) { + strcat(newKey, "_"); + } + strcat(newKey, v->val.str); + } - onmaster = true; + DefElem *newde = makeDefElem(newKey, (Node *)value); + newOpts = lappend(newOpts, newde); + + if (walkerCell) + optCell = walkerCell->next; + else + optCell = NULL; + break; + } + case PREDEF_FMT_OPT_ID_ILLEGAL: { + ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot recognize full formatter option list"), + errOmitLocation(true))); + } + } + } - if(srehDesc && srehDesc->errtable) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("External web table with ON MASTER clause " - "cannot use error tables."))); - } - } - } - } + /* Use new list to replace the old one */ + createExtStmt->base.options = newOpts; +} - /* - * Check if we need to create an error table. If so, add it to the - * before list. - */ - if(stmt->sreh && ((SingleRowErrorDesc *)stmt->sreh)->errtable) - transformSingleRowErrorHandling(pstate, &cxt, - (SingleRowErrorDesc *) stmt->sreh); +static Query *transformCreateExternalStmt(ParseState *pstate, + CreateExternalStmt *stmt, + List **extras_before, + List **extras_after) { + CreateStmtContext cxt; + Query *q; + ListCell *elements; + ExtTableTypeDesc *desc = NULL; + List *likeDistributedBy = NIL; + bool bQuiet = false; /* shut up transformDistributedBy messages */ + bool onmaster = false; + bool iswritable = stmt->iswritable; + bool isPluggableStorage = false; + if (!stmt->forceCreateDir) stmt->forceCreateDir = stmt->iswritable; + + cxt.stmtType = "CREATE EXTERNAL TABLE"; + cxt.isExternalTable = true; + cxt.relation = stmt->base.relation; + cxt.inhRelations = stmt->base.inhRelations; + cxt.isaddpart = stmt->base.is_add_part; + cxt.iswritable = stmt->iswritable; + cxt.exttypedesc = stmt->exttypedesc; + cxt.format = stmt->format; + cxt.parentPath = stmt->parentPath; + cxt.hasoids = false; + cxt.isalter = false; + cxt.columns = NIL; + cxt.ckconstraints = NIL; + cxt.fkconstraints = NIL; + cxt.ixconstraints = NIL; + cxt.inh_indexes = NIL; + cxt.pkey = NULL; + + cxt.blist = NIL; + cxt.alist = NIL; + + /* + * Build type description of internal table in pluggable storage + * framework based on format + */ + desc = (ExtTableTypeDesc *)(stmt->exttypedesc); + if (desc->exttabletype == EXTTBL_TYPE_UNKNOWN) { + if (stmt->format == NULL) { + ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("Internal table must have format specification"), + errhint("Use CREATE TABLE FORMAT instead"), + errOmitLocation(true))); + } - transformETDistributedBy(pstate, &cxt, stmt->distributedBy, &stmt->policy, NULL,/*no WITH options for ET*/ - likeDistributedBy, bQuiet, iswritable, onmaster); + /* orc, text, csv on hdfs */ + else if (pg_strncasecmp(stmt->format, "orc", strlen("orc")) == 0 || + pg_strncasecmp(stmt->format, "text", strlen("text")) == 0 || + pg_strncasecmp(stmt->format, "csv", strlen("csv")) == 0) { + desc->exttabletype = EXTTBL_TYPE_LOCATION; + desc->location_list = NIL; + // desc->location_list = list_make1((Node *) makeString(PROTOCOL_HDFS)); + desc->command_string = NULL; + desc->on_clause = NIL; + } else { + ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("Format \"%s\" for internal table is invalid", + stmt->format))); + } + isPluggableStorage = true; + } + + if (desc->exttabletype == EXTTBL_TYPE_LOCATION) { + + ListCell *loc_cell = list_head(desc->location_list); + if (loc_cell == NIL) { + if (pg_strncasecmp(stmt->format, "orc", strlen("orc")) && + pg_strncasecmp(stmt->format, "text", strlen("text")) && + pg_strncasecmp(stmt->format, "csv", strlen("csv"))) { + ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), + errmsg( + "Internal table on hdfs must be \'orc\', " + "\'text\', or \'csv\' format"))); + } + isPluggableStorage = true; + } else { + Value *loc_val = lfirst(loc_cell); + char *loc_str = pstrdup(loc_val->val.str); + bool is_hdfs_protocol = IS_HDFS_URI(loc_str); + isPluggableStorage = is_hdfs_protocol; + + + if (is_hdfs_protocol && + (pg_strncasecmp(stmt->format, "orc", strlen("orc")) && + pg_strncasecmp(stmt->format, "text", strlen("text")) && + pg_strncasecmp(stmt->format, "csv", strlen("csv")))) { + ereport( + ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg( + "LOCATION using hdfs url \'%s\' does not " + "support \'%s\' format", + loc_str, stmt->format), + errhint("Use \"FORMAT \'orc\', \'text\', or \'csv\'\" instead"), + errOmitLocation(true))); + } + } + } + + // handle error table for text/csv pluggable storage + if (stmt->sreh && isPluggableStorage && + (strcasecmp(stmt->format, "text") == 0 || + strcasecmp(stmt->format, "csv") == 0)) { + SingleRowErrorDesc *errDesc = (SingleRowErrorDesc *)stmt->sreh; + + if (!errDesc->is_limit_in_rows) { + ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), + errmsg( + "Single row error handling with percentage limit is " + "not accepted for pluggable storage"))); + } - Assert(cxt.ckconstraints == NIL); - Assert(cxt.fkconstraints == NIL); - Assert(cxt.ixconstraints == NIL); + errDesc->is_hdfs_protocol_text = true; + if (errDesc->errtable) { + errDesc->hdfsLoc = (char *)palloc0(MAXPGPATH); + char *fileSpacePath = NULL; + GetFilespacePathForTablespace(get_database_dts(MyDatabaseId), + &fileSpacePath); + /* uuid_t uuid; + char buf[1024]; + uuid_generate(uuid); + uuid_unparse(uuid, buf); + sprintf(errDesc->hdfsLoc, "%s/ExtErrTbl/%s", fileSpacePath, buf);*/ + } + } - /* - * Output results. - */ - q = makeNode(Query); - q->commandType = CMD_UTILITY; - q->utilityStmt = (Node *) stmt; - stmt->tableElts = cxt.columns; - *extras_before = list_concat(*extras_before, cxt.blist); - *extras_after = list_concat(cxt.alist, *extras_after); + // Only on top-most partitioned tables + if (stmt->base.partitionBy && !stmt->base.is_part_child) { + if (isPluggableStorage) + fixCreateStmtForPartitionedTable(&stmt->base); + else + elog(ERROR, + "Partition external table only supported for pluggable storage"); + } + + /* + * Run through each primary element in the table creation clause. Separate + * column defs from constraints, and do preliminary analysis. + */ + foreach (elements, stmt->base.tableElts) { + Node *element = lfirst(elements); + + switch (nodeTag(element)) { + case T_ColumnDef: + transformColumnDefinition(pstate, &cxt, (ColumnDef *)element); + break; + + case T_Constraint: + transformExtTableConstraint(pstate, &cxt, (Constraint *)element); + break; + + case T_FkConstraint: + /* should never happen. If it does fix gram.y */ + elog(ERROR, "node type %d not supported for external tables", + (int)nodeTag(element)); + break; + + case T_InhRelation: { + /* LIKE */ + bool isBeginning = (cxt.columns == NIL); + + transformInhRelation(pstate, &cxt, (InhRelation *)element, + !isPluggableStorage); + + if (Gp_role == GP_ROLE_DISPATCH && isBeginning && + stmt->base.distributedBy == NIL && stmt->policy == NULL && + iswritable /* dont bother if readable table */) { + likeDistributedBy = getLikeDistributionPolicy((InhRelation *)element); + } + } break; - return q; + default: + elog(ERROR, "unrecognized node type: %d", (int)nodeTag(element)); + break; + } + } + + /* + * transformIndexConstraints wants cxt.alist to contain only index + * statements, so transfer anything we already have into extras_after + * immediately. + */ + *extras_after = list_concat(cxt.alist, *extras_after); + cxt.alist = NIL; + + /* + * Postprocess constraints that give rise to index definitions. + */ + transformIndexConstraints(pstate, &cxt, false); + + /* + * Check if this is an EXECUTE ON MASTER table. We'll need this information + * in transformExternalDistributedBy. While at it, we also check if an error + * table is attempted to be used on ON MASTER table and error if so. + */ + if (!iswritable) { + desc = (ExtTableTypeDesc *)stmt->exttypedesc; + + if (desc->exttabletype == EXTTBL_TYPE_EXECUTE) { + ListCell *exec_location_opt; + + foreach (exec_location_opt, desc->on_clause) { + DefElem *defel = (DefElem *)lfirst(exec_location_opt); + + if (strcmp(defel->defname, "master") == 0) { + SingleRowErrorDesc *srehDesc = (SingleRowErrorDesc *)stmt->sreh; + + onmaster = true; + + if (srehDesc && srehDesc->errtable) + ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg( + "External web table with ON MASTER clause " + "cannot use error tables."))); + } + } + } + } + + /* + * Check if we need to create an error table. If so, add it to the + * before list. + */ + if (stmt->sreh && ((SingleRowErrorDesc *)stmt->sreh)->errtable) + transformSingleRowErrorHandling(pstate, &cxt, + (SingleRowErrorDesc *)stmt->sreh); + + transformETDistributedBy(pstate, &cxt, stmt->base.distributedBy, + &stmt->policy, NULL, /*no WITH options for ET*/ + likeDistributedBy, bQuiet, iswritable, onmaster); + + // Process table partitioning clause + if (isPluggableStorage) + transformPartitionBy(pstate, &cxt, &stmt->base, stmt->base.partitionBy, + stmt->policy); + + /* + * Output results. + */ + q = makeNode(Query); + q->commandType = CMD_UTILITY; + q->utilityStmt = (Node *)stmt; + stmt->base.tableElts = cxt.columns; + stmt->base.constraints = cxt.ckconstraints; + stmt->pkey = cxt.pkey; + *extras_before = list_concat(*extras_before, cxt.blist); + *extras_after = list_concat(cxt.alist, *extras_after); + + return q; } static Query * @@ -2314,8 +2842,9 @@ transformCreateForeignStmt(ParseState *pstate, CreateForeignStmt *stmt, ListCell *elements; cxt.stmtType = "CREATE FOREIGN TABLE"; - cxt.relation = stmt->relation; - cxt.inhRelations = NIL; + cxt.isExternalTable = false; + cxt.relation = stmt->relation; + cxt.inhRelations = NIL; cxt.hasoids = false; cxt.isalter = false; cxt.columns = NIL; @@ -2572,9 +3101,9 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("multiple default values specified for column \"%s\" of table \"%s\"", column->colname, cxt->relation->relname))); - /* - * Note: DEFAULT NULL maps to constraint->raw_expr == NULL - * + /* + * Note: DEFAULT NULL maps to constraint->raw_expr == NULL + * * We lose the knowledge that the user specified DEFAULT NULL at * this point, so we record it in default_is_null */ @@ -2643,7 +3172,23 @@ transformTableConstraint(ParseState *pstate, CreateStmtContext *cxt, } } - +static void transformExtTableConstraint(ParseState *pstate, + CreateStmtContext *cxt, + Constraint *constraint) { + switch (constraint->contype) { + case CONSTR_PRIMARY: + cxt->ixconstraints = lappend(cxt->ixconstraints, constraint); + break; + + case CONSTR_CHECK: + cxt->ckconstraints = lappend(cxt->ckconstraints, constraint); + break; + + default: + elog(ERROR, "unrecognized constraint type: %d", constraint->contype); + break; + } +} /* * transformETDistributedBy - transform DISTRIBUTED BY clause for @@ -2658,75 +3203,63 @@ transformTableConstraint(ParseState *pstate, CreateStmtContext *cxt, * this is an EXECUTE table with ON MASTER specified, in which case * we create no policy so that the master will be accessed. */ -static void -transformETDistributedBy(ParseState *pstate, CreateStmtContext *cxt, - List *distributedBy, GpPolicy **policyp, List *options, - List *likeDistributedBy, - bool bQuiet, - bool iswritable, - bool onmaster) -{ - int maxattrs = 200; - GpPolicy* p = NULL; - - /* - * utility mode creates can't have a policy. Only the QD can have policies - */ - if (Gp_role != GP_ROLE_DISPATCH) - { - *policyp = NULL; - return; - } - - if(!iswritable && list_length(distributedBy) > 0) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("Readable external tables can\'t specify a DISTRIBUTED BY clause."))); - - if(iswritable) - { - /* WET */ - - if(distributedBy == NIL && likeDistributedBy == NIL) - { - /* defaults to DISTRIBUTED RANDOMLY */ - p = (GpPolicy *) palloc(sizeof(GpPolicy) + maxattrs * - sizeof(p->attrs[0])); - p->ptype = POLICYTYPE_PARTITIONED; - p->nattrs = 0; - p->bucketnum = GetRelOpt_bucket_num_fromOptions(options, GetExternalTablePartitionNum()); - p->attrs[0] = 1; - - *policyp = p; - } - else - { - /* regular DISTRIBUTED BY transformation */ - transformDistributedBy(pstate, cxt, distributedBy, policyp, options, - likeDistributedBy, bQuiet); - } - } - else - { - /* RET */ - - if(onmaster) - { - p = NULL; - } - else - { - /* defaults to DISTRIBUTED RANDOMLY */ - p = (GpPolicy *) palloc(sizeof(GpPolicy) + maxattrs * - sizeof(p->attrs[0])); - p->ptype = POLICYTYPE_PARTITIONED; - p->nattrs = 0; - p->bucketnum = GetRelOpt_bucket_num_fromOptions(options, GetExternalTablePartitionNum()); - p->attrs[0] = 1; - } +static void transformETDistributedBy(ParseState *pstate, CreateStmtContext *cxt, + List *distributedBy, GpPolicy **policyp, + List *options, List *likeDistributedBy, + bool bQuiet, bool iswritable, + bool onmaster) { + int maxattrs = 200; + GpPolicy *p = NULL; + + /* + * utility mode creates can't have a policy. Only the QD can have policies + */ + if (Gp_role != GP_ROLE_DISPATCH) { + *policyp = NULL; + return; + } + + if (!iswritable && list_length(distributedBy) > 0) + ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg( + "Readable external tables can\'t specify a DISTRIBUTED " + "BY clause."))); + + if (iswritable) { + /* WET */ + + if (distributedBy == NIL && likeDistributedBy == NIL) { + /* defaults to DISTRIBUTED RANDOMLY */ + p = (GpPolicy *)palloc(sizeof(GpPolicy) + maxattrs * sizeof(p->attrs[0])); + p->ptype = POLICYTYPE_PARTITIONED; + p->nattrs = 0; + p->bucketnum = GetRelOpt_bucket_num_fromOptions( + options, GetExternalTablePartitionNum()); + p->attrs[0] = 1; + + *policyp = p; + } else { + /* regular DISTRIBUTED BY transformation */ + transformDistributedBy(pstate, cxt, distributedBy, policyp, options, + likeDistributedBy, bQuiet); + } + } else { + /* RET */ + + if (onmaster) { + p = NULL; + } else { + /* defaults to DISTRIBUTED RANDOMLY */ + p = (GpPolicy *)palloc(sizeof(GpPolicy) + maxattrs * sizeof(p->attrs[0])); + p->ptype = POLICYTYPE_PARTITIONED; + p->nattrs = 0; + p->bucketnum = GetRelOpt_bucket_num_fromOptions( + options, GetExternalTablePartitionNum()); + p->attrs[0] = 1; + } - *policyp = p; - } + *policyp = p; + } } /****************stmt->policy*********************/ @@ -3424,7 +3957,7 @@ make_prule_catalog(ParseState *pstate, char newVals[10000]; { - List *coldefs = stmt->tableElts; + List *coldefs = stmt->base.tableElts; ListCell *lc = NULL; StringInfoData sid; int colcnt = 0; @@ -3583,7 +4116,7 @@ make_prule_rulestmt(ParseState *pstate, if (1) { - List *coldefs = stmt->tableElts; + List *coldefs = stmt->base.tableElts; ListCell *lc = NULL; List *vl1 = NULL; @@ -6007,7 +6540,7 @@ validate_list_partition(partValidationState *vstate) foreach(lc2, already) { List *item = lfirst(lc2); - + Assert( IsA(item, List) && list_length(item) == nvals ); /* @@ -6132,11 +6665,11 @@ merge_partition_encoding(ParseState *pstate, PartitionElem *elem, List *penc) ListCell *lc; AlterPartitionCmd *pc; - /* + /* * First of all, we shouldn't proceed if this partition isn't AOCO */ - /* + /* * Yes, I am as surprised as you are that this is how we represent the WITH * clause here. */ @@ -6153,7 +6686,7 @@ merge_partition_encoding(ParseState *pstate, PartitionElem *elem, List *penc) return; /* nothing more to do */ } - /* + /* * If the specific partition has no specific column encoding, just * set it to the partition level default and we're done. */ @@ -6162,7 +6695,7 @@ merge_partition_encoding(ParseState *pstate, PartitionElem *elem, List *penc) elem->colencs = penc; return; } - + /* * Fixup the actual column encoding clauses for this specific partition * element. @@ -7299,11 +7832,11 @@ merge_part_column_encodings(CreateStmt *cs, List *stenc) if (!stenc) return; - /* + /* * First, split the table elements into column reference storage directives * and everything else. */ - foreach(lc, cs->tableElts) + foreach(lc, cs->base.tableElts) { Node *n = lfirst(lc); @@ -7335,7 +7868,7 @@ merge_part_column_encodings(CreateStmt *cs, List *stenc) foreach(lc2, finalencs) { ColumnReferenceStorageDirective *f = lfirst(lc2); - + if (f->deflt) continue; @@ -7379,7 +7912,7 @@ merge_part_column_encodings(CreateStmt *cs, List *stenc) } } - cs->tableElts = list_concat(others, finalencs); + cs->base.tableElts = list_concat(others, finalencs); } static void @@ -7403,9 +7936,9 @@ make_child_node(CreateStmt *stmt, CreateStmtContext *cxt, char *relname, child_tab_name->relname = relname; child_tab_name->location = -1; - child_tab_stmt->relation = child_tab_name; - child_tab_stmt->is_part_child = true; - child_tab_stmt->is_add_part = stmt->is_add_part; + child_tab_stmt->base.relation = child_tab_name; + child_tab_stmt->base.is_part_child = true; + child_tab_stmt->base.is_add_part = stmt->base.is_add_part; if (!bQuiet) ereport(NOTICE, @@ -7415,7 +7948,7 @@ make_child_node(CreateStmt *stmt, CreateStmtContext *cxt, char *relname, cxt->relation->relname))); /* set the "Post Create" rule if it exists */ - child_tab_stmt->postCreate = pPostCreate; + child_tab_stmt->base.postCreate = pPostCreate; /* * Deep copy the parent's table elements. @@ -7430,7 +7963,7 @@ make_child_node(CreateStmt *stmt, CreateStmtContext *cxt, char *relname, * user-specified constraint names, so we don't do one here * any more. */ - child_tab_stmt->tableElts = copyObject(stmt->tableElts); + child_tab_stmt->base.tableElts = copyObject(stmt->base.tableElts); merge_part_column_encodings(child_tab_stmt, stenc); @@ -7438,8 +7971,8 @@ make_child_node(CreateStmt *stmt, CreateStmtContext *cxt, char *relname, if (pConstraint && ((enable_partition_rules && curPby->partType == PARTTYP_HASH) || curPby->partType != PARTTYP_HASH)) - child_tab_stmt->tableElts = - lappend(child_tab_stmt->tableElts, + child_tab_stmt->base.tableElts = + lappend(child_tab_stmt->base.tableElts, pConstraint); /* @@ -7449,10 +7982,10 @@ make_child_node(CreateStmt *stmt, CreateStmtContext *cxt, char *relname, * the create child table */ /*child_tab_stmt->inhRelations = list_make1(parent_tab_name); */ - child_tab_stmt->inhRelations = list_copy(stmt->inhRelations); + child_tab_stmt->base.inhRelations = list_copy(stmt->base.inhRelations); - child_tab_stmt->constraints = copyObject(stmt->constraints); - child_tab_stmt->options = stmt->options; + child_tab_stmt->base.constraints = copyObject(stmt->base.constraints); + child_tab_stmt->base.options = stmt->base.options; /* allow WITH clause for appendonly tables */ if ( pStoreAttr ) @@ -7461,24 +7994,24 @@ make_child_node(CreateStmt *stmt, CreateStmtContext *cxt, char *relname, /* Options */ if ( psa_apc->arg1 ) - child_tab_stmt->options = (List *)psa_apc->arg1; + child_tab_stmt->base.options = (List *)psa_apc->arg1; /* Tablespace from parent (input CreateStmt)... */ if ( psa_apc->arg2 && *strVal(psa_apc->arg2) ) - child_tab_stmt->tablespacename = strVal(psa_apc->arg2); + child_tab_stmt->base.tablespacename = strVal(psa_apc->arg2); } /* ...or tablespace from root. */ - if ( !child_tab_stmt->tablespacename && stmt->tablespacename ) - child_tab_stmt->tablespacename = stmt->tablespacename; + if ( !child_tab_stmt->base.tablespacename && stmt->base.tablespacename ) + child_tab_stmt->base.tablespacename = stmt->base.tablespacename; - child_tab_stmt->oncommit = stmt->oncommit; - child_tab_stmt->distributedBy = stmt->distributedBy; + child_tab_stmt->base.oncommit = stmt->base.oncommit; + child_tab_stmt->base.distributedBy = stmt->base.distributedBy; /* use the newSub as the partitionBy if the current * partition elem had an inline subpartition declaration */ - child_tab_stmt->partitionBy = (Node *)newSub; + child_tab_stmt->base.partitionBy = (Node *)newSub; - child_tab_stmt->relKind = RELKIND_RELATION; + child_tab_stmt->base.relKind = RELKIND_RELATION; /* * Adjust tablespace name for the CREATE TABLE via ADD PARTITION. (MPP-8047) @@ -7489,18 +8022,18 @@ make_child_node(CreateStmt *stmt, CreateStmtContext *cxt, char *relname, * Ultimately, we take the tablespace as specified in the command, or, if none * was specified, the one from the root paritioned table. */ - if ( ! child_tab_stmt->tablespacename ) + if ( ! child_tab_stmt->base.tablespacename ) { Oid poid = RangeVarGetRelid(cxt->relation, true, false /*allowHcatalog*/); /* parent branch */ if ( ! poid ) { - poid = RangeVarGetRelid(stmt->relation, true, false /*alloweHcatalog*/); /* whole partitioned table */ + poid = RangeVarGetRelid(stmt->base.relation, true, false /*alloweHcatalog*/); /* whole partitioned table */ } if ( poid ) { Relation prel = RelationIdGetRelation(poid); - child_tab_stmt->tablespacename = get_tablespace_name(prel->rd_rel->reltablespace); + child_tab_stmt->base.tablespacename = get_tablespace_name(prel->rd_rel->reltablespace); RelationClose(prel); } } @@ -7732,7 +8265,7 @@ transformPartitionBy(ParseState *pstate, CreateStmtContext *cxt, at_depth = at_buf; } else - pBy->parentRel = copyObject(stmt->relation); + pBy->parentRel = copyObject(stmt->base.relation); /* set the depth for the immediate subpartition */ if (pBy->subPart) @@ -8408,8 +8941,8 @@ transformPartitionBy(ParseState *pstate, CreateStmtContext *cxt, if ((pBy->partDepth > 0) && (pBy->bKeepMe != true)) { /* we don't need this any more */ - stmt->partitionBy = NULL; - stmt->is_part_child = true; + stmt->base.partitionBy = NULL; + stmt->base.is_part_child = true; } } /* end transformPartitionBy */ @@ -8589,7 +9122,7 @@ transformIndexStmt(ParseState *pstate, IndexStmt *stmt, partrel = heap_open(PartitionRuleRelationId, AccessShareLock); tuple = caql_getfirst( - caql_addrel(cqclr(&cqc), partrel), + caql_addrel(cqclr(&cqc), partrel), cql("SELECT * FROM pg_partition_rule " " WHERE parchildrelid = :1 ", ObjectIdGetDatum(relid))); @@ -8607,7 +9140,7 @@ transformIndexStmt(ParseState *pstate, IndexStmt *stmt, partrel = heap_open(PartitionRelationId, AccessShareLock); tuple = caql_getfirst( - caql_addrel(cqclr(&cqc), partrel), + caql_addrel(cqclr(&cqc), partrel), cql("SELECT parlevel FROM pg_partition " " WHERE oid = :1 ", ObjectIdGetDatum(paroid))); @@ -11015,7 +11548,7 @@ transformAlterTable_all_PartitionStmt( if (atc1->subtype != AT_PartAlter) { rv = makeRangeVar( - NULL /*catalogname*/, + NULL /*catalogname*/, get_namespace_name( RelationGetNamespace(rel)), pstrdup(RelationGetRelationName(rel)), -1); @@ -11107,7 +11640,7 @@ transformAlterTable_all_PartitionStmt( * the new partition is LIKE the parent and it * inherits from it */ - ct->tableElts = lappend(ct->tableElts, inh); + ct->base.tableElts = lappend(ct->base.tableElts, inh); cl = list_make1(ct); @@ -11154,11 +11687,12 @@ transformAlterTableStmt(ParseState *pstate, AlterTableStmt *stmt, bool skipValidation = true; AlterTableCmd *newcmd; - cxt.stmtType = "ALTER TABLE"; + cxt.stmtType = "ALTER TABLE"; + cxt.isExternalTable = false; cxt.relation = stmt->relation; cxt.inhRelations = NIL; cxt.isalter = true; - cxt.hasoids = false; /* need not be right */ + cxt.hasoids = false; /* need not be right */ cxt.columns = NIL; cxt.ckconstraints = NIL; cxt.fkconstraints = NIL; @@ -11433,7 +11967,7 @@ transformDeclareCursorStmt(ParseState *pstate, DeclareCursorStmt *stmt) * - has no LIMIT/OFFSET * - references only one range table (i.e. no joins, self-joins) * - this range table must itself be updatable - * + * */ static bool isSimplyUpdatableQuery(Query *query) @@ -12037,7 +12571,7 @@ analyzeCreateSchemaStmt(CreateSchemaStmt *stmt) { CreateStmt *elp = (CreateStmt *) element; - setSchemaName(cxt.schemaname, &elp->relation->schemaname); + setSchemaName(cxt.schemaname, &elp->base.relation->schemaname); /* * XXX todo: deal with constraints @@ -12050,7 +12584,7 @@ analyzeCreateSchemaStmt(CreateSchemaStmt *stmt) { CreateExternalStmt *elp = (CreateExternalStmt *) element; - setSchemaName(cxt.schemaname, &elp->relation->schemaname); + setSchemaName(cxt.schemaname, &elp->base.relation->schemaname); cxt.tables = lappend(cxt.tables, element); } @@ -12395,17 +12929,17 @@ transformSingleRowErrorHandling(ParseState *pstate, CreateStmtContext *cxt, attrList = lappend(attrList, coldef); } - createStmt->relation = sreh->errtable; - createStmt->tableElts = attrList; - createStmt->inhRelations = NIL; - createStmt->constraints = NIL; - createStmt->options = list_make2(makeDefElem("errortable", (Node *) makeString("true")), + createStmt->base.relation = sreh->errtable; + createStmt->base.tableElts = attrList; + createStmt->base.inhRelations = NIL; + createStmt->base.constraints = NIL; + createStmt->base.options = list_make2(makeDefElem("errortable", (Node *) makeString("true")), makeDefElem("appendonly", (Node *) makeString("true"))); - createStmt->oncommit = ONCOMMIT_NOOP; - createStmt->tablespacename = NULL; - createStmt->relKind = RELKIND_RELATION; + createStmt->base.oncommit = ONCOMMIT_NOOP; + createStmt->base.tablespacename = NULL; + createStmt->base.relKind = RELKIND_RELATION; createStmt->relStorage = RELSTORAGE_AOROWS; - createStmt->distributedBy = list_make1(NULL); /* DISTRIBUTED RANDOMLY */ + createStmt->base.distributedBy = list_make1(NULL); /* DISTRIBUTED RANDOMLY */ cxt->blist = lappend(cxt->blist, createStmt); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/48ff52c9/src/backend/parser/gram.y ---------------------------------------------------------------------- diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index dc0e13b..b443bca 100755 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -242,10 +242,10 @@ static Node *makeIsNotDistinctFromNode(Node *expr, int position); %type <dbehavior> opt_drop_behavior %type <list> createdb_opt_list alterdb_opt_list copy_opt_list - ext_on_clause_list format_opt format_opt_list format_def_list transaction_mode_list + ext_on_clause_list format_opt format_opt_list transaction_mode_list ext_opt_encoding_list %type <defelt> createdb_opt_item alterdb_opt_item copy_opt_item - ext_on_clause_item format_opt_item format_def_item transaction_mode_item + ext_on_clause_item format_opt_item transaction_mode_item ext_opt_encoding_item %type <ival> opt_lock lock_type cast_context @@ -301,7 +301,7 @@ static Node *makeIsNotDistinctFromNode(Node *expr, int position); aggr_args aggr_args_list old_aggr_definition old_aggr_list oper_argtypes RuleActionList RuleActionMulti cdb_string_list - opt_column_list columnList opt_name_list exttab_auth_list keyvalue_list + opt_column_list columnList columnListPlus opt_name_list exttab_auth_list keyvalue_list opt_inherited_column_list sort_clause opt_sort_clause sortby_list index_params name_list from_clause from_list opt_array_bounds @@ -431,7 +431,7 @@ static Node *makeIsNotDistinctFromNode(Node *expr, int position); %type <node> var_value zone_value %type <keyword> unreserved_keyword func_name_keyword -%type <keyword> col_name_keyword reserved_keyword +%type <keyword> col_name_keyword reserved_keyword format_opt_keyword %type <keyword> keywords_ok_in_alias_no_as %type <node> TableConstraint TableLikeClause @@ -3310,24 +3310,25 @@ CreateStmt: CREATE OptTemp TABLE qualified_name '(' OptTableElementList ')' OptTabPartitionBy { CreateStmt *n = makeNode(CreateStmt); + $4->istemp = $2; - n->relation = $4; - n->tableElts = $6; - n->inhRelations = $8; - n->constraints = NIL; - n->options = $9; - n->oncommit = $10; - n->tablespacename = $11; - n->distributedBy = $12; - n->partitionBy = $13; + n->base.relation = $4; + n->base.tableElts = $6; + n->base.inhRelations = $8; + n->base.constraints = NIL; + n->base.options = $9; + n->base.oncommit = $10; + n->base.tablespacename = $11; + n->base.distributedBy = $12; + n->base.partitionBy = $13; n->oidInfo.relOid = 0; n->oidInfo.comptypeOid = 0; n->oidInfo.toastOid = 0; n->oidInfo.toastIndexOid = 0; n->oidInfo.toastComptypeOid = 0; - n->relKind = RELKIND_RELATION; + n->base.relKind = RELKIND_RELATION; n->policy = 0; - n->postCreate = NULL; + n->base.postCreate = NULL; $$ = (Node *)n; } @@ -3339,23 +3340,23 @@ CreateStmt: CREATE OptTemp TABLE qualified_name '(' OptTableElementList ')' */ CreateStmt *n = makeNode(CreateStmt); $4->istemp = $2; - n->relation = $4; - n->tableElts = $8; - n->inhRelations = list_make1($6); - n->constraints = NIL; - n->options = $10; - n->oncommit = $11; - n->tablespacename = $12; - n->distributedBy = $13; - n->partitionBy = $14; + n->base.relation = $4; + n->base.tableElts = $8; + n->base.inhRelations = list_make1($6); + n->base.constraints = NIL; + n->base.options = $10; + n->base.oncommit = $11; + n->base.tablespacename = $12; + n->base.distributedBy = $13; + n->base.partitionBy = $14; n->oidInfo.relOid = 0; n->oidInfo.comptypeOid = 0; n->oidInfo.toastOid = 0; n->oidInfo.toastIndexOid = 0; n->oidInfo.toastComptypeOid = 0; - n->relKind = RELKIND_RELATION; + n->base.relKind = RELKIND_RELATION; n->policy = 0; - n->postCreate = NULL; + n->base.postCreate = NULL; $$ = (Node *)n; } @@ -3754,6 +3755,17 @@ columnList: columnElem { $$ = list_make1($1); } | columnList ',' columnElem { $$ = lappend($1, $3); } ; +columnListPlus: + columnElem ',' columnElem + { + $$ = list_make1($1); + $$ = lappend($$, $3); + } + | columnListPlus ',' columnElem + { + $$ = lappend($1, $3); + } + ; columnElem: ColId { @@ -4584,16 +4596,17 @@ CreateExternalStmt: CREATE OptWritable EXTERNAL OptWeb OptTemp TABLE qualified_n { CreateExternalStmt *n = makeNode(CreateExternalStmt); n->iswritable = $2; + n->isexternal = TRUE; n->isweb = $4; $7->istemp = $5; - n->relation = $7; - n->tableElts = $9; + n->base.relation = $7; + n->base.tableElts = $9; n->exttypedesc = $11; n->format = $13; - n->formatOpts = $14; + n->base.options = $14; n->encoding = $15; n->sreh = $16; - n->distributedBy = $17; + n->base.distributedBy = $17; n->policy = 0; /* various syntax checks for EXECUTE external table */ @@ -4694,13 +4707,12 @@ ext_on_clause_item: format_opt: '(' format_opt_list ')' { $$ = $2; } - | '(' format_def_list ')' { $$ = $2; } | '(' ')' { $$ = NIL; } | /*EMPTY*/ { $$ = NIL; } ; format_opt_list: - format_opt_item + format_opt_item { $$ = list_make1($1); } @@ -4710,67 +4722,43 @@ format_opt_list: } ; -format_def_list: - format_def_item - { - $$ = list_make1($1); - } - | format_def_list ',' format_def_item - { - $$ = lappend($1, $3); - } - -format_def_item: - ColLabel '=' def_arg - { - $$ = makeDefElem($1, $3); - } - | ColLabel '=' '(' columnList ')' - { - $$ = makeDefElem($1, (Node *) $4); - } +format_opt_keyword: + AS + | DELIMITER + | NULL_P + | CSV + | HEADER_P + | QUOTE + | ESCAPE + | FORCE + | NOT + | FILL + | MISSING + | FIELDS + | NEWLINE + ; format_opt_item: - DELIMITER opt_as Sconst + IDENT { - $$ = makeDefElem("delimiter", (Node *)makeString($3)); + $$ = makeDefElem("#ident", (Node *)makeString($1)); } - | NULL_P opt_as Sconst - { - $$ = makeDefElem("null", (Node *)makeString($3)); - } - | CSV - { - $$ = makeDefElem("csv", (Node *)makeInteger(TRUE)); - } - | HEADER_P - { - $$ = makeDefElem("header", (Node *)makeInteger(TRUE)); - } - | QUOTE opt_as Sconst - { - $$ = makeDefElem("quote", (Node *)makeString($3)); - } - | ESCAPE opt_as Sconst - { - $$ = makeDefElem("escape", (Node *)makeString($3)); - } - | FORCE NOT NULL_P columnList + | Sconst { - $$ = makeDefElem("force_notnull", (Node *)$4); + $$ = makeDefElem("#string", (Node *)makeString($1)); } - | FORCE QUOTE columnList + | SignedIconst { - $$ = makeDefElem("force_quote", (Node *)$3); + $$ = makeDefElem("#int", (Node *)makeInteger($1)); } - | FILL MISSING FIELDS + | format_opt_keyword { - $$ = makeDefElem("fill_missing_fields", (Node *)makeInteger(TRUE)); + $$ = makeDefElem("#ident", (Node *)makeString($1)); } - | NEWLINE opt_as Sconst + | columnListPlus { - $$ = makeDefElem("newline", (Node *)makeString($3)); + $$ = makeDefElem("#collist", (Node *)$1); } ; @@ -13258,28 +13246,28 @@ makeAddPartitionCreateStmt(Node *n, Node *subSpec) CreateStmt *ct = makeNode(CreateStmt); PartitionBy *pBy = NULL; - ct->relation = makeRangeVar(NULL /*catalogname*/, NULL, "fake_partition_name", -1); + ct->base.relation = makeRangeVar(NULL /*catalogname*/, NULL, "fake_partition_name", -1); /* in analyze.c, fill in tableelts with a list of inhrelation of the partition parent table, and fill in inhrelations with copy of rangevar for parent table */ - ct->tableElts = NIL; /* fill in later */ - ct->inhRelations = NIL; /* fill in later */ + ct->base.tableElts = NIL; /* fill in later */ + ct->base.inhRelations = NIL; /* fill in later */ - ct->constraints = NIL; + ct->base.constraints = NIL; if (pc_StAttr) - ct->options = (List *)pc_StAttr->arg1; + ct->base.options = (List *)pc_StAttr->arg1; else - ct->options = NIL; + ct->base.options = NIL; - ct->oncommit = ONCOMMIT_NOOP; + ct->base.oncommit = ONCOMMIT_NOOP; if (pc_StAttr && pc_StAttr->arg2) - ct->tablespacename = strVal(pc_StAttr->arg2); + ct->base.tablespacename = strVal(pc_StAttr->arg2); else - ct->tablespacename = NULL; + ct->base.tablespacename = NULL; if (subSpec) /* treat subspec as partition by... */ { @@ -13290,19 +13278,19 @@ makeAddPartitionCreateStmt(Node *n, Node *subSpec) pBy->partQuiet = PART_VERBO_NODISTRO; pBy->location = -1; pBy->partDefault = NULL; - pBy->parentRel = copyObject(ct->relation); + pBy->parentRel = copyObject(ct->base.relation); } - ct->distributedBy = NULL; - ct->partitionBy = (Node *)pBy; + ct->base.distributedBy = NULL; + ct->base.partitionBy = (Node *)pBy; ct->oidInfo.relOid = 0; ct->oidInfo.comptypeOid = 0; ct->oidInfo.toastOid = 0; ct->oidInfo.toastIndexOid = 0; ct->oidInfo.toastComptypeOid = 0; - ct->relKind = RELKIND_RELATION; + ct->base.relKind = RELKIND_RELATION; ct->policy = 0; - ct->postCreate = NULL; + ct->base.postCreate = NULL; return (Node *)ct; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/48ff52c9/src/backend/tcop/utility.c ---------------------------------------------------------------------- diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index af2d12c..bd6e2b6 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -15,12 +15,15 @@ *------------------------------------------------------------------------- */ #include "postgres.h" +#include "port.h" #include "access/twophase.h" #include "access/xact.h" +#include "access/fileam.h" #include "catalog/catalog.h" #include "catalog/catquery.h" #include "catalog/namespace.h" +#include "catalog/pg_exttable.h" #include "catalog/toasting.h" #include "catalog/aoseg.h" #include "commands/alter.h" @@ -50,6 +53,7 @@ #include "commands/vacuum.h" #include "commands/view.h" #include "miscadmin.h" +#include "nodes/value.h" #include "postmaster/checkpoint.h" #include "rewrite/rewriteDefine.h" #include "rewrite/rewriteRemove.h" @@ -61,6 +65,7 @@ #include "utils/guc.h" #include "utils/syscache.h" #include "utils/lsyscache.h" +#include "utils/uri.h" #include "lib/stringinfo.h" #include "cdb/cdbcat.h" @@ -71,6 +76,7 @@ #include "cdb/dispatcher.h" #include "resourcemanager/resqueuecommand.h" +#include "catalog/pg_exttable.h" /* * Error-checking support for DROP commands */ @@ -270,10 +276,32 @@ CheckDropRelStorage(RangeVar *rel, ObjectType removeType) classform = (Form_pg_class) GETSTRUCT(tuple); - if ((removeType == OBJECT_EXTTABLE && classform->relstorage != RELSTORAGE_EXTERNAL) || - (removeType == OBJECT_FOREIGNTABLE && classform->relstorage != RELSTORAGE_FOREIGN) || - (removeType == OBJECT_TABLE && (classform->relstorage == RELSTORAGE_EXTERNAL || - classform->relstorage == RELSTORAGE_FOREIGN))) + bool is_internal = false; + if (classform->relstorage == RELSTORAGE_EXTERNAL) + { + ExtTableEntry *entry = GetExtTableEntry(relOid); + List *entry_locations = entry->locations; + Assert(entry_locations); + ListCell *entry_location = list_head(entry_locations); + char *url = ((Value*)lfirst(entry_location))->val.str; + char *category = getExtTblCategoryInFmtOptsStr(entry->fmtopts); + + if ((IS_HDFS_URI(url)) && + (category != NULL && pg_strncasecmp(category, "internal", strlen("internal")) == 0)) + { + is_internal = true; + } + + if (category) + { + pfree(category); + } + } + + if ((removeType == OBJECT_EXTTABLE && (classform->relstorage != RELSTORAGE_EXTERNAL || is_internal)) || + (removeType == OBJECT_FOREIGNTABLE && classform->relstorage != RELSTORAGE_FOREIGN) || + (removeType == OBJECT_TABLE && (classform->relstorage == RELSTORAGE_EXTERNAL && (!is_internal) || + classform->relstorage == RELSTORAGE_FOREIGN))) { /* we have a mismatch. format an error string and shoot */ @@ -287,7 +315,7 @@ CheckDropRelStorage(RangeVar *rel, ObjectType removeType) else want_type = pstrdup("a base"); - if (classform->relstorage == RELSTORAGE_EXTERNAL) + if (classform->relstorage == RELSTORAGE_EXTERNAL && !is_internal) hint = pstrdup("Use DROP EXTERNAL TABLE to remove an external table"); else if (classform->relstorage == RELSTORAGE_FOREIGN) hint = pstrdup("Use DROP FOREIGN TABLE to remove a foreign table"); @@ -447,7 +475,7 @@ check_xact_readonly(Node *parsetree) createStmt = (CreateStmt *) parsetree; - if (createStmt->relation->istemp) + if (createStmt->base.relation->istemp) return; // Permit creation of TEMPORARY tables in read-only mode. ereport(ERROR, @@ -912,8 +940,8 @@ ProcessUtility(Node *parsetree, Assert (gp_upgrade_mode || Gp_role != GP_ROLE_EXECUTE); - relOid = DefineRelation((CreateStmt *) parsetree, - relKind, relStorage); + relOid = DefineRelation((CreateStmt *) parsetree, relKind, + relStorage, NonCustomFormatType); /* * Let AlterTableCreateToastTable decide if this one needs a @@ -936,13 +964,13 @@ ProcessUtility(Node *parsetree, ((CreateStmt *) parsetree)->oidInfo.toastOid, ((CreateStmt *) parsetree)->oidInfo.toastIndexOid, &(((CreateStmt *) parsetree)->oidInfo.toastComptypeOid), - ((CreateStmt *)parsetree)->is_part_child); + ((CreateStmt *)parsetree)->base.is_part_child); AlterTableCreateAoSegTableWithOid(relOid, ((CreateStmt *) parsetree)->oidInfo.aosegOid, ((CreateStmt *) parsetree)->oidInfo.aosegIndexOid, &(((CreateStmt *) parsetree)->oidInfo.aosegComptypeOid), - ((CreateStmt *) parsetree)->is_part_child); + ((CreateStmt *) parsetree)->base.is_part_child); } CommandCounterIncrement(); /* http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/48ff52c9/src/backend/utils/misc/uriparser.c ---------------------------------------------------------------------- diff --git a/src/backend/utils/misc/uriparser.c b/src/backend/utils/misc/uriparser.c index 5489c17..e94408d 100644 --- a/src/backend/utils/misc/uriparser.c +++ b/src/backend/utils/misc/uriparser.c @@ -78,6 +78,11 @@ ParseExternalTableUri(const char *uri_str) uri->protocol = URI_GPFDISTS; protocol_len = strlen(PROTOCOL_GPFDISTS); } + else if (IS_HDFS_URI(uri_str)) + { + uri->protocol = URI_HDFS; + protocol_len = strlen(PROTOCOL_HDFS); + } else /* not recognized. treat it as a custom protocol */ { @@ -200,7 +205,10 @@ ParseExternalTableUri(const char *uri_str) } else { - uri->port = -1; /* no port was indicated. will use default if needed */ + if (IS_HDFS_URI(uri_str)) /* means nameservice format */ + uri->port = 0; + else + uri->port = -1; /* no port was indicated. will use default if needed */ } } } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/48ff52c9/src/include/access/fileam.h ---------------------------------------------------------------------- diff --git a/src/include/access/fileam.h b/src/include/access/fileam.h index 777698a..5d4871d 100644 --- a/src/include/access/fileam.h +++ b/src/include/access/fileam.h @@ -104,10 +104,9 @@ extern bool external_getnext(FileScanDesc scan, ScanState *ss, TupleTableSlot *slot); -extern ExternalInsertDesc external_insert_init(Relation rel, - int errAosegno, - ExternalTableType formatterType, - char *formatterName); +extern ExternalInsertDesc external_insert_init(Relation rel, int errAosegno, + int formatterType, char *formatterName, PlannedStmt* plannedstmt); + extern Oid external_insert(ExternalInsertDesc extInsertDesc, TupleTableSlot *tupTableSlot); extern void external_insert_finish(ExternalInsertDesc extInsertDesc); @@ -115,6 +114,12 @@ extern void external_set_env_vars(extvar_t *extvar, char* uri, bool csv, char* e extern void AtAbort_ExtTables(void); char* linenumber_atoi(char buffer[20],int64 linenumber); +extern bool hasErrTblInFmtOpts(List *fmtOpts); +extern char getExtTblFormatterTypeInFmtOpts(List *fmtOpts); +extern void external_populate_formatter_actionmask(struct CopyStateData *pstate, + FormatterData *formatter); + +extern char *getExtTblCategoryInFmtOptsStr(char *fmtStr); extern char *getExtTblFormatterTypeInFmtOptsStr(char *fmtStr); extern char *getExtTblFormatterTypeInFmtOptsList(List *fmtOpts); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/48ff52c9/src/include/access/formatter.h ---------------------------------------------------------------------- diff --git a/src/include/access/formatter.h b/src/include/access/formatter.h index f87afc2..7b46d3d 100644 --- a/src/include/access/formatter.h +++ b/src/include/access/formatter.h @@ -36,10 +36,19 @@ typedef enum FmtNotification { FMT_NONE, + FMT_DONE, FMT_NEED_MORE_DATA } FmtNotification; +typedef enum FmtActionMask +{ + FMT_UNSET = 0, + FMT_SET = 1, + FMT_NEEDEXTBUFF = 2, + FMT_WRITE_END = 4 +} FmtActionMask; + /* * FormatterData is the node type that is passed as fmgr "context" info * when a function is called by the External Table Formatter manager. @@ -49,6 +58,7 @@ typedef struct FormatterData { NodeTag type; /* see T_FormatterData */ + FmtActionMask fmt_mask; /* metadata */ Relation fmt_relation; TupleDesc fmt_tupDesc; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/48ff52c9/src/include/access/plugstorage.h ---------------------------------------------------------------------- diff --git a/src/include/access/plugstorage.h b/src/include/access/plugstorage.h index 48c5fde..f904f81 100644 --- a/src/include/access/plugstorage.h +++ b/src/include/access/plugstorage.h @@ -45,6 +45,7 @@ #include "executor/tuptable.h" /* From src/include/access/fileam.h */ +extern char *getExtTblCategoryInFmtOptsStr(char *fmtStr); extern char *getExtTblFormatterTypeInFmtOptsStr(char *fmtStr); extern char *getExtTblFormatterTypeInFmtOptsList(List *fmtOpts); @@ -98,6 +99,7 @@ typedef struct PlugStorageData bool ps_has_tuple; Oid ps_tuple_oid; TupleTableSlot *ps_tuple_table_slot; + int ps_segno; } PlugStorageData; @@ -179,7 +181,9 @@ void InvokePlugStorageFormatStopScan(FmgrInfo *func, ExternalInsertDesc InvokePlugStorageFormatInsertInit(FmgrInfo *func, Relation relation, int formatterType, - char *formatterName); + char *formatterName, + PlannedStmt* plannedstmt, + int segno); Oid InvokePlugStorageFormatInsert(FmgrInfo *func, ExternalInsertDesc extInsertDesc, http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/48ff52c9/src/include/catalog/pg_exttable.h ---------------------------------------------------------------------- diff --git a/src/include/catalog/pg_exttable.h b/src/include/catalog/pg_exttable.h index 3256bb9..11f053c 100644 --- a/src/include/catalog/pg_exttable.h +++ b/src/include/catalog/pg_exttable.h @@ -164,9 +164,10 @@ GetExtTableEntry(Oid relid); extern void RemoveExtTableEntry(Oid relid); -#define CustomFormatType 'b' -#define TextFormatType 't' -#define CsvFormatType 'c' +#define CustomFormatType 'b' +#define TextFormatType 't' +#define CsvFormatType 'c' +#define NonCustomFormatType 'n' /* PXF formats*/ #define GpdbWritableFormatName "GPDBWritable"