Repository: incubator-hawq
Updated Branches:
  refs/heads/master 9578ab04c -> 816782bd8


HAWQ-1564. Add Pluggable Storage Dependent Information

The info added are mainly about external URI, block location, file splits and 
formatter action.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/816782bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/816782bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/816782bd

Branch: refs/heads/master
Commit: 816782bd84c5ac001a86afa907a81794530e3433
Parents: 9578ab0
Author: Chiyang Wan <chiyang10...@gmail.com>
Authored: Tue Dec 5 09:57:02 2017 +0800
Committer: Chiyang Wan <chiyang10...@gmail.com>
Committed: Wed Dec 6 12:35:34 2017 +0800

----------------------------------------------------------------------
 src/backend/access/external/fileam.c | 26 +++++++++++++------------
 src/backend/access/external/url.c    | 32 +++++++++++++++++++++++++------
 src/backend/nodes/copyfuncs.c        |  1 +
 src/backend/nodes/outfast.c          |  1 +
 src/backend/nodes/outfuncs.c         |  1 +
 src/backend/nodes/readfast.c         |  1 +
 src/include/access/extprotocol.h     | 22 +++++++++++++++++++--
 src/include/access/filesplit.h       |  1 +
 src/include/access/formatter.h       |  7 ++++++-
 src/include/access/relscan.h         | 15 +++++++++++++++
 src/include/access/url.h             |  2 +-
 11 files changed, 87 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/access/external/fileam.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/fileam.c 
b/src/backend/access/external/fileam.c
index 099dae5..6a59b95 100644
--- a/src/backend/access/external/fileam.c
+++ b/src/backend/access/external/fileam.c
@@ -79,7 +79,7 @@
 #include "cdb/cdbutil.h"
 #include "cdb/cdbvars.h"
 
-static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir, 
ExternalSelectDesc desc);
+static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir, 
ExternalSelectDesc desc, ScanState *ss);
 static void InitParseState(CopyState pstate, Relation relation,
                                                   Datum* values, bool* nulls, 
bool writable,
                                                   List *fmtOpts, char fmtType,
@@ -97,7 +97,7 @@ static void 
FunctionCallPrepareFormatter(FunctionCallInfoData*        fcinfo,
 
 static void open_external_readable_source(FileScanDesc scan);
 static void open_external_writable_source(ExternalInsertDesc extInsertDesc);
-static int     external_getdata(URL_FILE *extfile, CopyState pstate, int 
maxread, ExternalSelectDesc desc);
+static int     external_getdata(URL_FILE *extfile, CopyState pstate, int 
maxread, ExternalSelectDesc desc, ScanState *ss);
 static void external_senddata(URL_FILE *extfile, CopyState pstate);
 static void external_scan_error_callback(void *arg);
 void readHeaderLine(CopyState pstate);
@@ -487,6 +487,7 @@ HeapTuple
 external_getnext(FileScanDesc scan, ScanDirection direction, 
ExternalSelectDesc desc)
 {
        HeapTuple       tuple;
+       ScanState *ss = NULL; /* a temporary dummy for the following steps */
 
        if (scan->fs_noop)
                return NULL;
@@ -508,7 +509,7 @@ external_getnext(FileScanDesc scan, ScanDirection 
direction, ExternalSelectDesc
        /* Note: no locking manipulations needed */
        FILEDEBUG_1;
 
-       tuple = externalgettup(scan, direction, desc);
+       tuple = externalgettup(scan, direction, desc, ss);
 
 
        if (tuple == NULL)
@@ -991,7 +992,7 @@ static DataLineStatus parse_next_line(FileScanDesc scan)
 }
 
 static HeapTuple
-externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc)
+externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc, ScanState 
*ss)
 {
                HeapTuple       tuple = NULL;
                CopyState       pstate = scan->fs_pstate;
@@ -1003,7 +1004,7 @@ externalgettup_defined(FileScanDesc scan, 
ExternalSelectDesc desc)
                        /* need to fill our buffer with data? */
                        if (pstate->raw_buf_done)
                        {
-                           pstate->bytesread = 
external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc);
+                           pstate->bytesread = 
external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
                                pstate->begloc = pstate->raw_buf;
                                pstate->raw_buf_done = (pstate->bytesread==0);
                                pstate->raw_buf_index = 0;
@@ -1094,7 +1095,7 @@ externalgettup_defined(FileScanDesc scan, 
ExternalSelectDesc desc)
 }
 
 static HeapTuple
-externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc)
+externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc, ScanState 
*ss)
 {
                HeapTuple       tuple;
                CopyState               pstate = scan->fs_pstate;
@@ -1110,7 +1111,7 @@ externalgettup_custom(FileScanDesc scan, 
ExternalSelectDesc desc)
                        /* need to fill our buffer with data? */
                        if (pstate->raw_buf_done)
                        {
-                               int      bytesread = 
external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc);
+                               int      bytesread = 
external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
                                if ( bytesread > 0 )
                                        
appendBinaryStringInfo(&formatter->fmt_databuf, pstate->raw_buf, bytesread);
                                pstate->raw_buf_done = false;
@@ -1252,7 +1253,7 @@ externalgettup_custom(FileScanDesc scan, 
ExternalSelectDesc desc)
 */
 static HeapTuple
 externalgettup(FileScanDesc scan,
-                   ScanDirection dir __attribute__((unused)), 
ExternalSelectDesc desc)
+                   ScanDirection dir __attribute__((unused)), 
ExternalSelectDesc desc, ScanState *ss)
 {
 
        CopyState       pstate = scan->fs_pstate;
@@ -1274,9 +1275,9 @@ externalgettup(FileScanDesc scan,
        }
 
        if (!custom)
-               return externalgettup_defined(scan, desc); /* text/csv */
+               return externalgettup_defined(scan, desc, ss); /* text/csv */
        else
-               return externalgettup_custom(scan, desc);  /* custom   */
+               return externalgettup_custom(scan, desc, ss);  /* custom   */
 
 }
 /*
@@ -1769,7 +1770,7 @@ close_external_source(FILE *dataSource, bool failOnError, 
const char *relname)
  * get a chunk of data from the external data file.
  */
 static int
