This is an automated email from the ASF dual-hosted git repository.

huor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hawq.git

commit 33ddcf7aa87eaa47063496319681627158d76de9
Author: tuyu <[email protected]>
AuthorDate: Thu Aug 1 15:29:47 2019 +0800

    HAWQ-1734. Resolve insert issue in external table of orc
---
 src/backend/access/appendonly/appendonlywriter.c | 81 ++++++++++++++++++++++++
 src/backend/cdb/cdbdatalocality.c                | 33 ++++++++--
 src/backend/cdb/cdbquerycontextdispatching.c     | 26 ++++++--
 src/backend/commands/copy.c                      |  4 ++
 src/backend/executor/execMain.c                  | 68 ++++++++++++++++----
 src/include/access/appendonlywriter.h            |  1 +
 6 files changed, 189 insertions(+), 24 deletions(-)

diff --git a/src/backend/access/appendonly/appendonlywriter.c 
b/src/backend/access/appendonly/appendonlywriter.c
index addd33e..629ba3a 100644
--- a/src/backend/access/appendonly/appendonlywriter.c
+++ b/src/backend/access/appendonly/appendonlywriter.c
@@ -1185,6 +1185,72 @@ List *SetSegnoForWrite(List *existing_segnos, Oid relid, 
int segment_num,
 
 }
 
