This is an automated email from the ASF dual-hosted git repository.

huor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hawq.git

commit f62d3f4f2f9eb6a0c9e9ab15af3b7acd1eade971
Author: oushu1tuyu1 <[email protected]>
AuthorDate: Mon Apr 29 11:35:24 2019 +0800

    HAWQ-1710. Add ORC reader implement in hawq
---
 contrib/orc/orc.c        | 875 ++++++++++++++++++++++++++++++++++++++++++++---
 contrib/orc/orc_init.sql |   4 +-
 2 files changed, 834 insertions(+), 45 deletions(-)

diff --git a/contrib/orc/orc.c b/contrib/orc/orc.c
index 0d03813..8109e62 100644
--- a/contrib/orc/orc.c
+++ b/contrib/orc/orc.c
@@ -32,6 +32,7 @@
 #include "storage/cwrapper/orc-format-c.h"
 #include "storage/cwrapper/hdfs-file-system-c.h"
 #include "cdb/cdbvars.h"
+
 #define ORC_TIMESTAMP_EPOCH_JDATE 2457024 /* == date2j(2015, 1, 1) */
 #define MAX_ORC_ARRAY_DIMS        10000
 #define ORC_NUMERIC_MAX_PRECISION 38
@@ -86,38 +87,40 @@ typedef struct ORCFormatUserData
   char **colNames;
   int *colDatatypes;
   int64_t *colDatatypeMods;
-  int numberOfColumns;
+  int32_t numberOfColumns;
   char **colRawValues;
   Datum *colValues;
   uint64_t *colValLength;
   bits8 **colValNullBitmap;
   int **colValDims;
   char **colAddresses;
-  bool *colIsNulls;
   bool *colToReads;
-  bool *colSpeedUpPossible;
-  bool *colSpeedUp;
-
-  bool *nulls;
-  Datum *datums;
-  int reserved;
-
-  TimestampType *colTimestamp;
 
   int nSplits;
   ORCFormatFileSplit *splits;
+
+  // for write only
+  TimestampType *colTimestamp;
 } ORCFormatUserData;
 
 static FmgrInfo *get_orc_function(char *formatter_name, char *function_name);
+static void get_scan_functions(FileScanDesc file_scan_desc);
 static void get_insert_functions(ExternalInsertDesc ext_insert_desc);
+static void init_format_user_data_for_read(TupleDesc tup_desc,
+    ORCFormatUserData *user_data);
 static void init_format_user_data_for_write(TupleDesc tup_desc,
     ORCFormatUserData *user_data);
 static void build_options_in_json(List *fmt_opts_defelem, int encoding,
     char **json_str, TupleDesc tupDesc);
 static ORCFormatC *create_formatter_instance(List *fmt_opts_defelem,
     int encoding, int segno, TupleDesc tupDesc);
+static void build_file_splits(Uri *uri, ScanState *scan_state,
+    ORCFormatUserData *user_data);
+static void build_tuple_descrition_for_read(Plan *plan, Relation relation,
+    ORCFormatUserData *user_data);
 static void build_tuple_descrition_for_write(Relation relation,
     ORCFormatUserData *user_data);