-external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, 
ExternalSelectDesc desc)
+external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, 
ExternalSelectDesc desc, ScanState *ss)
 {
        int                     bytesread = 0;
 
@@ -1781,7 +1782,8 @@ external_getdata(URL_FILE *extfile, CopyState pstate, int 
maxread, ExternalSelec
        */
 
 
-       bytesread = url_fread((void *) pstate->raw_buf, 1, maxread, extfile, 
pstate, desc);
+       bytesread = url_fread((void *) pstate->raw_buf, 1, maxread, extfile, 
pstate, desc,
+                                                 (ss == NULL ? NULL : 
&(ss->splits)));
 
        if (url_feof(extfile, bytesread))
        {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/access/external/url.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/url.c 
b/src/backend/access/external/url.c
index 7f586db..7aadc95 100644
--- a/src/backend/access/external/url.c
+++ b/src/backend/access/external/url.c
@@ -24,6 +24,7 @@
 #include <sys/wait.h>
 
 #include "access/fileam.h"
+#include "access/filesplit.h"
 #include "access/heapam.h"
 #include "access/valid.h"
 #include "catalog/pg_extprotocol.h"
@@ -88,7 +89,8 @@ static int32  InvokeExtProtocol(void          *ptr,
                                                                URL_FILE        
*file, 
                                                                CopyState       
pstate,
                                                                bool            
last_call,
-                                                               
ExternalSelectDesc desc);
+                                                               
ExternalSelectDesc desc,
+                                                               List            
**psplits);
 void extract_http_domain(char* i_path, char* o_domain, int dlen);
 
 
@@ -1272,7 +1274,7 @@ url_fclose(URL_FILE *file, bool failOnError, const char 
*relname)
                        
                        /* last call. let the user close custom resources */
                        if(file->u.custom.protocol_udf)
-                               (void) InvokeExtProtocol(NULL, 0, file, NULL, 
true, NULL);
+                               (void) InvokeExtProtocol(NULL, 0, file, NULL, 
true, NULL, NULL);
 
                        /* now clean up everything not cleaned by user */
                        MemoryContextDelete(file->u.custom.protcxt);
@@ -1776,7 +1778,7 @@ static size_t curl_fwrite(char *buf, int nbytes, 
URL_FILE* file, CopyState pstat
 
 
 size_t
-url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState 
pstate, ExternalSelectDesc desc)
+url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState 
pstate, ExternalSelectDesc desc, List **splits)
 {
     size_t     want;
        int     n;
@@ -1823,7 +1825,7 @@ url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE 
*file, CopyState pstate
 
                case CFTYPE_CUSTOM:
                        
-                       want = (size_t) InvokeExtProtocol(ptr, nmemb * size, 
file, pstate, false, desc);
+                       want = (size_t) InvokeExtProtocol(ptr, nmemb * size, 
file, pstate, false, desc, splits);
                        break;
                                
                default: /* unknown or supported type */
@@ -1861,7 +1863,7 @@ url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE 
*file, CopyState pstat
                
                case CFTYPE_CUSTOM:
                                                
-                       want = (size_t) InvokeExtProtocol(ptr, nmemb * size, 
file, pstate, false, NULL);
+                       want = (size_t) InvokeExtProtocol(ptr, nmemb * size, 
file, pstate, false, NULL, NULL);
                        break;
                        
                default: /* unknown or unsupported type */
@@ -2299,7 +2301,8 @@ InvokeExtProtocol(void                    *ptr,
                                  URL_FILE              *file, 
                                  CopyState      pstate,
                                  bool                   last_call,
-                                 ExternalSelectDesc desc)
+                                 ExternalSelectDesc desc,
+                                 List             **psplits)
 {
        FunctionCallInfoData    fcinfo;
        ExtProtocolData*                extprotocol = 
file->u.custom.extprotocol;
@@ -2318,7 +2321,24 @@ InvokeExtProtocol(void                   *ptr,
        extprotocol->prot_scanquals = file->u.custom.scanquals;
        extprotocol->prot_last_call = last_call;
        extprotocol->desc = desc;
+       extprotocol->splits = NULL;
        
+       if (psplits != NULL && *psplits != NULL) {
+               /*
+                * We move to read splits from arg to this context structure, 
so that
+                * means we passed split data only the first time this is 
called.
+                */
+               while( list_length(*psplits)>0 )
+               {
+                       FileSplit split = 
(FileSplit)lfirst(list_head(*psplits));
+                       elog(LOG, "split %s:" INT64_FORMAT ", " INT64_FORMAT,
+                                         split->ext_file_uri_string,
+                                         split->offsets, split->lengths);
+                       extprotocol->splits = lappend(extprotocol->splits, 
split);
+                       *psplits = list_delete_first(*psplits);
+               }
+       }
+
        InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo, 
                                                         /* FmgrInfo */ 
extprotocol_udf, 
                                                         /* nArgs */ 0, 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/nodes/copyfuncs.c
----------------------------------------------------------------------
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index b16db04..b57c139 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4219,6 +4219,7 @@ _copyFileSplitNode(FileSplitNode *from)
   COPY_SCALAR_FIELD(logiceof);
   COPY_SCALAR_FIELD(offsets);
   COPY_SCALAR_FIELD(lengths);
+  COPY_STRING_FIELD(ext_file_uri_string);
 
   return newnode;
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/nodes/outfast.c
----------------------------------------------------------------------
diff --git a/src/backend/nodes/outfast.c b/src/backend/nodes/outfast.c
index 348bb2c..749e1d3 100644
--- a/src/backend/nodes/outfast.c
+++ b/src/backend/nodes/outfast.c
@@ -2201,6 +2201,7 @@ _outFileSplitNode(StringInfo str, FileSplitNode *node)
        WRITE_INT64_FIELD(logiceof);
        WRITE_INT64_FIELD(offsets);
        WRITE_INT64_FIELD(lengths);
+       WRITE_STRING_FIELD(ext_file_uri_string);
 }
 
 static void

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/nodes/outfuncs.c
----------------------------------------------------------------------
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 03a2550..cf6bf04 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -4053,6 +4053,7 @@ _outFileSplitNode(StringInfo str, FileSplitNode *node)
        WRITE_INT64_FIELD(logiceof);
        WRITE_INT64_FIELD(offsets);
        WRITE_INT64_FIELD(lengths);
+       WRITE_STRING_FIELD(ext_file_uri_string);
 }
 
 static void

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/nodes/readfast.c
----------------------------------------------------------------------
diff --git a/src/backend/nodes/readfast.c b/src/backend/nodes/readfast.c
index a77a217..1be9620 100644
--- a/src/backend/nodes/readfast.c
+++ b/src/backend/nodes/readfast.c
@@ -2294,6 +2294,7 @@ _readFileSplitNode(const char **str)
        READ_INT64_FIELD(logiceof);
        READ_INT64_FIELD(offsets);
        READ_INT64_FIELD(lengths);
+       READ_STRING_FIELD(ext_file_uri_string);
        READ_DONE();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/extprotocol.h
----------------------------------------------------------------------
diff --git a/src/include/access/extprotocol.h b/src/include/access/extprotocol.h
index c1aa724..674284b 100644
--- a/src/include/access/extprotocol.h
+++ b/src/include/access/extprotocol.h
@@ -50,7 +50,7 @@ typedef struct ExtProtocolData
        bool                    prot_last_call;
        List               *prot_scanquals;
        ExternalSelectDesc desc;
-
+       List               *splits;                     /* splits to read from 
external protocol */
 } ExtProtocolData;
 
 typedef ExtProtocolData *ExtProtocol;
