HAWQ-1179. Call Bridge api with profile value read from Fragmenter call.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/65e1ed10 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/65e1ed10 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/65e1ed10 Branch: refs/heads/HAWQ-1177 Commit: 65e1ed10a0399711d7548f5038697c0d624456c6 Parents: 7ec8536 Author: Oleksandr Diachenko <odiache...@pivotal.io> Authored: Wed Dec 21 15:55:02 2016 -0800 Committer: Oleksandr Diachenko <odiache...@pivotal.io> Committed: Wed Dec 21 15:55:02 2016 -0800 ---------------------------------------------------------------------- src/backend/access/external/hd_work_mgr.c | 21 +- src/backend/access/external/pxfheaders.c | 14 +- src/backend/access/external/pxfmasterapi.c | 9 + src/backend/access/external/pxfuriparser.c | 156 +++++- .../access/external/test/hd_work_mgr_test.c | 112 ++++ .../access/external/test/pxfuriparser_test.c | 558 ++++++++++++++++++- src/bin/gpfusion/gpbridgeapi.c | 11 + src/include/access/pxfmasterapi.h | 1 + src/include/access/pxfuriparser.h | 3 + 9 files changed, 841 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/backend/access/external/hd_work_mgr.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/hd_work_mgr.c b/src/backend/access/external/hd_work_mgr.c index 6829de5..520b5de 100644 --- a/src/backend/access/external/hd_work_mgr.c +++ b/src/backend/access/external/hd_work_mgr.c @@ -53,6 +53,7 @@ typedef struct sAllocatedDataFragment char *source_name; /* source name */ char *fragment_md; /* fragment meta data */ char *user_data; /* additional user data */ + char *profile; /* recommended profile to work with fragment */ } AllocatedDataFragment; /* @@ -713,6 +714,7 @@ create_allocated_fragment(DataFragment *fragment) allocated->source_name = pstrdup(fragment->source_name); allocated->fragment_md = (fragment->fragment_md) ? pstrdup(fragment->fragment_md) : NULL; allocated->user_data = (fragment->user_data) ? pstrdup(fragment->user_data) : NULL; + allocated->profile = (fragment->profile) ? pstrdup(fragment->profile) : NULL; return allocated; } @@ -782,12 +784,22 @@ make_allocation_output_string(List *segment_fragments) appendStringInfo(&fragment_str, "%d", frag->index); appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM); if (frag->fragment_md) + { appendStringInfo(&fragment_str, "%s", frag->fragment_md); + } + + appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM); if (frag->user_data) { - appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM); appendStringInfo(&fragment_str, "%s", frag->user_data); } + appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM); + if (frag->profile) + { + appendStringInfo(&fragment_str, "%s", frag->profile); + } + appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM); + fragment_size = strlen(fragment_str.data); appendStringInfo(&segwork, "%d", fragment_size); @@ -817,6 +829,8 @@ free_allocated_frags(List *segment_fragments) pfree(frag->fragment_md); if (frag->user_data) pfree(frag->user_data); + if (frag->profile) + pfree(frag->profile); pfree(frag); } list_free(segment_fragments); @@ -856,6 +870,11 @@ print_fragment_list(List *fragments) { appendStringInfo(&log_str, "user data: %s\n", frag->user_data); } + + if (frag->profile) + { + appendStringInfo(&log_str, "profile: %s\n", frag->profile); + } } elog(FRAGDEBUG, "%s", log_str.data); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/backend/access/external/pxfheaders.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/pxfheaders.c b/src/backend/access/external/pxfheaders.c index fe7cd6a..2381044 100644 --- a/src/backend/access/external/pxfheaders.c +++ b/src/backend/access/external/pxfheaders.c @@ -27,12 +27,12 @@ #include "catalog/pg_exttable.h" #include "access/pxfheaders.h" #include "access/pxffilters.h" +#include "utils/formatting.h" #include "utils/guc.h" static void add_alignment_size_httpheader(CHURL_HEADERS headers); static void add_tuple_desc_httpheader(CHURL_HEADERS headers, Relation rel); static void add_location_options_httpheader(CHURL_HEADERS headers, GPHDUri *gphduri); -static char* prepend_x_gp(const char* key); static void add_delegation_token_headers(CHURL_HEADERS headers, PxfInputData *inputData); static void add_remote_credentials(CHURL_HEADERS headers); static void add_projection_desc_httpheader(CHURL_HEADERS headers, ProjectionInfo *projInfo, List *qualsAttributes); @@ -303,22 +303,12 @@ static void add_location_options_httpheader(CHURL_HEADERS headers, GPHDUri *gphd foreach(option, gphduri->options) { OptionData *data = (OptionData*)lfirst(option); - char *x_gp_key = prepend_x_gp(data->key); + char *x_gp_key = normalize_key_name(data->key); churl_headers_append(headers, x_gp_key, data->value); pfree(x_gp_key); } } -/* Full name of the HEADER KEY expected by the PXF service */ -static char* prepend_x_gp(const char* key) -{ - StringInfoData formatter; - initStringInfo(&formatter); - appendStringInfo(&formatter, "X-GP-%s", key); - - return formatter.data; -} - /* * The function will add delegation token headers. * Each token information piece will be serialized as HEX string. http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/backend/access/external/pxfmasterapi.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/pxfmasterapi.c b/src/backend/access/external/pxfmasterapi.c index 833218a..4ee90ff 100644 --- a/src/backend/access/external/pxfmasterapi.c +++ b/src/backend/access/external/pxfmasterapi.c @@ -358,6 +358,11 @@ parse_get_fragments_response(List *fragments, StringInfo rest_buf) if (js_user_data) fragment->user_data = pstrdup(json_object_get_string(js_user_data)); + /* 5. profile - recommended profile to work with fragment */ + struct json_object *js_profile = json_object_object_get(js_fragment, "profile"); + if (js_profile) + fragment->profile = pstrdup(json_object_get_string(js_profile)); + /* * HD-2547: * Ignore fragment if it doesn't contain any host locations, @@ -400,6 +405,10 @@ void free_fragment(DataFragment *fragment) if (fragment->user_data) pfree(fragment->user_data); + + if (fragment->profile) + pfree(fragment->profile); + pfree(fragment); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/backend/access/external/pxfuriparser.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/pxfuriparser.c b/src/backend/access/external/pxfuriparser.c index 9e83714..17d5803 100644 --- a/src/backend/access/external/pxfuriparser.c +++ b/src/backend/access/external/pxfuriparser.c @@ -33,7 +33,7 @@ static void GPHDUri_parse_protocol(GPHDUri *uri, char **cursor); static void GPHDUri_parse_authority(GPHDUri *uri, char **cursor); static void GPHDUri_parse_data(GPHDUri *uri, char **cursor); static void GPHDUri_parse_options(GPHDUri *uri, char **cursor); -static List* GPHDUri_parse_option(char* pair, List* options, const char* uri); +static List* GPHDUri_parse_option(char* pair, GPHDUri *uri); static void GPHDUri_free_options(GPHDUri *uri); static void GPHDUri_parse_segwork(GPHDUri *uri, const char *uri_str); static List* GPHDUri_parse_fragment(char* fragment, List* fragments); @@ -41,6 +41,7 @@ static void GPHDUri_free_fragments(GPHDUri *uri); static void GPHDUri_debug_print_options(GPHDUri *uri); static void GPHDUri_debug_print_segwork(GPHDUri *uri); static void GPHDUri_fetch_authority_from_ha_nn(GPHDUri *uri, char *nameservice); +char* normalize_key_name(const char* key); /* parseGPHDUri * @@ -119,6 +120,8 @@ freeGPHDUri(GPHDUri *uri) pfree(uri->host); pfree(uri->port); pfree(uri->data); + if (uri->profile) + pfree(uri->profile); GPHDUri_free_options(uri); if (uri->ha_nodes) @@ -133,6 +136,8 @@ freeGPHDUriForMetadata(GPHDUri *uri) pfree(uri->host); pfree(uri->port); + if (uri->profile) + pfree(uri->profile); pfree(uri); } @@ -472,7 +477,7 @@ GPHDUri_parse_options(GPHDUri *uri, char **cursor) pair; pair = strtok_r(NULL, sep, &strtok_context)) { - uri->options = GPHDUri_parse_option(pair, uri->options, uri->uri); + uri->options = GPHDUri_parse_option(pair, uri); } pfree(dup); @@ -484,7 +489,7 @@ GPHDUri_parse_options(GPHDUri *uri, char **cursor) * to OptionData object (key and value). */ static List* -GPHDUri_parse_option(char* pair, List* options, const char* uri) +GPHDUri_parse_option(char* pair, GPHDUri *uri) { char *sep; @@ -498,33 +503,40 @@ GPHDUri_parse_option(char* pair, List* options, const char* uri) if (sep == NULL) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("Invalid URI %s: option '%s' missing '='", uri, pair))); + errmsg("Invalid URI %s: option '%s' missing '='", uri->uri, pair))); } if (strchr(sep + 1, '=') != NULL) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("Invalid URI %s: option '%s' contains duplicate '='", uri, pair))); + errmsg("Invalid URI %s: option '%s' contains duplicate '='", uri->uri, pair))); } key_len = sep - pair; if (key_len == 0) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("Invalid URI %s: option '%s' missing key before '='", uri, pair))); + errmsg("Invalid URI %s: option '%s' missing key before '='", uri->uri, pair))); } value_len = pair_len - key_len + 1; if (value_len == EMPTY_VALUE_LEN) { ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("Invalid URI %s: option '%s' missing value after '='", uri, pair))); + errmsg("Invalid URI %s: option '%s' missing value after '='", uri->uri, pair))); } option_data->key = pnstrdup(pair,key_len); option_data->value = pnstrdup(sep + 1, value_len); - return lappend(options, option_data); + char *x_gp_key = normalize_key_name(option_data->key); + if (strcmp(x_gp_key, "X-GP-PROFILE") == 0) + { + uri->profile = pstrdup(option_data->value); + } + pfree(x_gp_key); + + return lappend(uri->options, option_data); } /* @@ -562,6 +574,11 @@ GPHDUri_debug_print(GPHDUri *uri) uri->port, uri->data); + if (uri->profile) + { + elog(NOTICE, "Profile: %s", uri->profile); + } + GPHDUri_debug_print_options(uri); GPHDUri_debug_print_segwork(uri); } @@ -611,6 +628,8 @@ GPHDUri_debug_print_segwork(GPHDUri *uri) data->fragment_md ? data->fragment_md : "NULL"); if (data->user_data) appendStringInfo(&fragment_data, ", user data : %s", data->user_data); + if (data->profile) + appendStringInfo(&fragment_data, ", profile : %s", data->profile); elog(NOTICE, "%s", fragment_data.data); ++count; resetStringInfo(&fragment_data); @@ -671,66 +690,123 @@ GPHDUri_parse_segwork(GPHDUri *uri, const char *uri_str) static List* GPHDUri_parse_fragment(char* fragment, List* fragments) { + if (!fragment) + { + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment.fragment string is null."))); + } char *dup_frag = pstrdup(fragment); char *value_start; char *value_end; - bool has_user_data = false; - StringInfoData formatter; + StringInfoData authority_formatter; FragmentData* fragment_data; fragment_data = palloc0(sizeof(FragmentData)); - initStringInfo(&formatter); + initStringInfo(&authority_formatter); value_start = dup_frag; + /* expect ip */ value_end = strchr(value_start, segwork_separator); - Assert(value_end != NULL); + if (value_end == NULL) + { + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid."))); + } *value_end = '\0'; - appendStringInfo(&formatter, "%s:", value_start); + appendStringInfo(&authority_formatter, "%s:", value_start); value_start = value_end + 1; + /* expect port */ value_end = strchr(value_start, segwork_separator); - Assert(value_end != NULL); + if (value_end == NULL) + { + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid."))); + } *value_end = '\0'; - appendStringInfo(&formatter, "%s", value_start); - fragment_data->authority = formatter.data; + appendStringInfo(&authority_formatter, "%s", value_start); + fragment_data->authority = pstrdup(authority_formatter.data); + pfree(authority_formatter.data); value_start = value_end + 1; + /* expect source name */ value_end = strchr(value_start, segwork_separator); - Assert(value_end != NULL); + if (value_end == NULL) + { + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid."))); + } *value_end = '\0'; fragment_data->source_name = pstrdup(value_start); value_start = value_end + 1; + /* expect index */ value_end = strchr(value_start, segwork_separator); - Assert(value_end != NULL); + if (value_end == NULL) + { + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid."))); + } *value_end = '\0'; fragment_data->index = pstrdup(value_start); value_start = value_end + 1; + /* expect fragment metadata */ Assert(value_start); - - /* check for user data */ value_end = strchr(value_start, segwork_separator); - if (value_end != NULL) + if (value_end == NULL) { - has_user_data = true; - *value_end = '\0'; + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid."))); } + *value_end = '\0'; fragment_data->fragment_md = pstrdup(value_start); + value_start = value_end + 1; + + /* expect user data */ + Assert(value_start); + value_end = strchr(value_start, segwork_separator); + if (value_end == NULL) + { + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid."))); + } + *value_end = '\0'; + fragment_data->user_data = pstrdup(value_start); + value_start = value_end + 1; - /* read user data */ - if (has_user_data) + /* expect for profile */ + Assert(value_start); + value_end = strchr(value_start, segwork_separator); + if (value_end == NULL) { - fragment_data->user_data = pstrdup(value_end + 1); + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid."))); } + *value_end = '\0'; + if (strlen(value_start) > 0) + fragment_data->profile = pstrdup(value_start); return lappend(fragments, fragment_data); } /* + * Free fragment data + */ +static void +GPHDUri_free_fragment(FragmentData *data) +{ + if (data->authority) + pfree(data->authority); + if (data->fragment_md) + pfree(data->fragment_md); + if (data->index) + pfree(data->index); + if (data->profile) + pfree(data->profile); + if (data->source_name) + pfree(data->source_name); + if (data->user_data) + pfree(data->user_data); + pfree(data); +} + +/* * Free fragments list */ static void @@ -741,10 +817,7 @@ GPHDUri_free_fragments(GPHDUri *uri) foreach(fragment, uri->fragments) { FragmentData *data = (FragmentData*)lfirst(fragment); - pfree(data->authority); - pfree(data->index); - pfree(data->source_name); - pfree(data); + GPHDUri_free_fragment(data); } list_free(uri->fragments); uri->fragments = NIL; @@ -823,3 +896,26 @@ bool RelationIsExternalPxfReadOnly(Relation rel, StringInfo location) return false; } + +/* + * Full name of the HEADER KEY expected by the PXF service + * Converts input string to upper case and prepends "X-GP-" string + * + */ +char* normalize_key_name(const char* key) +{ + if (!key || strlen(key) == 0) + { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("internal error in pxfheaders.c:normalize_key_name. Parameter key is null or empty."))); + } + + StringInfoData formatter; + initStringInfo(&formatter); + char* upperCasedKey = str_toupper(pstrdup(key), strlen(key)); + appendStringInfo(&formatter, "X-GP-%s", upperCasedKey); + pfree(upperCasedKey); + + return formatter.data; +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/backend/access/external/test/hd_work_mgr_test.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/test/hd_work_mgr_test.c b/src/backend/access/external/test/hd_work_mgr_test.c index c39b2ac..759b461 100644 --- a/src/backend/access/external/test/hd_work_mgr_test.c +++ b/src/backend/access/external/test/hd_work_mgr_test.c @@ -28,12 +28,124 @@ #include "hd_work_mgr_allocate_fragments_to_datanodes_test.c" #include "hd_work_mgr_distribute_work_2_gp_segments_test.c" +/* + * Serialize output string when all fields of AllocatedDataFragment are passed + */ +void +test__make_allocation_output_string(void **state) +{ + + List *segment_fragments = NIL; + AllocatedDataFragment* frag = palloc0(sizeof(AllocatedDataFragment)); + + frag->index = 42; + frag->host = "HOST"; + frag->rest_port = 1312; + frag->source_name = "TABLE_NAME"; + frag->fragment_md = "FRAGMENT_METADATA"; + frag->user_data = "USER_DATA"; + frag->profile = "PROFILE"; + + segment_fragments = lappend(segment_fragments, frag); + char *output_str = make_allocation_output_string(segment_fragments); + + assert_string_equal(output_str, "segwork=60@HOST@1312@TABLE_NAME@42@FRAGMENT_METADATA@USER_DATA@PROFILE@"); + + pfree(frag); + pfree(output_str); + list_free(segment_fragments); +} + +/* + * Serialize output string when profile is empty + */ +void +test__make_allocation_output_string__empty_profile(void **state) +{ + + List *segment_fragments = NIL; + AllocatedDataFragment* frag = palloc0(sizeof(AllocatedDataFragment)); + + frag->index = 42; + frag->host = "HOST"; + frag->rest_port = 1312; + frag->source_name = "TABLE_NAME"; + frag->fragment_md = "FRAGMENT_METADATA"; + frag->user_data = "USER_DATA"; + + segment_fragments = lappend(segment_fragments, frag); + char *output_str = make_allocation_output_string(segment_fragments); + + assert_string_equal(output_str, "segwork=53@HOST@1312@TABLE_NAME@42@FRAGMENT_METADATA@USER_DATA@@"); + + pfree(frag); + pfree(output_str); + list_free(segment_fragments); +} + +/* + * Serialize output string when user data is empty + */ +void +test__make_allocation_output_string__empty_user_data(void **state) +{ + + List *segment_fragments = NIL; + AllocatedDataFragment* frag = palloc0(sizeof(AllocatedDataFragment)); + + frag->index = 42; + frag->host = "HOST"; + frag->rest_port = 1312; + frag->source_name = "TABLE_NAME"; + frag->fragment_md = "FRAGMENT_METADATA"; + frag->profile = "PROFILE"; + + segment_fragments = lappend(segment_fragments, frag); + char *output_str = make_allocation_output_string(segment_fragments); + + assert_string_equal(output_str, "segwork=51@HOST@1312@TABLE_NAME@42@FRAGMENT_METADATA@@PROFILE@"); + + pfree(frag); + pfree(output_str); + list_free(segment_fragments); +} + +/* + * Serialize output string when profile and user data are empty + */ +void +test__make_allocation_output_string__empty_user_data_profile(void **state) +{ + + List *segment_fragments = NIL; + AllocatedDataFragment* frag = palloc0(sizeof(AllocatedDataFragment)); + + frag->index = 42; + frag->host = "HOST"; + frag->rest_port = 1312; + frag->source_name = "TABLE_NAME"; + frag->fragment_md = "FRAGMENT_METADATA"; + + segment_fragments = lappend(segment_fragments, frag); + char *output_str = make_allocation_output_string(segment_fragments); + + assert_string_equal(output_str, "segwork=44@HOST@1312@TABLE_NAME@42@FRAGMENT_METADATA@@@"); + + pfree(frag); + pfree(output_str); + list_free(segment_fragments); +} + int main(int argc, char* argv[]) { cmockery_parse_arguments(argc, argv); const UnitTest tests[] = { + unit_test(test__make_allocation_output_string), + unit_test(test__make_allocation_output_string__empty_profile), + unit_test(test__make_allocation_output_string__empty_user_data), + unit_test(test__make_allocation_output_string__empty_user_data_profile), unit_test(test__do_segment_clustering_by_host__10SegmentsOn3Hosts), unit_test(test__get_dn_processing_load), unit_test(test__create_allocated_fragment__NoUserData), http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/backend/access/external/test/pxfuriparser_test.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/test/pxfuriparser_test.c b/src/backend/access/external/test/pxfuriparser_test.c index b97cef5..1912d2b 100644 --- a/src/backend/access/external/test/pxfuriparser_test.c +++ b/src/backend/access/external/test/pxfuriparser_test.c @@ -72,6 +72,7 @@ test__parseGPHDUri__ValidURI(void **state) assert_string_equal(option->value, "SomeAnalyzer"); assert_true(parsed->fragments == NULL); + assert_true(parsed->profile == NULL); freeGPHDUri(parsed); } @@ -180,6 +181,7 @@ test__parseGPHDUri__NegativeTestNoProtocol(void **state) assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR); assert_true(edata->elevel == ERROR); assert_string_equal(edata->message, "Invalid URI pxf:/1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=HdfsDataFragmenter"); + elog_dismiss(INFO); return; } PG_END_TRY(); @@ -210,6 +212,7 @@ test__parseGPHDUri__NegativeTestNoOptions(void **state) assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR); assert_true(edata->elevel == ERROR); assert_string_equal(edata->message, "Invalid URI pxf://1.2.3.4:5678/some/path/and/table.tbl: missing options section"); + elog_dismiss(INFO); return; } PG_END_TRY(); @@ -240,6 +243,7 @@ test__parseGPHDUri__NegativeTestMissingEqual(void **state) assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR); assert_true(edata->elevel == ERROR); assert_string_equal(edata->message, "Invalid URI pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER: option 'FRAGMENTER' missing '='"); + elog_dismiss(INFO); return; } PG_END_TRY(); @@ -270,6 +274,7 @@ test__parseGPHDUri__NegativeTestDuplicateEquals(void **state) assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR); assert_true(edata->elevel == ERROR); assert_string_equal(edata->message, "Invalid URI pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=HdfsDataFragmenter=DuplicateFragmenter: option 'FRAGMENTER=HdfsDataFragmenter=DuplicateFragmenter' contains duplicate '='"); + elog_dismiss(INFO); return; } PG_END_TRY(); @@ -300,6 +305,7 @@ test__parseGPHDUri__NegativeTestMissingKey(void **state) assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR); assert_true(edata->elevel == ERROR); assert_string_equal(edata->message, "Invalid URI pxf://1.2.3.4:5678/some/path/and/table.tbl?=HdfsDataFragmenter: option '=HdfsDataFragmenter' missing key before '='"); + elog_dismiss(INFO); return; } PG_END_TRY(); @@ -330,6 +336,7 @@ test__parseGPHDUri__NegativeTestMissingValue(void **state) assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR); assert_true(edata->elevel == ERROR); assert_string_equal(edata->message, "Invalid URI pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=: option 'FRAGMENTER=' missing value after '='"); + elog_dismiss(INFO); return; } PG_END_TRY(); @@ -348,6 +355,7 @@ test__GPHDUri_verify_no_duplicate_options__ValidURI(void **state) /* Setting the test -- code omitted -- */ GPHDUri* parsed = parseGPHDUri(valid_uri); GPHDUri_verify_no_duplicate_options(parsed); + assert_string_equal(parsed->profile, "a"); freeGPHDUri(parsed); } @@ -375,6 +383,7 @@ test__GPHDUri_verify_no_duplicate_options__NegativeTestDuplicateOpts(void **stat assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR); assert_true(edata->elevel == ERROR); assert_string_equal(edata->message, "Invalid URI pxf://1.2.3.4:5678/some/path/and/table.tbl?Profile=a&Analyzer=b&PROFILE=c: Duplicate option(s): PROFILE"); + elog_dismiss(INFO); return; } PG_END_TRY(); @@ -424,6 +433,7 @@ test__GPHDUri_verify_core_options_exist__NegativeTestMissingCoreOpts(void **stat assert_true(edata->elevel == ERROR); assert_string_equal(edata->message, "Invalid URI pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=a: PROFILE or ACCESSOR and RESOLVER option(s) missing"); list_free(coreOptions); + elog_dismiss(INFO); return; } PG_END_TRY(); @@ -431,9 +441,541 @@ test__GPHDUri_verify_core_options_exist__NegativeTestMissingCoreOpts(void **stat assert_true(false); } +/* + * Test GPHDUri_parse_fragment when fragment string is valid and all parameters are passed + */ +void +test__GPHDUri_parse_fragment__ValidFragment(void **state) { + + char* fragment = "HOST@REST_PORT@TABLE_NAME@INDEX@FRAGMENT_METADATA@USER_DATA@PROFILE@"; + + List *fragments = NIL; + + fragments = GPHDUri_parse_fragment(fragment, fragments); + + ListCell *fragment_cell = list_head(fragments); + FragmentData *fragment_data = (FragmentData*) lfirst(fragment_cell); + + assert_string_equal(fragment_data->authority, "HOST:REST_PORT"); + assert_string_equal(fragment_data->fragment_md, "FRAGMENT_METADATA"); + assert_string_equal(fragment_data->index, "INDEX"); + assert_string_equal(fragment_data->profile, "PROFILE"); + assert_string_equal(fragment_data->source_name, "TABLE_NAME"); + assert_string_equal(fragment_data->user_data, "USER_DATA"); + + GPHDUri_free_fragment(fragment_data); + list_free(fragments); + +} + +/* + * Test GPHDUri_parse_fragment when fragment string doesn't have profile + */ +void +test__GPHDUri_parse_fragment__EmptyProfile(void **state) { + char* fragment = "HOST@REST_PORT@TABLE_NAME@INDEX@FRAGMENT_METADATA@USER_DATA@@"; + + List *fragments = NIL; + + fragments = GPHDUri_parse_fragment(fragment, fragments); + + ListCell *fragment_cell = list_head(fragments); + FragmentData *fragment_data = (FragmentData*) lfirst(fragment_cell); + + assert_string_equal(fragment_data->authority, "HOST:REST_PORT"); + assert_string_equal(fragment_data->fragment_md, "FRAGMENT_METADATA"); + assert_string_equal(fragment_data->index, "INDEX"); + assert_true(!fragment_data->profile); + assert_string_equal(fragment_data->source_name, "TABLE_NAME"); + assert_string_equal(fragment_data->user_data, "USER_DATA"); + + GPHDUri_free_fragment(fragment_data); + list_free(fragments); + +} + +/* + * Test GPHDUri_parse_fragment when fragment string is null + */ +void +test__GPHDUri_parse_fragment__NullFragment(void **state) { + + char* fragment = NULL; + + List *fragments = NIL; + + StringInfo err_msg = makeStringInfo(); + appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment.fragment string is null."); + + /* Expect error */ + PG_TRY(); + { + /* This will throw a ereport(ERROR).*/ + fragments = GPHDUri_parse_fragment(fragment, fragments); + } + PG_CATCH(); + { + CurrentMemoryContext = 1; + ErrorData *edata = CopyErrorData(); + + /* Validate the type of expected error */ + assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR); + assert_true(edata->elevel == ERROR); + assert_string_equal(edata->message, err_msg->data); + + list_free(fragments); + pfree(err_msg->data); + pfree(err_msg); + elog_dismiss(INFO); + return; + } + PG_END_TRY(); + + /* should not reach here*/ + assert_true(false); + +} + + +/* + * Test GPHDUri_parse_fragment when fragment string is empty + */ +void +test__GPHDUri_parse_fragment__EmptyString(void **state) { + + char* fragment = ""; + + List *fragments = NIL; + + StringInfo err_msg = makeStringInfo(); + appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid."); + + /* Expect error */ + PG_TRY(); + { + /* This will throw a ereport(ERROR).*/ + fragments = GPHDUri_parse_fragment(fragment, fragments); + } + PG_CATCH(); + { + CurrentMemoryContext = 1; + ErrorData *edata = CopyErrorData(); + + /* Validate the type of expected error */ + assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR); + assert_true(edata->elevel == ERROR); + assert_string_equal(edata->message, err_msg->data); + + list_free(fragments); + pfree(err_msg->data); + pfree(err_msg); + elog_dismiss(INFO); + return; + } + PG_END_TRY(); + + /* should not reach here*/ + assert_true(false); + +} + +/* + * Test GPHDUri_parse_fragment when fragment string has less tokens then expected + */ +void +test__GPHDUri_parse_fragment__MissingIpHost(void **state) { + + char* fragment = "@"; + + List *fragments = NIL; + + StringInfo err_msg = makeStringInfo(); + appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid."); + + /* Expect error */ + PG_TRY(); + { + /* This will throw a ereport(ERROR).*/ + fragments = GPHDUri_parse_fragment(fragment, fragments); + } + PG_CATCH(); + { + CurrentMemoryContext = 1; + ErrorData *edata = CopyErrorData(); + + /* Validate the type of expected error */ + assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR); + assert_true(edata->elevel == ERROR); + assert_string_equal(edata->message, err_msg->data); + + list_free(fragments); + pfree(err_msg->data); + pfree(err_msg); + elog_dismiss(INFO); + return; + } + PG_END_TRY(); + + /* should not reach here*/ + assert_true(false); + +} + +/* + * Test GPHDUri_parse_fragment when fragment string has less tokens then expected + */ +void +test__GPHDUri_parse_fragment__MissingPort(void **state) { + + char* fragment = "@HOST@"; + + List *fragments = NIL; + + StringInfo err_msg = makeStringInfo(); + appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid."); + + /* Expect error */ + PG_TRY(); + { + /* This will throw a ereport(ERROR).*/ + fragments = GPHDUri_parse_fragment(fragment, fragments); + } + PG_CATCH(); + { + CurrentMemoryContext = 1; + ErrorData *edata = CopyErrorData(); + + /* Validate the type of expected error */ + assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR); + assert_true(edata->elevel == ERROR); + assert_string_equal(edata->message, err_msg->data); + + list_free(fragments); + pfree(err_msg->data); + pfree(err_msg); + + elog_dismiss(INFO); + return; + } + PG_END_TRY(); + + /* should not reach here*/ + assert_true(false); + +} + +/* + * Test GPHDUri_parse_fragment when fragment string has less tokens then expected + */ +void +test__GPHDUri_parse_fragment__MissingSourceName(void **state) { + + char* fragment = "@HOST@PORT@"; + + List *fragments = NIL; + + StringInfo err_msg = makeStringInfo(); + appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid."); + + /* Expect error */ + PG_TRY(); + { + /* This will throw a ereport(ERROR).*/ + fragments = GPHDUri_parse_fragment(fragment, fragments); + } + PG_CATCH(); + { + CurrentMemoryContext = 1; + ErrorData *edata = CopyErrorData(); + + /* Validate the type of expected error */ + assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR); + assert_true(edata->elevel == ERROR); + assert_string_equal(edata->message, err_msg->data); + + list_free(fragments); + pfree(err_msg->data); + pfree(err_msg); + elog_dismiss(INFO); + return; + } + PG_END_TRY(); + + /* should not reach here*/ + assert_true(false); + +} + +/* + * Test GPHDUri_parse_fragment when fragment string has less tokens then expected + */ +void +test__GPHDUri_parse_fragment__MissingIndex(void **state) { + + + char* fragment = "@HOST@PORT@SOURCE_NAME@"; + + List *fragments = NIL; + + StringInfo err_msg = makeStringInfo(); + appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid."); + + /* Expect error */ + PG_TRY(); + { + /* This will throw a ereport(ERROR).*/ + fragments = GPHDUri_parse_fragment(fragment, fragments); + } + PG_CATCH(); + { + CurrentMemoryContext = 1; + ErrorData *edata = CopyErrorData(); + + /* Validate the type of expected error */ + assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR); + assert_true(edata->elevel == ERROR); + assert_string_equal(edata->message, err_msg->data); + + list_free(fragments); + pfree(err_msg->data); + pfree(err_msg); + elog_dismiss(INFO); + return; + } + PG_END_TRY(); + + /* should not reach here*/ + assert_true(false); + +} + + +/* + * Test GPHDUri_parse_fragment when fragment string has less tokens then expected + */ +void +test__GPHDUri_parse_fragment__MissingFragmentMetadata(void **state) { + + char* fragment = "@HOST@PORT@SOURCE_NAME@42@"; + + List *fragments = NIL; + + StringInfo err_msg = makeStringInfo(); + appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid."); + + /* Expect error */ + PG_TRY(); + { + /* This will throw a ereport(ERROR).*/ + fragments = GPHDUri_parse_fragment(fragment, fragments); + } + PG_CATCH(); + { + CurrentMemoryContext = 1; + ErrorData *edata = CopyErrorData(); + + /* Validate the type of expected error */ + assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR); + assert_true(edata->elevel == ERROR); + assert_string_equal(edata->message, err_msg->data); + + list_free(fragments); + pfree(err_msg->data); + pfree(err_msg); + elog_dismiss(INFO); + return; + } + PG_END_TRY(); + + /* should not reach here*/ + assert_true(false); + +} + +/* + * Test GPHDUri_parse_fragment when fragment string has less tokens then expected + */ +void +test__GPHDUri_parse_fragment__MissingUserData(void **state) { + + char* fragment = "HOST@REST_PORT@TABLE_NAME@INDEX@FRAGMENT_METADATA@"; + + List *fragments = NIL; + + StringInfo err_msg = makeStringInfo(); + appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid."); + + /* Expect error */ + PG_TRY(); + { + /* This will throw a ereport(ERROR).*/ + fragments = GPHDUri_parse_fragment(fragment, fragments); + } + PG_CATCH(); + { + CurrentMemoryContext = 1; + ErrorData *edata = CopyErrorData(); + + /* Validate the type of expected error */ + assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR); + assert_true(edata->elevel == ERROR); + assert_string_equal(edata->message, err_msg->data); + + list_free(fragments); + pfree(err_msg->data); + pfree(err_msg); + elog_dismiss(INFO); + return; + } + PG_END_TRY(); + + /* should not reach here*/ + assert_true(false); + +} + + +/* + * Test GPHDUri_parse_fragment when fragment string has less tokens then expected + */ +void +test__GPHDUri_parse_fragment__MissingProfile(void **state) { + + char* fragment = "HOST@REST_PORT@TABLE_NAME@INDEX@FRAGMENT_METADATA@USER_METADATA@"; + + List *fragments = NIL; + + StringInfo err_msg = makeStringInfo(); + appendStringInfo(err_msg, "internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid."); + + /* Expect error */ + PG_TRY(); + { + /* This will throw a ereport(ERROR).*/ + fragments = GPHDUri_parse_fragment(fragment, fragments); + } + PG_CATCH(); + { + CurrentMemoryContext = 1; + ErrorData *edata = CopyErrorData(); + + /* Validate the type of expected error */ + assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR); + assert_true(edata->elevel == ERROR); + assert_string_equal(edata->message, err_msg->data); + + list_free(fragments); + pfree(err_msg->data); + pfree(err_msg); + elog_dismiss(INFO); + return; + } + PG_END_TRY(); + + /* should not reach here*/ + assert_true(false); + +} + + +void +test__normalize_key_name_Positive(void **state) +{ + char *input_key = strdup("mIxEdCaSeVaLuE"); + char *normalized_key = normalize_key_name(input_key); + assert_string_equal(normalized_key, "X-GP-MIXEDCASEVALUE"); + + pfree(input_key); + pfree(normalized_key); +} + +void +test__normalize_key_name_PositiveUpperCase(void **state) +{ + char *input_key = strdup("ALREADY_UPPER_CASE"); + char *normalized_key = normalize_key_name(input_key); + assert_string_equal(normalized_key, "X-GP-ALREADY_UPPER_CASE"); + + pfree(input_key); + pfree(normalized_key); +} + +void +test__normalize_key_name_Negative__key_is_null(void **state) +{ + char *input_key = NULL; + + StringInfo err_msg = makeStringInfo(); + appendStringInfo(err_msg, "internal error in pxfheaders.c:normalize_key_name. Parameter key is null or empty."); + + /* Expect error */ + PG_TRY(); + { + /* This will throw a ereport(ERROR).*/ + char *normalized_key = normalize_key_name(input_key); + } + PG_CATCH(); + { + CurrentMemoryContext = 1; + ErrorData *edata = CopyErrorData(); + + /* Validate the type of expected error */ + assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR); + assert_true(edata->elevel == ERROR); + assert_string_equal(edata->message, err_msg->data); + + pfree(err_msg->data); + pfree(err_msg); + + return; + } + PG_END_TRY(); + + /* should not reach here*/ + assert_true(false); + +} + +void +test__normalize_key_name_Negative__key_is_empty(void **state) +{ + char *input_key = ""; + + StringInfo err_msg = makeStringInfo(); + appendStringInfo(err_msg, "internal error in pxfheaders.c:normalize_key_name. Parameter key is null or empty."); + + /* Expect error */ + PG_TRY(); + { + /* This will throw a ereport(ERROR).*/ + char *normalized_key = normalize_key_name(input_key); + } + PG_CATCH(); + { + CurrentMemoryContext = 1; + ErrorData *edata = CopyErrorData(); + + /* Validate the type of expected error */ + assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR); + assert_true(edata->elevel == ERROR); + assert_string_equal(edata->message, err_msg->data); + + pfree(err_msg->data); + pfree(err_msg); + + return; + } + PG_END_TRY(); + + /* should not reach here*/ + assert_true(false); + +} + int main(int argc, char* argv[]) { + cmockery_parse_arguments(argc, argv); const UnitTest tests[] = { @@ -449,7 +991,21 @@ main(int argc, char* argv[]) unit_test(test__GPHDUri_verify_no_duplicate_options__ValidURI), unit_test(test__GPHDUri_verify_no_duplicate_options__NegativeTestDuplicateOpts), unit_test(test__GPHDUri_verify_core_options_exist__ValidURI), - unit_test(test__GPHDUri_verify_core_options_exist__NegativeTestMissingCoreOpts) + unit_test(test__GPHDUri_verify_core_options_exist__NegativeTestMissingCoreOpts), + unit_test(test__GPHDUri_parse_fragment__EmptyProfile), + unit_test(test__GPHDUri_parse_fragment__ValidFragment), + unit_test(test__GPHDUri_parse_fragment__EmptyString), + unit_test(test__GPHDUri_parse_fragment__MissingIpHost), + unit_test(test__GPHDUri_parse_fragment__MissingPort), + unit_test(test__GPHDUri_parse_fragment__MissingSourceName), + unit_test(test__GPHDUri_parse_fragment__MissingIndex), + unit_test(test__GPHDUri_parse_fragment__MissingFragmentMetadata), + unit_test(test__GPHDUri_parse_fragment__MissingUserData), + unit_test(test__GPHDUri_parse_fragment__MissingProfile), + unit_test(test__normalize_key_name_Positive), + unit_test(test__normalize_key_name_PositiveUpperCase), + unit_test(test__normalize_key_name_Negative__key_is_null), + unit_test(test__normalize_key_name_Negative__key_is_empty) }; return run_tests(tests); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/bin/gpfusion/gpbridgeapi.c ---------------------------------------------------------------------- diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c index 2751b1b..b524df8 100644 --- a/src/bin/gpfusion/gpbridgeapi.c +++ b/src/bin/gpfusion/gpbridgeapi.c @@ -226,6 +226,17 @@ void set_current_fragment_headers(gphadoop_context* context) churl_headers_remove(context->churl_headers, "X-GP-FRAGMENT-USER-DATA", true); } + /* if current fragment has optimal profile set it*/ + if (frag_data->profile) + { + churl_headers_override(context->churl_headers, "X-GP-PROFILE", frag_data->profile); + } else if (context->gphd_uri->profile) + { + /* if current fragment doesn't have any optimal profile, set to use profile from url */ + churl_headers_override(context->churl_headers, "X-GP-PROFILE", context->gphd_uri->profile); + } + /* if there is no profile passed in url, we expect to have accessor+fragmenter+resolver so no action needed by this point */ + } void gpbridge_import_start(PG_FUNCTION_ARGS) http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/include/access/pxfmasterapi.h ---------------------------------------------------------------------- diff --git a/src/include/access/pxfmasterapi.h b/src/include/access/pxfmasterapi.h index 4b9ecd9..b487b41 100644 --- a/src/include/access/pxfmasterapi.h +++ b/src/include/access/pxfmasterapi.h @@ -56,6 +56,7 @@ typedef struct sDataFragment List *replicas; char *fragment_md; /* fragment meta data (start, end, length, etc.) */ char *user_data; + char *profile; } DataFragment; /* http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/include/access/pxfuriparser.h ---------------------------------------------------------------------- diff --git a/src/include/access/pxfuriparser.h b/src/include/access/pxfuriparser.h index d050325..ac614c5 100644 --- a/src/include/access/pxfuriparser.h +++ b/src/include/access/pxfuriparser.h @@ -46,6 +46,7 @@ typedef struct FragmentData char *source_name; char *fragment_md; char *user_data; + char *profile; } FragmentData; typedef struct OptionData @@ -66,6 +67,7 @@ typedef struct GPHDUri char *host; /* host name str */ char *port; /* port number as string */ char *data; /* data location (path) */ + char *profile; /* profile option */ List *fragments; /* list of FragmentData */ /* options */ @@ -88,5 +90,6 @@ int GPHDUri_get_value_for_opt(GPHDUri *uri, char *key, char **val, bool emit_e bool RelationIsExternalPxfReadOnly(Relation rel, StringInfo location); void GPHDUri_verify_no_duplicate_options(GPHDUri *uri); void GPHDUri_verify_core_options_exist(GPHDUri *uri, List *coreOptions); +char* normalize_key_name(const char* key); #endif // _PXF_URIPARSER_H_