+static void orc_scan_error_callback(void *arg);
 static void orc_parse_format_string(CopyState pstate, char *fmtstr);
 static char *orc_strtokx2(const char *s, const char *whitespace,
     const char *delim, const char *quote, char escape, bool e_strings,
@@ -139,7 +142,7 @@ Datum orc_validate_interfaces(PG_FUNCTION_ARGS)
   if (pg_strncasecmp(psv_interface->format_name, "orc", strlen("orc")) != 0)
   {
     ereport(ERROR,
-        (errcode(ERRCODE_SYNTAX_ERROR), errmsg("orc_validate_interface : 
incorrect format name \'%s\'", psv_interface->format_name)));
+        (errcode(ERRCODE_SYNTAX_ERROR), errmsg("ORC: incorrect format name 
\'%s\'", psv_interface->format_name)));
   }
 
   PG_RETURN_VOID() ;
@@ -259,7 +262,7 @@ Datum orc_validate_options(PG_FUNCTION_ARGS)
         && strncasecmp(key, "category", strlen("category")))
     {
       ereport(ERROR,
-          (errcode(ERRCODE_SYNTAX_ERROR), errmsg("Option \"%s\" for ORC table 
is invalid", key), errhint("Format options for ORC table must be either " 
"formatter, compresstype, bloomfilter or dicthreshold"), 
errOmitLocation(true)));
+          (errcode(ERRCODE_SYNTAX_ERROR), errmsg("Option \"%s\" for ORC table 
is invalid", key), errOmitLocation(true)));
     }
 
     sprintf((char * ) format_str + len, "%s '%s' ", key, val);
@@ -301,7 +304,7 @@ Datum orc_validate_encodings(PG_FUNCTION_ARGS)
   if (strncasecmp(encoding_name, "utf8", strlen("utf8")))
   {
     ereport(ERROR,
-        (errcode(ERRCODE_SYNTAX_ERROR), errmsg("\"%s\" is not a valid encoding 
for ORC external table. " "Encoding for ORC external table must be UTF8.", 
encoding_name), errOmitLocation(true)));
+        (errcode(ERRCODE_SYNTAX_ERROR), errmsg("\"%s\" is not a valid encoding 
for ORC external table. ", encoding_name), errOmitLocation(true)));
   }
 
   PG_RETURN_VOID() ;
@@ -320,22 +323,676 @@ Datum orc_validate_datatypes(PG_FUNCTION_ARGS)
   {
     int32_t datatype =
         (int32_t) (((Form_pg_attribute) (tup_desc->attrs[i]))->atttypid);
+    int4  typmod = ((Form_pg_attribute) (tup_desc->attrs[i]))->atttypmod;
 
     if (checkORCUnsupportedDataType(datatype))
     {
       ereport(ERROR,
           (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unsupported data types %d 
for columns of external ORC table is specified.", datatype), 
errOmitLocation(true)));
     }
+    if (HAWQ_TYPE_NUMERIC == datatype)
+    {
+      int4 tmp_typmod = typmod - VARHDRSZ;
+      int precision = (tmp_typmod >> 16) & 0xffff;
+      int scale = tmp_typmod & 0xffff;
+      if (precision < 1 || 38 < precision)
+        ereport(ERROR,
+            (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("ORC DECIMAL 
precision must be between 1 and 38")));
+      if (scale == 0)
+        ereport(NOTICE, (errmsg("Using a scale of zero for ORC DECIMAL")));
+    }
+  }
+
+  PG_RETURN_VOID() ;
+}
+
+/*
+ * FileScanDesc
+ * orc_beginscan(ExternalScan *extScan,
+ *               ScanState *scanState,
+ *               Relation relation,
+ *               int formatterType,
+ *               char *formatterName)
+ */
+Datum orc_beginscan(PG_FUNCTION_ARGS)
+{
+  PlugStorage ps = (PlugStorage) (fcinfo->context);
+  ExternalScan *ext_scan = ps->ps_ext_scan;
+  ScanState *scan_state = ps->ps_scan_state;
+  Relation relation = ps->ps_relation;
+  int formatterType = ps->ps_formatter_type;
+  char *formatterName = ps->ps_formatter_name;
+
+  Index scan_rel_id = ext_scan->scan.scanrelid;
+  uint32 scan_counter = ext_scan->scancounter;
+  List *uri_list = ext_scan->uriList;
+  List *fmt_opts = ext_scan->fmtOpts;
+  int fmt_encoding = ext_scan->encoding;
+  List *scan_quals = ext_scan->scan.plan.qual;
+
+  /* 1. Increment relation reference count while scanning relation */
+  /*
+   * This is just to make really sure the relcache entry won't go away while
+   * the scan has a pointer to it.  Caller should be holding the rel open
+   * anyway, so this is redundant in all normal scenarios...
+   */
+  RelationIncrementReferenceCount(relation);
+
+  /* 2. Allocate and initialize the select descriptor */
+  FileScanDesc file_scan_desc = palloc(sizeof(FileScanDescData));
+  file_scan_desc->fs_inited = false;
+  file_scan_desc->fs_ctup.t_data = NULL;
+  ItemPointerSetInvalid(&file_scan_desc->fs_ctup.t_self);
+  file_scan_desc->fs_cbuf = InvalidBuffer;
+  file_scan_desc->fs_rd = relation;
+  file_scan_desc->fs_scanrelid = scan_rel_id;
+  file_scan_desc->fs_scancounter = scan_counter;
+  file_scan_desc->fs_scanquals = scan_quals;
+  file_scan_desc->fs_noop = false;
+  file_scan_desc->fs_file = NULL;
+  file_scan_desc->fs_formatter = NULL;
+  file_scan_desc->fs_formatter_type = formatterType;
+  file_scan_desc->fs_formatter_name = formatterName;
+
+  /* 2.1 Setup scan functions */
+  get_scan_functions(file_scan_desc);
+
+  /* 2.2 Get URI for the scan */
+  /*
+   * get the external URI assigned to us.
+   *
+   * The URI assigned for this segment is normally in the uriList list
+   * at the index of this segment id. However, if we are executing on
+   * MASTER ONLY the (one and only) entry which is destined for the master
+   * will be at the first entry of the uriList list.
+   */
+  char *uri_str = NULL;
+  int segindex = GetQEIndex();
+
+  Value *v = NULL;
+
+  v = (Value *) list_nth(uri_list, 0);
+
+  if (v->type == T_Null)
+    uri_str = NULL;
+  else
+    uri_str = (char *) strVal(v);
+
+  /*
+   * If a uri is assigned to us - get a reference to it. Some executors
+   * don't have a uri to scan (if # of uri's < # of primary segdbs).
+   * in which case uri will be NULL. If that's the case for this
+   * segdb set to no-op.
+   */
+  if (uri_str)
+  {
+    /* set external source (uri) */
+    file_scan_desc->fs_uri = uri_str;
+    ereport(DEBUG3, (errmsg_internal("fs_uri (%d) is set as %s", segindex, 
uri_str)));
+    /* NOTE: we delay actually opening the data source until 
external_getnext() */
+  }
+  else
+  {
+    /* segdb has no work to do. set to no-op */
+    file_scan_desc->fs_noop = true;
+    file_scan_desc->fs_uri = NULL;
+  }
+
+  /* 2.3 Allocate values and nulls structure */
+  TupleDesc tup_desc = RelationGetDescr(relation);
+  file_scan_desc->fs_tupDesc = tup_desc;
+  file_scan_desc->attr = tup_desc->attrs;
+  file_scan_desc->num_phys_attrs = tup_desc->natts;
+
+  file_scan_desc->values = (Datum *) palloc(
+      file_scan_desc->num_phys_attrs * sizeof(Datum));
+  file_scan_desc->nulls = (bool *) palloc(
+      file_scan_desc->num_phys_attrs * sizeof(bool));
+
+  /* 2.5 Allocate and initialize the structure which track data parsing state 
*/
+  file_scan_desc->fs_pstate = (CopyStateData *) palloc0(
+      sizeof(CopyStateData));
+
+  /* 2.5.1 Initialize basic information */
+  CopyState pstate = file_scan_desc->fs_pstate;
+  pstate->fe_eof = false;
+  pstate->eol_type = EOL_UNKNOWN;
+  pstate->eol_str = NULL;
+  pstate->cur_relname = RelationGetRelationName(relation);
+  pstate->cur_lineno = 0;
+  pstate->err_loc_type = ROWNUM_ORIGINAL;
+  pstate->cur_attname = NULL;
+  pstate->raw_buf_done = true; /* true so we will read data in first run */
+  pstate->line_done = true;
+  pstate->bytesread = 0;
+  pstate->custom = false;
+  pstate->header_line = false;
+  pstate->fill_missing = false;
+  pstate->line_buf_converted = false;
+  pstate->raw_buf_index = 0;
+  pstate->processed = 0;
+  pstate->filename = uri_str;
+  pstate->copy_dest = COPY_EXTERNAL_SOURCE;
+  pstate->missing_bytes = 0;
+  pstate->csv_mode = false;
+  pstate->custom = true;
+  pstate->custom_formatter_func = NULL;
+  pstate->custom_formatter_name = NULL;
+  pstate->rel = relation;
+
+  /* 2.5.2 Setup encoding information */
+  /*
+   * Set up encoding conversion info.  Even if the client and server
+   * encodings are the same, we must apply pg_client_to_server() to validate
+   * data in multibyte encodings.
+   *
+   * Each external table specifies the encoding of its external data. We will
+   * therefore set a client encoding and client-to-server conversion procedure
+   * in here (server-to-client in WET) and these will be used in the data
+   * conversion routines (in copy.c CopyReadLineXXX(), etc).
+   */
+  Insist(PG_VALID_ENCODING(fmt_encoding));
+  pstate->client_encoding = fmt_encoding;
+  Oid conversion_proc = FindDefaultConversionProc(fmt_encoding,
+      GetDatabaseEncoding());
+
+  if (OidIsValid(conversion_proc))
+  {
+    /* conversion proc found */
+    pstate->enc_conversion_proc = palloc(sizeof(FmgrInfo));
+    fmgr_info(conversion_proc, pstate->enc_conversion_proc);
+  }
+  else
+  {
+    /* no conversion function (both encodings are probably the same) */
+    pstate->enc_conversion_proc = NULL;
+  }
+
+  pstate->need_transcoding = pstate->client_encoding != GetDatabaseEncoding();
+  pstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(
+      pstate->client_encoding);
+
+  /* 2.5.3 Parse the data format options */
+  char *format_str = pstrdup((char *) strVal(linitial(fmt_opts)));
+
+  orc_parse_format_string(pstate, format_str);
+
+  /* 2.5.4 Generate or convert list of attributes to process */
+  pstate->attr_offsets = (int *) palloc(tup_desc->natts * sizeof(int));
+  pstate->attnumlist = CopyGetAttnums(tup_desc, relation, NIL);
+
+  /* 2.5.5 Convert FORCE NOT NULL name list to per-column flags, check 
validity */
+  pstate->force_notnull_flags = (bool *) palloc0(
+      tup_desc->natts * sizeof(bool));
+  if (pstate->force_notnull)
+  {
+    List *attnums;
+    ListCell *cur;
+
+    attnums = CopyGetAttnums(tup_desc, relation, pstate->force_notnull);
+
+    foreach(cur, attnums)
+    {
+      int attnum = lfirst_int(cur);
+      pstate->force_notnull_flags[attnum - 1] = true;
+    }
+  }
+
+  /* 2.5.6 Take care of state that is RET specific */
+  initStringInfo(&pstate->attribute_buf);
+  initStringInfo(&pstate->line_buf);
+
+  /* Set up data buffer to hold a chunk of data */
+  MemSet(pstate->raw_buf, ' ', RAW_BUF_SIZE * sizeof(char));
+  pstate->raw_buf[RAW_BUF_SIZE] = '\0';
+
+  /* 2.5.7 Create temporary memory context for per row process */
+  /*
+   * Create a temporary memory context that we can reset once per row to
+   * recover palloc'd memory.  This avoids any problems with leaks inside
+   * datatype input or output routines, and should be faster than retail
+   * pfree's anyway.
+   */
+  pstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
+      "ExtTableMemCxt",
+      ALLOCSET_DEFAULT_MINSIZE,
+      ALLOCSET_DEFAULT_INITSIZE,
+      ALLOCSET_DEFAULT_MAXSIZE);
+
+  /* 2.6 Setup formatter information */
+  file_scan_desc->fs_formatter = (FormatterData *) palloc0(
+      sizeof(FormatterData));
+  initStringInfo(&file_scan_desc->fs_formatter->fmt_databuf);
+  file_scan_desc->fs_formatter->fmt_perrow_ctx =
+      file_scan_desc->fs_pstate->rowcontext;
+  file_scan_desc->fs_formatter->fmt_user_ctx = NULL;
+
+  /* 2.7 Set up callback to identify error line number */
+  file_scan_desc->errcontext.callback = orc_scan_error_callback;
+  file_scan_desc->errcontext.arg = (void *) file_scan_desc->fs_pstate;
+  file_scan_desc->errcontext.previous = error_context_stack;
+
+  /* 3. Setup user data */
+  /* 3.1 Initialize user data */
+  ORCFormatUserData *user_data = palloc0(sizeof(ORCFormatUserData));
+  init_format_user_data_for_read(tup_desc, user_data);
+
+  /* 3.2 Create formatter instance */
+  List *fmt_opts_defelem = pstate->custom_formatter_params;
+  user_data->fmt = create_formatter_instance(fmt_opts_defelem, fmt_encoding,
+      ps->ps_segno, tup_desc);
+
+  /* 3.3 Build file splits */
+  Uri *uri = ParseExternalTableUri(uri_str);
+
+  if (enable_secure_filesystem)
+  {
+    char *token = find_filesystem_credential_with_uri(uri_str);
+    SetToken(uri_str, token);
+  }
+  file_scan_desc->fs_ps_scan_state = scan_state; /* for orc rescan */
+  build_file_splits(uri, scan_state, user_data);
+
+  FreeExternalTableUri(uri);
+
+  /* 3.4 Build tuple description */
+  Plan *plan = &(ext_scan->scan.plan);
+  file_scan_desc->fs_ps_plan = plan;
+  build_tuple_descrition_for_read(plan, relation, user_data);
+
+  /* 3.5 Save user data */
+  file_scan_desc->fs_ps_user_data = (void *) user_data;
+  /* 4. Begin scan with the formatter */
+  ORCFormatBeginORCFormatC(user_data->fmt, user_data->splits,
+      user_data->nSplits, user_data->colToReads, user_data->colNames,
+      user_data->colDatatypes, user_data->colDatatypeMods,
+      user_data->numberOfColumns);
+
+  ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
+  if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
+  {
+    ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
+  }
+
+  /* 5. Save file_scan_desc */
+  ps->ps_file_scan_desc = file_scan_desc;
+
+  PG_RETURN_POINTER(file_scan_desc);
+}
+
+/*
+ * ExternalSelectDesc
+ * orc_getnext_init(PlanState *planState,
+ *                  ExternalScanState *extScanState)
+ */
+Datum orc_getnext_init(PG_FUNCTION_ARGS)
+{
+  PlugStorage ps = (PlugStorage) (fcinfo->context);
+  PlanState *plan_state = ps->ps_plan_state;
+  ExternalScanState *ext_scan_state = ps->ps_ext_scan_state;
+
+  ExternalSelectDesc ext_select_desc = NULL;
+  /*
+   ExternalSelectDesc ext_select_desc = (ExternalSelectDesc)palloc0(
+   sizeof(ExternalSelectDescData));
+
+   Plan *rootPlan = NULL;
+
+   if (plan_state != NULL)
+   {
+   ext_select_desc->projInfo = plan_state->ps_ProjInfo;
+
+   // If we have an agg type then our parent is an Agg node
+   rootPlan = plan_state->state->es_plannedstmt->planTree;
+   if (IsA(rootPlan, Agg) && ext_scan_state->parent_agg_type)
+   {
+   ext_select_desc->agg_type = ext_scan_state->parent_agg_type;
+   }
+   }
+   */
+
+  ps->ps_ext_select_desc = ext_select_desc;
+
+  PG_RETURN_POINTER(ext_select_desc);
+}
+
+Datum orc_getnext(PG_FUNCTION_ARGS) {
+  PlugStorage ps = (PlugStorage)(fcinfo->context);
+  FileScanDesc fsd = ps->ps_file_scan_desc;
+  ORCFormatUserData *user_data = (ORCFormatUserData *)(fsd->fs_ps_user_data);
+  TupleTableSlot *slot = ps->ps_tuple_table_slot;
+  bool *nulls = slot_get_isnull(slot);
+  memset(nulls, true, user_data->numberOfColumns);
+
+  bool res = ORCFormatNextORCFormatC(user_data->fmt, user_data->colRawValues,
+                                     user_data->colValLength, nulls);
+  if (res) {
+    for (int32_t i = 0; i < user_data->numberOfColumns; ++i) {
+      // Column not to read or column is null
+      if (nulls[i]) continue;
+
+      switch (fsd->attr[i]->atttypid) {
+        case HAWQ_TYPE_BOOL: {
+          user_data->colValues[i] =
+              BoolGetDatum(*(bool *)(user_data->colRawValues[i]));
+          break;
+        }
+        case HAWQ_TYPE_INT2: {
+          user_data->colValues[i] =
+              Int16GetDatum(*(int16_t *)(user_data->colRawValues[i]));
+          break;
+        }
+        case HAWQ_TYPE_INT4: {
+          user_data->colValues[i] =
+              Int32GetDatum(*(int32_t *)(user_data->colRawValues[i]));
+          break;
+        }
+        case HAWQ_TYPE_INT8:
+        case HAWQ_TYPE_TIME:
+        case HAWQ_TYPE_TIMESTAMP:
+        case HAWQ_TYPE_TIMESTAMPTZ: {
+          user_data->colValues[i] =
+              Int64GetDatum(*(int64_t *)(user_data->colRawValues[i]));
+          break;
+        }
+        case HAWQ_TYPE_FLOAT4: {
+          user_data->colValues[i] =
+              Float4GetDatum(*(float *)(user_data->colRawValues[i]));
+          break;
+        }
+        case HAWQ_TYPE_FLOAT8: {
+          user_data->colValues[i] =
+              Float8GetDatum(*(double *)(user_data->colRawValues[i]));
+          break;
+        }
+        case HAWQ_TYPE_VARCHAR:
+        case HAWQ_TYPE_TEXT:
+        case HAWQ_TYPE_BPCHAR:
+        case HAWQ_TYPE_BYTE:
+        case HAWQ_TYPE_NUMERIC: {
+          SET_VARSIZE((struct varlena *)(user_data->colRawValues[i]),
+                      user_data->colValLength[i]);
+          user_data->colValues[i] = 
PointerGetDatum(user_data->colRawValues[i]);
+          break;
+        }
+        case HAWQ_TYPE_DATE: {
+          user_data->colValues[i] =
+              Int32GetDatum(*(int32_t *)(user_data->colRawValues[i]) -
+                            POSTGRES_EPOCH_JDATE + UNIX_EPOCH_JDATE);
+          break;
+        }
+        default: {
+          ereport(ERROR, (errmsg_internal("ORC:%d", fsd->attr[i]->atttypid)));
+
+          break;
+        }
+      }
+    }
+
+    ps->ps_has_tuple = true;
+    slot->PRIVATE_tts_values = user_data->colValues;
+    TupSetVirtualTupleNValid(slot, user_data->numberOfColumns);
+    PG_RETURN_BOOL(true);
+  }
+
+  ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
+  if (e->errCode == ERRCODE_SUCCESSFUL_COMPLETION) {
+    ORCFormatEndORCFormatC(user_data->fmt);
+    e = ORCFormatGetErrorORCFormatC(user_data->fmt);
+    if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
+      ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
+    }
+
+    ORCFormatFreeORCFormatC(&(user_data->fmt));
+
+    pfree(user_data->colRawValues);
+    pfree(user_data->colValues);
+    pfree(user_data->colToReads);
+    pfree(user_data->colValLength);
+    if (user_data->splits != NULL) {
+      for (int i = 0; i < user_data->nSplits; ++i) {
+        pfree(user_data->splits[i].fileName);
+      }
+      pfree(user_data->splits);
+    }
+    for (int i = 0; i < user_data->numberOfColumns; ++i) {
+      pfree(user_data->colNames[i]);
+    }
+
+    pfree(user_data->colNames);
+    pfree(user_data->colDatatypes);
+    pfree(user_data);
+    fsd->fs_ps_user_data = NULL;
+
+    ps->ps_has_tuple = false;
+    slot->PRIVATE_tts_values = NULL;
+    ExecClearTuple(slot);
+  } else {
+    ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
+  }
+  PG_RETURN_BOOL(false);
+}
+
+/*
+ * void
+ * orc_rescan(FileScanDesc scan)
+ */
+Datum orc_rescan(PG_FUNCTION_ARGS)
+{
+  PlugStorage ps = (PlugStorage) (fcinfo->context);
+  FileScanDesc fsd = ps->ps_file_scan_desc;
+  Relation relation = fsd->fs_rd;
+  TupleDesc tup_desc = RelationGetDescr(relation);
+
+  ORCFormatUserData *user_data = (ORCFormatUserData *) (fsd->fs_ps_user_data);
+
+  if (user_data == NULL)
+  {
+    /* 1 Initialize user data */
+    user_data = palloc0(sizeof(ORCFormatUserData));
+    init_format_user_data_for_read(fsd->fs_tupDesc, user_data);
+
+    /* 2 Create formatter instance */
+    List *fmt_opts_defelem = fsd->fs_pstate->custom_formatter_params;
+    int fmt_encoding = fsd->fs_pstate->client_encoding;
+    user_data->fmt = create_formatter_instance(fmt_opts_defelem,
+        fmt_encoding, ps->ps_segno, tup_desc);
+
+    /* 3 Build file splits */
+    Uri *uri = ParseExternalTableUri(fsd->fs_uri);
+    build_file_splits(uri, fsd->fs_ps_scan_state, user_data);
+
+    /* 4 Build tuple description */
+    Plan *plan = fsd->fs_ps_plan;
+    build_tuple_descrition_for_read(plan, fsd->fs_rd, user_data);
+
+    /* 5 Save user data */
+    fsd->fs_ps_user_data = (void *) user_data;
+
+    if (enable_secure_filesystem)
+    {
+      char *token = find_filesystem_credential_with_uri(fsd->fs_uri);
+      SetToken(fsd->fs_uri, token);
+    }
+    FreeExternalTableUri(uri);
+    /* 6 Begin scan with the formatter */
+    ORCFormatBeginORCFormatC(user_data->fmt, user_data->splits,
+        user_data->nSplits, user_data->colToReads, user_data->colNames,
+        user_data->colDatatypes, user_data->colDatatypeMods,
+        user_data->numberOfColumns);
+
+    ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
+    if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
+    {
+      ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
+    }
+  } else {
+    ORCFormatRescanORCFormatC(user_data->fmt);
+    ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
+    if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
+    {
+      ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
+    }
+  }
+
+  /* reset some parse state variables */
+  fsd->fs_pstate->fe_eof = false;
+  fsd->fs_pstate->cur_lineno = 0;
+  fsd->fs_pstate->cur_attname = NULL;
+  fsd->fs_pstate->raw_buf_done = true; /* true so we will read data in first 
run */
+  fsd->fs_pstate->line_done = true;
+  fsd->fs_pstate->bytesread = 0;
+
+  PG_RETURN_VOID() ;
+}
+
+/*
+ * void
+ * orc_endscan(FileScanDesc scan)
+ */
+Datum orc_endscan(PG_FUNCTION_ARGS)
+{
+  PlugStorage ps = (PlugStorage) (fcinfo->context);
+  FileScanDesc fsd = ps->ps_file_scan_desc;
+
+  /* Clean up scan descriptor */
+  char *relname = pstrdup(RelationGetRelationName(fsd->fs_rd));
+
+  if (fsd->fs_pstate != NULL)
+  {
     /*
-     * TODO(wshao): additional check for orc decimal type
-     * orc format currently does not support decimal precision larger than 38
+     * decrement relation reference count and free scan descriptor storage
      */
+    RelationDecrementReferenceCount(fsd->fs_rd);
   }
 
+  if (fsd->values)
+  {
+    pfree(fsd->values);
+    fsd->values = NULL;
+  }
+  if (fsd->nulls)
+  {
+    pfree(fsd->nulls);
+    fsd->nulls = NULL;
+  }
+
+  if (fsd->fs_pstate != NULL && fsd->fs_pstate->rowcontext != NULL)
+  {
+    /*
+     * delete the row context
+     */
+    MemoryContextDelete(fsd->fs_pstate->rowcontext);
+    fsd->fs_pstate->rowcontext = NULL;
+  }
+
+  if (fsd->fs_formatter)
+  {
+    /* TODO: check if this space is automatically freed.
+     * if not, then see what about freeing the user context */
+    if (fsd->fs_formatter->fmt_databuf.data)
+      pfree(fsd->fs_formatter->fmt_databuf.data);
+    pfree(fsd->fs_formatter);
+    fsd->fs_formatter = NULL;
+  }
+
+  /*
+   * free formatter information
+   */
+  if (fsd->fs_formatter_name)
+  {
+    pfree(fsd->fs_formatter_name);
+    fsd->fs_formatter_name = NULL;
+  }
+
+  /*
+   * free parse state memory
+   */
+  if (fsd->fs_pstate != NULL)
+  {
+    if (fsd->fs_pstate->attribute_buf.data)
+      pfree(fsd->fs_pstate->attribute_buf.data);
+    if (fsd->fs_pstate->line_buf.data)
+      pfree(fsd->fs_pstate->line_buf.data);
+    if (fsd->fs_pstate->attr_offsets)
+      pfree(fsd->fs_pstate->attr_offsets);
+    if (fsd->fs_pstate->force_quote_flags)
+      pfree(fsd->fs_pstate->force_quote_flags);
+    if (fsd->fs_pstate->force_notnull_flags)
+      pfree(fsd->fs_pstate->force_notnull_flags);
+
+    pfree(fsd->fs_pstate);
+    fsd->fs_pstate = NULL;
+  }
+
+  /* clean up error context */
+  error_context_stack = fsd->errcontext.previous;
+
+  pfree(relname);
+
   PG_RETURN_VOID() ;
 }
 
 /*
+ * void
+ * orc_stopscan(FileScanDesc scan)
+ */
+Datum orc_stopscan(PG_FUNCTION_ARGS)
+{
+  PlugStorage ps = (PlugStorage)(fcinfo->context);
+  FileScanDesc fsd = ps->ps_file_scan_desc;
+  ORCFormatUserData *user_data = (ORCFormatUserData *)(fsd->fs_ps_user_data);
+  TupleTableSlot *tts = ps->ps_tuple_table_slot;
+
+  if (!user_data) PG_RETURN_VOID();
+
+  /* If there is no error caught, it should be an end of reading split */
+  ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
+  if (e->errCode == ERRCODE_SUCCESSFUL_COMPLETION) {
+    ORCFormatEndORCFormatC(user_data->fmt);
+    e = ORCFormatGetErrorORCFormatC(user_data->fmt);
+    if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION) {
+      ereport(LOG, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
+    }
+
+    ORCFormatFreeORCFormatC(&(user_data->fmt));
+
+    pfree(user_data->colRawValues);
+    pfree(user_data->colValues);
+    if (user_data->colToReads) {
+      pfree(user_data->colToReads);
+      user_data->colToReads = NULL;
+    }
+    pfree(user_data->colValLength);
+    if (user_data->splits != NULL) {
+      for (int i = 0; i < user_data->nSplits; ++i) {
+        pfree(user_data->splits[i].fileName);
+      }
+      pfree(user_data->splits);
+    }
+    for (int i = 0; i < user_data->numberOfColumns; ++i) {
+      pfree(user_data->colNames[i]);
+    }
+
+    pfree(user_data->colNames);
+    pfree(user_data->colDatatypes);
+    pfree(user_data);
+    fsd->fs_ps_user_data = NULL;
+
+    /* form empty tuple */
+    ps->ps_has_tuple = false;
+
+    tts->PRIVATE_tts_values = NULL;
+    tts->PRIVATE_tts_isnull = NULL;
+    ExecClearTuple(tts);
+  } else {
+    ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
+  }
+
+  PG_RETURN_VOID();
+}
+
+/*
  * ExternalInsertDesc
  * orc_insert_init(Relation relation,
  *                 int formatterType,
@@ -582,8 +1239,7 @@ Datum orc_insert_init(PG_FUNCTION_ARGS)
   ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
   if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
   {
-    elog(ERROR, "ORC: failed to begin insert: %s (%d)",
-    e->errMessage, e->errCode);
+    ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
   }
 
   /* 4. Save the result */
@@ -606,7 +1262,7 @@ Datum orc_insert(PG_FUNCTION_ARGS)
   ORCFormatUserData *user_data = (ORCFormatUserData *) (eid->ext_ps_user_data);
 
   user_data->colValues = slot_get_values(tts);
-  user_data->colIsNulls = slot_get_isnull(tts);
+  bool *nulls = slot_get_isnull(tts);
 
   static bool DUMMY_BOOL = true;
   static int8_t DUMMY_INT8 = 0;
@@ -647,7 +1303,7 @@ Datum orc_insert(PG_FUNCTION_ARGS)
     user_data->colRawValues[i] = NULL;
     user_data->colValNullBitmap[i] = NULL;
 
-    if (user_data->colIsNulls[i])
+    if (nulls[i])
     {
       if (dataType == HAWQ_TYPE_CHAR)
       {
@@ -692,7 +1348,8 @@ Datum orc_insert(PG_FUNCTION_ARGS)
       {
         user_data->colRawValues[i] = (char *) (&DUMMY_TIME);
       }
-      else if (dataType == HAWQ_TYPE_TIMESTAMP)
+      else if (dataType == HAWQ_TYPE_TIMESTAMP
+          || dataType == HAWQ_TYPE_TIMESTAMPTZ)
       {
         user_data->colRawValues[i] = (char *) (&DUMMY_TIMESTAMP);
       }
@@ -718,12 +1375,11 @@ Datum orc_insert(PG_FUNCTION_ARGS)
       }
       else if (dataType == HAWQ_TYPE_INVALID)
       {
-        elog(ERROR, "HAWQ data type with id %d is invalid", dataType);
+        ereport(ERROR, (errmsg_internal("HAWQ data type with id %d is 
invalid", dataType)));
       }
       else
       {
-        elog(
-            ERROR, "HAWQ data type with id %d is not supported yet", dataType);
+        ereport(ERROR, (errmsg_internal("HAWQ data type with id %d is not 
supported yet", dataType)));
       }
 
       continue;
@@ -736,14 +1392,18 @@ Datum orc_insert(PG_FUNCTION_ARGS)
     {
       user_data->colRawValues[i] = (char *) (&(user_data->colValues[i]));
     }
-    else if (dataType == HAWQ_TYPE_TIMESTAMP)
+    else if (dataType == HAWQ_TYPE_TIMESTAMP || dataType == 
HAWQ_TYPE_TIMESTAMPTZ)
     {
       int64_t *timestamp = (int64_t *) (&(user_data->colValues[i]));
       user_data->colTimestamp[i].second = *timestamp / 1000000
           + (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * 60 * 60 * 24;
       user_data->colTimestamp[i].nanosecond = *timestamp % 1000000 * 1000;
-      if (user_data->colTimestamp[i].nanosecond < 0)
+      int64_t days = user_data->colTimestamp[i].second / 60 / 60 / 24;
+      if (user_data->colTimestamp[i].nanosecond < 0 &&
+          (days > POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE || days < 0))
         user_data->colTimestamp[i].nanosecond += 1000000000;
+      
if(user_data->colTimestamp[i].second<0&&user_data->colTimestamp[i].nanosecond)
+        user_data->colTimestamp[i].second-=1;
       user_data->colRawValues[i] =
           (char *) (&(user_data->colTimestamp[i]));
     }
@@ -791,8 +1451,8 @@ Datum orc_insert(PG_FUNCTION_ARGS)
       // Now we only support 1 dimension array
       if (ARR_NDIM(arr) > 1)
       {
-        elog(ERROR, "Now we only support 1 dimension array in orc format,"
-            " your array dimension is %d", ARR_NDIM(arr));
+        ereport(ERROR, (errmsg_internal("Now we only support 1 dimension array 
in orc format,"
+            " your array dimension is %d", ARR_NDIM(arr))));
       }
       else if (ARR_NDIM(arr) == 1)
       {
@@ -805,12 +1465,12 @@ Datum orc_insert(PG_FUNCTION_ARGS)
     }
     else if (dataType == HAWQ_TYPE_INVALID)
     {
-      elog(ERROR, "HAWQ data type with id %d is invalid", dataType);
+      ereport(ERROR, (errmsg_internal("HAWQ data type with id %d is invalid", 
dataType)));
     }
     else
     {
-      elog(
-      ERROR, "HAWQ data type with id %d is not supported yet", dataType);
+      ereport(ERROR, (errmsg_internal("HAWQ data type with id %d is not 
supported yet", dataType)));
+
     }
   }
 
@@ -818,20 +1478,19 @@ Datum orc_insert(PG_FUNCTION_ARGS)
   ORCFormatInsertORCFormatC(user_data->fmt, user_data->colDatatypes,
       user_data->colRawValues, user_data->colValLength,
       user_data->colValNullBitmap, user_data->colValDims,
-      user_data->colIsNulls);
+      nulls);
 
   ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
   if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
   {
-    elog(ERROR, "orc_insert: failed to insert: %s(%d)",
-    e->errMessage, e->errCode);
+    ereport(ERROR,(errcode(e->errCode),errmsg("ORC::%s", e->errMessage)));
   }
 
   for (int i = 0; i < user_data->numberOfColumns; ++i)
   {
     int dataType = (int) (tupdesc->attrs[i]->atttypid);
 
-    if (user_data->colIsNulls[i])
+    if (nulls[i])
     {
       continue;
     }
@@ -869,8 +1528,7 @@ Datum orc_insert_finish(PG_FUNCTION_ARGS)
   ORCFormatCatchedError *e = ORCFormatGetErrorORCFormatC(user_data->fmt);
   if (e->errCode != ERRCODE_SUCCESSFUL_COMPLETION)
   {
-    elog(ERROR, "ORC: failed to finish insert: %s(%d)",
-    e->errMessage, e->errCode);
+    ereport(ERROR, (errcode(e->errCode), errmsg("ORC:%s", e->errMessage)));
   }
 
   ORCFormatFreeORCFormatC(&(user_data->fmt));
@@ -878,7 +1536,6 @@ Datum orc_insert_finish(PG_FUNCTION_ARGS)
   pfree(user_data->colDatatypes);
   pfree(user_data->colRawValues);
   pfree(user_data->colValLength);
-  pfree(user_data->colAddresses);
   for (int i = 0; i < user_data->numberOfColumns; ++i)
   {
     pfree(user_data->colNames[i]);
@@ -913,13 +1570,33 @@ static FmgrInfo *get_orc_function(char *formatter_name, 
char *function_name)
   }
   else
   {
-    elog(ERROR, "%s_%s function was not found for pluggable storage",
-    formatter_name, function_name);
+    ereport(ERROR, (errmsg_internal("%s_%s function was not found for 
pluggable storage",
+                                    formatter_name, function_name)));
   }
 
   return procInfo;
 }
 
+static void get_scan_functions(FileScanDesc file_scan_desc)
+{
+  file_scan_desc->fs_ps_scan_funcs.beginscan = get_orc_function("orc",
+      "beginscan");
+
+  file_scan_desc->fs_ps_scan_funcs.getnext_init = get_orc_function("orc",
+      "getnext_init");
+
+  file_scan_desc->fs_ps_scan_funcs.getnext = get_orc_function("orc",
+      "getnext");
+
+  file_scan_desc->fs_ps_scan_funcs.rescan = get_orc_function("orc", "rescan");
+
+  file_scan_desc->fs_ps_scan_funcs.endscan = get_orc_function("orc",
+      "endscan");
+
+  file_scan_desc->fs_ps_scan_funcs.stopscan = get_orc_function("orc",
+      "stopscan");
+}
+
 static void get_insert_functions(ExternalInsertDesc ext_insert_desc)
 {
   ext_insert_desc->ext_ps_insert_funcs.insert_init = get_orc_function("orc",
@@ -932,6 +1609,33 @@ static void get_insert_functions(ExternalInsertDesc 
ext_insert_desc)
       "insert_finish");
 }
 
+static void init_format_user_data_for_read(TupleDesc tup_desc,
+    ORCFormatUserData *user_data)
+{
+  user_data->numberOfColumns = tup_desc->natts;
+  user_data->colNames = palloc0(sizeof(char *) * user_data->numberOfColumns);
+  user_data->colDatatypes = palloc0(sizeof(int) * user_data->numberOfColumns);
+  user_data->colDatatypeMods = palloc0(
+      sizeof(int64_t) * user_data->numberOfColumns);
+  user_data->colValues = palloc0(sizeof(Datum) * user_data->numberOfColumns);
+  user_data->colRawValues = palloc0(
+      sizeof(char *) * user_data->numberOfColumns);
+  user_data->colValLength = palloc0(
+      sizeof(uint64_t) * user_data->numberOfColumns);
+  user_data->colValNullBitmap = palloc0(
+      sizeof(bits8 *) * user_data->numberOfColumns);
+
+  for (int i = 0; i < user_data->numberOfColumns; i++)
+  {
+    user_data->colNames[i] = NULL;
+    user_data->colValues[i] = NULL;
+    user_data->colRawValues[i] = NULL;
+    user_data->colValLength[i] = 0;
+    user_data->colValNullBitmap[i] = (bits8 *) palloc0(
+        sizeof(bits8) * MAX_ORC_ARRAY_DIMS);
+  }
+}
+
 static void init_format_user_data_for_write(TupleDesc tup_desc,
     ORCFormatUserData *user_data)
 {
@@ -947,8 +1651,6 @@ static void init_format_user_data_for_write(TupleDesc 
tup_desc,
   user_data->colValNullBitmap = palloc0(
       sizeof(bits8 *) * user_data->numberOfColumns);
   user_data->colValDims = palloc0(sizeof(int *) * user_data->numberOfColumns);
-  user_data->colAddresses = palloc0(
-      sizeof(char *) * user_data->numberOfColumns);
   user_data->colTimestamp = palloc0(
       sizeof(TimestampType) * user_data->numberOfColumns);
 }
@@ -1016,7 +1718,7 @@ static void build_options_in_json(List *fmt_opts_defelem, 
int encoding,
     strcpy(*json_str, str);
     json_object_put(opt_json_object);
 
-    elog(DEBUG3, "formatter options are %s", *json_str);
+    ereport(DEBUG3, (errmsg_internal("formatter options are %s", *json_str)));
   }
 }
 
@@ -1037,6 +1739,80 @@ static ORCFormatC *create_formatter_instance(List 
*fmt_opts_defelem,
   return orc_format_c;
 }
 
+static void build_file_splits(Uri *uri, ScanState *scan_state,
+    ORCFormatUserData *user_data)
+{
+  if (scan_state)
+  {
+    user_data->nSplits = list_length(scan_state->splits);
+    user_data->splits = palloc0(
+        sizeof(ORCFormatFileSplit) * user_data->nSplits);
+    ListCell *cell = NULL;
+    int i = 0;
+    foreach(cell, scan_state->splits)
+    {
+      FileSplit origFS = (FileSplit) lfirst(cell);
+      user_data->splits[i].len = origFS->lengths;
+      user_data->splits[i].start = origFS->offsets;
+      if (uri->protocol == URI_HIVE)
+      {
+        uri->hostname =
+            (scan_state->hivehost) ?
+                pstrdup(scan_state->hivehost) : NULL;
+        uri->port = scan_state->hiveport;
+      }
+      /* build file path containing host address */
+      int fileNameLen = sizeof("hdfs") - 1 + /* *** This is a bad imp. */
+      sizeof("://") - 1 + strlen(uri->hostname) + sizeof(':')
+          + sizeof("65535") - 1 + strlen(origFS->ext_file_uri_string)
+          + sizeof('\0');
+
+      user_data->splits[i].fileName = palloc(fileNameLen * sizeof(char));
+      sprintf(user_data->splits[i].fileName, "%s://%s:%d%s", "hdfs",
+          uri->hostname, uri->port, origFS->ext_file_uri_string);
+      ereport(LOG, 
(errmsg_internal("fileinformation:%s",user_data->splits[i].fileName)));
+
+      i++;
+    }
+  }
+  else
+  {
+    user_data->nSplits = 0;
+    user_data->splits = NULL;
+  }
+}
+
+static void build_tuple_descrition_for_read(Plan *plan, Relation relation,
+    ORCFormatUserData *user_data)
+{
+  user_data->colToReads = palloc0(sizeof(bool) * user_data->numberOfColumns);
+
+  for (int i = 0; i < user_data->numberOfColumns; ++i)
+  {
+    user_data->colToReads[i] = plan ? false : true;
+
+    /* 64 is the name type length */
+    user_data->colNames[i] = palloc(sizeof(char) * 64);
+
+    strcpy(user_data->colNames[i],
+        relation->rd_att->attrs[i]->attname.data);
+
+    int data_type = (int) (relation->rd_att->attrs[i]->atttypid);
+    user_data->colDatatypes[i] = map_hawq_type_to_common_plan(data_type);
+    user_data->colDatatypeMods[i] = relation->rd_att->attrs[i]->atttypmod;
+  }
+
+  if (plan)
+  {
+    /* calculate columns to read for seqscan */
+    GetNeededColumnsForScan((Node *) plan->targetlist,
+        user_data->colToReads, user_data->numberOfColumns);
+
+    GetNeededColumnsForScan((Node *) plan->qual, user_data->colToReads,
+        user_data->numberOfColumns);
+  }
+}
+
 static void build_tuple_descrition_for_write(Relation relation,
     ORCFormatUserData *user_data)
 {
@@ -1051,10 +1827,23 @@ static void build_tuple_descrition_for_write(Relation 
relation,
     user_data->colDatatypes[i] = map_hawq_type_to_common_plan(
         (int) (relation->rd_att->attrs[i]->atttypid));
 
-    user_data->colDatatypeMods[i] = relation->rd_att->attrs[i]->atttypmod;
+    if (user_data->colDatatypes[i] == CHARID &&
+        relation->rd_att->attrs[i]->atttypmod == -1) {
+      user_data->colDatatypeMods[i] =
+      strlen(relation->rd_att->attrs[i]->attname.data) + VARHDRSZ;
+    } else {
+      user_data->colDatatypeMods[i] = relation->rd_att->attrs[i]->atttypmod;
+    }
   }
 }
 
+static void orc_scan_error_callback(void *arg)
+{
+  CopyState cstate = (CopyState) arg;
+
+  errcontext("External table %s", cstate->cur_relname);
+}
+
 static void orc_parse_format_string(CopyState pstate, char *fmtstr)
 {
   char *token;
diff --git a/contrib/orc/orc_init.sql b/contrib/orc/orc_init.sql
index cae2318..e3773ef 100644
--- a/contrib/orc/orc_init.sql
+++ b/contrib/orc/orc_init.sql
@@ -21,7 +21,7 @@ LANGUAGE C STABLE;
 CREATE OR REPLACE FUNCTION pg_catalog.orc_validate_datatypes() RETURNS void
 AS '$libdir/orc.so', 'orc_validate_datatypes'
 LANGUAGE C STABLE;
-/*
+
 CREATE OR REPLACE FUNCTION pg_catalog.orc_beginscan() RETURNS bytea
 AS '$libdir/orc.so', 'orc_beginscan'
 LANGUAGE C STABLE;
@@ -45,7 +45,7 @@ LANGUAGE C STABLE;
 CREATE OR REPLACE FUNCTION pg_catalog.orc_stopscan() RETURNS void
 AS '$libdir/orc.so', 'orc_stopscan'
 LANGUAGE C STABLE;
-*/
+
 
 CREATE OR REPLACE FUNCTION pg_catalog.orc_insert_init() RETURNS bytea
 AS '$libdir/orc.so', 'orc_insert_init'

Reply via email to