@@ -81,6 +81,17 @@ typedef enum ValidatorDirection
 } ValidatorDirection;
 
 /*
+ * Indicate the validator to validate arguments when creating external table or
+ * let validator fetch block location information. This design is to avoid
+ * changing catalog table.
+ */
+typedef enum ValidatorAction
+{
+       EXT_VALID_ACT_ARGUMENTS,
+       EXT_VALID_ACT_GETBLKLOC
+} ValidatorAction;
+
+/*
  * ExtProtocolValidatorData is the node type that is passed as fmgr "context" 
info
  * when a function is called by the External Table protocol manager.
  */
@@ -91,9 +102,16 @@ typedef struct ExtProtocolValidatorData
        List                            *format_opts;
        ValidatorDirection       direction;  /* validating read or write? */
        char                            *errmsg;                  /* the 
validation error upon return, if any */
-       
+       ValidatorAction          action;        /* indicate what action should 
be done. */
+       bool                            forceCreateDir;
 } ExtProtocolValidatorData;
 
+typedef struct ExtProtocolBlockLocationData
+{
+       NodeTag                          type;
+       List                            *files;         /* List of 
blocklocation_file*/
+} ExtProtocolBlockLocationData;
+
 typedef ExtProtocolValidatorData *ExtProtocolValidator;
 
 #define CALLED_AS_EXTPROTOCOL_VALIDATOR(fcinfo) \

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/filesplit.h
----------------------------------------------------------------------
diff --git a/src/include/access/filesplit.h b/src/include/access/filesplit.h
index e627b07..2284d2a 100644
--- a/src/include/access/filesplit.h
+++ b/src/include/access/filesplit.h
@@ -40,6 +40,7 @@ typedef struct FileSplitNode
        int64 logiceof;
        int64 offsets;
        int64 lengths;