+List *SetSegnoForExternalWrite(List *existing_segnos, Oid relid, int 
segment_num,
+        bool forNewRel, bool reuse_segfilenum_in_same_xid,
+        bool keepHash)
+{
+    /* these vars are used in GP_ROLE_DISPATCH only */
+    AORelHashEntryData *aoentry = NULL;
+    TransactionId              CurrentXid = GetTopTransactionId();
+    int next;
+    AOSegfileStatus *segfilestatus = NULL;
+    int remaining_num = segment_num;
+    bool has_same_txn_status = false;
+    AOSegfileStatus **maxSegno4Segment = NULL;
+
+     switch(Gp_role)
+    {
+        case GP_ROLE_EXECUTE:
+
+             Assert(existing_segnos != NIL);
+            Assert(list_length(existing_segnos) == segment_num);
+            return existing_segnos;
+
+         case GP_ROLE_UTILITY:
+
+             Assert(existing_segnos == NIL);
+            Assert(segment_num == 1);
+            return list_make1_int(RESERVED_SEGNO);
+
+         case GP_ROLE_DISPATCH:
+
+             Assert(existing_segnos == NIL);
+            Assert(segment_num > 0);
+
+             if (forNewRel)
+            {
+                int i;
+                for (i = 1; i <= segment_num; i++)
+                {
+                    existing_segnos = lappend_int(existing_segnos, i);
+                }
+                return existing_segnos;
+            }
+
+             if (Debug_appendonly_print_segfile_choice)
+            {
+                ereport(LOG, (errmsg("SetSegnoForWrite: Choosing a segno for 
external "
+                                "relation \"%s\" (%d) ",
+                                get_rel_name(relid), relid)));
+            }
+
+             for (int i = 0; i < segment_num; i++)
+            {
+                existing_segnos = lappend_int(existing_segnos,  i);
+            }
+            Assert(list_length(existing_segnos) == segment_num);
+
+             return existing_segnos;
+
+             /* fix this for dispatch agent. for now it's broken anyway. */
+        default:
+            Assert(false);
+            return NIL;
+    }
+
+ }
+
+
 /*
  * assignPerRelSegno
  *
@@ -1231,6 +1297,21 @@ List *assignPerRelSegno(List *all_relids, int 
segment_num)
                        }
 
                }
+               else if (RelationIsExternal(rel))
+               {
+                       SegfileMapNode *n;
+
+                       n = makeNode(SegfileMapNode);
+                       n->relid = cur_relid;
+
+                       n->segnos = SetSegnoForExternalWrite(NIL, cur_relid, 
segment_num,
+                                       false, true, true);
+
+                       Assert(n->relid != InvalidOid);
+                       Assert(n->segnos != NIL);
+
+                       mapping = lappend(mapping, n);
+               }
 
         /*
          * hold RowExclusiveLock until the end of transaction
diff --git a/src/backend/cdb/cdbdatalocality.c 
b/src/backend/cdb/cdbdatalocality.c
index 55f4ac0..67fe51d 100644
--- a/src/backend/cdb/cdbdatalocality.c
+++ b/src/backend/cdb/cdbdatalocality.c
@@ -4125,15 +4125,34 @@ run_allocation_algorithm(SplitAllocResult *result, List 
*virtual_segments, Query
                int fileCountInRelation = list_length(rel_data->files);
                bool FileCountBucketNumMismatch = false;
                if (targetPolicy->bucketnum > 0) {
-                 FileCountBucketNumMismatch = fileCountInRelation %
-                   targetPolicy->bucketnum == 0 ? false : true;
+                       Relation rel = heap_open(rel_data->relid, NoLock);
+                       targetPolicy->bucketnum == 0 ? false : true;
+                       if (!RelationIsExternal(rel))
+                       {
+                               FileCountBucketNumMismatch = 
fileCountInRelation %
+                                               targetPolicy->bucketnum == 0 ? 
false : true;
+                       }
+                       else
+                       {
+                               ListCell *lc_file;
+                               int maxsegno = 0;
+                               foreach(lc_file, rel_data->files)
+                               {
+                                       Relation_File *rel_file = 
(Relation_File *) lfirst(lc_file);
+                                       if (rel_file->segno > maxsegno)
+                                               maxsegno = rel_file->segno;
+                               }
+                               FileCountBucketNumMismatch =
+                               maxsegno > targetPolicy->bucketnum ? true : 
false;
+                       }
+                       heap_close(rel, NoLock);
                }
                if (isRelationHash && FileCountBucketNumMismatch && 
!allow_file_count_bucket_num_mismatch) {
-                 elog(ERROR, "file count %d in catalog is not in proportion to 
the bucket "
-                     "number %d of hash table with oid=%u, some data may be 
lost, if you "
-                     "still want to continue the query by considering the 
table as random, set GUC "
-                     "allow_file_count_bucket_num_mismatch to on and try 
again.",
-                     fileCountInRelation, targetPolicy->bucketnum, myrelid);
+                       elog(ERROR, "file count %d in catalog is not in 
proportion to the bucket "
+                               "number %d of hash table with oid=%u, some data 
may be lost, if you "
+                               "still want to continue the query by 
considering the table as random, set GUC "
+                               "allow_file_count_bucket_num_mismatch to on and 
try again.",
+                               fileCountInRelation, targetPolicy->bucketnum, 
myrelid);
                }
                /* change the virtual segment order when keep hash.
                 * order of idMap should also be changed.
diff --git a/src/backend/cdb/cdbquerycontextdispatching.c 
b/src/backend/cdb/cdbquerycontextdispatching.c
index 88d4f44..167af2a 100644
--- a/src/backend/cdb/cdbquerycontextdispatching.c
+++ b/src/backend/cdb/cdbquerycontextdispatching.c
@@ -2955,10 +2955,6 @@ fetchSegFileInfos(Oid relid, List *segnos)
                }
 
                storageChar = get_relation_storage_type(relid);
-               /*
-                * Get pg_appendonly information for this table.
-                */
-               aoEntry = GetAppendOnlyEntry(relid, SnapshotNow);
 
                /*
                 * Based on the pg_appendonly information, fetch
@@ -2967,13 +2963,31 @@ fetchSegFileInfos(Oid relid, List *segnos)
                 */
                if (RELSTORAGE_AOROWS == storageChar)
                {
+                       /*
+                        * Get pg_appendonly information for this table.
+                        */
+                       aoEntry = GetAppendOnlyEntry(relid, SnapshotNow);
                        AOFetchSegFileInfo(aoEntry, result, SnapshotNow);
                }
