Repository: incubator-hawq Updated Branches: refs/heads/master 9578ab04c -> 816782bd8
HAWQ-1564. Add Pluggable Storage Dependent Information The info added are mainly about external URI, block location, file splits and formatter action. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/816782bd Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/816782bd Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/816782bd Branch: refs/heads/master Commit: 816782bd84c5ac001a86afa907a81794530e3433 Parents: 9578ab0 Author: Chiyang Wan <chiyang10...@gmail.com> Authored: Tue Dec 5 09:57:02 2017 +0800 Committer: Chiyang Wan <chiyang10...@gmail.com> Committed: Wed Dec 6 12:35:34 2017 +0800 ---------------------------------------------------------------------- src/backend/access/external/fileam.c | 26 +++++++++++++------------ src/backend/access/external/url.c | 32 +++++++++++++++++++++++++------ src/backend/nodes/copyfuncs.c | 1 + src/backend/nodes/outfast.c | 1 + src/backend/nodes/outfuncs.c | 1 + src/backend/nodes/readfast.c | 1 + src/include/access/extprotocol.h | 22 +++++++++++++++++++-- src/include/access/filesplit.h | 1 + src/include/access/formatter.h | 7 ++++++- src/include/access/relscan.h | 15 +++++++++++++++ src/include/access/url.h | 2 +- 11 files changed, 87 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/access/external/fileam.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/fileam.c b/src/backend/access/external/fileam.c index 099dae5..6a59b95 100644 --- a/src/backend/access/external/fileam.c +++ b/src/backend/access/external/fileam.c @@ -79,7 +79,7 @@ #include "cdb/cdbutil.h" #include "cdb/cdbvars.h" -static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir, ExternalSelectDesc desc); +static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir, ExternalSelectDesc desc, ScanState *ss); static void InitParseState(CopyState pstate, Relation relation, Datum* values, bool* nulls, bool writable, List *fmtOpts, char fmtType, @@ -97,7 +97,7 @@ static void FunctionCallPrepareFormatter(FunctionCallInfoData* fcinfo, static void open_external_readable_source(FileScanDesc scan); static void open_external_writable_source(ExternalInsertDesc extInsertDesc); -static int external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc); +static int external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc, ScanState *ss); static void external_senddata(URL_FILE *extfile, CopyState pstate); static void external_scan_error_callback(void *arg); void readHeaderLine(CopyState pstate); @@ -487,6 +487,7 @@ HeapTuple external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc desc) { HeapTuple tuple; + ScanState *ss = NULL; /* a temporary dummy for the following steps */ if (scan->fs_noop) return NULL; @@ -508,7 +509,7 @@ external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc /* Note: no locking manipulations needed */ FILEDEBUG_1; - tuple = externalgettup(scan, direction, desc); + tuple = externalgettup(scan, direction, desc, ss); if (tuple == NULL) @@ -991,7 +992,7 @@ static DataLineStatus parse_next_line(FileScanDesc scan) } static HeapTuple -externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc) +externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss) { HeapTuple tuple = NULL; CopyState pstate = scan->fs_pstate; @@ -1003,7 +1004,7 @@ externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc) /* need to fill our buffer with data? */ if (pstate->raw_buf_done) { - pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc); + pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss); pstate->begloc = pstate->raw_buf; pstate->raw_buf_done = (pstate->bytesread==0); pstate->raw_buf_index = 0; @@ -1094,7 +1095,7 @@ externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc) } static HeapTuple -externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc) +externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss) { HeapTuple tuple; CopyState pstate = scan->fs_pstate; @@ -1110,7 +1111,7 @@ externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc) /* need to fill our buffer with data? */ if (pstate->raw_buf_done) { - int bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc); + int bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss); if ( bytesread > 0 ) appendBinaryStringInfo(&formatter->fmt_databuf, pstate->raw_buf, bytesread); pstate->raw_buf_done = false; @@ -1252,7 +1253,7 @@ externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc) */ static HeapTuple externalgettup(FileScanDesc scan, - ScanDirection dir __attribute__((unused)), ExternalSelectDesc desc) + ScanDirection dir __attribute__((unused)), ExternalSelectDesc desc, ScanState *ss) { CopyState pstate = scan->fs_pstate; @@ -1274,9 +1275,9 @@ externalgettup(FileScanDesc scan, } if (!custom) - return externalgettup_defined(scan, desc); /* text/csv */ + return externalgettup_defined(scan, desc, ss); /* text/csv */ else - return externalgettup_custom(scan, desc); /* custom */ + return externalgettup_custom(scan, desc, ss); /* custom */ } /* @@ -1769,7 +1770,7 @@ close_external_source(FILE *dataSource, bool failOnError, const char *relname) * get a chunk of data from the external data file. */ static int -external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc) +external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc, ScanState *ss) { int bytesread = 0; @@ -1781,7 +1782,8 @@ external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelec */ - bytesread = url_fread((void *) pstate->raw_buf, 1, maxread, extfile, pstate, desc); + bytesread = url_fread((void *) pstate->raw_buf, 1, maxread, extfile, pstate, desc, + (ss == NULL ? NULL : &(ss->splits))); if (url_feof(extfile, bytesread)) { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/access/external/url.c ---------------------------------------------------------------------- diff --git a/src/backend/access/external/url.c b/src/backend/access/external/url.c index 7f586db..7aadc95 100644 --- a/src/backend/access/external/url.c +++ b/src/backend/access/external/url.c @@ -24,6 +24,7 @@ #include <sys/wait.h> #include "access/fileam.h" +#include "access/filesplit.h" #include "access/heapam.h" #include "access/valid.h" #include "catalog/pg_extprotocol.h" @@ -88,7 +89,8 @@ static int32 InvokeExtProtocol(void *ptr, URL_FILE *file, CopyState pstate, bool last_call, - ExternalSelectDesc desc); + ExternalSelectDesc desc, + List **psplits); void extract_http_domain(char* i_path, char* o_domain, int dlen); @@ -1272,7 +1274,7 @@ url_fclose(URL_FILE *file, bool failOnError, const char *relname) /* last call. let the user close custom resources */ if(file->u.custom.protocol_udf) - (void) InvokeExtProtocol(NULL, 0, file, NULL, true, NULL); + (void) InvokeExtProtocol(NULL, 0, file, NULL, true, NULL, NULL); /* now clean up everything not cleaned by user */ MemoryContextDelete(file->u.custom.protcxt); @@ -1776,7 +1778,7 @@ static size_t curl_fwrite(char *buf, int nbytes, URL_FILE* file, CopyState pstat size_t -url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc) +url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc, List **splits) { size_t want; int n; @@ -1823,7 +1825,7 @@ url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate case CFTYPE_CUSTOM: - want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, desc); + want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, desc, splits); break; default: /* unknown or supported type */ @@ -1861,7 +1863,7 @@ url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstat case CFTYPE_CUSTOM: - want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, NULL); + want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, NULL, NULL); break; default: /* unknown or unsupported type */ @@ -2299,7 +2301,8 @@ InvokeExtProtocol(void *ptr, URL_FILE *file, CopyState pstate, bool last_call, - ExternalSelectDesc desc) + ExternalSelectDesc desc, + List **psplits) { FunctionCallInfoData fcinfo; ExtProtocolData* extprotocol = file->u.custom.extprotocol; @@ -2318,7 +2321,24 @@ InvokeExtProtocol(void *ptr, extprotocol->prot_scanquals = file->u.custom.scanquals; extprotocol->prot_last_call = last_call; extprotocol->desc = desc; + extprotocol->splits = NULL; + if (psplits != NULL && *psplits != NULL) { + /* + * We move to read splits from arg to this context structure, so that + * means we passed split data only the first time this is called. + */ + while( list_length(*psplits)>0 ) + { + FileSplit split = (FileSplit)lfirst(list_head(*psplits)); + elog(LOG, "split %s:" INT64_FORMAT ", " INT64_FORMAT, + split->ext_file_uri_string, + split->offsets, split->lengths); + extprotocol->splits = lappend(extprotocol->splits, split); + *psplits = list_delete_first(*psplits); + } + } + InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo, /* FmgrInfo */ extprotocol_udf, /* nArgs */ 0, http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/nodes/copyfuncs.c ---------------------------------------------------------------------- diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index b16db04..b57c139 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4219,6 +4219,7 @@ _copyFileSplitNode(FileSplitNode *from) COPY_SCALAR_FIELD(logiceof); COPY_SCALAR_FIELD(offsets); COPY_SCALAR_FIELD(lengths); + COPY_STRING_FIELD(ext_file_uri_string); return newnode; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/nodes/outfast.c ---------------------------------------------------------------------- diff --git a/src/backend/nodes/outfast.c b/src/backend/nodes/outfast.c index 348bb2c..749e1d3 100644 --- a/src/backend/nodes/outfast.c +++ b/src/backend/nodes/outfast.c @@ -2201,6 +2201,7 @@ _outFileSplitNode(StringInfo str, FileSplitNode *node) WRITE_INT64_FIELD(logiceof); WRITE_INT64_FIELD(offsets); WRITE_INT64_FIELD(lengths); + WRITE_STRING_FIELD(ext_file_uri_string); } static void http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/nodes/outfuncs.c ---------------------------------------------------------------------- diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 03a2550..cf6bf04 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -4053,6 +4053,7 @@ _outFileSplitNode(StringInfo str, FileSplitNode *node) WRITE_INT64_FIELD(logiceof); WRITE_INT64_FIELD(offsets); WRITE_INT64_FIELD(lengths); + WRITE_STRING_FIELD(ext_file_uri_string); } static void http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/nodes/readfast.c ---------------------------------------------------------------------- diff --git a/src/backend/nodes/readfast.c b/src/backend/nodes/readfast.c index a77a217..1be9620 100644 --- a/src/backend/nodes/readfast.c +++ b/src/backend/nodes/readfast.c @@ -2294,6 +2294,7 @@ _readFileSplitNode(const char **str) READ_INT64_FIELD(logiceof); READ_INT64_FIELD(offsets); READ_INT64_FIELD(lengths); + READ_STRING_FIELD(ext_file_uri_string); READ_DONE(); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/extprotocol.h ---------------------------------------------------------------------- diff --git a/src/include/access/extprotocol.h b/src/include/access/extprotocol.h index c1aa724..674284b 100644 --- a/src/include/access/extprotocol.h +++ b/src/include/access/extprotocol.h @@ -50,7 +50,7 @@ typedef struct ExtProtocolData bool prot_last_call; List *prot_scanquals; ExternalSelectDesc desc; - + List *splits; /* splits to read from external protocol */ } ExtProtocolData; typedef ExtProtocolData *ExtProtocol; @@ -81,6 +81,17 @@ typedef enum ValidatorDirection } ValidatorDirection; /* + * Indicate the validator to validate arguments when creating external table or + * let validator fetch block location information. This design is to avoid + * changing catalog table. + */ +typedef enum ValidatorAction +{ + EXT_VALID_ACT_ARGUMENTS, + EXT_VALID_ACT_GETBLKLOC +} ValidatorAction; + +/* * ExtProtocolValidatorData is the node type that is passed as fmgr "context" info * when a function is called by the External Table protocol manager. */ @@ -91,9 +102,16 @@ typedef struct ExtProtocolValidatorData List *format_opts; ValidatorDirection direction; /* validating read or write? */ char *errmsg; /* the validation error upon return, if any */ - + ValidatorAction action; /* indicate what action should be done. */ + bool forceCreateDir; } ExtProtocolValidatorData; +typedef struct ExtProtocolBlockLocationData +{ + NodeTag type; + List *files; /* List of blocklocation_file*/ +} ExtProtocolBlockLocationData; + typedef ExtProtocolValidatorData *ExtProtocolValidator; #define CALLED_AS_EXTPROTOCOL_VALIDATOR(fcinfo) \ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/filesplit.h ---------------------------------------------------------------------- diff --git a/src/include/access/filesplit.h b/src/include/access/filesplit.h index e627b07..2284d2a 100644 --- a/src/include/access/filesplit.h +++ b/src/include/access/filesplit.h @@ -40,6 +40,7 @@ typedef struct FileSplitNode int64 logiceof; int64 offsets; int64 lengths; + char *ext_file_uri_string; } FileSplitNode; typedef FileSplitNode *FileSplit; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/formatter.h ---------------------------------------------------------------------- diff --git a/src/include/access/formatter.h b/src/include/access/formatter.h index 326002c..f87afc2 100644 --- a/src/include/access/formatter.h +++ b/src/include/access/formatter.h @@ -48,7 +48,7 @@ typedef enum FmtNotification typedef struct FormatterData { NodeTag type; /* see T_FormatterData */ - + /* metadata */ Relation fmt_relation; TupleDesc fmt_tupDesc; @@ -74,6 +74,11 @@ typedef struct FormatterData bool fmt_needs_transcoding; FmgrInfo* fmt_conversion_proc; int fmt_external_encoding; + + /* external url */ + char *fmt_url; + /* splits */ + List *fmt_splits; } FormatterData; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/relscan.h ---------------------------------------------------------------------- diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index bff990a..5fc6c76 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -18,9 +18,14 @@ #include "access/skey.h" #include "access/memtup.h" #include "access/aosegfiles.h" +#include "access/plugstorage_utils.h" +#include "nodes/plannodes.h" #include "storage/bufpage.h" #include "utils/tqual.h" +/* forward declaration from nodes/execnodes.h */ +typedef struct ScanState ScanState; + typedef struct HeapScanDescData { /* scan parameters */ @@ -127,6 +132,16 @@ typedef struct FileScanDescData /* custom data formatter */ FormatterData *fs_formatter; + /* formatter type and name */ + int fs_formatter_type; + char *fs_formatter_name; + + /* current scan information for pluggable format */ + PlugStorageScanFuncs fs_ps_scan_funcs; /* scan functions */ + void *fs_ps_user_data; /* user data */ + ScanState *fs_ps_scan_state; /* support rescan */ + Plan *fs_ps_plan; /* support rescan */ + } FileScanDescData; typedef FileScanDescData *FileScanDesc; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/url.h ---------------------------------------------------------------------- diff --git a/src/include/access/url.h b/src/include/access/url.h index 307656a..b1ec102 100644 --- a/src/include/access/url.h +++ b/src/include/access/url.h @@ -159,7 +159,7 @@ extern URL_FILE *url_fopen(char *url, bool forwrite, extvar_t *ev, CopyState pst extern int url_fclose(URL_FILE *file, bool failOnError, const char *relname); extern bool url_feof(URL_FILE *file, int bytesread); extern bool url_ferror(URL_FILE *file, int bytesread, char *ebuf, int ebuflen); -extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc); +extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc, List **psplits); extern size_t url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate); extern void url_rewind(URL_FILE *file, const char *relname); extern void url_fflush(URL_FILE *file, CopyState pstate);