+       char *ext_file_uri_string;
 } FileSplitNode;
 
 typedef FileSplitNode *FileSplit;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/formatter.h
----------------------------------------------------------------------
diff --git a/src/include/access/formatter.h b/src/include/access/formatter.h
index 326002c..f87afc2 100644
--- a/src/include/access/formatter.h
+++ b/src/include/access/formatter.h
@@ -48,7 +48,7 @@ typedef enum FmtNotification
 typedef struct FormatterData
 {
        NodeTag                 type;                 /* see T_FormatterData */
-       
+
        /* metadata */
        Relation        fmt_relation;
        TupleDesc       fmt_tupDesc;
@@ -74,6 +74,11 @@ typedef struct FormatterData
        bool                    fmt_needs_transcoding;
        FmgrInfo*               fmt_conversion_proc;
        int                             fmt_external_encoding;
+
+       /* external url */
+       char               *fmt_url;
+       /* splits */
+       List               *fmt_splits;
                
 } FormatterData;
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/relscan.h
----------------------------------------------------------------------
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index bff990a..5fc6c76 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -18,9 +18,14 @@
 #include "access/skey.h"
 #include "access/memtup.h"
 #include "access/aosegfiles.h"
+#include "access/plugstorage_utils.h"
+#include "nodes/plannodes.h"
 #include "storage/bufpage.h"
 #include "utils/tqual.h"
 
+/* forward declaration from nodes/execnodes.h */
+typedef struct ScanState ScanState;
+
 typedef struct HeapScanDescData
 {
        /* scan parameters */
@@ -127,6 +132,16 @@ typedef struct FileScanDescData
        /* custom data formatter */
        FormatterData *fs_formatter;
        
+       /* formatter type and name */
+       int fs_formatter_type;
+       char *fs_formatter_name;
+
+       /* current scan information for pluggable format */
+       PlugStorageScanFuncs fs_ps_scan_funcs;   /* scan functions */
+       void *fs_ps_user_data;                   /* user data */
+       ScanState *fs_ps_scan_state;             /* support rescan */
+       Plan *fs_ps_plan;                        /* support rescan */
+
 }      FileScanDescData;
 
 typedef FileScanDescData *FileScanDesc;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/url.h
----------------------------------------------------------------------
diff --git a/src/include/access/url.h b/src/include/access/url.h
index 307656a..b1ec102 100644
--- a/src/include/access/url.h
+++ b/src/include/access/url.h
@@ -159,7 +159,7 @@ extern URL_FILE *url_fopen(char *url, bool forwrite, 
extvar_t *ev, CopyState pst
 extern int url_fclose(URL_FILE *file, bool failOnError, const char *relname);
 extern bool url_feof(URL_FILE *file, int bytesread);
 extern bool url_ferror(URL_FILE *file, int bytesread, char *ebuf, int ebuflen);
-extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, 
CopyState pstate, ExternalSelectDesc desc);
+extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, 
CopyState pstate, ExternalSelectDesc desc, List **psplits);
 extern size_t url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE *file, 
CopyState pstate);
 extern void url_rewind(URL_FILE *file, const char *relname);
 extern void url_fflush(URL_FILE *file, CopyState pstate);

Reply via email to