-               else
+               else if (RELSTORAGE_PARQUET == storageChar)
                {
-                       Assert(RELSTORAGE_PARQUET == storageChar);
+                       /*
+                        * Get pg_appendonly information for this table.
+                        */
+                       aoEntry = GetAppendOnlyEntry(relid, SnapshotNow);
                        ParquetFetchSegFileInfo(aoEntry, result, SnapshotNow);
                }
+               else
+               {
+                       /*
+                        * Get range info for current magma hash table, which 
is already
+                        * available when get block location and do datalocality
+                        */
+                       /*
+                        * TODO(Zongtian): vsegno -> rg list -> {range list1, 
..., range list N}
+                        */
+                       Assert(RELSTORAGE_EXTERNAL == storageChar);
+               }
        }
        return result;
 }
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 9c21148..dbeda09 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -4331,6 +4331,9 @@ CopyFrom(CopyState cstate)
                                                                                
                resultRelInfo->ri_aosegfileinfos,
                                                                                
                GetQEIndex());
 
+                                                       PlannedStmt* 
plannedstmt = palloc(sizeof(PlannedStmt));
+                                                       
memset(plannedstmt,0,sizeof(PlannedStmt));
+                                                       
plannedstmt->scantable_splits = cstate->splits;
                                                        
resultRelInfo->ri_extInsertDesc =
                                                                        
InvokePlugStorageFormatInsertInit(insertInitFunc,
                                                                                
      resultRelInfo->ri_RelationDesc,
@@ -4340,6 +4343,7 @@ CopyFrom(CopyState cstate)
                                                                                
                                                  segfileinfo->segno);
 
                                                        pfree(insertInitFunc);
+                                                       pfree(plannedstmt);
                                                }
                                                else
                                                {
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 81f68f2..e21aeb6 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -875,8 +875,9 @@ ExecutorStart(QueryDesc *queryDesc, int eflags)
                         result_segfileinfos = 
GetResultRelSegFileInfos(RelationGetRelid(relinfo->ri_RelationDesc),
                                                                        
estate->es_result_aosegnos, result_segfileinfos);
                     }
+
                     plannedstmt->result_segfileinfos = result_segfileinfos;
-                    
+
                     if (plannedstmt->intoClause != NULL)
                     {
                         List *segment_segnos = SetSegnoForWrite(NIL, 0, 
GetQEGangNum(), true, true, false);
@@ -1791,7 +1792,6 @@ InitializeResultRelations(PlannedStmt *plannedstmt, 
EState *estate, CmdType oper
                }
 
        }
