HAWQ-1179. Call Bridge api with profile value read from Fragmenter call.

Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/65e1ed10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/65e1ed10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/65e1ed10

Branch: refs/heads/HAWQ-1177
Commit: 65e1ed10a0399711d7548f5038697c0d624456c6
Parents: 7ec8536
Author: Oleksandr Diachenko <odiache...@pivotal.io>
Authored: Wed Dec 21 15:55:02 2016 -0800
Committer: Oleksandr Diachenko <odiache...@pivotal.io>
Committed: Wed Dec 21 15:55:02 2016 -0800

----------------------------------------------------------------------
 src/backend/access/external/hd_work_mgr.c       |  21 +-
 src/backend/access/external/pxfheaders.c        |  14 +-
 src/backend/access/external/pxfmasterapi.c      |   9 +
 src/backend/access/external/pxfuriparser.c      | 156 +++++-
 .../access/external/test/hd_work_mgr_test.c     | 112 ++++
 .../access/external/test/pxfuriparser_test.c    | 558 ++++++++++++++++++-
 src/bin/gpfusion/gpbridgeapi.c                  |  11 +
 src/include/access/pxfmasterapi.h               |   1 +
 src/include/access/pxfuriparser.h               |   3 +
 9 files changed, 841 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/backend/access/external/hd_work_mgr.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/hd_work_mgr.c 
b/src/backend/access/external/hd_work_mgr.c
index 6829de5..520b5de 100644
--- a/src/backend/access/external/hd_work_mgr.c
+++ b/src/backend/access/external/hd_work_mgr.c
@@ -53,6 +53,7 @@ typedef struct sAllocatedDataFragment
        char *source_name; /* source name */
        char *fragment_md; /* fragment meta data */
        char *user_data; /* additional user data */
+       char *profile; /* recommended profile to work with fragment */
 } AllocatedDataFragment;
 
 /*
@@ -713,6 +714,7 @@ create_allocated_fragment(DataFragment *fragment)
        allocated->source_name = pstrdup(fragment->source_name);
        allocated->fragment_md = (fragment->fragment_md) ? 
pstrdup(fragment->fragment_md) : NULL;
        allocated->user_data = (fragment->user_data) ? 
pstrdup(fragment->user_data) : NULL;
+       allocated->profile = (fragment->profile) ? pstrdup(fragment->profile) : 
NULL;
        return allocated;
 }
 
@@ -782,12 +784,22 @@ make_allocation_output_string(List *segment_fragments)
                appendStringInfo(&fragment_str, "%d", frag->index);
                appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM);
                if (frag->fragment_md)
+               {
                        appendStringInfo(&fragment_str, "%s", 
frag->fragment_md);
+               }
+
+               appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM);
                if (frag->user_data)
                {
-                       appendStringInfoChar(&fragment_str, 
SEGWORK_IN_PAIR_DELIM);
                        appendStringInfo(&fragment_str, "%s", frag->user_data);
                }
+               appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM);
+               if (frag->profile)
+               {
+                       appendStringInfo(&fragment_str, "%s", frag->profile);
+               }
+               appendStringInfoChar(&fragment_str, SEGWORK_IN_PAIR_DELIM);
+
                fragment_size = strlen(fragment_str.data);
 
                appendStringInfo(&segwork, "%d", fragment_size);
@@ -817,6 +829,8 @@ free_allocated_frags(List *segment_fragments)
                        pfree(frag->fragment_md);
                if (frag->user_data)
                        pfree(frag->user_data);
+               if (frag->profile)
+                       pfree(frag->profile);
                pfree(frag);
        }
        list_free(segment_fragments);
@@ -856,6 +870,11 @@ print_fragment_list(List *fragments)
                {
                        appendStringInfo(&log_str, "user data: %s\n", 
frag->user_data);
                }
+
+               if (frag->profile)
+               {
+                       appendStringInfo(&log_str, "profile: %s\n", 
frag->profile);
+               }
        }
 
        elog(FRAGDEBUG, "%s", log_str.data);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/backend/access/external/pxfheaders.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfheaders.c 
b/src/backend/access/external/pxfheaders.c
index fe7cd6a..2381044 100644
--- a/src/backend/access/external/pxfheaders.c
+++ b/src/backend/access/external/pxfheaders.c
@@ -27,12 +27,12 @@
 #include "catalog/pg_exttable.h"
 #include "access/pxfheaders.h"
 #include "access/pxffilters.h"
+#include "utils/formatting.h"
 #include "utils/guc.h"
 
 static void add_alignment_size_httpheader(CHURL_HEADERS headers);
 static void add_tuple_desc_httpheader(CHURL_HEADERS headers, Relation rel);
 static void add_location_options_httpheader(CHURL_HEADERS headers, GPHDUri 
*gphduri);
-static char* prepend_x_gp(const char* key);
 static void add_delegation_token_headers(CHURL_HEADERS headers, PxfInputData 
*inputData);
 static void add_remote_credentials(CHURL_HEADERS headers);
 static void add_projection_desc_httpheader(CHURL_HEADERS headers, 
ProjectionInfo *projInfo, List *qualsAttributes);
@@ -303,22 +303,12 @@ static void add_location_options_httpheader(CHURL_HEADERS 
headers, GPHDUri *gphd
        foreach(option, gphduri->options)
        {
                OptionData *data = (OptionData*)lfirst(option);
-               char *x_gp_key = prepend_x_gp(data->key);
+               char *x_gp_key = normalize_key_name(data->key);
                churl_headers_append(headers, x_gp_key, data->value);
                pfree(x_gp_key);
        }
 }
 
-/* Full name of the HEADER KEY expected by the PXF service */
-static char* prepend_x_gp(const char* key)
-{      
-       StringInfoData formatter;
-       initStringInfo(&formatter);
-       appendStringInfo(&formatter, "X-GP-%s", key);
-       
-       return formatter.data;
-}
-
 /*
  * The function will add delegation token headers.
  * Each token information piece will be serialized as HEX string.

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/backend/access/external/pxfmasterapi.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfmasterapi.c 
b/src/backend/access/external/pxfmasterapi.c
index 833218a..4ee90ff 100644
--- a/src/backend/access/external/pxfmasterapi.c
+++ b/src/backend/access/external/pxfmasterapi.c
@@ -358,6 +358,11 @@ parse_get_fragments_response(List *fragments, StringInfo 
rest_buf)
                if (js_user_data)
                        fragment->user_data = 
pstrdup(json_object_get_string(js_user_data));
 
+               /* 5. profile - recommended profile to work with fragment */
+               struct json_object *js_profile = 
json_object_object_get(js_fragment, "profile");
+               if (js_profile)
+                       fragment->profile = 
pstrdup(json_object_get_string(js_profile));
+
                /*
                 * HD-2547:
                 * Ignore fragment if it doesn't contain any host locations,
@@ -400,6 +405,10 @@ void free_fragment(DataFragment *fragment)
 
        if (fragment->user_data)
                pfree(fragment->user_data);
+
+       if (fragment->profile)
+               pfree(fragment->profile);
+
        pfree(fragment);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/backend/access/external/pxfuriparser.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfuriparser.c 
b/src/backend/access/external/pxfuriparser.c
index 9e83714..17d5803 100644
--- a/src/backend/access/external/pxfuriparser.c
+++ b/src/backend/access/external/pxfuriparser.c
@@ -33,7 +33,7 @@ static void  GPHDUri_parse_protocol(GPHDUri *uri, char 
**cursor);
 static void  GPHDUri_parse_authority(GPHDUri *uri, char **cursor);
 static void  GPHDUri_parse_data(GPHDUri *uri, char **cursor);
 static void  GPHDUri_parse_options(GPHDUri *uri, char **cursor);
-static List* GPHDUri_parse_option(char* pair, List* options, const char* uri);
+static List* GPHDUri_parse_option(char* pair, GPHDUri *uri);
 static void  GPHDUri_free_options(GPHDUri *uri);
 static void  GPHDUri_parse_segwork(GPHDUri *uri, const char *uri_str);
 static List* GPHDUri_parse_fragment(char* fragment, List* fragments);
@@ -41,6 +41,7 @@ static void  GPHDUri_free_fragments(GPHDUri *uri);
 static void  GPHDUri_debug_print_options(GPHDUri *uri);
 static void  GPHDUri_debug_print_segwork(GPHDUri *uri);
 static void  GPHDUri_fetch_authority_from_ha_nn(GPHDUri *uri, char 
*nameservice);
+char* normalize_key_name(const char* key);
 
 /* parseGPHDUri
  *
@@ -119,6 +120,8 @@ freeGPHDUri(GPHDUri *uri)
        pfree(uri->host);
        pfree(uri->port);
        pfree(uri->data);
+       if (uri->profile)
+               pfree(uri->profile);
 
        GPHDUri_free_options(uri);
        if (uri->ha_nodes)
@@ -133,6 +136,8 @@ freeGPHDUriForMetadata(GPHDUri *uri)
 
        pfree(uri->host);
        pfree(uri->port);
+       if (uri->profile)
+               pfree(uri->profile);
 
        pfree(uri);
 }
@@ -472,7 +477,7 @@ GPHDUri_parse_options(GPHDUri *uri, char **cursor)
                        pair;
                        pair = strtok_r(NULL, sep, &strtok_context))
        {
-               uri->options = GPHDUri_parse_option(pair, uri->options, 
uri->uri);
+               uri->options = GPHDUri_parse_option(pair, uri);
        }
 
        pfree(dup);
@@ -484,7 +489,7 @@ GPHDUri_parse_options(GPHDUri *uri, char **cursor)
  * to OptionData object (key and value).
  */
 static List*
