HAWQ-465. Implement stored procedure to return fields metainfo from PXF.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/ecb85e5c Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/ecb85e5c Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/ecb85e5c Branch: refs/heads/HAWQ-546 Commit: ecb85e5c9d9a8c330b6b87380c436cabdd259e99 Parents: 8cdab61 Author: Oleksandr Diachenko <odiache...@pivotal.io> Authored: Mon Mar 7 17:00:00 2016 -0800 Committer: Oleksandr Diachenko <odiache...@pivotal.io> Committed: Wed Mar 30 17:23:29 2016 -0700 ---------------------------------------------------------------------- src/all_src_files.txt | 4 + src/backend/access/external/hd_work_mgr.c | 18 +- src/backend/access/external/pxfmasterapi.c | 52 +- src/backend/access/external/pxfuriparser.c | 6 +- src/backend/catalog/Makefile | 2 +- src/backend/catalog/external/Makefile | 32 + src/backend/catalog/external/externalmd.c | 586 +++++++++++++++++++ src/backend/catalog/hcatalog/Makefile | 32 - src/backend/catalog/hcatalog/externalmd.c | 562 ------------------ src/backend/catalog/namespace.c | 2 +- src/backend/utils/adt/Makefile | 2 +- src/backend/utils/adt/pxf_functions.c | 186 ++++++ src/include/access/hd_work_mgr.h | 4 +- src/include/access/pxfmasterapi.h | 4 +- src/include/access/pxfuriparser.h | 4 +- src/include/catalog/external/externalmd.h | 35 ++ src/include/catalog/external/itemmd.h | 71 +++ src/include/catalog/hcatalog/externalmd.h | 35 -- src/include/catalog/hcatalog/hcatalogmd.h | 68 --- src/include/catalog/pg_proc.h | 6 +- src/include/catalog/pg_proc.sql | 2 + src/include/utils/builtins.h | 3 + .../data/hcatalog/invalid_numeric_range.json | 2 +- .../hcatalog/invalid_typemod_timestamp.json | 2 +- src/test/regress/data/hcatalog/multi_table.json | 2 +- .../data/hcatalog/multi_table_duplicates.json | 2 +- .../regress/data/hcatalog/single_table.json | 2 +- src/test/regress/input/hcatalog_lookup.source | 4 + src/test/regress/json_utils.c | 11 +- src/test/regress/output/hcatalog_lookup.source | 7 + src/test/regress/output/json_load.source | 2 +- 31 files changed, 1005 insertions(+), 745 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/all_src_files.txt ---------------------------------------------------------------------- diff --git a/src/all_src_files.txt b/src/all_src_files.txt index feabe2a..f85326a 100644 --- a/src/all_src_files.txt +++ b/src/all_src_files.txt @@ -138,6 +138,7 @@ backend/catalog/pg_proc_callback.c backend/catalog/pg_shdepend.c backend/catalog/pg_type.c backend/catalog/toasting.c +backend/catalog/external/externalmd.c backend/cdb/cdbanalyze.c backend/cdb/cdbappendonlystorage.c backend/cdb/cdbappendonlystorageformat.c @@ -662,6 +663,7 @@ backend/utils/adt/percentile.c backend/utils/adt/pg_locale.c backend/utils/adt/pg_lzcompress.c backend/utils/adt/pgstatfuncs.c +backend/utils/adt/pxf_functions.c backend/utils/adt/pivot.c backend/utils/adt/pseudotypes.c backend/utils/adt/quote.c @@ -934,6 +936,8 @@ include/catalog/pg_type_encoding.h include/catalog/pg_user_mapping.h include/catalog/pg_window.h include/catalog/toasting.h +include/catalog/external/externalmd.h +include/catalog/external/itemmd.h include/cdb/cdbanalyze.h include/cdb/cdbaocsam.h include/cdb/cdbappendonlyam.h http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/backend/access/external/hd_work_mgr.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/hd_work_mgr.c b/src/backend/access/external/hd_work_mgr.c index 61fd40e..9660d9c 100644 --- a/src/backend/access/external/hd_work_mgr.c +++ b/src/backend/access/external/hd_work_mgr.c @@ -949,22 +949,22 @@ static void cancel_delegation_token(PxfInputData *inputData) } /* - * Fetches meatdata for the relation from hcatalog using pxf - * Create the necessary hadoop uri for the hcat proxy and the clientcontext - * Returns the list of metadata for hcat tables + * Fetches metadata for the item from PXF + * Returns the list of metadata for PXF items + * Caches data if dboid is not NULL * */ -List *get_pxf_hcat_metadata(char *relation_location) +List *get_pxf_item_metadata(char *profile, char *pattern, Oid dboid) { ClientContext client_context; /* holds the communication info */ PxfInputData inputData = {0}; - List *hcat_tables = NIL; + List *objects = NIL; /* Define pxf service url address */ StringInfoData uri; initStringInfo(&uri); appendStringInfo(&uri, "%s/", pxf_service_address); - GPHDUri* hadoop_uri = parseGPHDUriForHCAT(uri.data); + GPHDUri* hadoop_uri = parseGPHDUriForMetadata(uri.data); pfree(uri.data); init_client_context(&client_context); @@ -982,9 +982,9 @@ List *get_pxf_hcat_metadata(char *relation_location) generate_delegation_token(&inputData); build_http_header(&inputData); - hcat_tables = get_hcat_metadata(hadoop_uri, relation_location, &client_context); + objects = get_external_metadata(hadoop_uri, profile, pattern, &client_context, dboid); - freeGPHDUriForHCAT(hadoop_uri); + freeGPHDUriForMetadata(hadoop_uri); cancel_delegation_token(&inputData); - return hcat_tables; + return objects; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/backend/access/external/pxfmasterapi.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/pxfmasterapi.c b/src/backend/access/external/pxfmasterapi.c index 17dbc65..e07277e 100644 --- a/src/backend/access/external/pxfmasterapi.c +++ b/src/backend/access/external/pxfmasterapi.c @@ -27,7 +27,7 @@ */ #include <json-c/json.h> #include "access/pxfmasterapi.h" -#include "catalog/hcatalog/externalmd.h" +#include "catalog/external/externalmd.h" static List* parse_datanodes_response(List *rest_srvrs, StringInfo rest_buf); static PxfFragmentStatsElem *parse_get_frag_stats_response(StringInfo rest_buf); @@ -35,7 +35,7 @@ static float4 normalize_size(long size, char* unit); static List* parse_get_fragments_response(List* fragments, StringInfo rest_buf); static void ha_failover(GPHDUri *hadoop_uri, ClientContext *client_context, char* rest_msg); static void rest_request(GPHDUri *hadoop_uri, ClientContext* client_context, char *rest_msg); -static char* concat(char *body, char *tail); +static char* concat(int num_args, ...); /* * Obtain the datanode REST servers host/port data @@ -145,7 +145,7 @@ void free_datanode_rest_server(PxfServer* srv) PxfFragmentStatsElem *get_fragments_statistics(GPHDUri* hadoop_uri, ClientContext *client_context) { - char *restMsg = concat("http://%s:%s/%s/%s/Fragmenter/getFragmentsStats?path=", hadoop_uri->data); + char *restMsg = concat(2, "http://%s:%s/%s/%s/Fragmenter/getFragmentsStats?path=", hadoop_uri->data); /* send the request. The response will exist in rest_buf.data */ rest_request(hadoop_uri, client_context, restMsg); @@ -258,14 +258,20 @@ ha_failover(GPHDUri *hadoop_uri, } } -/* Concatenate two literal strings using stringinfo */ -char* concat(char *body, char *tail) +/* Concatenate multiple literal strings using stringinfo */ +char* concat(int num_args, ...) { + va_list ap; StringInfoData str; initStringInfo(&str); - appendStringInfoString(&str, body); - appendStringInfoString(&str, tail); + va_start(ap, num_args); + + for (int i = 0; i < num_args; i++) { + appendStringInfoString(&str, va_arg(ap, char*)); + } + va_end(ap); + return str.data; } @@ -282,7 +288,7 @@ get_data_fragment_list(GPHDUri *hadoop_uri, ClientContext *client_context) { List *data_fragments = NIL; - char *restMsg = concat("http://%s:%s/%s/%s/Fragmenter/getFragments?path=",hadoop_uri->data); + char *restMsg = concat(2, "http://%s:%s/%s/%s/Fragmenter/getFragments?path=", hadoop_uri->data); rest_request(hadoop_uri, client_context, restMsg); @@ -398,23 +404,35 @@ void free_fragment(DataFragment *fragment) } /* - * get_hcat_metadata + * get_external_metadata * - * Request hcatalog metadata from the PXF MetadataResource - * Process the hcat response - * TODO: The hcat response processing might be pushed out of this function's context + * Request item metadata from the PXF MetadataResource + * Process the PXF response + * Cache data to dboid if it's not NULL + * TODO: The PXF response processing might be pushed out of this function's context * */ -List* get_hcat_metadata(GPHDUri* hadoop_uri, char *location, ClientContext *client_context) + +List* get_external_metadata(GPHDUri* hadoop_uri, char *profile, char *pattern, ClientContext *client_context, Oid dboid) { - List *hcat_tables = NIL; - char *restMsg = concat("http://%s:%s/%s/%s/Metadata/getTableMetadata?table=", location); + List *objects = NIL; + char *restMsg = concat(4, "http://%s:%s/%s/%s/Metadata/getMetadata?profile=", profile, "&pattern=", pattern); rest_request(hadoop_uri, client_context, restMsg); /* parse the JSON response and form a fragments list to return */ - hcat_tables = ParseHCatalogEntries(&(client_context->the_rest_buf)); + objects = ParsePxfEntries(&(client_context->the_rest_buf), profile, dboid); + + return objects; +} + - return hcat_tables; +List* get_and_cache_external_metadata(GPHDUri* hadoop_uri, char *profile, char *pattern, ClientContext *client_context, Oid dboid) +{ + return get_external_metadata(hadoop_uri, profile, pattern, client_context, dboid); } +List* get_no_cache_external_metadata(GPHDUri* hadoop_uri, char *profile, char *pattern, ClientContext *client_context) +{ + return get_external_metadata(hadoop_uri, profile, pattern, client_context, NULL); +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/backend/access/external/pxfuriparser.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/pxfuriparser.c b/src/backend/access/external/pxfuriparser.c index 0ce78bd..9e83714 100644 --- a/src/backend/access/external/pxfuriparser.c +++ b/src/backend/access/external/pxfuriparser.c @@ -84,7 +84,7 @@ parseGPHDUri(const char *uri_str) return uri; } -/* parseGPHDUriForHCAT +/* parseGPHDUriForMetadata * * Go over a URI string and parse it into its various components while * verifying valid structure given a specific target protocol. @@ -100,7 +100,7 @@ parseGPHDUri(const char *uri_str) * a parsed uri as a GPHDUri structure, or reports a format error. */ GPHDUri* -parseGPHDUriForHCAT(char *uri_str) +parseGPHDUriForMetadata(char *uri_str) { GPHDUri *uri = (GPHDUri *)palloc0(sizeof(GPHDUri)); @@ -128,7 +128,7 @@ freeGPHDUri(GPHDUri *uri) } void -freeGPHDUriForHCAT(GPHDUri *uri) +freeGPHDUriForMetadata(GPHDUri *uri) { pfree(uri->host); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/backend/catalog/Makefile ---------------------------------------------------------------------- diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index 1654a53..b453d38 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -22,7 +22,7 @@ OBJS = catalog.o dependency.o heap.o index.o indexing.o namespace.o aclchk.o \ pg_type.o toasting.o aoseg.o \ pg_attribute_encoding.o pg_compression.o $(QUICKLZ_COMPRESSION) -SUBDIRS = caql core hcatalog +SUBDIRS = caql core external BKIFILES = postgres.bki postgres.description postgres.shdescription http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/backend/catalog/external/Makefile ---------------------------------------------------------------------- diff --git a/src/backend/catalog/external/Makefile b/src/backend/catalog/external/Makefile new file mode 100644 index 0000000..3956c8d --- /dev/null +++ b/src/backend/catalog/external/Makefile @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for catalog/external +# +#------------------------------------------------------------------------- + +subdir = src/backend/catalog/external +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = externalmd.o + +include $(top_srcdir)/src/backend/common.mk + http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/backend/catalog/external/externalmd.c ---------------------------------------------------------------------- diff --git a/src/backend/catalog/external/externalmd.c b/src/backend/catalog/external/externalmd.c new file mode 100644 index 0000000..e65d741 --- /dev/null +++ b/src/backend/catalog/external/externalmd.c @@ -0,0 +1,586 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * externalmd.c + * + * Created on: Mar 6, 2015 + * Author: antova + * + * + * Utilities for loading external PXF metadata + * + */ + + +#include "postgres.h" +#include <json-c/json.h> + +#include "miscadmin.h" +#include "access/transam.h" +#include "catalog/catquery.h" +#include "catalog/external/externalmd.h" +#include "catalog/pg_database.h" +#include "catalog/pg_exttable.h" +#include "catalog/pg_namespace.h" +#include "catalog/namespace.h" +#include "commands/typecmds.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/lsyscache.h" +#include "utils/numeric.h" +#include "utils/guc.h" + + +List *ParsePxfEntries(StringInfo json, char *profile, Oid dboid); +static PxfItem *ParsePxfItem(struct json_object *pxfMD, char* profile); +static void LoadPxfItem(PxfItem *pxfItem, Oid dboid); +static Oid LoadNamespace(const char *namespaceName, Oid dboid); +static void LoadTable(Oid namespaceOid, PxfItem *pxfItem); +static void LoadType(Oid relid, Oid reltypeoid, NameData relname, Oid relnamespaceoid); +static void LoadDistributionPolicy(Oid relid, PxfItem *pxfItem); +static void LoadExtTable(Oid relid, PxfItem *pxfItem); +static void LoadColumns(Oid relid, List *columns); +static int ComputeTypeMod(Oid typeOid, const char *colname, int *typemod, int nTypeMod); + +const int maxNumTypeModifiers = 2; +/* + * Parse a json response containing PXF metadata and load it in the in-memory heap tables, + * to database with Oid=dboid if dboid is not NULL + * Return the list of the parsed tables + */ +List *ParsePxfEntries(StringInfo json, char *profile, Oid dboid) +{ + struct json_object *jsonObj = json_tokener_parse(json->data); + if ((NULL == jsonObj ) || is_error(jsonObj)) + { + return NIL; + } + + List *tables = NIL; + struct json_object *jsonItems = json_object_object_get(jsonObj, "PXFMetadata"); + if ((jsonItems == NULL) || is_error(jsonItems)) + { + return NIL; + } + + const int numItems = json_object_array_length(jsonItems); + for (int i = 0; i < numItems; i++) + { + struct json_object *jsonItem = json_object_array_get_idx(jsonItems, i); + PxfItem *pxfItem = ParsePxfItem(jsonItem, profile); + if (dboid != NULL) + LoadPxfItem(pxfItem, dboid); + tables = lappend(tables, pxfItem); + } + + return tables; +} + +/* + * ParsePxfItem + * Parse the given json object representing a single PXF item into the internal + * representation + */ +static PxfItem *ParsePxfItem(struct json_object *pxfMD, char* profile) +{ + PxfItem *pxfItem = palloc0(sizeof(PxfItem)); + + /* parse item name */ + struct json_object *jsonItem = json_object_object_get(pxfMD, "item"); + char *itemPath = pstrdup(json_object_get_string(json_object_object_get(jsonItem, "path"))); + char *itemName = pstrdup(json_object_get_string(json_object_object_get(jsonItem, "name"))); + + pxfItem->profile = profile; + pxfItem->path = itemPath; + pxfItem->name = itemName; + + elog(DEBUG1, "Parsed item %s, namespace %s", itemName, itemPath); + + /* parse columns */ + struct json_object *jsonFields = json_object_object_get(pxfMD, "fields"); + const int numFields = json_object_array_length(jsonFields); + for (int i = 0; i < numFields; i++) + { + PxfField *pxfField = palloc0(sizeof(PxfField)); + struct json_object *jsonCol = json_object_array_get_idx(jsonFields, i); + + struct json_object *fieldName = json_object_object_get(jsonCol, "name"); + pxfField->name = pstrdup(json_object_get_string(fieldName)); + + struct json_object *fieldType = json_object_object_get(jsonCol, "type"); + pxfField->type = pstrdup(json_object_get_string(fieldType)); + pxfField->nTypeModifiers = 0; + + elog(DEBUG1, "Parsing field %s, type %s", pxfField->name, pxfField->type); + + struct json_object *jsonModifiers = json_object_object_get(jsonCol, "modifiers"); + if (NULL != jsonModifiers) + { + const int numModifiers = json_object_array_length(jsonModifiers); + Assert(2 >= numModifiers); + + pxfField->nTypeModifiers = numModifiers; + for (int j = 0; j < numModifiers; j++) + { + struct json_object *jsonMod = json_object_array_get_idx(jsonModifiers, j); + pxfField->typeModifiers[j] = json_object_get_int(jsonMod); + + elog(DEBUG1, "modifier[%d]: %d", j, pxfField->typeModifiers[j]); + } + } + pxfItem->fields = lappend(pxfItem->fields, pxfField); + } + + return pxfItem; +} + +/* + * LoadPxfItem + * Load the given PXF item into in-memory heap tables + */ +static void LoadPxfItem(PxfItem *pxfItem, Oid dboid) +{ + Oid namespaceOid = LookupNamespaceId(pxfItem->path, dboid); + + if (!OidIsValid(namespaceOid)) + { + /* external database name has not been mapped to a namespace yet: create it */ + namespaceOid = LoadNamespace(pxfItem->path, dboid); + elog(DEBUG1, "No namespace found: %s. Generated new namespace oid: %u", pxfItem->path, namespaceOid); + } + + LoadTable(namespaceOid, pxfItem); +} + +/* + * LoadNamespace + * Create an entry for the given PXF namespace in the in-memory heap tables and + * return the reserved namespace oid + */ +static Oid LoadNamespace(const char *namespaceName, Oid dboid) +{ + Assert(OidIsValid(dboid)); + + Oid namespaceOid = GetNewExternalObjectId(); + + bool nulls[Natts_pg_namespace] = {false}; + Datum values[Natts_pg_namespace] = {(Datum) 0}; + + NameData name; + namestrcpy(&name, namespaceName); + values[Anum_pg_namespace_nspname - 1] = NameGetDatum(&name); + values[Anum_pg_namespace_nspdboid - 1] = ObjectIdGetDatum((Oid) dboid); + values[Anum_pg_namespace_nspowner - 1] = ObjectIdGetDatum(GetUserId()); + nulls[Anum_pg_namespace_nspacl - 1] = true; + + cqContext *pcqCtx = caql_beginscan( + NULL, + cql("INSERT INTO pg_namespace", NULL)); + + HeapTuple tup = caql_form_tuple(pcqCtx, values, nulls); + HeapTupleSetOid(tup, namespaceOid); + + caql_insert_inmem(pcqCtx, tup); + caql_endscan(pcqCtx); + + return namespaceOid; +} + +/* + * LoadTable + * Load the metadata for an PXF table to pg_class and related catalog tables. + */ +static void LoadTable(Oid namespaceOid, PxfItem *pxfItem) +{ + /* + * assert entry is not already loaded in pg_class + */ + Oid relid = caql_getoid_only( + NULL, + NULL, + cql("SELECT oid FROM pg_class " + " WHERE relname = :1 and relnamespace = :2", + CStringGetDatum(pxfItem->name), ObjectIdGetDatum(namespaceOid))); + if (InvalidOid != relid) + { + + HeapTuple tup; + Form_pg_namespace namespace; + + tup = caql_getfirst( + NULL, + cql("SELECT * FROM pg_namespace " + " WHERE oid = :1 ", + ObjectIdGetDatum(namespaceOid))); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "could not find tuple for namespace with oid=%u", + namespaceOid); + + namespace = (Form_pg_namespace) GETSTRUCT(tup); + + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("relation \"%s.%s.%s\" already exists", + NameStr(namespace->nspname), pxfItem->path, pxfItem->name), + errOmitLocation(true))); + } + + /* generate a new relid for the table */ + relid = GetNewExternalObjectId(); + elog(DEBUG1, "Generated new relation oid: %u", relid); + + /* generate new oid for pg_type entry */ + Oid reltypeoid = GetNewExternalObjectId(); + elog(DEBUG1, "Generated new reltype oid: %u", reltypeoid); + + Datum values[Natts_pg_class]; + bool nulls[Natts_pg_class]; + for (int i = 0; i < Natts_pg_class; i++) + { + nulls[i] = false; + values[i] = (Datum) 0; + } + + NameData name; + namestrcpy(&name, pxfItem->name); + + values[Anum_pg_class_relname - 1] = NameGetDatum(&name); + values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(namespaceOid); + values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(reltypeoid); // TODO: Jun 05, 2015 - nhorn: check + values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(GetUserId()); + values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(InvalidOid); /* access method for indexes */ + values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(InvalidOid); /* physical storage file id */ + values[Anum_pg_class_reltablespace - 1] = ObjectIdGetDatum(InvalidOid); + values[Anum_pg_class_relpages - 1] = Int32GetDatum(1); /* TODO: Mar 13, 2015 - lantova: get table statistics from HCatalog */ + values[Anum_pg_class_reltuples - 1] = 1; /* TODO: Mar 13, 2015 - lantova: get table statistics from HCatalog */ + values[Anum_pg_class_reltoastrelid - 1] = ObjectIdGetDatum(InvalidOid); + values[Anum_pg_class_reltoastidxid - 1] = ObjectIdGetDatum(InvalidOid); + values[Anum_pg_class_relaosegrelid - 1] = ObjectIdGetDatum(InvalidOid); + values[Anum_pg_class_relaosegidxid - 1] = ObjectIdGetDatum(InvalidOid); + values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(false); + values[Anum_pg_class_relisshared - 1] = BoolGetDatum(false); + values[Anum_pg_class_relkind - 1] = CharGetDatum(RELKIND_RELATION); + values[Anum_pg_class_relstorage - 1] = CharGetDatum(RELSTORAGE_EXTERNAL); + values[Anum_pg_class_relnatts - 1] = Int16GetDatum(list_length(pxfItem->fields)); + values[Anum_pg_class_relchecks - 1] = Int16GetDatum(0); + values[Anum_pg_class_reltriggers - 1] = Int16GetDatum(0); + values[Anum_pg_class_relukeys - 1] = Int16GetDatum(0); + values[Anum_pg_class_relfkeys - 1] = Int16GetDatum(0); + values[Anum_pg_class_relrefs - 1] = Int16GetDatum(0); + values[Anum_pg_class_relhasoids - 1] = BoolGetDatum(false); + values[Anum_pg_class_relhaspkey - 1] = BoolGetDatum(false); + values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(false); + values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(false); + values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(InvalidTransactionId); + nulls[Anum_pg_class_relacl - 1] = true; + nulls[Anum_pg_class_reloptions - 1] = true; + + cqContext *pcqCtx = caql_beginscan( + NULL, + cql("INSERT INTO pg_class", NULL)); + + HeapTuple tup = caql_form_tuple(pcqCtx, values, nulls); + HeapTupleSetOid(tup, relid); + + caql_insert_inmem(pcqCtx, tup); + caql_endscan(pcqCtx); + + LoadType(relid, reltypeoid, name, namespaceOid); + LoadDistributionPolicy(relid, pxfItem); + LoadExtTable(relid, pxfItem); + LoadColumns(relid, pxfItem->fields); +} + +/* + * LoadType + * Load the metadata for an PXF table to pg_type + */ +static void LoadType(Oid relid, Oid reltypeoid, NameData relname, Oid relnamespaceoid) +{ + Datum values[Natts_pg_type]; + bool nulls[Natts_pg_type]; + for (int i = 0; i < Natts_pg_type; i++) + { + nulls[i] = false; + values[i] = (Datum) 0; + } + + values[Anum_pg_type_typname - 1] = NameGetDatum(&relname); + values[Anum_pg_type_typnamespace - 1] = ObjectIdGetDatum(relnamespaceoid); + values[Anum_pg_type_typowner - 1] = ObjectIdGetDatum(GetUserId()); + values[Anum_pg_type_typlen - 1] = Int16GetDatum(-1); + values[Anum_pg_type_typbyval - 1] = BoolGetDatum(false); + values[Anum_pg_type_typtype - 1] = CharGetDatum(TYPTYPE_COMPOSITE); + values[Anum_pg_type_typisdefined - 1] = BoolGetDatum(true); + values[Anum_pg_type_typdelim - 1] = CharGetDatum(DEFAULT_TYPDELIM); + values[Anum_pg_type_typrelid - 1] = ObjectIdGetDatum(relid); + values[Anum_pg_type_typelem - 1] = ObjectIdGetDatum(0); /* not an array */ + values[Anum_pg_type_typinput - 1] = ObjectIdGetDatum(F_RECORD_IN); + values[Anum_pg_type_typoutput - 1] = ObjectIdGetDatum(F_RECORD_OUT); ; + values[Anum_pg_type_typreceive - 1] = ObjectIdGetDatum(F_RECORD_RECV); + values[Anum_pg_type_typsend - 1] = ObjectIdGetDatum(F_RECORD_SEND); + values[Anum_pg_type_typanalyze - 1] = ObjectIdGetDatum(0); + values[Anum_pg_type_typalign - 1] = CharGetDatum('d'); /* DOUBLE alignment */ + values[Anum_pg_type_typstorage - 1] = CharGetDatum('x'); /* EXTENDED storage */ + values[Anum_pg_type_typnotnull - 1] = BoolGetDatum(false); + values[Anum_pg_type_typbasetype - 1] = ObjectIdGetDatum(0); + values[Anum_pg_type_typtypmod - 1] = Int32GetDatum(-1); /* this type is not a domain */ + values[Anum_pg_type_typndims - 1] = Int32GetDatum(0); + nulls[Anum_pg_type_typdefaultbin - 1] = true; + nulls[Anum_pg_type_typdefault - 1] = true; + + cqContext *pcqCtx = caql_beginscan( + NULL, + cql("INSERT INTO pg_type", NULL)); + + HeapTuple tup = caql_form_tuple(pcqCtx, values, nulls); + HeapTupleSetOid(tup, reltypeoid); + + caql_insert_inmem(pcqCtx, tup); + caql_endscan(pcqCtx); +} + +/* + * LoadDistributionPolicy + * Load the metadata for an PXF table to gp_distribution_policy + */ +static void LoadDistributionPolicy(Oid relid, PxfItem *pxfItem) +{ + Datum values[Natts_gp_policy]; + bool nulls[Natts_gp_policy]; + for (int i = 0; i < Natts_gp_policy; i++) + { + nulls[i] = false; + values[i] = (Datum) 0; + } + + values[Anum_gp_policy_localoid - 1] = ObjectIdGetDatum(relid); + nulls[Anum_gp_policy_attrnums - 1] = true; /* default distribution */ + + cqContext *pcqCtx = caql_beginscan( + NULL, + cql("INSERT INTO gp_distribution_policy", NULL)); + + HeapTuple tup = caql_form_tuple(pcqCtx, values, nulls); + + caql_insert_inmem(pcqCtx, tup); + caql_endscan(pcqCtx); +} + +/* + * LoadExtTable + * Load the metadata for an PXF table to pg_exttable + */ +static void LoadExtTable(Oid relid, PxfItem *pxfItem) +{ + Datum values[Natts_pg_exttable]; + bool nulls[Natts_pg_exttable]; + for (int i = 0; i < Natts_pg_exttable; i++) + { + nulls[i] = false; + values[i] = (Datum) 0; + } + + /* location - should be an array of text with one element: + * pxf://<ip:port/namaservice>/<hive db>.<hive table>?Profile=Hive */ + StringInfoData locationStr; + initStringInfo(&locationStr); + appendStringInfo(&locationStr, "pxf://%s/%s.%s?Profile=%s", + pxf_service_address, pxfItem->path, pxfItem->name, pxfItem->profile); + Size len = VARHDRSZ + locationStr.len; + /* +1 leaves room for sprintf's trailing null */ + text *t = (text *) palloc(len + 1); + SET_VARSIZE(t, len); + sprintf((char *) VARDATA(t), "%s", locationStr.data); + ArrayBuildState *astate = NULL; + astate = accumArrayResult(astate, PointerGetDatum(t), + false, TEXTOID, + CurrentMemoryContext); + pfree(locationStr.data); + Assert(NULL != astate); + Datum location = makeArrayResult(astate, CurrentMemoryContext); + + /* format options - should be "formatter 'pxfwritable_import'" */ + StringInfoData formatStr; + initStringInfo(&formatStr); + appendStringInfo(&formatStr, "formatter 'pxfwritable_import'"); + Datum format_opts = DirectFunctionCall1(textin, CStringGetDatum(formatStr.data)); + pfree(formatStr.data); + + values[Anum_pg_exttable_reloid - 1] = ObjectIdGetDatum(relid); + values[Anum_pg_exttable_location - 1] = location; + values[Anum_pg_exttable_fmttype - 1] = CharGetDatum('b' /* binary */); + values[Anum_pg_exttable_fmtopts - 1] = format_opts; + nulls[Anum_pg_exttable_command - 1] = true; + nulls[Anum_pg_exttable_rejectlimit - 1] = true; + nulls[Anum_pg_exttable_rejectlimittype - 1] = true; + nulls[Anum_pg_exttable_fmterrtbl - 1] = true; + values[Anum_pg_exttable_encoding - 1] = Int32GetDatum(pg_get_client_encoding() /* default encoding */); + values[Anum_pg_exttable_writable - 1] = BoolGetDatum(false /* not writable */); + + cqContext *pcqCtx = caql_beginscan( + NULL, + cql("INSERT INTO pg_exttable", NULL)); + + HeapTuple tup = caql_form_tuple(pcqCtx, values, nulls); + + caql_insert_inmem(pcqCtx, tup); + caql_endscan(pcqCtx); +} + +/* + * LoadPxfColumns + * Load the column metadata for an PXF table to pg_attribute + */ +static void LoadColumns(Oid relid, List *columns) +{ + Assert(OidIsValid(relid)); + Assert(NULL != columns); + + cqContext *pcqCtx = caql_beginscan( + NULL, + cql("INSERT INTO pg_attribute", NULL)); + + ListCell *lc = NULL; + AttrNumber attno = 1; + foreach(lc, columns) + { + PxfField *field = lfirst(lc); + Oid typeOid = + caql_getoid_only( + NULL, + NULL, + cql("SELECT oid FROM pg_type " + " WHERE typname = :1 and typnamespace = :2", + CStringGetDatum(field->type), ObjectIdGetDatum((Oid) PG_CATALOG_NAMESPACE))); + + if (!OidIsValid(typeOid)) + { + elog(ERROR, "Unsupported type %s for imported column %s", field->type, field->name); + } + + FormData_pg_attribute attributeD; + HeapTuple attributeTuple = heap_addheader(Natts_pg_attribute, + false, + ATTRIBUTE_TUPLE_SIZE, + (void *) &attributeD); + + Form_pg_attribute attribute = (Form_pg_attribute) GETSTRUCT(attributeTuple); + + int16 typlen = 0; + bool typbyval = false; + char typealign = 'c'; /* CHAR/no alignment by default */ + + get_typlenbyvalalign(typeOid, &typlen, &typbyval, &typealign); + + attribute->attrelid = relid; + namestrcpy(&(attribute->attname), field->name); + attribute->atttypid = typeOid; + attribute->attstattarget = 0; /* no stats collection for column */ + attribute->attlen = typlen; + attribute->attcacheoff = -1; + attribute->atttypmod = ComputeTypeMod(typeOid, field->name, field->typeModifiers, field->nTypeModifiers); + attribute->attnum = attno; + attribute->attbyval = typbyval; + attribute->attndims = 0; /* array types not supported */ + attribute->attstorage = get_typstorage(typeOid); + attribute->attalign = typealign; + attribute->attnotnull = false; + attribute->atthasdef = false; + attribute->attisdropped = false; + attribute->attislocal = true; + attribute->attinhcount = 0; + + caql_insert_inmem(pcqCtx, attributeTuple); + + attno++; + } + + caql_endscan(pcqCtx); + +} + +/* + * get_typemod + * Compute the type modifiers for a column of the given type + */ +static int ComputeTypeMod(Oid typeOid, const char *colname, int *typemod, int nTypeMod) +{ + Assert(0 <= nTypeMod && nTypeMod <= maxNumTypeModifiers); + if (0 == nTypeMod) + { + if (BPCHAROID == typeOid) + { + /* "char" without length corresponds to "char(1)" */ + return VARHDRSZ + 1; + } + + return -1; + } + + if (BPCHAROID == typeOid || VARCHAROID == typeOid) + { + /* typemod specifies the length */ + if (1 != nTypeMod) + { + elog(ERROR, "Invalid typemod for imported column %s", colname); + } + + return VARHDRSZ + typemod[0]; + } + + if (NUMERICOID != typeOid) + { + /* TODO: Mar 16, 2015 - lantova: do we need to support typemod with types other than BPCHAR, VARCHAR, NUMERIC? */ + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Invalid typemod for imported column %s", + colname))); + } + + /* pack the scale and precision for NUMERIC into a single number to store in the typmod field in pg_attribute */ + int precision = typemod[0]; + if (1 > precision || NUMERIC_MAX_PRECISION < precision) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Invalid typemod for imported column %s. NUMERIC precision %d must be between 1 and %d", + colname, precision, NUMERIC_MAX_PRECISION))); + } + + int result = precision << 16; + + if (maxNumTypeModifiers == nTypeMod) + { + int scale = typemod[1]; + + if (0 > scale || precision < scale) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("Invalid typemod for imported column %s. NUMERIC scale %d must be between 0 and precision %d", + colname, scale, precision))); + } + result = result | scale; + } + + return VARHDRSZ + result; +} + http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/backend/catalog/hcatalog/Makefile ---------------------------------------------------------------------- diff --git a/src/backend/catalog/hcatalog/Makefile b/src/backend/catalog/hcatalog/Makefile deleted file mode 100644 index 6d1dfde..0000000 --- a/src/backend/catalog/hcatalog/Makefile +++ /dev/null @@ -1,32 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -#------------------------------------------------------------------------- -# -# Makefile-- -# Makefile for catalog/hcatalog -# -#------------------------------------------------------------------------- - -subdir = src/backend/catalog/hcatalog -top_builddir = ../../../.. -include $(top_builddir)/src/Makefile.global - -OBJS = externalmd.o - -include $(top_srcdir)/src/backend/common.mk - http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/backend/catalog/hcatalog/externalmd.c ---------------------------------------------------------------------- diff --git a/src/backend/catalog/hcatalog/externalmd.c b/src/backend/catalog/hcatalog/externalmd.c deleted file mode 100644 index 3a7ebf7..0000000 --- a/src/backend/catalog/hcatalog/externalmd.c +++ /dev/null @@ -1,562 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * externalmd.c - * - * Created on: Mar 6, 2015 - * Author: antova - * - * - * Utilities for loading external hcatalog metadata - * - */ - -#include "postgres.h" -#include <json-c/json.h> - -#include "miscadmin.h" -#include "access/transam.h" -#include "catalog/catquery.h" -#include "catalog/pg_database.h" -#include "catalog/pg_exttable.h" -#include "catalog/pg_namespace.h" -#include "catalog/namespace.h" -#include "catalog/hcatalog/externalmd.h" -#include "commands/typecmds.h" -#include "utils/builtins.h" -#include "utils/fmgroids.h" -#include "utils/lsyscache.h" -#include "utils/numeric.h" -#include "utils/guc.h" - -static HCatalogTable *ParseHCatalogTable(struct json_object *hcatalogMD); -static void LoadHCatalogEntry(HCatalogTable *hcatalogTable); -static Oid LoadHCatalogNamespace(const char *namespaceName); -static void LoadHCatalogTable(Oid namespaceOid, HCatalogTable *hcatalogTable); -static void LoadHCatalogType(Oid relid, Oid reltypeoid, NameData relname, Oid relnamespaceoid); -static void LoadHCatalogDistributionPolicy(Oid relid, HCatalogTable *hcatalogTable); -static void LoadHCatalogExtTable(Oid relid, HCatalogTable *hcatalogTable); -static void LoadHCatalogColumns(Oid relid, List *columns); -int ComputeTypeMod(Oid typeOid, const char *colname, int *typemod, int nTypeMod); - -const int maxNumTypeModifiers = 2; -/* - * Parse a json response containing HCatalog metadata and load it in the in-memory heap tables, - * Return the list of the parsed tables - */ -List *ParseHCatalogEntries(StringInfo json) -{ - struct json_object *jsonObj = json_tokener_parse(json->data); - if ((NULL == jsonObj ) || is_error(jsonObj)) - { - return NIL; - } - - List *tables = NIL; - struct json_object *jsonTables = json_object_object_get(jsonObj, "PXFMetadata"); - if ((jsonTables == NULL) || is_error(jsonTables)) - { - return NIL; - } - - const int numTables = json_object_array_length(jsonTables); - for (int i = 0; i < numTables; i++) - { - struct json_object *jsonTable = json_object_array_get_idx(jsonTables, i); - HCatalogTable *hcatalogTable = ParseHCatalogTable(jsonTable); - LoadHCatalogEntry(hcatalogTable); - tables = lappend(tables, hcatalogTable); - } - - return tables; -} - -/* - * ParseHcatalogTable - * Parse the given json object representing a single HCatalog table into the internal - * representation - */ -HCatalogTable *ParseHCatalogTable(struct json_object *hcatalogMD) -{ - HCatalogTable *hcatalogTable = palloc0(sizeof(HCatalogTable)); - - /* parse table name */ - struct json_object *jsonTable = json_object_object_get(hcatalogMD, "table"); - char *dbName = pstrdup(json_object_get_string(json_object_object_get(jsonTable, "dbName"))); - char *tableName = pstrdup(json_object_get_string(json_object_object_get(jsonTable, "tableName"))); - - hcatalogTable->dbName = dbName; - hcatalogTable->tableName = tableName; - - elog(DEBUG1, "Parsed table %s, namespace %s", tableName, dbName); - - /* parse columns */ - struct json_object *jsonColumns = json_object_object_get(hcatalogMD, "fields"); - const int numColumns = json_object_array_length(jsonColumns); - for (int i = 0; i < numColumns; i++) - { - HCatalogColumn *hcatalogCol = palloc0(sizeof(HCatalogColumn)); - struct json_object *jsonCol = json_object_array_get_idx(jsonColumns, i); - - struct json_object *colName = json_object_object_get(jsonCol, "name"); - hcatalogCol->colName = pstrdup(json_object_get_string(colName)); - - struct json_object *colType = json_object_object_get(jsonCol, "type"); - hcatalogCol->typeName = pstrdup(json_object_get_string(colType)); - hcatalogCol->nTypeModifiers = 0; - - elog(DEBUG1, "Parsing column %s, type %s", hcatalogCol->colName, hcatalogCol->typeName); - - struct json_object *jsonModifiers = json_object_object_get(jsonCol, "modifiers"); - if (NULL != jsonModifiers) - { - const int numModifiers = json_object_array_length(jsonModifiers); - Assert(2 >= numModifiers); - - hcatalogCol->nTypeModifiers = numModifiers; - for (int j = 0; j < numModifiers; j++) - { - struct json_object *jsonMod = json_object_array_get_idx(jsonModifiers, j); - hcatalogCol->typeModifiers[j] = json_object_get_int(jsonMod); - - elog(DEBUG1, "modifier[%d]: %d", j, hcatalogCol->typeModifiers[j]); - } - } - hcatalogTable->columns = lappend(hcatalogTable->columns, hcatalogCol); - } - - return hcatalogTable; -} - -/* - * LoadHcatalogTable - * Load the given hcatalog table into in-memory heap tables - */ -void LoadHCatalogEntry(HCatalogTable *hcatalogTable) -{ - Oid namespaceOid = LookupNamespaceId(hcatalogTable->dbName, HcatalogDbOid); - - if (!OidIsValid(namespaceOid)) - { - /* hcatalog database name has not been mapped to a namespace yet: create it */ - namespaceOid = LoadHCatalogNamespace(hcatalogTable->dbName); - elog(DEBUG1, "No namespace found: %s. Generated new namespace oid: %u", hcatalogTable->dbName, namespaceOid); - } - - LoadHCatalogTable(namespaceOid, hcatalogTable); -} - -/* - * CreateHCatalogNamespace - * Create an entry for the given HCatalog namespace in the in-memory heap tables and - * return the reserved namespace oid - */ -Oid LoadHCatalogNamespace(const char *namespaceName) -{ - Oid namespaceOid = GetNewExternalObjectId(); - - bool nulls[Natts_pg_namespace] = {false}; - Datum values[Natts_pg_namespace] = {(Datum) 0}; - - NameData name; - namestrcpy(&name, namespaceName); - values[Anum_pg_namespace_nspname - 1] = NameGetDatum(&name); - values[Anum_pg_namespace_nspdboid - 1] = ObjectIdGetDatum((Oid) HcatalogDbOid); - values[Anum_pg_namespace_nspowner - 1] = ObjectIdGetDatum(GetUserId()); - nulls[Anum_pg_namespace_nspacl - 1] = true; - - cqContext *pcqCtx = caql_beginscan( - NULL, - cql("INSERT INTO pg_namespace", NULL)); - - HeapTuple tup = caql_form_tuple(pcqCtx, values, nulls); - HeapTupleSetOid(tup, namespaceOid); - - caql_insert_inmem(pcqCtx, tup); - caql_endscan(pcqCtx); - - return namespaceOid; -} - -/* - * LoadHCatalogTable - * Load the metadata for an HCatalog table to pg_class and related catalog tables. - */ -void LoadHCatalogTable(Oid namespaceOid, HCatalogTable *hcatalogTable) -{ - /* - * assert entry is not already loaded in pg_class - */ - Oid relid = caql_getoid_only( - NULL, - NULL, - cql("SELECT oid FROM pg_class " - " WHERE relname = :1 and relnamespace = :2", - CStringGetDatum(hcatalogTable->tableName), ObjectIdGetDatum(namespaceOid))); - if (InvalidOid != relid) - { - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_TABLE), - errmsg("relation \"hcatalog.%s.%s\" already exists", - hcatalogTable->dbName, hcatalogTable->tableName), - errOmitLocation(true))); - } - - /* generate a new relid for the table */ - relid = GetNewExternalObjectId(); - elog(DEBUG1, "Generated new relation oid: %u", relid); - - /* generate new oid for pg_type entry */ - Oid reltypeoid = GetNewExternalObjectId(); - elog(DEBUG1, "Generated new reltype oid: %u", reltypeoid); - - Datum values[Natts_pg_class]; - bool nulls[Natts_pg_class]; - for (int i = 0; i < Natts_pg_class; i++) - { - nulls[i] = false; - values[i] = (Datum) 0; - } - - NameData name; - namestrcpy(&name, hcatalogTable->tableName); - - values[Anum_pg_class_relname - 1] = NameGetDatum(&name); - values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(namespaceOid); - values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(reltypeoid); // TODO: Jun 05, 2015 - nhorn: check - values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(GetUserId()); - values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(InvalidOid); /* access method for indexes */ - values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(InvalidOid); /* physical storage file id */ - values[Anum_pg_class_reltablespace - 1] = ObjectIdGetDatum(InvalidOid); - values[Anum_pg_class_relpages - 1] = Int32GetDatum(1); /* TODO: Mar 13, 2015 - lantova: get table statistics from HCatalog */ - values[Anum_pg_class_reltuples - 1] = 1; /* TODO: Mar 13, 2015 - lantova: get table statistics from HCatalog */ - values[Anum_pg_class_reltoastrelid - 1] = ObjectIdGetDatum(InvalidOid); - values[Anum_pg_class_reltoastidxid - 1] = ObjectIdGetDatum(InvalidOid); - values[Anum_pg_class_relaosegrelid - 1] = ObjectIdGetDatum(InvalidOid); - values[Anum_pg_class_relaosegidxid - 1] = ObjectIdGetDatum(InvalidOid); - values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(false); - values[Anum_pg_class_relisshared - 1] = BoolGetDatum(false); - values[Anum_pg_class_relkind - 1] = CharGetDatum(RELKIND_RELATION); - values[Anum_pg_class_relstorage - 1] = CharGetDatum(RELSTORAGE_EXTERNAL); - values[Anum_pg_class_relnatts - 1] = Int16GetDatum(list_length(hcatalogTable->columns)); - values[Anum_pg_class_relchecks - 1] = Int16GetDatum(0); - values[Anum_pg_class_reltriggers - 1] = Int16GetDatum(0); - values[Anum_pg_class_relukeys - 1] = Int16GetDatum(0); - values[Anum_pg_class_relfkeys - 1] = Int16GetDatum(0); - values[Anum_pg_class_relrefs - 1] = Int16GetDatum(0); - values[Anum_pg_class_relhasoids - 1] = BoolGetDatum(false); - values[Anum_pg_class_relhaspkey - 1] = BoolGetDatum(false); - values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(false); - values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(false); - values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(InvalidTransactionId); - nulls[Anum_pg_class_relacl - 1] = true; - nulls[Anum_pg_class_reloptions - 1] = true; - - cqContext *pcqCtx = caql_beginscan( - NULL, - cql("INSERT INTO pg_class", NULL)); - - HeapTuple tup = caql_form_tuple(pcqCtx, values, nulls); - HeapTupleSetOid(tup, relid); - - caql_insert_inmem(pcqCtx, tup); - caql_endscan(pcqCtx); - - LoadHCatalogType(relid, reltypeoid, name, namespaceOid); - LoadHCatalogDistributionPolicy(relid, hcatalogTable); - LoadHCatalogExtTable(relid, hcatalogTable); - LoadHCatalogColumns(relid, hcatalogTable->columns); -} - -/* - * LoadHCatalogType - * Load the metadata for an HCatalog table to pg_type - */ -static void LoadHCatalogType(Oid relid, Oid reltypeoid, NameData relname, Oid relnamespaceoid) -{ - Datum values[Natts_pg_type]; - bool nulls[Natts_pg_type]; - for (int i = 0; i < Natts_pg_type; i++) - { - nulls[i] = false; - values[i] = (Datum) 0; - } - - values[Anum_pg_type_typname - 1] = NameGetDatum(&relname); - values[Anum_pg_type_typnamespace - 1] = ObjectIdGetDatum(relnamespaceoid); - values[Anum_pg_type_typowner - 1] = ObjectIdGetDatum(GetUserId()); - values[Anum_pg_type_typlen - 1] = Int16GetDatum(-1); - values[Anum_pg_type_typbyval - 1] = BoolGetDatum(false); - values[Anum_pg_type_typtype - 1] = CharGetDatum(TYPTYPE_COMPOSITE); - values[Anum_pg_type_typisdefined - 1] = BoolGetDatum(true); - values[Anum_pg_type_typdelim - 1] = CharGetDatum(DEFAULT_TYPDELIM); - values[Anum_pg_type_typrelid - 1] = ObjectIdGetDatum(relid); - values[Anum_pg_type_typelem - 1] = ObjectIdGetDatum(0); /* not an array */ - values[Anum_pg_type_typinput - 1] = ObjectIdGetDatum(F_RECORD_IN); - values[Anum_pg_type_typoutput - 1] = ObjectIdGetDatum(F_RECORD_OUT); ; - values[Anum_pg_type_typreceive - 1] = ObjectIdGetDatum(F_RECORD_RECV); - values[Anum_pg_type_typsend - 1] = ObjectIdGetDatum(F_RECORD_SEND); - values[Anum_pg_type_typanalyze - 1] = ObjectIdGetDatum(0); - values[Anum_pg_type_typalign - 1] = CharGetDatum('d'); /* DOUBLE alignment */ - values[Anum_pg_type_typstorage - 1] = CharGetDatum('x'); /* EXTENDED storage */ - values[Anum_pg_type_typnotnull - 1] = BoolGetDatum(false); - values[Anum_pg_type_typbasetype - 1] = ObjectIdGetDatum(0); - values[Anum_pg_type_typtypmod - 1] = Int32GetDatum(-1); /* this type is not a domain */ - values[Anum_pg_type_typndims - 1] = Int32GetDatum(0); - nulls[Anum_pg_type_typdefaultbin - 1] = true; - nulls[Anum_pg_type_typdefault - 1] = true; - - cqContext *pcqCtx = caql_beginscan( - NULL, - cql("INSERT INTO pg_type", NULL)); - - HeapTuple tup = caql_form_tuple(pcqCtx, values, nulls); - HeapTupleSetOid(tup, reltypeoid); - - caql_insert_inmem(pcqCtx, tup); - caql_endscan(pcqCtx); -} - -/* - * LoadHCatalogDistributionPolicy - * Load the metadata for an HCatalog table to gp_distribution_policy - */ -void LoadHCatalogDistributionPolicy(Oid relid, HCatalogTable *hcatalogTable) -{ - Datum values[Natts_gp_policy]; - bool nulls[Natts_gp_policy]; - for (int i = 0; i < Natts_gp_policy; i++) - { - nulls[i] = false; - values[i] = (Datum) 0; - } - - values[Anum_gp_policy_localoid - 1] = ObjectIdGetDatum(relid); - nulls[Anum_gp_policy_attrnums - 1] = true; /* default distribution */ - - cqContext *pcqCtx = caql_beginscan( - NULL, - cql("INSERT INTO gp_distribution_policy", NULL)); - - HeapTuple tup = caql_form_tuple(pcqCtx, values, nulls); - - caql_insert_inmem(pcqCtx, tup); - caql_endscan(pcqCtx); -} - -/* - * LoadHCatalogExtTable - * Load the metadata for an HCatalog table to pg_exttable - */ -void LoadHCatalogExtTable(Oid relid, HCatalogTable *hcatalogTable) -{ - Datum values[Natts_pg_exttable]; - bool nulls[Natts_pg_exttable]; - for (int i = 0; i < Natts_pg_exttable; i++) - { - nulls[i] = false; - values[i] = (Datum) 0; - } - - /* location - should be an array of text with one element: - * pxf://<ip:port/namaservice>/<hive db>.<hive table>?Profile=Hive */ - StringInfoData locationStr; - initStringInfo(&locationStr); - appendStringInfo(&locationStr, "pxf://%s/%s.%s?Profile=Hive", - pxf_service_address, hcatalogTable->dbName, hcatalogTable->tableName); - Size len = VARHDRSZ + locationStr.len; - /* +1 leaves room for sprintf's trailing null */ - text *t = (text *) palloc(len + 1); - SET_VARSIZE(t, len); - sprintf((char *) VARDATA(t), "%s", locationStr.data); - ArrayBuildState *astate = NULL; - astate = accumArrayResult(astate, PointerGetDatum(t), - false, TEXTOID, - CurrentMemoryContext); - pfree(locationStr.data); - Assert(NULL != astate); - Datum location = makeArrayResult(astate, CurrentMemoryContext); - - /* format options - should be "formatter 'pxfwritable_import'" */ - StringInfoData formatStr; - initStringInfo(&formatStr); - appendStringInfo(&formatStr, "formatter 'pxfwritable_import'"); - Datum format_opts = DirectFunctionCall1(textin, CStringGetDatum(formatStr.data)); - pfree(formatStr.data); - - values[Anum_pg_exttable_reloid - 1] = ObjectIdGetDatum(relid); - values[Anum_pg_exttable_location - 1] = location; - values[Anum_pg_exttable_fmttype - 1] = CharGetDatum('b' /* binary */); - values[Anum_pg_exttable_fmtopts - 1] = format_opts; - nulls[Anum_pg_exttable_command - 1] = true; - nulls[Anum_pg_exttable_rejectlimit - 1] = true; - nulls[Anum_pg_exttable_rejectlimittype - 1] = true; - nulls[Anum_pg_exttable_fmterrtbl - 1] = true; - values[Anum_pg_exttable_encoding - 1] = Int32GetDatum(pg_get_client_encoding() /* default encoding */); - values[Anum_pg_exttable_writable - 1] = BoolGetDatum(false /* not writable */); - - cqContext *pcqCtx = caql_beginscan( - NULL, - cql("INSERT INTO pg_exttable", NULL)); - - HeapTuple tup = caql_form_tuple(pcqCtx, values, nulls); - - caql_insert_inmem(pcqCtx, tup); - caql_endscan(pcqCtx); -} - -/* - * LoadHCatalogColumns - * Load the column metadata for an HCatalog table to pg_attribute - */ -void LoadHCatalogColumns(Oid relid, List *columns) -{ - Assert(OidIsValid(relid)); - Assert(NULL != columns); - - cqContext *pcqCtx = caql_beginscan( - NULL, - cql("INSERT INTO pg_attribute", NULL)); - - ListCell *lc = NULL; - AttrNumber attno = 1; - foreach(lc, columns) - { - HCatalogColumn *hcatCol = lfirst(lc); - Oid typeOid = - caql_getoid_only( - NULL, - NULL, - cql("SELECT oid FROM pg_type " - " WHERE typname = :1 and typnamespace = :2", - CStringGetDatum(hcatCol->typeName), ObjectIdGetDatum((Oid) PG_CATALOG_NAMESPACE))); - - if (!OidIsValid(typeOid)) - { - elog(ERROR, "Unsupported type %s for imported column %s", hcatCol->typeName, hcatCol->colName); - } - - FormData_pg_attribute attributeD; - HeapTuple attributeTuple = heap_addheader(Natts_pg_attribute, - false, - ATTRIBUTE_TUPLE_SIZE, - (void *) &attributeD); - - Form_pg_attribute attribute = (Form_pg_attribute) GETSTRUCT(attributeTuple); - - int16 typlen = 0; - bool typbyval = false; - char typealign = 'c'; /* CHAR/no alignment by default */ - - get_typlenbyvalalign(typeOid, &typlen, &typbyval, &typealign); - - attribute->attrelid = relid; - namestrcpy(&(attribute->attname), hcatCol->colName); - attribute->atttypid = typeOid; - attribute->attstattarget = 0; /* no stats collection for column */ - attribute->attlen = typlen; - attribute->attcacheoff = -1; - attribute->atttypmod = ComputeTypeMod(typeOid, hcatCol->colName, hcatCol->typeModifiers, hcatCol->nTypeModifiers); - attribute->attnum = attno; - attribute->attbyval = typbyval; - attribute->attndims = 0; /* array types not supported */ - attribute->attstorage = get_typstorage(typeOid); - attribute->attalign = typealign; - attribute->attnotnull = false; - attribute->atthasdef = false; - attribute->attisdropped = false; - attribute->attislocal = true; - attribute->attinhcount = 0; - - caql_insert_inmem(pcqCtx, attributeTuple); - - attno++; - } - - caql_endscan(pcqCtx); - -} - -/* - * get_typemod - * Compute the type modifiers for a column of the given type - */ -int ComputeTypeMod(Oid typeOid, const char *colname, int *typemod, int nTypeMod) -{ - Assert(0 <= nTypeMod && nTypeMod <= maxNumTypeModifiers); - if (0 == nTypeMod) - { - if (BPCHAROID == typeOid) - { - /* "char" without length corresponds to "char(1)" */ - return VARHDRSZ + 1; - } - - return -1; - } - - if (BPCHAROID == typeOid || VARCHAROID == typeOid) - { - /* typemod specifies the length */ - if (1 != nTypeMod) - { - elog(ERROR, "Invalid typemod for imported column %s", colname); - } - - return VARHDRSZ + typemod[0]; - } - - if (NUMERICOID != typeOid) - { - /* TODO: Mar 16, 2015 - lantova: do we need to support typemod with types other than BPCHAR, VARCHAR, NUMERIC? */ - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("Invalid typemod for imported column %s", - colname))); - } - - /* pack the scale and precision for NUMERIC into a single number to store in the typmod field in pg_attribute */ - int precision = typemod[0]; - if (1 > precision || NUMERIC_MAX_PRECISION < precision) - { - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("Invalid typemod for imported column %s. NUMERIC precision %d must be between 1 and %d", - colname, precision, NUMERIC_MAX_PRECISION))); - } - - int result = precision << 16; - - if (maxNumTypeModifiers == nTypeMod) - { - int scale = typemod[1]; - - if (0 > scale || precision < scale) - { - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("Invalid typemod for imported column %s. NUMERIC scale %d must be between 0 and precision %d", - colname, scale, precision))); - } - result = result | scale; - } - - return VARHDRSZ + result; -} - http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/backend/catalog/namespace.c ---------------------------------------------------------------------- diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index 5056327..0de311f 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -263,7 +263,7 @@ RangeVarGetRelid(const RangeVar *relation, bool failOK, bool allowHcatalog) appendStringInfo(&location, "%s.%s", relation->schemaname, relation->relname); // TODO: May 29, 2015 - shivram: revisit returning the hcat tables here - List *hcat_tables = get_pxf_hcat_metadata(location.data); + List *hcat_tables = get_pxf_item_metadata(HiveProfileName, location.data, HcatalogDbOid); Assert(hcat_tables != NIL); elog(DEBUG2, "Retrieved %d tables from HCatalog for \"%s.%s\"", list_length(hcat_tables), relation->schemaname, relation->relname); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/backend/utils/adt/Makefile ---------------------------------------------------------------------- diff --git a/src/backend/utils/adt/Makefile b/src/backend/utils/adt/Makefile index a840804..11b0c6c 100644 --- a/src/backend/utils/adt/Makefile +++ b/src/backend/utils/adt/Makefile @@ -25,7 +25,7 @@ OBJS = acl.o array_userfuncs.o arrayfuncs.o arrayutils.o ascii.o \ int8.o like.o lockfuncs.o mac.o matrix.o misc.o nabstime.o name.o \ network.o not_in.o numeric.o numutils.o oid.o oracle_compat.o \ percentile.o pg_locale.o pg_lzcompress.o pgstatfuncs.o pivot.o \ - pseudotypes.o quote.o regexp.o regproc.o ri_triggers.o rowtypes.o \ + pseudotypes.o pxf_functions.o quote.o regexp.o regproc.o ri_triggers.o rowtypes.o \ ruleutils.o selfuncs.o tid.o timestamp.o varbit.o varchar.o varlena.o \ version.o xid.o xml.o http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/backend/utils/adt/pxf_functions.c ---------------------------------------------------------------------- diff --git a/src/backend/utils/adt/pxf_functions.c b/src/backend/utils/adt/pxf_functions.c new file mode 100644 index 0000000..1b455e0 --- /dev/null +++ b/src/backend/utils/adt/pxf_functions.c @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "postgres.h" + +#include "catalog/external/externalmd.h" +#include "fmgr.h" +#include "funcapi.h" +#include "utils/builtins.h" +#include "utils/guc.h" + +typedef struct ItemContext +{ + ListCell *current_item; + ListCell *current_field; +} ItemContext; + +static ListCell* pxf_item_fields_enum_start(text *profile, text *pattern); +static ItemContext* pxf_item_fields_enum_next(ItemContext *item_context); +static void pxf_item_fields_enum_end(void); + +static ListCell* +pxf_item_fields_enum_start(text *profile, text *pattern) +{ + List *items = NIL; + + char *profile_cstr = text_to_cstring(profile); + char *pattern_cstr = text_to_cstring(pattern); + + items = get_pxf_item_metadata(profile_cstr, pattern_cstr, NULL); + + if (items == NIL) + return NULL; + + return list_head(items); +} + +static ItemContext* +pxf_item_fields_enum_next(ItemContext *item_context) +{ + + /* first time call */ + if (item_context->current_item && !item_context->current_field) + item_context->current_field = list_head(((PxfItem *) lfirst(item_context->current_item))->fields); + + /* next field for the same item */ + else if (lnext(item_context->current_field)) + item_context->current_field = lnext(item_context->current_field); + /* next item */ + else if (lnext(item_context->current_item)) + { + item_context->current_item = lnext(item_context->current_item); + item_context->current_field = list_head(((PxfItem *) lfirst(item_context->current_item))->fields); + + /* no items, no fields left */ + } else + item_context = NULL; + + return item_context; +} + +static void pxf_item_fields_enum_end(void) +{ + /* cleanup */ +} + +Datum pxf_get_item_fields(PG_FUNCTION_ARGS) +{ + MemoryContext oldcontext; + FuncCallContext *funcctx; + HeapTuple tuple; + Datum result; + Datum values[4]; + bool nulls[4]; + + ItemContext *item_context; + + text *profile = PG_GETARG_TEXT_P(0); + text *pattern = PG_GETARG_TEXT_P(1); + + + if (!hcatalog_enable) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("HCatalog querying is not enabled"))); + } + + /* stuff done only on the first call of the function */ + if (SRF_IS_FIRSTCALL()) + { + TupleDesc tupdesc; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* + * switch to memory context appropriate for multiple function calls + */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* initialize item fileds metadata scanning code */ + + ListCell *items_cell = pxf_item_fields_enum_start(profile, pattern); + + if (items_cell == NULL) + { + pxf_item_fields_enum_end(); + funcctx->user_fctx = NULL; + SRF_RETURN_DONE(funcctx); + } + + item_context = (ItemContext *) palloc0(sizeof(ItemContext)); + item_context->current_item = items_cell; + funcctx->user_fctx = (void *) item_context; + + /* + * build tupdesc for result tuples. This must match this function's + * pg_proc entry! + */ + tupdesc = CreateTemplateTupleDesc(4, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "path", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "itemname", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "fieldname", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "fieldtype", + TEXTOID, -1, 0); + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + item_context = (ItemContext *) funcctx->user_fctx; + + /* search for next entry to display */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + item_context = pxf_item_fields_enum_next(item_context); + ; + funcctx->user_fctx = item_context; + MemoryContextSwitchTo(oldcontext); + + if (!item_context) + { + pxf_item_fields_enum_end(); + funcctx->user_fctx = NULL; + SRF_RETURN_DONE(funcctx); + } + + PxfItem *item = (PxfItem *) lfirst( + item_context->current_item); + PxfField *field = (PxfField *) lfirst( + item_context->current_field); + + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = CStringGetTextDatum(item->path); + values[1] = CStringGetTextDatum(item->name); + values[2] = CStringGetTextDatum(field->name); + values[3] = CStringGetTextDatum(field->type); + + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + result = HeapTupleGetDatum(tuple); + + SRF_RETURN_NEXT(funcctx, result); + +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/include/access/hd_work_mgr.h ---------------------------------------------------------------------- diff --git a/src/include/access/hd_work_mgr.h b/src/include/access/hd_work_mgr.h index 0153d8c..cab8ca7 100644 --- a/src/include/access/hd_work_mgr.h +++ b/src/include/access/hd_work_mgr.h @@ -45,6 +45,8 @@ typedef struct sPxfFragmentStatsElem } PxfFragmentStatsElem; PxfFragmentStatsElem *get_pxf_fragments_statistics(char *uri, Relation rel); -List *get_pxf_hcat_metadata(char *relation_location); +List *get_pxf_item_metadata(char *profile, char *pattern, Oid dboid); + +#define HiveProfileName "Hive" #endif /* HDWORKMGR_H */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/include/access/pxfmasterapi.h ---------------------------------------------------------------------- diff --git a/src/include/access/pxfmasterapi.h b/src/include/access/pxfmasterapi.h index a746a4d..4b9ecd9 100644 --- a/src/include/access/pxfmasterapi.h +++ b/src/include/access/pxfmasterapi.h @@ -73,7 +73,9 @@ extern void free_datanode_rest_server(PxfServer* srv); extern PxfFragmentStatsElem *get_fragments_statistics(GPHDUri* hadoop_uri, ClientContext *cl_context); extern List* get_data_fragment_list(GPHDUri *hadoop_uri, ClientContext* client_context); extern void free_fragment(DataFragment *fragment); -extern List* get_hcat_metadata(GPHDUri* hadoop_uri, char *location, ClientContext *client_context); +extern List* get_external_metadata(GPHDUri* hadoop_uri, char *profile, char *pattern, ClientContext *client_context, Oid dboid); +extern List* get_and_cache_external_metadata(GPHDUri* hadoop_uri, char *profile, char *pattern, ClientContext *client_context, Oid dboid); +extern List* get_no_cache_external_metadata(GPHDUri* hadoop_uri, char *profile, char *pattern, ClientContext *client_context); #endif //_PXF_NAMENODE_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/include/access/pxfuriparser.h ---------------------------------------------------------------------- diff --git a/src/include/access/pxfuriparser.h b/src/include/access/pxfuriparser.h index 2c63ea5..d050325 100644 --- a/src/include/access/pxfuriparser.h +++ b/src/include/access/pxfuriparser.h @@ -79,9 +79,9 @@ typedef struct GPHDUri } GPHDUri; GPHDUri *parseGPHDUri(const char *uri_str); -GPHDUri *parseGPHDUriForHCAT(char *uri_str); +GPHDUri *parseGPHDUriForMetadata(char *uri_str); void freeGPHDUri(GPHDUri *uri); -void freeGPHDUriForHCAT(GPHDUri *uri); +void freeGPHDUriForMetadata(GPHDUri *uri); char *GPHDUri_dup_without_segwork(const char* uri); void GPHDUri_debug_print(GPHDUri *uri); int GPHDUri_get_value_for_opt(GPHDUri *uri, char *key, char **val, bool emit_error); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/include/catalog/external/externalmd.h ---------------------------------------------------------------------- diff --git a/src/include/catalog/external/externalmd.h b/src/include/catalog/external/externalmd.h new file mode 100644 index 0000000..493257a --- /dev/null +++ b/src/include/catalog/external/externalmd.h @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * externalmd.h + * + * Created on: Mar 6, 2015 + * Author: antova + */ + +#ifndef EXTERNALMD_H_ +#define EXTERNALMD_H_ + +#include "itemmd.h" +#include "postgres.h" +#include "nodes/parsenodes.h" + +extern List *ParsePxfEntries(StringInfo json, char *profile, Oid dboid); + +#endif /* EXTERNALMD_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/include/catalog/external/itemmd.h ---------------------------------------------------------------------- diff --git a/src/include/catalog/external/itemmd.h b/src/include/catalog/external/itemmd.h new file mode 100644 index 0000000..a841d63 --- /dev/null +++ b/src/include/catalog/external/itemmd.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * itemmd.h + * + * Representation of PXF item metadata + * Created on: Mar 6, 2015 + * Author: antova + */ + +#ifndef ITEMMD_H_ +#define ITEMMD_H_ + +#include "postgres.h" +#include "nodes/parsenodes.h" + +/* + * Metadata for fields in PXF items + */ +typedef struct PxfField +{ + /* column name */ + char *name; + + /* type name */ + char *type; + + /* type modifiers, e.g. max length or precision */ + int typeModifiers[2]; + + /* number of type modifiers */ + int nTypeModifiers; +} PxfField; + +/* + * Metadata for PXF items + */ +typedef struct PxfItem +{ + /* profile name */ + char *profile; + + /* path */ + char *path; + + /* item name */ + char *name; + + /* fields */ + List *fields; +} PxfItem; + + + +#endif /* ITEMMD_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/include/catalog/hcatalog/externalmd.h ---------------------------------------------------------------------- diff --git a/src/include/catalog/hcatalog/externalmd.h b/src/include/catalog/hcatalog/externalmd.h deleted file mode 100644 index f756c0a..0000000 --- a/src/include/catalog/hcatalog/externalmd.h +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * externalmd.h - * - * Created on: Mar 6, 2015 - * Author: antova - */ - -#ifndef EXTERNALMD_H_ -#define EXTERNALMD_H_ - -#include "postgres.h" -#include "catalog/hcatalog/hcatalogmd.h" -#include "nodes/parsenodes.h" - -extern List *ParseHCatalogEntries(StringInfo json); - -#endif /* EXTERNALMD_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/include/catalog/hcatalog/hcatalogmd.h ---------------------------------------------------------------------- diff --git a/src/include/catalog/hcatalog/hcatalogmd.h b/src/include/catalog/hcatalog/hcatalogmd.h deleted file mode 100644 index 4b72d9e..0000000 --- a/src/include/catalog/hcatalog/hcatalogmd.h +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * hcatalogmd.h - * - * Representation of hcatalog table metadata - * Created on: Mar 6, 2015 - * Author: antova - */ - -#ifndef HCATALOGMD_H_ -#define HCATALOGMD_H_ - -#include "postgres.h" -#include "nodes/parsenodes.h" - -/* - * Metadata for columns in HCatalog tables - */ -typedef struct HCatalogColumn -{ - /* column name */ - char *colName; - - /* type name */ - char *typeName; - - /* type modifiers, e.g. max length or precision */ - int typeModifiers[2]; - - /* number of type modifiers */ - int nTypeModifiers; -} HCatalogColumn; - -/* - * Metadata for HCatalog tables - */ -typedef struct HCatalogTable -{ - /* database name */ - char *dbName; - - /* table name */ - char *tableName; - - /* columns */ - List *columns; -} HCatalogTable; - - - -#endif /* HCATALOGMD_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/include/catalog/pg_proc.h ---------------------------------------------------------------------- diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index a6f8970..f3c5e77 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -10129,8 +10129,12 @@ DESCR("bitmap(internal)"); DATA(insert OID = 3011 ( bmoptions PGNSP PGUID 12 f f t f s 2 17 f "1009 16" _null_ _null_ _null_ bmoptions - _null_ n )); DESCR("btree(internal)"); +/* pxf_get_item_fields(text, text, OUT text, OUT text, OUT text, OUT text) => SETOF pg_catalog.record */ +DATA(insert OID = 9996 ( pxf_get_item_fields PGNSP PGUID 12 f f t t v 2 2249 f "25 25" "{25,25,25,25,25,25}" "{i,i,o,o,o,o}" "{profile,pattern,path,itemname,fieldname,fieldtype}" pxf_get_item_fields - _null_ r )); +DESCR("Returns the metadata fields of external object from PXF"); + /* raises deprecation error */ -/* gp_deprecated() => void */ +/* gp_deprecated() => void */ DATA(insert OID = 9997 ( gp_deprecated PGNSP PGUID 12 f f f f i 0 2278 f "" _null_ _null_ _null_ gp_deprecated - _null_ n )); DESCR("raises function deprecation error"); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/include/catalog/pg_proc.sql ---------------------------------------------------------------------- diff --git a/src/include/catalog/pg_proc.sql b/src/include/catalog/pg_proc.sql index df3fca8..fc475e2 100644 --- a/src/include/catalog/pg_proc.sql +++ b/src/include/catalog/pg_proc.sql @@ -5348,6 +5348,8 @@ CREATE FUNCTION bmoptions(_text, bool) RETURNS bytea LANGUAGE internal STABLE STRICT AS 'bmoptions' WITH (OID=3011, DESCRIPTION="btree(internal)"); + CREATE FUNCTION pxf_get_item_fields(IN profile text, IN pattern text, OUT path text, OUT itemname text, OUT fieldname text, OUT fieldtype text) RETURNS SETOF pg_catalog.record LANGUAGE internal VOLATILE STRICT AS 'pxf_get_object_fields' WITH (OID=9996, DESCRIPTION="Returns the metadata fields of external object from PXF"); + -- raises deprecation error CREATE FUNCTION gp_deprecated() RETURNS void LANGUAGE internal IMMUTABLE AS 'gp_deprecated' WITH (OID=9997, DESCRIPTION="raises function deprecation error"); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/include/utils/builtins.h ---------------------------------------------------------------------- diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index 55ff100..af381c4 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -1221,4 +1221,7 @@ extern Datum gp_metadata_cache_exists(PG_FUNCTION_ARGS); extern Datum gp_metadata_cache_info(PG_FUNCTION_ARGS); extern Datum gp_metadata_cache_put_entry_for_test(PG_FUNCTION_ARGS); +/* PXF functions */ +extern Datum pxf_get_item_fields(PG_FUNCTION_ARGS); + #endif /* BUILTINS_H */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/test/regress/data/hcatalog/invalid_numeric_range.json ---------------------------------------------------------------------- diff --git a/src/test/regress/data/hcatalog/invalid_numeric_range.json b/src/test/regress/data/hcatalog/invalid_numeric_range.json index fe525c8..1888777 100644 --- a/src/test/regress/data/hcatalog/invalid_numeric_range.json +++ b/src/test/regress/data/hcatalog/invalid_numeric_range.json @@ -1 +1 @@ -{"PXFMetadata":[{"table":{"dbName":"default","tableName":"mytable2"},"fields":[{"name":"n5","type":"numeric","modifiers":["30","40"]}]}]} +{"PXFMetadata":[{"item":{"path":"default","name":"mytable2"},"fields":[{"name":"n5","type":"numeric","modifiers":["30","40"]}]}]} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/test/regress/data/hcatalog/invalid_typemod_timestamp.json ---------------------------------------------------------------------- diff --git a/src/test/regress/data/hcatalog/invalid_typemod_timestamp.json b/src/test/regress/data/hcatalog/invalid_typemod_timestamp.json index 3c6e5a5..8e3c570 100644 --- a/src/test/regress/data/hcatalog/invalid_typemod_timestamp.json +++ b/src/test/regress/data/hcatalog/invalid_typemod_timestamp.json @@ -1 +1 @@ -{"PXFMetadata":[{"table":{"dbName":"default","tableName":"mytable3"},"fields":[{"name":"n5","type":"timestamp","modifiers":["30","40"]}]}]} +{"PXFMetadata":[{"item":{"path":"default","name":"mytable3"},"fields":[{"name":"n5","type":"timestamp","modifiers":["30","40"]}]}]} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/test/regress/data/hcatalog/multi_table.json ---------------------------------------------------------------------- diff --git a/src/test/regress/data/hcatalog/multi_table.json b/src/test/regress/data/hcatalog/multi_table.json index d73d129..5d5c0ff 100644 --- a/src/test/regress/data/hcatalog/multi_table.json +++ b/src/test/regress/data/hcatalog/multi_table.json @@ -1 +1 @@ -{"PXFMetadata":[{"table":{"dbName":"db1","tableName":"ht1"},"fields":[{"name":"c1","type":"bpchar","modifiers":["3"]},{"name":"vc2","type":"varchar","modifiers":["3"]}]},{"table":{"dbName":"db2","tableName":"ht1"},"fields":[{"name":"c1","type":"bpchar","modifiers":["3"]},{"name":"vc2","type":"varchar","modifiers":["3"]}]},{"table":{"dbName":"db2","tableName":"ht2"},"fields":[{"name":"c1","type":"bpchar","modifiers":["3"]},{"name":"vc2","type":"varchar","modifiers":["3"]}]}]} +{"PXFMetadata":[{"item":{"path":"db1","name":"ht1"},"fields":[{"name":"c1","type":"bpchar","modifiers":["3"]},{"name":"vc2","type":"varchar","modifiers":["3"]}]},{"item":{"path":"db2","name":"ht1"},"fields":[{"name":"c1","type":"bpchar","modifiers":["3"]},{"name":"vc2","type":"varchar","modifiers":["3"]}]},{"item":{"path":"db2","name":"ht2"},"fields":[{"name":"c1","type":"bpchar","modifiers":["3"]},{"name":"vc2","type":"varchar","modifiers":["3"]}]}]} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/test/regress/data/hcatalog/multi_table_duplicates.json ---------------------------------------------------------------------- diff --git a/src/test/regress/data/hcatalog/multi_table_duplicates.json b/src/test/regress/data/hcatalog/multi_table_duplicates.json index 0a11345..45db598 100644 --- a/src/test/regress/data/hcatalog/multi_table_duplicates.json +++ b/src/test/regress/data/hcatalog/multi_table_duplicates.json @@ -1 +1 @@ -{"PXFMetadata":[{"table":{"dbName":"db","tableName":"t"},"fields":[{"name":"c1","type":"bpchar","modifiers":["3"]},{"name":"vc2","type":"varchar","modifiers":["3"]}]},{"table":{"dbName":"db","tableName":"t"},"fields":[{"name":"c1","type":"bpchar","modifiers":["3"]},{"name":"vc2","type":"varchar","modifiers":["3"]}]}]} +{"PXFMetadata":[{"item":{"path":"db","name":"t"},"fields":[{"name":"c1","type":"bpchar","modifiers":["3"]},{"name":"vc2","type":"varchar","modifiers":["3"]}]},{"item":{"path":"db","name":"t"},"fields":[{"name":"c1","type":"bpchar","modifiers":["3"]},{"name":"vc2","type":"varchar","modifiers":["3"]}]}]} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/test/regress/data/hcatalog/single_table.json ---------------------------------------------------------------------- diff --git a/src/test/regress/data/hcatalog/single_table.json b/src/test/regress/data/hcatalog/single_table.json index d5e8336..7df3427 100644 --- a/src/test/regress/data/hcatalog/single_table.json +++ b/src/test/regress/data/hcatalog/single_table.json @@ -1 +1 @@ -{"PXFMetadata":[{"table":{"dbName":"default","tableName":"mytable"},"fields":[{"name":"s1","type":"text"},{"name":"s2","type":"text"},{"name":"n1","type":"int4"},{"name":"d1","type":"float8"},{"name":"dc1","type":"numeric","modifiers":["38","18"]},{"name":"tm","type":"timestamp"},{"name":"f","type":"float4"},{"name":"bg","type":"int8"},{"name":"b","type":"bool"},{"name":"tn","type":"int2"},{"name":"sml","type":"int2"},{"name":"dt","type":"date"},{"name":"vc1","type":"varchar","modifiers":["5"]},{"name":"c1","type":"bpchar","modifiers":["3"]},{"name":"bin","type":"bytea"}]}]} +{"PXFMetadata":[{"item":{"path":"default","name":"mytable"},"fields":[{"name":"s1","type":"text"},{"name":"s2","type":"text"},{"name":"n1","type":"int4"},{"name":"d1","type":"float8"},{"name":"dc1","type":"numeric","modifiers":["38","18"]},{"name":"tm","type":"timestamp"},{"name":"f","type":"float4"},{"name":"bg","type":"int8"},{"name":"b","type":"bool"},{"name":"tn","type":"int2"},{"name":"sml","type":"int2"},{"name":"dt","type":"date"},{"name":"vc1","type":"varchar","modifiers":["5"]},{"name":"c1","type":"bpchar","modifiers":["3"]},{"name":"bin","type":"bytea"}]}]} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/test/regress/input/hcatalog_lookup.source ---------------------------------------------------------------------- diff --git a/src/test/regress/input/hcatalog_lookup.source b/src/test/regress/input/hcatalog_lookup.source index 11be8b9..3d7279f 100644 --- a/src/test/regress/input/hcatalog_lookup.source +++ b/src/test/regress/input/hcatalog_lookup.source @@ -6,9 +6,13 @@ SET hcatalog_enable = false; SELECT * from hcatalog.db.t; +SELECT * FROM pxf_get_item_fields('Hive', '*'); + -- enable GUC SET hcatalog_enable = true; +SELECT * FROM pxf_get_item_fields('Hive', '*abc*abc*'); + -- Create function to insert and scan in-memory data to pg_class CREATE OR REPLACE FUNCTION convert_to_hcatalog_schema(schemaName text) RETURNS text AS '@abs_builddir@/regress@DLSUFFIX@', 'convert_to_hcatalog_schema' http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/test/regress/json_utils.c ---------------------------------------------------------------------- diff --git a/src/test/regress/json_utils.c b/src/test/regress/json_utils.c index c89f131..77dc0b5 100644 --- a/src/test/regress/json_utils.c +++ b/src/test/regress/json_utils.c @@ -5,11 +5,12 @@ * Author: antova */ +#include "catalog/external/externalmd.h" #include "postgres.h" +#include "access/hd_work_mgr.h" #include "funcapi.h" #include "catalog/catquery.h" #include "catalog/gp_policy.h" -#include "catalog/hcatalog/externalmd.h" #include "catalog/namespace.h" #include "catalog/pg_database.h" #include "catalog/pg_exttable.h" @@ -68,16 +69,16 @@ load_json_data(PG_FUNCTION_ARGS) initStringInfo(&buf); appendStringInfo(&buf, "%s", pcbuf); - List *hcatalog_tables = ParseHCatalogEntries(&buf); + List *items = ParsePxfEntries(&buf, HiveProfileName, HcatalogDbOid); pfree(buf.data); StringInfoData tblNames; initStringInfo(&tblNames); ListCell *lc = NULL; - foreach (lc, hcatalog_tables) + foreach (lc, items) { - HCatalogTable *hcatalogTable = (HCatalogTable *) lfirst(lc); - appendStringInfo(&tblNames, "%s.%s ", hcatalogTable->dbName, hcatalogTable->tableName); + PxfItem *item = (PxfItem *) lfirst(lc); + appendStringInfo(&tblNames, "%s.%s ", item->path, item->name); } PG_RETURN_TEXT_P(cstring_to_text(tblNames.data)); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/test/regress/output/hcatalog_lookup.source ---------------------------------------------------------------------- diff --git a/src/test/regress/output/hcatalog_lookup.source b/src/test/regress/output/hcatalog_lookup.source index a5a2231..ed4a7ef 100644 --- a/src/test/regress/output/hcatalog_lookup.source +++ b/src/test/regress/output/hcatalog_lookup.source @@ -7,8 +7,15 @@ SELECT * from hcatalog.db.t; ERROR: HCatalog querying is not enabled, query for "hcatalog.db.t" is not allowed in this context LINE 1: SELECT * from hcatalog.db.t; ^ +SELECT * FROM pxf_get_item_fields('Hive', '*'); +ERROR: HCatalog querying is not enabled -- enable GUC SET hcatalog_enable = true; +SELECT * FROM pxf_get_item_fields('Hive', '*abc*abc*'); + path | itemname | fieldname | fieldtype +------+----------+-----------+----------- +(0 rows) + -- Create function to insert and scan in-memory data to pg_class CREATE OR REPLACE FUNCTION convert_to_hcatalog_schema(schemaName text) RETURNS text AS '@abs_builddir@/regress@DLSUFFIX@', 'convert_to_hcatalog_schema' http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/ecb85e5c/src/test/regress/output/json_load.source ---------------------------------------------------------------------- diff --git a/src/test/regress/output/json_load.source b/src/test/regress/output/json_load.source index a67f383..3e7bea2 100644 --- a/src/test/regress/output/json_load.source +++ b/src/test/regress/output/json_load.source @@ -184,7 +184,7 @@ END TRANSACTION; -- negative test: duplicated tables BEGIN TRANSACTION; SELECT load_json_data('@abs_builddir@/data/hcatalog/multi_table_duplicates.json'); -ERROR: relation "hcatalog.db.t" already exists +ERROR: relation "db.db.t" already exists END TRANSACTION; -- negative test case: invalid numeric typemod BEGIN TRANSACTION;