-
        estate->es_partition_state = NULL;
        if (estate->es_result_partitions)
        {
@@ -2432,7 +2432,7 @@ CreateAppendOnlyParquetSegFileOnMaster(Oid relid, List 
*mapping)
        Relation rel = heap_open(relid, AccessShareLock);
 
        /* only relevant for AO relations */
-       if(!RelationIsAoRows(rel)  && !RelationIsParquet(rel))
+       if(!RelationIsAoRows(rel)  && !RelationIsParquet(rel) && 
!RelationIsExternal(rel))
        {
                heap_close(rel, AccessShareLock);
                return;
@@ -2466,6 +2466,45 @@ CreateAppendOnlyParquetSegFileOnMaster(Oid relid, List 
*mapping)
        Assert(found);
 }
 
+static void CreateExternalSegFileForRelationOnMaster(Relation rel, List 
*segnos,
+               SharedStorageOpTasks *addTasks)
+{
+       ParquetFileSegInfo * fsinfo;
+       ListCell *cell;
+
+       Assert(RelationIsExternal(rel));
+
+       char * relname = RelationGetRelationName(rel);
+
+       foreach(cell, segnos)
+       {
+               int segno = lfirst_int(cell);
+
+               Assert(NULL != addTasks);
+               Assert(addTasks->sizeTasks >= addTasks->numTasks);
+
+               RelFileNode *n;
+
+               if (addTasks->sizeTasks == addTasks->numTasks)
+               {
+                       addTasks->tasks = repalloc(addTasks->tasks,
+                                       addTasks->sizeTasks * 
sizeof(SharedStorageOpTask) * 2);
+                       addTasks->sizeTasks *= 2;
+               }
+
+               n = &addTasks->tasks[addTasks->numTasks].node;
+               n->dbNode = rel->rd_node.dbNode;
+               n->relNode = rel->rd_node.relNode;
+               n->spcNode = rel->rd_node.spcNode;
+
+               addTasks->tasks[addTasks->numTasks].segno = segno;
+               addTasks->tasks[addTasks->numTasks].relname = 
palloc(strlen(relname) + 1);
+               strcpy(addTasks->tasks[addTasks->numTasks].relname, relname);
+
+               addTasks->numTasks++;
+       }
+}
+
        static void
 CreaateAoRowSegFileForRelationOnMaster(Relation rel,
                AppendOnlyEntry * aoEntry, List *segnos, SharedStorageOpTasks 
*addTask, SharedStorageOpTasks *overwriteTask)
@@ -2647,13 +2686,20 @@ 
CreateAppendOnlyParquetSegFileForRelationOnMaster(Relation rel, List *segnos)
                        CreateParquetSegFileForRelationOnMaster(rel, aoEntry, 
segnos, addTasks, overwriteTasks);
 
                pfree(aoEntry);
+
+               PerformSharedStorageOpTasks(addTasks, Op_CreateSegFile);
+               PostPerformSharedStorageOpTasks(addTasks);
+               PerformSharedStorageOpTasks(overwriteTasks, 
Op_OverWriteSegFile);
        }
+       // TODO: Should we create empty files on orc hash distribution table?
+       // else if (RelationIsExternal(rel))
+       // {
+       //      CreateExternalSegFileForRelationOnMaster(rel, segnos, addTasks);
+       //      PerformSharedStorageOpTasks(addTasks, Op_CreateSegFile);
+       // }
 
-       PerformSharedStorageOpTasks(addTasks, Op_CreateSegFile);
-       PostPerformSharedStorageOpTasks(addTasks);
-  PerformSharedStorageOpTasks(overwriteTasks, Op_OverWriteSegFile);
        DropSharedStorageOpTasks(addTasks);
-  DropSharedStorageOpTasks(overwriteTasks);
+       DropSharedStorageOpTasks(overwriteTasks);
 }
 
 /*
@@ -2712,10 +2758,10 @@ ResultRelInfoSetSegFileInfo(ResultRelInfo 
*resultRelInfo, List *mapping)
        /*
         * Only relevant for AO relations.
         */
-       if (!relstorage_is_ao(RelinfoGetStorage(resultRelInfo)))
-       {
-               return;
-       }
+//     if (!relstorage_is_ao(RelinfoGetStorage(resultRelInfo)))
+//     {
+//             return;
+//     }
 
        Assert(mapping);
        Assert(resultRelInfo->ri_RelationDesc);
diff --git a/src/include/access/appendonlywriter.h 
b/src/include/access/appendonlywriter.h
index a943bcb..38d9354 100755
--- a/src/include/access/appendonlywriter.h
+++ b/src/include/access/appendonlywriter.h
@@ -129,6 +129,7 @@ extern void InitAppendOnlyWriter(void);
 extern Size AppendOnlyWriterShmemSize(void);
 extern bool TestCurrentTspSupportTruncate(Oid tsp);
 extern List *SetSegnoForWrite(List *existing_segnos, Oid relid, int 
segment_num, bool forNewRel, bool reuse_segfilenum_in_same_xid, bool keepHash);
+extern List *SetSegnoForExternalWrite(List *existing_segnos, Oid relid, int 
segment_num, bool forNewRel, bool reuse_segfilenum_in_same_xid, bool keepHash);
 extern List *assignPerRelSegno(List *all_rels, int segment_num);
 extern void UpdateMasterAosegTotals(Relation parentrel,
                                                                        int 
segno, 

Reply via email to