-GPHDUri_parse_option(char* pair, List* options, const char* uri)
+GPHDUri_parse_option(char* pair, GPHDUri *uri)
 {
 
        char    *sep;
@@ -498,33 +503,40 @@ GPHDUri_parse_option(char* pair, List* options, const 
char* uri)
        if (sep == NULL) {
                ereport(ERROR,
                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                errmsg("Invalid URI %s: option '%s' missing 
'='", uri, pair)));
+                                errmsg("Invalid URI %s: option '%s' missing 
'='", uri->uri, pair)));
        }
 
        if (strchr(sep + 1, '=') != NULL) {
                ereport(ERROR,
                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                errmsg("Invalid URI %s: option '%s' contains 
duplicate '='", uri, pair)));
+                                errmsg("Invalid URI %s: option '%s' contains 
duplicate '='", uri->uri, pair)));
        }
 
        key_len = sep - pair;
        if (key_len == 0) {
                ereport(ERROR,
                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                errmsg("Invalid URI %s: option '%s' missing 
key before '='", uri, pair)));
+                                errmsg("Invalid URI %s: option '%s' missing 
key before '='", uri->uri, pair)));
        }
        
        value_len = pair_len - key_len + 1;
        if (value_len == EMPTY_VALUE_LEN) {
                ereport(ERROR,
                                (errcode(ERRCODE_SYNTAX_ERROR),
-                                errmsg("Invalid URI %s: option '%s' missing 
value after '='", uri, pair)));
+                                errmsg("Invalid URI %s: option '%s' missing 
value after '='", uri->uri, pair)));
        }
     
        option_data->key = pnstrdup(pair,key_len);
        option_data->value = pnstrdup(sep + 1, value_len);
 
-       return lappend(options, option_data);
+       char *x_gp_key = normalize_key_name(option_data->key);
+       if (strcmp(x_gp_key, "X-GP-PROFILE") == 0)
+       {
+               uri->profile = pstrdup(option_data->value);
+       }
+       pfree(x_gp_key);
+
+       return lappend(uri->options, option_data);
 }
 
 /*
@@ -562,6 +574,11 @@ GPHDUri_debug_print(GPHDUri *uri)
                 uri->port,
                 uri->data);
 
+       if (uri->profile)
+       {
+               elog(NOTICE, "Profile: %s", uri->profile);
+       }
+
        GPHDUri_debug_print_options(uri);
        GPHDUri_debug_print_segwork(uri);
 }
@@ -611,6 +628,8 @@ GPHDUri_debug_print_segwork(GPHDUri *uri)
                                                 data->fragment_md ? 
data->fragment_md : "NULL");
                if (data->user_data)
                        appendStringInfo(&fragment_data, ", user data : %s", 
data->user_data);
+               if (data->profile)
+                       appendStringInfo(&fragment_data, ", profile : %s", 
data->profile);
                elog(NOTICE, "%s", fragment_data.data);
                ++count;
                resetStringInfo(&fragment_data);
@@ -671,66 +690,123 @@ GPHDUri_parse_segwork(GPHDUri *uri, const char *uri_str)
 static List*
 GPHDUri_parse_fragment(char* fragment, List* fragments)
 {
+       if (!fragment)
+       {
+               ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), 
errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment.fragment string 
is null.")));
+       }
 
        char    *dup_frag = pstrdup(fragment);
        char    *value_start;
        char    *value_end;
-       bool    has_user_data = false;
 
-       StringInfoData formatter;
+       StringInfoData authority_formatter;
        FragmentData* fragment_data;
 
        fragment_data = palloc0(sizeof(FragmentData));
-       initStringInfo(&formatter);
+       initStringInfo(&authority_formatter);
 
        value_start = dup_frag;
+
        /* expect ip */
        value_end = strchr(value_start, segwork_separator);
-       Assert(value_end != NULL);
+       if (value_end == NULL)
+       {
+               ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), 
errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment 
string is invalid.")));
+       }
        *value_end = '\0';
-       appendStringInfo(&formatter, "%s:", value_start);
+       appendStringInfo(&authority_formatter, "%s:", value_start);
        value_start = value_end + 1;
+
        /* expect port */
        value_end = strchr(value_start, segwork_separator);
-       Assert(value_end != NULL);
+       if (value_end == NULL)
+       {
+               ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), 
errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment 
string is invalid.")));
+       }
        *value_end = '\0';
-       appendStringInfo(&formatter, "%s", value_start);
-       fragment_data->authority = formatter.data;
+       appendStringInfo(&authority_formatter, "%s", value_start);
+       fragment_data->authority = pstrdup(authority_formatter.data);
+       pfree(authority_formatter.data);
        value_start = value_end + 1;
+
        /* expect source name */
        value_end = strchr(value_start, segwork_separator);
-       Assert(value_end != NULL);
+       if (value_end == NULL)
+       {
+               ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), 
errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment 
string is invalid.")));
+       }
        *value_end = '\0';
        fragment_data->source_name = pstrdup(value_start);
        value_start = value_end + 1;
+
        /* expect index */
        value_end = strchr(value_start, segwork_separator);
-       Assert(value_end != NULL);
+       if (value_end == NULL)
+       {
+               ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), 
errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment 
string is invalid.")));
+       }
        *value_end = '\0';
        fragment_data->index = pstrdup(value_start);
        value_start = value_end + 1;
+
        /* expect fragment metadata */
        Assert(value_start);
-
-       /* check for user data */
        value_end = strchr(value_start, segwork_separator);
-       if (value_end != NULL)
+       if (value_end == NULL)
        {
-               has_user_data = true;
-               *value_end = '\0';
+               ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), 
errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment 
string is invalid.")));
        }
+       *value_end = '\0';
        fragment_data->fragment_md = pstrdup(value_start);
+       value_start = value_end + 1;
+
+       /* expect user data */
+       Assert(value_start);
+       value_end = strchr(value_start, segwork_separator);
+       if (value_end == NULL)
+       {
+               ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), 
errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment 
string is invalid.")));
+       }
+       *value_end = '\0';
+       fragment_data->user_data = pstrdup(value_start);
+       value_start = value_end + 1;
 
-       /* read user data */
-       if (has_user_data)
+       /* expect for profile */
+       Assert(value_start);
+       value_end = strchr(value_start, segwork_separator);
+       if (value_end == NULL)
        {
-               fragment_data->user_data = pstrdup(value_end + 1);
+               ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), 
errmsg("internal error in pxfuriparser.c:GPHDUri_parse_fragment. Fragment 
string is invalid.")));
        }
+       *value_end = '\0';
+       if (strlen(value_start) > 0)
+               fragment_data->profile = pstrdup(value_start);
 
        return lappend(fragments, fragment_data);
 }
 
 /*
+ * Free fragment data
+ */
+static void
+GPHDUri_free_fragment(FragmentData *data)
+{
+       if (data->authority)
+               pfree(data->authority);
+       if (data->fragment_md)
+               pfree(data->fragment_md);
+       if (data->index)
+               pfree(data->index);
+       if (data->profile)
+               pfree(data->profile);
+       if (data->source_name)
+               pfree(data->source_name);
+       if (data->user_data)
+               pfree(data->user_data);
+       pfree(data);
+}
+
+/*
  * Free fragments list
  */
 static void 
@@ -741,10 +817,7 @@ GPHDUri_free_fragments(GPHDUri *uri)
        foreach(fragment, uri->fragments)
        {
                FragmentData *data = (FragmentData*)lfirst(fragment);
-               pfree(data->authority);
-               pfree(data->index);
-               pfree(data->source_name);
-               pfree(data);
+               GPHDUri_free_fragment(data);
        }
        list_free(uri->fragments);
        uri->fragments = NIL;
@@ -823,3 +896,26 @@ bool RelationIsExternalPxfReadOnly(Relation rel, 
StringInfo location)
 
        return false;
 }
+
+/*
+ * Full name of the HEADER KEY expected by the PXF service
+ * Converts input string to upper case and prepends "X-GP-" string
+ *
+ */
+char* normalize_key_name(const char* key)
+{
+       if (!key || strlen(key) == 0)
+       {
+               ereport(ERROR,
+                       (errcode(ERRCODE_INTERNAL_ERROR),
+                        errmsg("internal error in 
pxfheaders.c:normalize_key_name. Parameter key is null or empty.")));
+       }
+
+       StringInfoData formatter;
+       initStringInfo(&formatter);
+       char* upperCasedKey = str_toupper(pstrdup(key), strlen(key));
+       appendStringInfo(&formatter, "X-GP-%s", upperCasedKey);
+       pfree(upperCasedKey);
+
+       return formatter.data;
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/backend/access/external/test/hd_work_mgr_test.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/hd_work_mgr_test.c 
b/src/backend/access/external/test/hd_work_mgr_test.c
index c39b2ac..759b461 100644
--- a/src/backend/access/external/test/hd_work_mgr_test.c
+++ b/src/backend/access/external/test/hd_work_mgr_test.c
@@ -28,12 +28,124 @@
 #include "hd_work_mgr_allocate_fragments_to_datanodes_test.c"
 #include "hd_work_mgr_distribute_work_2_gp_segments_test.c"
 
+/*
+ * Serialize output string when all fields of AllocatedDataFragment are passed
+ */
+void
+test__make_allocation_output_string(void **state)
+{
+
+       List *segment_fragments = NIL;
+       AllocatedDataFragment* frag = palloc0(sizeof(AllocatedDataFragment));
+
+       frag->index = 42;
+       frag->host = "HOST";
+       frag->rest_port = 1312;
+       frag->source_name = "TABLE_NAME";
+       frag->fragment_md = "FRAGMENT_METADATA";
+       frag->user_data = "USER_DATA";
+       frag->profile = "PROFILE";
+
+       segment_fragments = lappend(segment_fragments, frag);
+       char *output_str = make_allocation_output_string(segment_fragments);
+
+       assert_string_equal(output_str, 
"segwork=60@HOST@1312@TABLE_NAME@42@FRAGMENT_METADATA@USER_DATA@PROFILE@");
+
+       pfree(frag);
+       pfree(output_str);
+       list_free(segment_fragments);
+}
+
+/*
+ * Serialize output string when profile is empty
+ */
+void
+test__make_allocation_output_string__empty_profile(void **state)
+{
+
+       List *segment_fragments = NIL;
+       AllocatedDataFragment* frag = palloc0(sizeof(AllocatedDataFragment));
+
+       frag->index = 42;
+       frag->host = "HOST";
+       frag->rest_port = 1312;
+       frag->source_name = "TABLE_NAME";
+       frag->fragment_md = "FRAGMENT_METADATA";
+       frag->user_data = "USER_DATA";
+
+       segment_fragments = lappend(segment_fragments, frag);
+       char *output_str = make_allocation_output_string(segment_fragments);
+
+       assert_string_equal(output_str, 
"segwork=53@HOST@1312@TABLE_NAME@42@FRAGMENT_METADATA@USER_DATA@@");
+
+       pfree(frag);
+       pfree(output_str);
+       list_free(segment_fragments);
+}
+
+/*
+ * Serialize output string when user data is empty
+ */
+void
+test__make_allocation_output_string__empty_user_data(void **state)
+{
+
+       List *segment_fragments = NIL;
+       AllocatedDataFragment* frag = palloc0(sizeof(AllocatedDataFragment));
+
+       frag->index = 42;
+       frag->host = "HOST";
+       frag->rest_port = 1312;
+       frag->source_name = "TABLE_NAME";
+       frag->fragment_md = "FRAGMENT_METADATA";
+       frag->profile = "PROFILE";
+
+       segment_fragments = lappend(segment_fragments, frag);
+       char *output_str = make_allocation_output_string(segment_fragments);
+
+       assert_string_equal(output_str, 
"segwork=51@HOST@1312@TABLE_NAME@42@FRAGMENT_METADATA@@PROFILE@");
+
+       pfree(frag);
+       pfree(output_str);
+       list_free(segment_fragments);
+}
+
+/*
+ * Serialize output string when profile and user data are empty
+ */
+void
+test__make_allocation_output_string__empty_user_data_profile(void **state)
+{
+
+       List *segment_fragments = NIL;
+       AllocatedDataFragment* frag = palloc0(sizeof(AllocatedDataFragment));
+
+       frag->index = 42;
+       frag->host = "HOST";
+       frag->rest_port = 1312;
+       frag->source_name = "TABLE_NAME";
+       frag->fragment_md = "FRAGMENT_METADATA";
+
+       segment_fragments = lappend(segment_fragments, frag);
+       char *output_str = make_allocation_output_string(segment_fragments);
+
+       assert_string_equal(output_str, 
"segwork=44@HOST@1312@TABLE_NAME@42@FRAGMENT_METADATA@@@");
+
+       pfree(frag);
+       pfree(output_str);
+       list_free(segment_fragments);
+}
+
 int 
 main(int argc, char* argv[]) 
 {
        cmockery_parse_arguments(argc, argv);
 
        const UnitTest tests[] = {
+                       unit_test(test__make_allocation_output_string),
+                       
unit_test(test__make_allocation_output_string__empty_profile),
+                       
unit_test(test__make_allocation_output_string__empty_user_data),
+                       
unit_test(test__make_allocation_output_string__empty_user_data_profile),
                        
unit_test(test__do_segment_clustering_by_host__10SegmentsOn3Hosts),
                        unit_test(test__get_dn_processing_load),
                        unit_test(test__create_allocated_fragment__NoUserData),

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/backend/access/external/test/pxfuriparser_test.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/test/pxfuriparser_test.c 
b/src/backend/access/external/test/pxfuriparser_test.c
index b97cef5..1912d2b 100644
--- a/src/backend/access/external/test/pxfuriparser_test.c
+++ b/src/backend/access/external/test/pxfuriparser_test.c
@@ -72,6 +72,7 @@ test__parseGPHDUri__ValidURI(void **state)
        assert_string_equal(option->value, "SomeAnalyzer");
 
        assert_true(parsed->fragments == NULL);
+       assert_true(parsed->profile == NULL);
 
        freeGPHDUri(parsed);
 }
@@ -180,6 +181,7 @@ test__parseGPHDUri__NegativeTestNoProtocol(void **state)
                assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
                assert_true(edata->elevel == ERROR);
                assert_string_equal(edata->message, "Invalid URI 
pxf:/1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=HdfsDataFragmenter");
+               elog_dismiss(INFO);
                return;
        }
        PG_END_TRY();
@@ -210,6 +212,7 @@ test__parseGPHDUri__NegativeTestNoOptions(void **state)
                assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
                assert_true(edata->elevel == ERROR);
                assert_string_equal(edata->message, "Invalid URI 
pxf://1.2.3.4:5678/some/path/and/table.tbl: missing options section");
+               elog_dismiss(INFO);
                return;
        }
        PG_END_TRY();
@@ -240,6 +243,7 @@ test__parseGPHDUri__NegativeTestMissingEqual(void **state)
                assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
                assert_true(edata->elevel == ERROR);
                assert_string_equal(edata->message, "Invalid URI 
pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER: option 'FRAGMENTER' 
missing '='");
+               elog_dismiss(INFO);
                return;
        }
        PG_END_TRY();
@@ -270,6 +274,7 @@ test__parseGPHDUri__NegativeTestDuplicateEquals(void 
**state)
                assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
                assert_true(edata->elevel == ERROR);
                assert_string_equal(edata->message, "Invalid URI 
pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=HdfsDataFragmenter=DuplicateFragmenter:
 option 'FRAGMENTER=HdfsDataFragmenter=DuplicateFragmenter' contains duplicate 
'='");
+               elog_dismiss(INFO);
                return;
        }
        PG_END_TRY();
@@ -300,6 +305,7 @@ test__parseGPHDUri__NegativeTestMissingKey(void **state)
                assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
                assert_true(edata->elevel == ERROR);
                assert_string_equal(edata->message, "Invalid URI 
pxf://1.2.3.4:5678/some/path/and/table.tbl?=HdfsDataFragmenter: option 
'=HdfsDataFragmenter' missing key before '='");
+               elog_dismiss(INFO);
                return;
        }
        PG_END_TRY();
@@ -330,6 +336,7 @@ test__parseGPHDUri__NegativeTestMissingValue(void **state)
                assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
                assert_true(edata->elevel == ERROR);
                assert_string_equal(edata->message, "Invalid URI 
pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=: option 'FRAGMENTER=' 
missing value after '='");
+               elog_dismiss(INFO);
                return;
        }
        PG_END_TRY();
@@ -348,6 +355,7 @@ test__GPHDUri_verify_no_duplicate_options__ValidURI(void 
**state)
        /* Setting the test -- code omitted -- */
        GPHDUri* parsed = parseGPHDUri(valid_uri);
        GPHDUri_verify_no_duplicate_options(parsed);
+       assert_string_equal(parsed->profile, "a");
        freeGPHDUri(parsed);
 }
 
@@ -375,6 +383,7 @@ 
test__GPHDUri_verify_no_duplicate_options__NegativeTestDuplicateOpts(void **stat
                assert_true(edata->sqlerrcode == ERRCODE_SYNTAX_ERROR);
                assert_true(edata->elevel == ERROR);
                assert_string_equal(edata->message, "Invalid URI 
pxf://1.2.3.4:5678/some/path/and/table.tbl?Profile=a&Analyzer=b&PROFILE=c: 
Duplicate option(s): PROFILE");
+               elog_dismiss(INFO);
                return;
        }
        PG_END_TRY();
@@ -424,6 +433,7 @@ 
test__GPHDUri_verify_core_options_exist__NegativeTestMissingCoreOpts(void **stat
                assert_true(edata->elevel == ERROR);
                assert_string_equal(edata->message, "Invalid URI 
pxf://1.2.3.4:5678/some/path/and/table.tbl?FRAGMENTER=a: PROFILE or ACCESSOR 
and RESOLVER option(s) missing");
                list_free(coreOptions);
+               elog_dismiss(INFO);
                return;
        }
        PG_END_TRY();
@@ -431,9 +441,541 @@ 
test__GPHDUri_verify_core_options_exist__NegativeTestMissingCoreOpts(void **stat
        assert_true(false);
 }
 
+/*
+ * Test GPHDUri_parse_fragment when fragment string is valid and all 
parameters are passed
+ */
+void
+test__GPHDUri_parse_fragment__ValidFragment(void **state) {
+
+       char* fragment = 
"HOST@REST_PORT@TABLE_NAME@INDEX@FRAGMENT_METADATA@USER_DATA@PROFILE@";
+
+       List *fragments = NIL;
+
+       fragments = GPHDUri_parse_fragment(fragment, fragments);
+
+       ListCell *fragment_cell = list_head(fragments);
+       FragmentData *fragment_data = (FragmentData*) lfirst(fragment_cell);
+
+       assert_string_equal(fragment_data->authority, "HOST:REST_PORT");
+       assert_string_equal(fragment_data->fragment_md, "FRAGMENT_METADATA");
+       assert_string_equal(fragment_data->index, "INDEX");
+       assert_string_equal(fragment_data->profile, "PROFILE");
+       assert_string_equal(fragment_data->source_name, "TABLE_NAME");
+       assert_string_equal(fragment_data->user_data, "USER_DATA");
+
+       GPHDUri_free_fragment(fragment_data);
+       list_free(fragments);
+
+}
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string doesn't have profile
+ */
+void
+test__GPHDUri_parse_fragment__EmptyProfile(void **state) {
+       char* fragment = 
"HOST@REST_PORT@TABLE_NAME@INDEX@FRAGMENT_METADATA@USER_DATA@@";
+
+       List *fragments = NIL;
+
+       fragments = GPHDUri_parse_fragment(fragment, fragments);
+
+       ListCell *fragment_cell = list_head(fragments);
+       FragmentData *fragment_data = (FragmentData*) lfirst(fragment_cell);
+
+       assert_string_equal(fragment_data->authority, "HOST:REST_PORT");
+       assert_string_equal(fragment_data->fragment_md, "FRAGMENT_METADATA");
+       assert_string_equal(fragment_data->index, "INDEX");
+       assert_true(!fragment_data->profile);
+       assert_string_equal(fragment_data->source_name, "TABLE_NAME");
+       assert_string_equal(fragment_data->user_data, "USER_DATA");
+
+       GPHDUri_free_fragment(fragment_data);
+       list_free(fragments);
+
+}
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string is null
+ */
+void
+test__GPHDUri_parse_fragment__NullFragment(void **state) {
+
+       char* fragment = NULL;
+
+       List *fragments = NIL;
+
+       StringInfo err_msg = makeStringInfo();
+       appendStringInfo(err_msg, "internal error in 
pxfuriparser.c:GPHDUri_parse_fragment.fragment string is null.");
+
+       /* Expect error */
+       PG_TRY();
+       {
+               /* This will throw a ereport(ERROR).*/
+               fragments = GPHDUri_parse_fragment(fragment, fragments);
+       }
+       PG_CATCH();
+       {
+               CurrentMemoryContext = 1;
+               ErrorData *edata = CopyErrorData();
+
+               /* Validate the type of expected error */
+               assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+               assert_true(edata->elevel == ERROR);
+               assert_string_equal(edata->message, err_msg->data);
+
+               list_free(fragments);
+               pfree(err_msg->data);
+               pfree(err_msg);
+               elog_dismiss(INFO);
+               return;
+       }
+       PG_END_TRY();
+
+       /* should not reach here*/
+       assert_true(false);
+
+}
+
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string is empty
+ */
+void
+test__GPHDUri_parse_fragment__EmptyString(void **state) {
+
+       char* fragment = "";
+
+       List *fragments = NIL;
+
+       StringInfo err_msg = makeStringInfo();
+       appendStringInfo(err_msg, "internal error in 
pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.");
+
+       /* Expect error */
+       PG_TRY();
+       {
+               /* This will throw a ereport(ERROR).*/
+               fragments = GPHDUri_parse_fragment(fragment, fragments);
+       }
+       PG_CATCH();
+       {
+               CurrentMemoryContext = 1;
+               ErrorData *edata = CopyErrorData();
+
+               /* Validate the type of expected error */
+               assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+               assert_true(edata->elevel == ERROR);
+               assert_string_equal(edata->message, err_msg->data);
+
+               list_free(fragments);
+               pfree(err_msg->data);
+               pfree(err_msg);
+               elog_dismiss(INFO);
+               return;
+       }
+       PG_END_TRY();
+
+       /* should not reach here*/
+       assert_true(false);
+
+}
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string has less tokens then 
expected
+ */
+void
+test__GPHDUri_parse_fragment__MissingIpHost(void **state) {
+
+       char* fragment = "@";
+
+       List *fragments = NIL;
+
+       StringInfo err_msg = makeStringInfo();
+       appendStringInfo(err_msg, "internal error in 
pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.");
+
+       /* Expect error */
+       PG_TRY();
+       {
+               /* This will throw a ereport(ERROR).*/
+               fragments = GPHDUri_parse_fragment(fragment, fragments);
+       }
+       PG_CATCH();
+       {
+               CurrentMemoryContext = 1;
+               ErrorData *edata = CopyErrorData();
+
+               /* Validate the type of expected error */
+               assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+               assert_true(edata->elevel == ERROR);
+               assert_string_equal(edata->message, err_msg->data);
+
+               list_free(fragments);
+               pfree(err_msg->data);
+               pfree(err_msg);
+               elog_dismiss(INFO);
+               return;
+       }
+       PG_END_TRY();
+
+       /* should not reach here*/
+       assert_true(false);
+
+}
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string has less tokens then 
expected
+ */
+void
+test__GPHDUri_parse_fragment__MissingPort(void **state) {
+
+       char* fragment = "@HOST@";
+
+       List *fragments = NIL;
+
+       StringInfo err_msg = makeStringInfo();
+       appendStringInfo(err_msg, "internal error in 
pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.");
+
+       /* Expect error */
+       PG_TRY();
+       {
+               /* This will throw a ereport(ERROR).*/
+               fragments = GPHDUri_parse_fragment(fragment, fragments);
+       }
+       PG_CATCH();
+       {
+               CurrentMemoryContext = 1;
+               ErrorData *edata = CopyErrorData();
+
+               /* Validate the type of expected error */
+               assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+               assert_true(edata->elevel == ERROR);
+               assert_string_equal(edata->message, err_msg->data);
+
+               list_free(fragments);
+               pfree(err_msg->data);
+               pfree(err_msg);
+
+               elog_dismiss(INFO);
+               return;
+       }
+       PG_END_TRY();
+
+       /* should not reach here*/
+       assert_true(false);
+
+}
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string has less tokens then 
expected
+ */
+void
+test__GPHDUri_parse_fragment__MissingSourceName(void **state) {
+
+       char* fragment = "@HOST@PORT@";
+
+       List *fragments = NIL;
+
+       StringInfo err_msg = makeStringInfo();
+       appendStringInfo(err_msg, "internal error in 
pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.");
+
+       /* Expect error */
+       PG_TRY();
+       {
+               /* This will throw a ereport(ERROR).*/
+               fragments = GPHDUri_parse_fragment(fragment, fragments);
+       }
+       PG_CATCH();
+       {
+               CurrentMemoryContext = 1;
+               ErrorData *edata = CopyErrorData();
+
+               /* Validate the type of expected error */
+               assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+               assert_true(edata->elevel == ERROR);
+               assert_string_equal(edata->message, err_msg->data);
+
+               list_free(fragments);
+               pfree(err_msg->data);
+               pfree(err_msg);
+               elog_dismiss(INFO);
+               return;
+       }
+       PG_END_TRY();
+
+       /* should not reach here*/
+       assert_true(false);
+
+}
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string has less tokens then 
expected
+ */
+void
+test__GPHDUri_parse_fragment__MissingIndex(void **state) {
+
+
+       char* fragment = "@HOST@PORT@SOURCE_NAME@";
+
+       List *fragments = NIL;
+
+       StringInfo err_msg = makeStringInfo();
+       appendStringInfo(err_msg, "internal error in 
pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.");
+
+       /* Expect error */
+       PG_TRY();
+       {
+               /* This will throw a ereport(ERROR).*/
+               fragments = GPHDUri_parse_fragment(fragment, fragments);
+       }
+       PG_CATCH();
+       {
+               CurrentMemoryContext = 1;
+               ErrorData *edata = CopyErrorData();
+
+               /* Validate the type of expected error */
+               assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+               assert_true(edata->elevel == ERROR);
+               assert_string_equal(edata->message, err_msg->data);
+
+               list_free(fragments);
+               pfree(err_msg->data);
+               pfree(err_msg);
+               elog_dismiss(INFO);
+               return;
+       }
+       PG_END_TRY();
+
+       /* should not reach here*/
+       assert_true(false);
+
+}
+
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string has less tokens then 
expected
+ */
+void
+test__GPHDUri_parse_fragment__MissingFragmentMetadata(void **state) {
+
+       char* fragment = "@HOST@PORT@SOURCE_NAME@42@";
+
+       List *fragments = NIL;
+
+       StringInfo err_msg = makeStringInfo();
+       appendStringInfo(err_msg, "internal error in 
pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.");
+
+       /* Expect error */
+       PG_TRY();
+       {
+               /* This will throw a ereport(ERROR).*/
+               fragments = GPHDUri_parse_fragment(fragment, fragments);
+       }
+       PG_CATCH();
+       {
+               CurrentMemoryContext = 1;
+               ErrorData *edata = CopyErrorData();
+
+               /* Validate the type of expected error */
+               assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+               assert_true(edata->elevel == ERROR);
+               assert_string_equal(edata->message, err_msg->data);
+
+               list_free(fragments);
+               pfree(err_msg->data);
+               pfree(err_msg);
+               elog_dismiss(INFO);
+               return;
+       }
+       PG_END_TRY();
+
+       /* should not reach here*/
+       assert_true(false);
+
+}
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string has less tokens then 
expected
+ */
+void
+test__GPHDUri_parse_fragment__MissingUserData(void **state) {
+
+       char* fragment = "HOST@REST_PORT@TABLE_NAME@INDEX@FRAGMENT_METADATA@";
+
+       List *fragments = NIL;
+
+       StringInfo err_msg = makeStringInfo();
+       appendStringInfo(err_msg, "internal error in 
pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.");
+
+       /* Expect error */
+       PG_TRY();
+       {
+               /* This will throw a ereport(ERROR).*/
+               fragments = GPHDUri_parse_fragment(fragment, fragments);
+       }
+       PG_CATCH();
+       {
+               CurrentMemoryContext = 1;
+               ErrorData *edata = CopyErrorData();
+
+               /* Validate the type of expected error */
+               assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+               assert_true(edata->elevel == ERROR);
+               assert_string_equal(edata->message, err_msg->data);
+
+               list_free(fragments);
+               pfree(err_msg->data);
+               pfree(err_msg);
+               elog_dismiss(INFO);
+               return;
+       }
+       PG_END_TRY();
+
+       /* should not reach here*/
+       assert_true(false);
+
+}
+
+
+/*
+ * Test GPHDUri_parse_fragment when fragment string has less tokens then 
expected
+ */
+void
+test__GPHDUri_parse_fragment__MissingProfile(void **state) {
+
+       char* fragment = 
"HOST@REST_PORT@TABLE_NAME@INDEX@FRAGMENT_METADATA@USER_METADATA@";
+
+       List *fragments = NIL;
+
+       StringInfo err_msg = makeStringInfo();
+       appendStringInfo(err_msg, "internal error in 
pxfuriparser.c:GPHDUri_parse_fragment. Fragment string is invalid.");
+
+       /* Expect error */
+       PG_TRY();
+       {
+               /* This will throw a ereport(ERROR).*/
+               fragments = GPHDUri_parse_fragment(fragment, fragments);
+       }
+       PG_CATCH();
+       {
+               CurrentMemoryContext = 1;
+               ErrorData *edata = CopyErrorData();
+
+               /* Validate the type of expected error */
+               assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+               assert_true(edata->elevel == ERROR);
+               assert_string_equal(edata->message, err_msg->data);
+
+               list_free(fragments);
+               pfree(err_msg->data);
+               pfree(err_msg);
+               elog_dismiss(INFO);
+               return;
+       }
+       PG_END_TRY();
+
+       /* should not reach here*/
+       assert_true(false);
+
+}
+
+
+void
+test__normalize_key_name_Positive(void **state)
+{
+       char *input_key = strdup("mIxEdCaSeVaLuE");
+       char *normalized_key = normalize_key_name(input_key);
+       assert_string_equal(normalized_key, "X-GP-MIXEDCASEVALUE");
+
+       pfree(input_key);
+       pfree(normalized_key);
+}
+
+void
+test__normalize_key_name_PositiveUpperCase(void **state)
+{
+       char *input_key = strdup("ALREADY_UPPER_CASE");
+       char *normalized_key = normalize_key_name(input_key);
+       assert_string_equal(normalized_key, "X-GP-ALREADY_UPPER_CASE");
+
+       pfree(input_key);
+       pfree(normalized_key);
+}
+
+void
+test__normalize_key_name_Negative__key_is_null(void **state)
+{
+       char *input_key = NULL;
+
+       StringInfo err_msg = makeStringInfo();
+       appendStringInfo(err_msg, "internal error in 
pxfheaders.c:normalize_key_name. Parameter key is null or empty.");
+
+       /* Expect error */
+       PG_TRY();
+       {
+               /* This will throw a ereport(ERROR).*/
+               char *normalized_key = normalize_key_name(input_key);
+       }
+       PG_CATCH();
+       {
+               CurrentMemoryContext = 1;
+               ErrorData *edata = CopyErrorData();
+
+               /* Validate the type of expected error */
+               assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+               assert_true(edata->elevel == ERROR);
+               assert_string_equal(edata->message, err_msg->data);
+
+               pfree(err_msg->data);
+               pfree(err_msg);
+
+               return;
+       }
+       PG_END_TRY();
+
+       /* should not reach here*/
+       assert_true(false);
+
+}
+
+void
+test__normalize_key_name_Negative__key_is_empty(void **state)
+{
+       char *input_key = "";
+
+       StringInfo err_msg = makeStringInfo();
+       appendStringInfo(err_msg, "internal error in 
pxfheaders.c:normalize_key_name. Parameter key is null or empty.");
+
+       /* Expect error */
+       PG_TRY();
+       {
+               /* This will throw a ereport(ERROR).*/
+               char *normalized_key = normalize_key_name(input_key);
+       }
+       PG_CATCH();
+       {
+               CurrentMemoryContext = 1;
+               ErrorData *edata = CopyErrorData();
+
+               /* Validate the type of expected error */
+               assert_true(edata->sqlerrcode == ERRCODE_INTERNAL_ERROR);
+               assert_true(edata->elevel == ERROR);
+               assert_string_equal(edata->message, err_msg->data);
+
+               pfree(err_msg->data);
+               pfree(err_msg);
+
+               return;
+       }
+       PG_END_TRY();
+
+       /* should not reach here*/
+       assert_true(false);
+
+}
+
 int 
 main(int argc, char* argv[]) 
 {
+
        cmockery_parse_arguments(argc, argv);
 
        const UnitTest tests[] = {
@@ -449,7 +991,21 @@ main(int argc, char* argv[])
                        
unit_test(test__GPHDUri_verify_no_duplicate_options__ValidURI),
                        
unit_test(test__GPHDUri_verify_no_duplicate_options__NegativeTestDuplicateOpts),
                        
unit_test(test__GPHDUri_verify_core_options_exist__ValidURI),
-                       
unit_test(test__GPHDUri_verify_core_options_exist__NegativeTestMissingCoreOpts)
+                       
unit_test(test__GPHDUri_verify_core_options_exist__NegativeTestMissingCoreOpts),
+                       unit_test(test__GPHDUri_parse_fragment__EmptyProfile),
+                       unit_test(test__GPHDUri_parse_fragment__ValidFragment),
+                       unit_test(test__GPHDUri_parse_fragment__EmptyString),
+                       unit_test(test__GPHDUri_parse_fragment__MissingIpHost),
+                       unit_test(test__GPHDUri_parse_fragment__MissingPort),
+                       
unit_test(test__GPHDUri_parse_fragment__MissingSourceName),
+                       unit_test(test__GPHDUri_parse_fragment__MissingIndex),
+                       
unit_test(test__GPHDUri_parse_fragment__MissingFragmentMetadata),
+                       
unit_test(test__GPHDUri_parse_fragment__MissingUserData),
+                       unit_test(test__GPHDUri_parse_fragment__MissingProfile),
+                       unit_test(test__normalize_key_name_Positive),
+                       unit_test(test__normalize_key_name_PositiveUpperCase),
+                       
unit_test(test__normalize_key_name_Negative__key_is_null),
+                       
unit_test(test__normalize_key_name_Negative__key_is_empty)
        };
        return run_tests(tests);
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/bin/gpfusion/gpbridgeapi.c
----------------------------------------------------------------------
diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c
index 2751b1b..b524df8 100644
--- a/src/bin/gpfusion/gpbridgeapi.c
+++ b/src/bin/gpfusion/gpbridgeapi.c
@@ -226,6 +226,17 @@ void set_current_fragment_headers(gphadoop_context* 
context)
                churl_headers_remove(context->churl_headers, 
"X-GP-FRAGMENT-USER-DATA", true);
        }
 
+       /* if current fragment has optimal profile set it*/
+       if (frag_data->profile)
+       {
+               churl_headers_override(context->churl_headers, "X-GP-PROFILE", 
frag_data->profile);
+       } else if (context->gphd_uri->profile)
+       {
+               /* if current fragment doesn't have any optimal profile, set to 
use profile from url */
+               churl_headers_override(context->churl_headers, "X-GP-PROFILE", 
context->gphd_uri->profile);
+       }
+       /* if there is no profile passed in url, we expect to have 
accessor+fragmenter+resolver so no action needed by this point */
+
 }
 
 void gpbridge_import_start(PG_FUNCTION_ARGS)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/include/access/pxfmasterapi.h
----------------------------------------------------------------------
diff --git a/src/include/access/pxfmasterapi.h 
b/src/include/access/pxfmasterapi.h
index 4b9ecd9..b487b41 100644
--- a/src/include/access/pxfmasterapi.h
+++ b/src/include/access/pxfmasterapi.h
@@ -56,6 +56,7 @@ typedef struct sDataFragment
        List *replicas;
        char *fragment_md; /* fragment meta data (start, end, length, etc.) */
        char *user_data;
+       char *profile;
 } DataFragment;
 
 /*

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/65e1ed10/src/include/access/pxfuriparser.h
----------------------------------------------------------------------
diff --git a/src/include/access/pxfuriparser.h 
b/src/include/access/pxfuriparser.h
index d050325..ac614c5 100644
--- a/src/include/access/pxfuriparser.h
+++ b/src/include/access/pxfuriparser.h
@@ -46,6 +46,7 @@ typedef struct FragmentData
        char     *source_name;
        char     *fragment_md;
        char     *user_data;
+       char     *profile;
 } FragmentData;
 
 typedef struct OptionData
@@ -66,6 +67,7 @@ typedef struct GPHDUri
        char                    *host;          /* host name str                
        */
        char                    *port;          /* port number as string        
*/
        char                    *data;      /* data location (path)     */
+       char                    *profile;   /* profile option                   
*/
        List                    *fragments; /* list of FragmentData             
*/
 
        /* options */
@@ -88,5 +90,6 @@ int            GPHDUri_get_value_for_opt(GPHDUri *uri, char 
*key, char **val, bool emit_e
 bool    RelationIsExternalPxfReadOnly(Relation rel, StringInfo location);
 void    GPHDUri_verify_no_duplicate_options(GPHDUri *uri);
 void    GPHDUri_verify_core_options_exist(GPHDUri *uri, List *coreOptions);
+char* normalize_key_name(const char* key);
 
 #endif // _PXF_URIPARSER_H_


Reply via email to