This is an automated email from the ASF dual-hosted git repository.

djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry-pxf.git


The following commit(s) were added to refs/heads/main by this push:
     new e991beb1 feat: Re-adapt cloudberry/pxf_fdw to Cloudberry (#31)
e991beb1 is described below

commit e991beb1b3d692aa742c2729e47773e3c778f14e
Author: liuxiaoyu <[email protected]>
AuthorDate: Wed Jan 28 11:05:08 2026 +0800

    feat: Re-adapt cloudberry/pxf_fdw to Cloudberry (#31)
    
    Co-authored-by: Nikolay Antonov <[email protected]>
---
 .github/workflows/pxf-ci.yml                       |   2 +
 Makefile                                           |  48 ++++---
 .../features/writable/HdfsWritableTextTest.java    |   2 +
 .../ubuntu/script/build_cloudberrry.sh             |   2 +-
 .../ubuntu/script/build_cloudberry_deb.sh          |   2 +-
 ci/docker/pxf-cbdb-dev/ubuntu/script/run_tests.sh  |  32 ++++-
 fdw/README.md                                      |   8 +-
 fdw/libchurl.c                                     |  22 +--
 fdw/pxf_bridge.c                                   |  24 ----
 fdw/pxf_bridge.h                                   |  17 +--
 fdw/pxf_fdw.c                                      | 155 ++++++---------------
 fdw/pxf_fdw.h                                      |   5 -
 fdw/pxf_filter.c                                   |   4 +-
 fdw/pxf_header.c                                   |   5 -
 fdw/pxf_option.c                                   |  12 +-
 15 files changed, 117 insertions(+), 223 deletions(-)

diff --git a/.github/workflows/pxf-ci.yml b/.github/workflows/pxf-ci.yml
index a3b55681..795345ad 100644
--- a/.github/workflows/pxf-ci.yml
+++ b/.github/workflows/pxf-ci.yml
@@ -103,6 +103,7 @@ jobs:
         test_group:
           - cli
           - external-table
+          - fdw
           - server
           - sanity
           - smoke
@@ -118,6 +119,7 @@ jobs:
           - s3
           - features
           - gpdb
+          - gpdb_fdw
           - load
           - pxf_extension
     steps:
diff --git a/Makefile b/Makefile
index 1014689f..489a3c8a 100644
--- a/Makefile
+++ b/Makefile
@@ -6,8 +6,8 @@ export PXF_MODULES
 PXF_VERSION ?= $(shell cat version)
 export PXF_VERSION
 
-FDW_SUPPORT = $(shell $(PG_CONFIG) --version | egrep "PostgreSQL 1[2-5]")
-FDW_SUPPORT =
+export SKIP_EXTERNAL_TABLE_BUILD_REASON
+export SKIP_FDW_BUILD_REASON
 
 SOURCE_EXTENSION_DIR = external-table
 TARGET_EXTENSION_DIR = gpextable
@@ -24,29 +24,31 @@ all: extensions cli server
 
 extensions: external-table fdw
 
-external-table cli server:
+external-table:
+ifeq ($(SKIP_EXTERNAL_TABLE_BUILD_REASON),)
        @echo "===> Compiling [$@] module <==="
        make -C $@
+else
+       @echo "Skipping building external-table extension because 
$(SKIP_EXTERNAL_TABLE_BUILD_REASON)"
+endif
 
-fdw:
-ifneq ($(FDW_SUPPORT),)
+ifeq ($(SKIP_FDW_BUILD_REASON),)
+       @echo "===> Compiling [$@] module <==="
        make -C fdw
+else
+       @echo "Skipping building FDW extension because $(SKIP_FDW_BUILD_REASON)"
 endif
 
-cli:
-       make -C cli
-
-server:
-       make -C server
+cli server:
+       @echo "===> Compiling [$@] module <==="
+       make -C $@
 
 clean:
        rm -rf build
        make -C $(SOURCE_EXTENSION_DIR) clean-all
        make -C cli clean
        make -C server clean
-ifneq ($(FDW_SUPPORT),)
        make -C fdw clean
-endif
 
 test:
 ifeq ($(SKIP_FDW_BUILD_REASON),)
@@ -61,18 +63,30 @@ it:
        make -C automation TEST=$(TEST)
 
 install:
-       make -C $(SOURCE_EXTENSION_DIR) install
-       make -C cli install
-       make -C server install
-ifneq ($(FDW_SUPPORT),)
-       make -C fdw install
+ifneq ($(SKIP_EXTERNAL_TABLE_BUILD_REASON),)
+       @echo "Skipping installing FDW extension because 
$(SKIP_EXTERNAL_TABLE_BUILD_REASON)"
+       $(eval PXF_MODULES := $(filter-out external-table,$(PXF_MODULES)))
+endif
+ifneq ($(SKIP_FDW_BUILD_REASON),)
+       @echo "Skipping installing FDW extension because 
$(SKIP_FDW_BUILD_REASON)"
+       $(eval PXF_MODULES := $(filter-out fdw,$(PXF_MODULES)))
 endif
+       set -e ;\
+       for module in $${PXF_MODULES[@]}; do \
+               echo "===> Installing [$${module}] module <===" ;\
+               make -C $${module} install ;\
+       done ;\
+       echo "===> PXF installation is complete <==="
 
 install-server:
        make -C server install-server
 
 stage:
        rm -rf build/stage
+ifneq ($(SKIP_EXTERNAL_TABLE_PACKAGE_REASON),)
+       @echo "Skipping staging FDW extension because 
$(SKIP_EXTERNAL_TABLE_PACKAGE_REASON)"
+       $(eval PXF_MODULES := $(filter-out external-table,$(PXF_MODULES)))
+endif
 ifneq ($(SKIP_FDW_PACKAGE_REASON),)
        @echo "Skipping staging FDW extension because 
$(SKIP_FDW_PACKAGE_REASON)"
        $(eval PXF_MODULES := $(filter-out fdw,$(PXF_MODULES)))
diff --git 
a/automation/src/test/java/org/greenplum/pxf/automation/features/writable/HdfsWritableTextTest.java
 
b/automation/src/test/java/org/greenplum/pxf/automation/features/writable/HdfsWritableTextTest.java
index fc21a472..22ad01f9 100755
--- 
a/automation/src/test/java/org/greenplum/pxf/automation/features/writable/HdfsWritableTextTest.java
+++ 
b/automation/src/test/java/org/greenplum/pxf/automation/features/writable/HdfsWritableTextTest.java
@@ -1,5 +1,6 @@
 package org.greenplum.pxf.automation.features.writable;
 
+import annotations.FailsWithFDW;
 import annotations.SkipForFDW;
 import annotations.WorksWithFDW;
 import org.apache.commons.lang.StringUtils;
@@ -75,6 +76,7 @@ public class HdfsWritableTextTest extends BaseWritableFeature 
{
      *
      * @throws Exception if test fails to run
      */
+    @FailsWithFDW
     @Test(groups = {"features", "gpdb", "security"})
     public void textFormatInsertNoProfile() throws Exception {
 
diff --git a/ci/docker/pxf-cbdb-dev/ubuntu/script/build_cloudberrry.sh 
b/ci/docker/pxf-cbdb-dev/ubuntu/script/build_cloudberrry.sh
index f0caad50..993a2100 100755
--- a/ci/docker/pxf-cbdb-dev/ubuntu/script/build_cloudberrry.sh
+++ b/ci/docker/pxf-cbdb-dev/ubuntu/script/build_cloudberrry.sh
@@ -100,7 +100,7 @@ cd ~/workspace/cloudberry
             --enable-orafce \
             --enable-orca \
             --disable-pax \
-            --enable-pxf \
+            --disable-pxf \
             --enable-tap-tests \
             --with-gssapi \
             --with-ldap \
diff --git a/ci/docker/pxf-cbdb-dev/ubuntu/script/build_cloudberry_deb.sh 
b/ci/docker/pxf-cbdb-dev/ubuntu/script/build_cloudberry_deb.sh
index eb3b8d87..d48720b7 100755
--- a/ci/docker/pxf-cbdb-dev/ubuntu/script/build_cloudberry_deb.sh
+++ b/ci/docker/pxf-cbdb-dev/ubuntu/script/build_cloudberry_deb.sh
@@ -28,7 +28,7 @@ cd "${CLOUDBERRY_SRC}"
             --enable-orafce \
             --enable-orca \
             --disable-pax \
-            --enable-pxf \
+            --disable-pxf \
             --enable-tap-tests \
             --with-gssapi \
             --with-ldap \
diff --git a/ci/docker/pxf-cbdb-dev/ubuntu/script/run_tests.sh 
b/ci/docker/pxf-cbdb-dev/ubuntu/script/run_tests.sh
index d57b69c5..4f5c6e35 100755
--- a/ci/docker/pxf-cbdb-dev/ubuntu/script/run_tests.sh
+++ b/ci/docker/pxf-cbdb-dev/ubuntu/script/run_tests.sh
@@ -549,6 +549,7 @@ feature_test(){
 }
 
 gpdb_test() {
+  local use_fdw="$1"
   export PROTOCOL=HDFS
   export PXF_HOME=${PXF_HOME:-/usr/local/pxf}
   export PATH="${PXF_HOME}/bin:${PATH}"
@@ -570,10 +571,20 @@ gpdb_test() {
   # Ensure PXF points to local HDFS/Hive/HBase configs
   configure_pxf_default_hdfs_server
 
-  echo "[run_tests] Starting GROUP=gpdb"
-  make GROUP="gpdb" || true
-  save_test_reports "gpdb"
-  echo "[run_tests] GROUP=gpdb finished"
+  local extra_args=""
+  if [[ "$use_fdw" == "true" ]]; then
+    extra_args="USE_FDW=true"
+  else
+    extra_args="USE_FDW=false"
+  fi
+  echo "[run_tests] Starting GROUP=gpdb $extra_args"
+  make GROUP="gpdb" $extra_args || true
+  if [[ "$use_fdw" == "true" ]]; then
+      save_test_reports "gpdb_fdw"
+  else
+      save_test_reports "gpdb"
+  fi
+  echo "[run_tests] GROUP=gpdb $extra_args finished"
 }
 
 pxf_extension_test(){
@@ -709,7 +720,7 @@ generate_test_summary() {
 
     local group=$(basename "$group_dir")
     # Skip if it's not a test group directory
-    [[ "$group" =~ 
^(smoke|hcatalog|hcfs|hdfs|hive|gpdb|sanity|hbase|profile|jdbc|proxy|unused|s3|features|load|performance|pxfExtensionVersion2|pxfExtensionVersion2_1|pxfFdwExtensionVersion1|pxfFdwExtensionVersion2)$
 ]] || continue
+    [[ "$group" =~ 
^(smoke|hcatalog|hcfs|hdfs|hive|gpdb|sanity|hbase|profile|jdbc|proxy|unused|s3|features|load|performance|pxfExtensionVersion2|pxfExtensionVersion2_1|pxfFdwExtensionVersion1|pxfFdwExtensionVersion2|fdw|gpdb_fdw)$
 ]] || continue
 
     echo "Processing $group test reports from $group_dir"
 
@@ -841,6 +852,13 @@ run_single_group() {
       cd "${REPO_ROOT}/external-table"
       make installcheck
       ;;
+    fdw)
+      cd "${REPO_ROOT}/fdw"
+      make test
+      ;;
+    gpdb_fdw)
+      gpdb_test "true"
+      ;;
     server)
       cd "${REPO_ROOT}/server"
       ./gradlew test
@@ -873,7 +891,7 @@ run_single_group() {
       feature_test
       ;;
     gpdb)
-      gpdb_test
+      gpdb_test "false"
       ;;
     pxf_extension)
       pxf_extension_test
@@ -898,7 +916,7 @@ run_single_group() {
       ;;
     *)
       echo "Unknown test group: $group"
-      echo "Available groups: cli, external-table, server, sanity, smoke, 
hdfs, hcatalog, hcfs, hive, hbase, profile, jdbc, proxy, unused, s3, features, 
gpdb, load, performance, bench, pxf_extension"
+      echo "Available groups: cli, external-table, fdw, server, sanity, smoke, 
hdfs, hcatalog, hcfs, hive, hbase, profile, jdbc, proxy, unused, s3, features, 
gpdb, gpdb_fdw, load, performance, bench, pxf_extension"
       exit 1
       ;;
   esac
diff --git a/fdw/README.md b/fdw/README.md
index a10f15c3..a0e546c5 100644
--- a/fdw/README.md
+++ b/fdw/README.md
@@ -1,6 +1,6 @@
-# PXF Foreign Data Wrapper for Greenplum and PostgreSQL
+# PXF Foreign Data Wrapper for Cloudberry
 
-This Greenplum extension implements a Foreign Data Wrapper (FDW) for PXF.
+This Cloudberry extension implements a Foreign Data Wrapper (FDW) for PXF.
 
 PXF is a query federation engine that accesses data residing in external 
systems
 such as Hadoop, Hive, HBase, relational databases, S3, Google Cloud Storage,
@@ -10,9 +10,9 @@ among other external systems.
 
 ## Compile
 
-To compile the PXF foreign data wrapper, we need a Greenplum 6+ installation 
and libcurl.
+To compile the PXF foreign data wrapper, we need a Cloudberry installation and 
libcurl.
 
-    export PATH=/usr/local/greenplum-db/bin/:$PATH
+    export PATH=/usr/local/cloudberry-db/bin/:$PATH
 
     make
 
diff --git a/fdw/libchurl.c b/fdw/libchurl.c
index 6b2fa93a..3003d4fe 100644
--- a/fdw/libchurl.c
+++ b/fdw/libchurl.c
@@ -16,19 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#if PG_VERSION_NUM >= 90600
 #include "postgres.h"
-#endif
-
 #include "libchurl.h"
-#if PG_VERSION_NUM >= 90600
 #include "lib/stringinfo.h"
-#endif
 #include "miscadmin.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
-#include "utils/jsonapi.h"
+#include "common/jsonapi.h"
+#include "mb/pg_wchar.h"
 
 /* include libcurl without typecheck.
  * This allows wrapping curl_easy_setopt to be wrapped
@@ -98,13 +94,6 @@ typedef struct churl_settings
        struct curl_slist *headers;
 } churl_settings;
 
-/* the null action object used for pure validation */
-static JsonSemAction nullSemAction =
-{
-       NULL, NULL, NULL, NULL, NULL,
-       NULL, NULL, NULL, NULL, NULL
-};
-
 churl_context *churl_new_context(void);
 static void            create_curl_handle(churl_context *context);
 static void            set_curl_option(churl_context *context, CURLoption 
option, const void *data);
@@ -864,13 +853,8 @@ fill_internal_buffer(churl_context *context, int want)
 
                        if (errno == EINTR || errno == EAGAIN)
                                continue;
-#if PG_VERSION_NUM >= 90600
                        elog(ERROR, "internal error: select failed on 
curl_multi_fdset (maxfd %d) (%d - %m)",
                                 maxfd, save_errno);
-#else
-                       elog(ERROR, "internal error: select failed on 
curl_multi_fdset (maxfd %d) (%d - %s)",
-                                maxfd, save_errno, strerror(errno));
-#endif
                }
                multi_perform(context);
        }
@@ -1056,7 +1040,7 @@ IsValidJson(text *json)
        PG_TRY();
        {
                /* validate it */
-               lex = makeJsonLexContext(json, false);
+               lex = makeJsonLexContextCstringLen(text_to_cstring(json), 
VARSIZE_ANY_EXHDR(json), GetDatabaseEncoding(), false);
                pg_parse_json(lex, &nullSemAction);
        }
        PG_CATCH();
diff --git a/fdw/pxf_bridge.c b/fdw/pxf_bridge.c
index b56a2510..567665ac 100644
--- a/fdw/pxf_bridge.c
+++ b/fdw/pxf_bridge.c
@@ -27,11 +27,7 @@
 /* helper function declarations */
 static void BuildUriForRead(PxfFdwScanState *pxfsstate);
 static void BuildUriForWrite(PxfFdwModifyState *pxfmstate);
-#if PG_VERSION_NUM >= 90600
 static size_t FillBuffer(PxfFdwScanState *pxfsstate, char *start, int minlen, 
int maxlen);
-#else
-static size_t FillBuffer(PxfFdwScanState *pxfsstate, char *start, size_t size);
-#endif
 
 /*
  * Clean up churl related data structures from the PXF FDW modify state.
@@ -102,20 +98,12 @@ PxfBridgeExportStart(PxfFdwModifyState *pxfmstate)
  * Reads data from the PXF server into the given buffer of a given size
  */
 int
-#if PG_VERSION_NUM >= 90600
 PxfBridgeRead(void *outbuf, int minlen, int maxlen, void *extra)
-#else
-PxfBridgeRead(void *outbuf, int datasize, void *extra)
-#endif
 {
        size_t          n = 0;
        PxfFdwScanState *pxfsstate = (PxfFdwScanState *) extra;
 
-#if PG_VERSION_NUM >= 90600
        n = FillBuffer(pxfsstate, outbuf, minlen, maxlen);
-#else
-       n = FillBuffer(pxfsstate, outbuf, datasize);
-#endif
 
        if (n == 0)
        {
@@ -176,28 +164,16 @@ BuildUriForWrite(PxfFdwModifyState *pxfmstate)
  * Read data from churl until the buffer is full or there is no more data to 
be read
  */
 static size_t
-#if PG_VERSION_NUM >= 90600
 FillBuffer(PxfFdwScanState *pxfsstate, char *start, int minlen, int maxlen)
-#else
-FillBuffer(PxfFdwScanState *pxfsstate, char *start, size_t size)
-#endif
 {
        size_t          n = 0;
        char       *ptr = start;
-#if PG_VERSION_NUM >= 90600
        char       *minend = ptr + minlen;
        char       *maxend = ptr + maxlen;
 
        while (ptr < minend)
        {
                n = churl_read(pxfsstate->churl_handle, ptr, maxend - ptr);
-#else
-       char       *end = ptr + size;
-
-       while (ptr < end)
-       {
-               n = churl_read(pxfsstate->churl_handle, ptr, end - ptr);
-#endif
                if (n == 0)
                        break;
 
diff --git a/fdw/pxf_bridge.h b/fdw/pxf_bridge.h
index b8a866a4..e5e53610 100644
--- a/fdw/pxf_bridge.h
+++ b/fdw/pxf_bridge.h
@@ -44,14 +44,10 @@ typedef struct PxfFdwScanState
        StringInfoData uri;
        Relation        relation;
        char       *filter_str;
-#if PG_VERSION_NUM >= 90600
        ExprState  *quals;
-#else
-       List       *quals;
-#endif
        List       *retrieved_attrs;
        PxfOptions *options;
-       CopyState       cstate;
+       CopyFromState   cstate;
        ProjectionInfo *projectionInfo;
 } PxfFdwScanState;
 
@@ -60,18 +56,13 @@ typedef struct PxfFdwScanState
  */
 typedef struct PxfFdwModifyState
 {
-       CopyState       cstate;                 /* state of writing to PXF */
+       CopyToState     cstate;                 /* state of writing to PXF */
 
        CHURL_HANDLE churl_handle;      /* curl handle */
        CHURL_HEADERS churl_headers;    /* curl headers */
        StringInfoData uri;                     /* rest endpoint URI for modify 
*/
        Relation        relation;
        PxfOptions *options;            /* FDW options */
-
-#if PG_VERSION_NUM < 90600
-       Datum      *values;                     /* List of values exported for 
the row */
-       bool       *nulls;                      /* List of null fields for the 
exported row */
-#endif
 } PxfFdwModifyState;
 
 /* Clean up churl related data structures from the context */
@@ -84,11 +75,7 @@ void         PxfBridgeImportStart(PxfFdwScanState 
*pxfsstate);
 void           PxfBridgeExportStart(PxfFdwModifyState *pxfmstate);
 
 /* Reads data from the PXF server into the given buffer of a given size */
-#if PG_VERSION_NUM >= 90600
 int                    PxfBridgeRead(void *outbuf, int minlen, int maxlen, 
void *extra);
-#else
-int                    PxfBridgeRead(void *outbuf, int datasize, void *extra);
-#endif
 
 /* Writes data from the given buffer of a given size to the PXF server */
 int                    PxfBridgeWrite(PxfFdwModifyState *context, char 
*databuf, int datalen);
diff --git a/fdw/pxf_fdw.c b/fdw/pxf_fdw.c
index a4a1cc10..645a1abc 100644
--- a/fdw/pxf_fdw.c
+++ b/fdw/pxf_fdw.c
@@ -13,27 +13,22 @@
 #include "pxf_filter.h"
 
 #include "access/reloptions.h"
-#if PG_VERSION_NUM >= 90600
 #include "access/table.h"
-#endif
 #include "cdb/cdbsreh.h"
 #include "cdb/cdbvars.h"
 #include "commands/copy.h"
+#include "commands/copyfrom_internal.h"
+#include "commands/copyto_internal.h"
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "foreign/fdwapi.h"
 #include "foreign/foreign.h"
 #include "nodes/pg_list.h"
-#if PG_VERSION_NUM >= 90600
 #include "optimizer/optimizer.h"
-#endif
 #include "optimizer/paths.h"
 #include "optimizer/pathnode.h"
 #include "optimizer/planmain.h"
 #include "optimizer/restrictinfo.h"
-#if PG_VERSION_NUM < 90600
-#include "optimizer/var.h"
-#endif
 #include "parser/parsetree.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -61,15 +56,7 @@ PG_FUNCTION_INFO_V1(pxf_fdw_handler);
 static void pxfGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid 
foreigntableid);
 
 static void pxfGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid 
foreigntableid);
-
-#if (PG_VERSION_NUM <= 90500)
-
-static ForeignScan *pxfGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, 
Oid foreigntableid, ForeignPath *best_path, List *tlist,
-                                                                         List 
*scan_clauses);
-
-#else
 static ForeignScan *pxfGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, 
Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, 
Plan *outer_plan);
-#endif
 
 static void pxfExplainForeignScan(ForeignScanState *node, ExplainState *es);
 
@@ -101,7 +88,8 @@ static PxfFdwModifyState *InitForeignModify(Relation 
relation);
 static void FinishForeignModify(PxfFdwModifyState *pxfmstate);
 static void InitCopyState(PxfFdwScanState *pxfsstate);
 static void InitCopyStateForModify(PxfFdwModifyState *pxfmstate);
-static CopyState BeginCopyTo(Relation forrel, List *options);
+static CopyToState BeginCopyToModify(Relation forrel, List *options);
+static void EndCopyModify(CopyToState cstate);
 static void PxfBeginScanErrorCallback(void *arg);
 static void PxfCopyFromErrorCallback(void *arg);
 
@@ -146,9 +134,7 @@ pxf_fdw_handler(PG_FUNCTION_ARGS)
         * taken
         */
        fdw_routine->PlanForeignModify = NULL;
-#if PG_VERSION_NUM >= 120000
        fdw_routine->BeginForeignInsert = pxfBeginForeignInsert;
-#endif
        fdw_routine->BeginForeignModify = pxfBeginForeignModify;
        fdw_routine->ExecForeignInsert = pxfExecForeignInsert;
 
@@ -158,9 +144,7 @@ pxf_fdw_handler(PG_FUNCTION_ARGS)
         */
        fdw_routine->ExecForeignUpdate = NULL;
        fdw_routine->ExecForeignDelete = NULL;
-#if PG_VERSION_NUM >= 120000
        fdw_routine->EndForeignInsert = pxfEndForeignInsert;
-#endif
        fdw_routine->EndForeignModify = pxfEndForeignModify;
        fdw_routine->IsForeignRelUpdatable = pxfIsForeignRelUpdatable;
 
@@ -224,11 +208,7 @@ pxfGetForeignRelSize(PlannerInfo *root, RelOptInfo 
*baserel, Oid foreigntableid)
         */
        RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
 
-#if PG_VERSION_NUM >= 90600
        rel = table_open(rte->relid, NoLock);
-#else
-       rel = heap_open(rte->relid, NoLock);
-#endif
 
        /*
         * Identify which baserestrictinfo clauses can be sent to the remote
@@ -241,11 +221,7 @@ pxfGetForeignRelSize(PlannerInfo *root, RelOptInfo 
*baserel, Oid foreigntableid)
         * Identify which attributes will need to be retrieved from the remote
         * server
         */
-#if (PG_VERSION_NUM <= 90500)
-       pull_varattnos((Node *) baserel->reltargetlist, baserel->relid, 
&fpinfo->attrs_used);
-#else
        pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid, 
&fpinfo->attrs_used);
-#endif
        foreach(lc, fpinfo->local_conds)
        {
                RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
@@ -255,7 +231,7 @@ pxfGetForeignRelSize(PlannerInfo *root, RelOptInfo 
*baserel, Oid foreigntableid)
 
        deparseTargetList(rel, fpinfo->attrs_used, &fpinfo->retrieved_attrs);
 
-       heap_close(rel, NoLock);
+       table_close(rel, NoLock);
 
        /* Use an artificial number of estimated rows */
        baserel->rows = 1000;
@@ -279,19 +255,14 @@ pxfGetForeignPaths(PlannerInfo *root,
 
        elog(DEBUG5, "pxf_fdw: pxfGetForeignPaths starts on segment: %d", 
PXF_SEGMENT_ID);
 
-
        path = create_foreignscan_path(root, baserel,
-#if PG_VERSION_NUM >= 90600
                                                                   NULL,        
/* default pathtarget */
-#endif
                                                                   
baserel->rows,
                                                                   
DEFAULT_PXF_FDW_STARTUP_COST,
                                                                   total_cost,
                                                                   NIL, /* no 
pathkeys */
                                                                   NULL,        
/* no outer rel either */
-#if PG_VERSION_NUM >= 90500
                                                                   NULL,        
/* no extra plan */
-#endif
                                                                   
fpinfo->retrieved_attrs);
 
 
@@ -299,7 +270,7 @@ pxfGetForeignPaths(PlannerInfo *root,
        /*
         * Create a ForeignPath node and add it as only possible path.
         */
-       add_path(baserel, (Path *) path);
+       add_path(baserel, (Path *) path, root);
 
        elog(DEBUG5, "pxf_fdw: pxfGetForeignPaths ends on segment: %d", 
PXF_SEGMENT_ID);
 }
@@ -308,7 +279,6 @@ pxfGetForeignPaths(PlannerInfo *root,
  * GetForeignPlan
  *             create a ForeignScan plan node
  */
-#if PG_VERSION_NUM >= 90500
 static ForeignScan *
 pxfGetForeignPlan(PlannerInfo *root,
                                  RelOptInfo *baserel,
@@ -317,15 +287,6 @@ pxfGetForeignPlan(PlannerInfo *root,
                                  List *tlist,
                                  List *scan_clauses,
                                  Plan *outer_plan)
-#else
-static ForeignScan *
-pxfGetForeignPlan(PlannerInfo *root,
-                                 RelOptInfo *baserel,
-                                 Oid foreigntableid,
-                                 ForeignPath *best_path,
-                                 List *tlist,  /* target list */
-                                 List *scan_clauses)
-#endif
 {
        char                       *where_clauses_str = NULL;
        List                       *fdw_private;
@@ -363,12 +324,10 @@ pxfGetForeignPlan(PlannerInfo *root,
                                                        scan_clauses,
                                                        scan_relid,
                                                        NIL,    /* no 
expressions to evaluate */
-                                                       fdw_private
-#if PG_VERSION_NUM >= 90500
-                                                       ,NIL
-                                                       ,NIL
-                                                       ,outer_plan
-#endif
+                                                       fdw_private,
+                                                       NIL,
+                                                       NIL,
+                                                       outer_plan
                );
 
 }
@@ -403,11 +362,7 @@ pxfBeginForeignScan(ForeignScanState *node, int eflags)
        if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
                return;
 
-#if PG_VERSION_NUM >= 90600
        ExprState  *quals             = node->ss.ps.qual;
-#else
-       List       *quals             = node->ss.ps.qual;
-#endif
        Oid                     foreigntableid = 
RelationGetRelid(node->ss.ss_currentRelation);
        PxfFdwScanState *pxfsstate    = NULL;
        Relation        relation          = node->ss.ss_currentRelation;
@@ -493,15 +448,8 @@ pxfIterateForeignScan(ForeignScanState *node)
 
        found = NextCopyFrom(pxfsstate->cstate,
                                                 NULL,
-#if PG_VERSION_NUM >= 90600
                                                 slot->tts_values,
-                                                slot->tts_isnull
-#else
-                                                slot_get_values(slot),
-                                                slot_get_isnull(slot),
-                                                NULL
-#endif
-                                                );
+                                                slot->tts_isnull);
 
        if (found)
        {
@@ -644,10 +592,6 @@ InitForeignModify(Relation relation)
        initStringInfo(&pxfmstate->uri);
        pxfmstate->relation = relation;
        pxfmstate->options = options;
-#if PG_VERSION_NUM < 90600
-       pxfmstate->values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
-       pxfmstate->nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
-#endif
 
        InitCopyStateForModify(pxfmstate);
 
@@ -678,22 +622,11 @@ pxfExecForeignInsert(EState *estate,
                resultRelInfo->ri_FdwState = pxfmstate;
        }
 
-       CopyState       cstate = pxfmstate->cstate;
-#if PG_VERSION_NUM < 90600
-       Relation        relation = resultRelInfo->ri_RelationDesc;
-       TupleDesc       tupDesc = RelationGetDescr(relation);
-       HeapTuple       tuple = ExecMaterializeSlot(slot);
-       Datum      *values = pxfmstate->values;
-       bool       *nulls = pxfmstate->nulls;
-
-       heap_deform_tuple(tuple, tupDesc, values, nulls);
-       CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
-#else
+       CopyToState     cstate = pxfmstate->cstate;
 
        /* TEXT or CSV */
        slot_getallattrs(slot);
        CopyOneRowTo(cstate, slot);
-#endif
        CopySendEndOfRow(cstate);
 
        StringInfo      fe_msgbuf = cstate->fe_msgbuf;
@@ -754,7 +687,7 @@ FinishForeignModify(PxfFdwModifyState *pxfmstate)
        if (pxfmstate == NULL)
                return;
 
-       EndCopyFrom(pxfmstate->cstate);
+       EndCopyModify(pxfmstate->cstate);
        pxfmstate->cstate = NULL;
        PxfBridgeCleanup(pxfmstate);
 
@@ -781,7 +714,7 @@ pxfIsForeignRelUpdatable(Relation rel)
 static void
 InitCopyState(PxfFdwScanState *pxfsstate)
 {
-       CopyState       cstate;
+       CopyFromState   cstate;
 
        PxfBridgeImportStart(pxfsstate);
 
@@ -790,19 +723,15 @@ InitCopyState(PxfFdwScanState *pxfsstate)
         * as to match the expected ScanTupleSlot signature.
         */
        cstate = BeginCopyFrom(
-#if PG_VERSION_NUM >= 90600
                                                   NULL,
-#endif
                                                   pxfsstate->relation,
                                                   NULL,
+                                                  NULL,
                                                   false,       /* is_program */
                                                   &PxfBridgeRead,      /* 
data_source_cb */
                                                   pxfsstate,   /* 
data_source_cb_extra */
                                                   NIL, /* attnamelist */
                                                   
pxfsstate->options->copy_options     /* copy options */
-#if PG_VERSION_NUM < 90600
-                                                  ,NIL /* ao_segnos */
-#endif
                                                   );
 
 
@@ -825,11 +754,7 @@ InitCopyState(PxfFdwScanState *pxfsstate)
                                                                          
pxfsstate->options->is_reject_limit_rows,
                                                                          
pxfsstate->options->resource,
                                                                          (char 
*) cstate->cur_relname,
-#if PG_VERSION_NUM >= 90600
                                                                          
pxfsstate->options->log_errors ? LOG_ERRORS_ENABLE : LOG_ERRORS_DISABLE);
-#else
-                                                                         
pxfsstate->options->log_errors);
-#endif
 
                cstate->cdbsreh->relid = RelationGetRelid(pxfsstate->relation);
        }
@@ -859,7 +784,7 @@ static void
 InitCopyStateForModify(PxfFdwModifyState *pxfmstate)
 {
        List       *copy_options;
-       CopyState       cstate;
+       CopyToState     cstate;
 
        copy_options = pxfmstate->options->copy_options;
 
@@ -868,16 +793,12 @@ InitCopyStateForModify(PxfFdwModifyState *pxfmstate)
        /*
         * Create CopyState from FDW options.  We always acquire all columns to 
match the expected ScanTupleSlot signature.
         */
-       cstate = BeginCopyTo(pxfmstate->relation, copy_options);
+       cstate = BeginCopyToModify(pxfmstate->relation, copy_options);
 
        /* Initialize 'out_functions', like CopyTo() would. */
 
        TupleDesc       tupDesc = RelationGetDescr(pxfmstate->relation);
-#if PG_VERSION_NUM >= 90600
        Form_pg_attribute attr = tupDesc->attrs;
-#else
-       Form_pg_attribute *attr = tupDesc->attrs;
-#endif
        int                     num_phys_attrs = tupDesc->natts;
 
        cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * 
sizeof(FmgrInfo));
@@ -889,11 +810,7 @@ InitCopyStateForModify(PxfFdwModifyState *pxfmstate)
                Oid                     out_func_oid;
                bool            isvarlena;
 
-#if PG_VERSION_NUM >= 90600
                getTypeOutputInfo(attr[attnum - 1].atttypid,
-#else
-               getTypeOutputInfo(attr[attnum - 1]->atttypid,
-#endif
                                                  &out_func_oid,
                                                  &isvarlena);
                fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
@@ -920,14 +837,14 @@ InitCopyStateForModify(PxfFdwModifyState *pxfmstate)
 /*
  * Set up CopyState for writing to a foreign table.
  */
-static CopyState
-BeginCopyTo(Relation forrel, List *options)
+static CopyToState
+BeginCopyToModify(Relation forrel, List *options)
 {
-       CopyState       cstate;
+       CopyToState     cstate;
 
        Assert(forrel->rd_rel->relkind == RELKIND_FOREIGN_TABLE);
 
-       cstate = BeginCopyToForeignTable(forrel, options);
+       cstate = BeginCopy(NULL, forrel, NULL, forrel->rd_id, NIL, options, 
NULL);
        cstate->dispatch_mode = COPY_DIRECT;
 
        /*
@@ -940,16 +857,29 @@ BeginCopyTo(Relation forrel, List *options)
         * Some more initialization, that in the normal COPY TO codepath, is 
done
         * in CopyTo() itself.
         */
-       cstate->null_print_client = cstate->null_print; /* default */
+       cstate->opts.null_print_client = cstate->opts.null_print; /* default */
        if (cstate->need_transcoding)
-               cstate->null_print_client = 
pg_server_to_custom(cstate->null_print,
-                                                                               
                                cstate->null_print_len,
-                                                                               
                                cstate->file_encoding,
-                                                                               
                                cstate->enc_conversion_proc);
+               cstate->opts.null_print_client = 
pg_server_to_any(cstate->opts.null_print,
+                                                                               
                                  cstate->opts.null_print_len,
+                                                                               
                                  cstate->opts.file_encoding);
 
        return cstate;
 }
 
+/*
+ * Clean up storage and release resources for COPY TO.
+ */
+static void
+EndCopyModify(CopyToState cstate)
+{
+       /* No COPY TO related resources except memory. */
+       Assert(!cstate->is_program);
+       Assert(cstate->filename == NULL);
+
+       MemoryContextDelete(cstate->copycontext);
+       pfree(cstate);
+}
+
 /*
  * PXF specific error context callback for "begin foreign scan" operation.
  * It replaces the "COPY" term in the error message context with
@@ -985,13 +915,13 @@ void
 PxfCopyFromErrorCallback(void *arg)
 {
     PxfFdwScanState *pxfsstate = (PxfFdwScanState *) arg;
-    CopyState  cstate = pxfsstate->cstate;
+       CopyFromState   cstate = pxfsstate->cstate;
     char               curlineno_str[32];
 
     snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
              cstate->cur_lineno);
 
-    if (cstate->binary)
+    if (cstate->opts.binary)
     {
         /* can't usefully display the data */
         if (cstate->cur_attname)
@@ -1051,8 +981,7 @@ PxfCopyFromErrorCallback(void *arg)
              * it, and at present there's no way to regurgitate it without
              * conversion. So we have to punt and just report the line number.
              */
-            else if (cstate->line_buf_valid &&
-                (cstate->line_buf_converted || !cstate->need_transcoding))
+            else if (cstate->line_buf_valid && !cstate->need_transcoding)
             {
                 char      *lineval;
 
diff --git a/fdw/pxf_fdw.h b/fdw/pxf_fdw.h
index c7067cc9..baae7a24 100644
--- a/fdw/pxf_fdw.h
+++ b/fdw/pxf_fdw.h
@@ -10,13 +10,8 @@
 
 #include "access/formatter.h"
 #include "commands/copy.h"
-#if PG_VERSION_NUM >= 90600
 #include "nodes/pathnodes.h"
-#endif
 #include "nodes/pg_list.h"
-#if PG_VERSION_NUM < 90600
-#include "nodes/relation.h"
-#endif
 #include "utils/rel.h"
 
 #ifndef PXF_FDW_H
diff --git a/fdw/pxf_filter.c b/fdw/pxf_filter.c
index 79b90b9b..b57575cf 100644
--- a/fdw/pxf_filter.c
+++ b/fdw/pxf_filter.c
@@ -178,7 +178,7 @@ dbop_pxfop_map pxf_supported_opr_op_expr[] =
        {85 /* boolne */ , PXFOP_NE},
 
        /* bpchar */
-       {BPCharEqualOperator /* bpchareq */ , PXFOP_EQ},
+       {BpcharEqualOperator /* bpchareq */ , PXFOP_EQ},
        {1058 /* bpcharlt */ , PXFOP_LT},
        {1060 /* bpchargt */ , PXFOP_GT},
        {1059 /* bpcharle */ , PXFOP_LE},
@@ -247,7 +247,7 @@ dbop_pxfop_array_map 
pxf_supported_opr_scalar_array_op_expr[] =
        {1120 /* float48eq */ , PXFOP_IN, true},
 
        /* bpchar */
-       {BPCharEqualOperator /* bpchareq */ , PXFOP_IN,
+       {BpcharEqualOperator /* bpchareq */ , PXFOP_IN,
        true},
 };
 
diff --git a/fdw/pxf_header.c b/fdw/pxf_header.c
index b0175d25..1d018a84 100644
--- a/fdw/pxf_header.c
+++ b/fdw/pxf_header.c
@@ -20,13 +20,8 @@
 #include "pxf_filter.h"
 #include "pxf_header.h"
 
-#if PG_VERSION_NUM >= 90600
 #include "access/external.h"
 #include "access/url.h"
-#else
-#include "access/fileam.h"
-#include "catalog/pg_exttable.h"
-#endif
 #include "cdb/cdbvars.h"
 #include "commands/defrem.h"
 #include "catalog/pg_namespace.h"
diff --git a/fdw/pxf_option.c b/fdw/pxf_option.c
index a0d62055..13ccfe92 100644
--- a/fdw/pxf_option.c
+++ b/fdw/pxf_option.c
@@ -379,11 +379,7 @@ ValidateCopyOptions(List *options_list, Oid catalog)
        /*
         * Apply the core COPY code's validation logic for more checks.
         */
-#if PG_VERSION_NUM >= 90600
-       ProcessCopyOptions(NULL, NULL, true, copy_options);
-#else
-       ProcessCopyOptions(NULL, true, copy_options, 0, true);
-#endif
+       ProcessCopyOptions(NULL, NULL, true, copy_options, InvalidOid);
 
        PG_RETURN_VOID();
 }
@@ -491,7 +487,7 @@ PxfGetOptions(Oid foreigntableid)
        /*
         * The source/target encoding is the same for TEXT/CSV wire format
         */
-       opt->data_encoding = encoding;
+       opt->data_encoding = encoding ? pstrdup(encoding) : NULL;
        opt->database_encoding = GetDatabaseEncodingName();
 
        /* The profile corresponds to protocol[:format] */
@@ -506,11 +502,7 @@ PxfGetOptions(Oid foreigntableid)
                /* default wire_format is CSV */
                wireFormat = (Node *)makeString(FDW_OPTION_WIRE_FORMAT_CSV);
 
-#if PG_VERSION_NUM >= 90600
        copy_options = lappend(copy_options, 
makeDefElem(FDW_COPY_OPTION_FORMAT, wireFormat, -1));
-#else
-       copy_options = lappend(copy_options, 
makeDefElem(FDW_COPY_OPTION_FORMAT, wireFormat));
-#endif
 
        opt->copy_options = copy_options;
        opt->options = other_options;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to