This is an automated email from the ASF dual-hosted git repository.
djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry-pxf.git
The following commit(s) were added to refs/heads/main by this push:
new e991beb1 feat: Re-adapt cloudberry/pxf_fdw to Cloudberry (#31)
e991beb1 is described below
commit e991beb1b3d692aa742c2729e47773e3c778f14e
Author: liuxiaoyu <[email protected]>
AuthorDate: Wed Jan 28 11:05:08 2026 +0800
feat: Re-adapt cloudberry/pxf_fdw to Cloudberry (#31)
Co-authored-by: Nikolay Antonov <[email protected]>
---
.github/workflows/pxf-ci.yml | 2 +
Makefile | 48 ++++---
.../features/writable/HdfsWritableTextTest.java | 2 +
.../ubuntu/script/build_cloudberrry.sh | 2 +-
.../ubuntu/script/build_cloudberry_deb.sh | 2 +-
ci/docker/pxf-cbdb-dev/ubuntu/script/run_tests.sh | 32 ++++-
fdw/README.md | 8 +-
fdw/libchurl.c | 22 +--
fdw/pxf_bridge.c | 24 ----
fdw/pxf_bridge.h | 17 +--
fdw/pxf_fdw.c | 155 ++++++---------------
fdw/pxf_fdw.h | 5 -
fdw/pxf_filter.c | 4 +-
fdw/pxf_header.c | 5 -
fdw/pxf_option.c | 12 +-
15 files changed, 117 insertions(+), 223 deletions(-)
diff --git a/.github/workflows/pxf-ci.yml b/.github/workflows/pxf-ci.yml
index a3b55681..795345ad 100644
--- a/.github/workflows/pxf-ci.yml
+++ b/.github/workflows/pxf-ci.yml
@@ -103,6 +103,7 @@ jobs:
test_group:
- cli
- external-table
+ - fdw
- server
- sanity
- smoke
@@ -118,6 +119,7 @@ jobs:
- s3
- features
- gpdb
+ - gpdb_fdw
- load
- pxf_extension
steps:
diff --git a/Makefile b/Makefile
index 1014689f..489a3c8a 100644
--- a/Makefile
+++ b/Makefile
@@ -6,8 +6,8 @@ export PXF_MODULES
PXF_VERSION ?= $(shell cat version)
export PXF_VERSION
-FDW_SUPPORT = $(shell $(PG_CONFIG) --version | egrep "PostgreSQL 1[2-5]")
-FDW_SUPPORT =
+export SKIP_EXTERNAL_TABLE_BUILD_REASON
+export SKIP_FDW_BUILD_REASON
SOURCE_EXTENSION_DIR = external-table
TARGET_EXTENSION_DIR = gpextable
@@ -24,29 +24,31 @@ all: extensions cli server
extensions: external-table fdw
-external-table cli server:
+external-table:
+ifeq ($(SKIP_EXTERNAL_TABLE_BUILD_REASON),)
@echo "===> Compiling [$@] module <==="
make -C $@
+else
+ @echo "Skipping building external-table extension because
$(SKIP_EXTERNAL_TABLE_BUILD_REASON)"
+endif
-fdw:
-ifneq ($(FDW_SUPPORT),)
+ifeq ($(SKIP_FDW_BUILD_REASON),)
+ @echo "===> Compiling [$@] module <==="
make -C fdw
+else
+ @echo "Skipping building FDW extension because $(SKIP_FDW_BUILD_REASON)"
endif
-cli:
- make -C cli
-
-server:
- make -C server
+cli server:
+ @echo "===> Compiling [$@] module <==="
+ make -C $@
clean:
rm -rf build
make -C $(SOURCE_EXTENSION_DIR) clean-all
make -C cli clean
make -C server clean
-ifneq ($(FDW_SUPPORT),)
make -C fdw clean
-endif
test:
ifeq ($(SKIP_FDW_BUILD_REASON),)
@@ -61,18 +63,30 @@ it:
make -C automation TEST=$(TEST)
install:
- make -C $(SOURCE_EXTENSION_DIR) install
- make -C cli install
- make -C server install
-ifneq ($(FDW_SUPPORT),)
- make -C fdw install
+ifneq ($(SKIP_EXTERNAL_TABLE_BUILD_REASON),)
+ @echo "Skipping installing FDW extension because
$(SKIP_EXTERNAL_TABLE_BUILD_REASON)"
+ $(eval PXF_MODULES := $(filter-out external-table,$(PXF_MODULES)))
+endif
+ifneq ($(SKIP_FDW_BUILD_REASON),)
+ @echo "Skipping installing FDW extension because
$(SKIP_FDW_BUILD_REASON)"
+ $(eval PXF_MODULES := $(filter-out fdw,$(PXF_MODULES)))
endif
+ set -e ;\
+ for module in $${PXF_MODULES[@]}; do \
+ echo "===> Installing [$${module}] module <===" ;\
+ make -C $${module} install ;\
+ done ;\
+ echo "===> PXF installation is complete <==="
install-server:
make -C server install-server
stage:
rm -rf build/stage
+ifneq ($(SKIP_EXTERNAL_TABLE_PACKAGE_REASON),)
+ @echo "Skipping staging FDW extension because
$(SKIP_EXTERNAL_TABLE_PACKAGE_REASON)"
+ $(eval PXF_MODULES := $(filter-out external-table,$(PXF_MODULES)))
+endif
ifneq ($(SKIP_FDW_PACKAGE_REASON),)
@echo "Skipping staging FDW extension because
$(SKIP_FDW_PACKAGE_REASON)"
$(eval PXF_MODULES := $(filter-out fdw,$(PXF_MODULES)))
diff --git
a/automation/src/test/java/org/greenplum/pxf/automation/features/writable/HdfsWritableTextTest.java
b/automation/src/test/java/org/greenplum/pxf/automation/features/writable/HdfsWritableTextTest.java
index fc21a472..22ad01f9 100755
---
a/automation/src/test/java/org/greenplum/pxf/automation/features/writable/HdfsWritableTextTest.java
+++
b/automation/src/test/java/org/greenplum/pxf/automation/features/writable/HdfsWritableTextTest.java
@@ -1,5 +1,6 @@
package org.greenplum.pxf.automation.features.writable;
+import annotations.FailsWithFDW;
import annotations.SkipForFDW;
import annotations.WorksWithFDW;
import org.apache.commons.lang.StringUtils;
@@ -75,6 +76,7 @@ public class HdfsWritableTextTest extends BaseWritableFeature
{
*
* @throws Exception if test fails to run
*/
+ @FailsWithFDW
@Test(groups = {"features", "gpdb", "security"})
public void textFormatInsertNoProfile() throws Exception {
diff --git a/ci/docker/pxf-cbdb-dev/ubuntu/script/build_cloudberrry.sh
b/ci/docker/pxf-cbdb-dev/ubuntu/script/build_cloudberrry.sh
index f0caad50..993a2100 100755
--- a/ci/docker/pxf-cbdb-dev/ubuntu/script/build_cloudberrry.sh
+++ b/ci/docker/pxf-cbdb-dev/ubuntu/script/build_cloudberrry.sh
@@ -100,7 +100,7 @@ cd ~/workspace/cloudberry
--enable-orafce \
--enable-orca \
--disable-pax \
- --enable-pxf \
+ --disable-pxf \
--enable-tap-tests \
--with-gssapi \
--with-ldap \
diff --git a/ci/docker/pxf-cbdb-dev/ubuntu/script/build_cloudberry_deb.sh
b/ci/docker/pxf-cbdb-dev/ubuntu/script/build_cloudberry_deb.sh
index eb3b8d87..d48720b7 100755
--- a/ci/docker/pxf-cbdb-dev/ubuntu/script/build_cloudberry_deb.sh
+++ b/ci/docker/pxf-cbdb-dev/ubuntu/script/build_cloudberry_deb.sh
@@ -28,7 +28,7 @@ cd "${CLOUDBERRY_SRC}"
--enable-orafce \
--enable-orca \
--disable-pax \
- --enable-pxf \
+ --disable-pxf \
--enable-tap-tests \
--with-gssapi \
--with-ldap \
diff --git a/ci/docker/pxf-cbdb-dev/ubuntu/script/run_tests.sh
b/ci/docker/pxf-cbdb-dev/ubuntu/script/run_tests.sh
index d57b69c5..4f5c6e35 100755
--- a/ci/docker/pxf-cbdb-dev/ubuntu/script/run_tests.sh
+++ b/ci/docker/pxf-cbdb-dev/ubuntu/script/run_tests.sh
@@ -549,6 +549,7 @@ feature_test(){
}
gpdb_test() {
+ local use_fdw="$1"
export PROTOCOL=HDFS
export PXF_HOME=${PXF_HOME:-/usr/local/pxf}
export PATH="${PXF_HOME}/bin:${PATH}"
@@ -570,10 +571,20 @@ gpdb_test() {
# Ensure PXF points to local HDFS/Hive/HBase configs
configure_pxf_default_hdfs_server
- echo "[run_tests] Starting GROUP=gpdb"
- make GROUP="gpdb" || true
- save_test_reports "gpdb"
- echo "[run_tests] GROUP=gpdb finished"
+ local extra_args=""
+ if [[ "$use_fdw" == "true" ]]; then
+ extra_args="USE_FDW=true"
+ else
+ extra_args="USE_FDW=false"
+ fi
+ echo "[run_tests] Starting GROUP=gpdb $extra_args"
+ make GROUP="gpdb" $extra_args || true
+ if [[ "$use_fdw" == "true" ]]; then
+ save_test_reports "gpdb_fdw"
+ else
+ save_test_reports "gpdb"
+ fi
+ echo "[run_tests] GROUP=gpdb $extra_args finished"
}
pxf_extension_test(){
@@ -709,7 +720,7 @@ generate_test_summary() {
local group=$(basename "$group_dir")
# Skip if it's not a test group directory
- [[ "$group" =~
^(smoke|hcatalog|hcfs|hdfs|hive|gpdb|sanity|hbase|profile|jdbc|proxy|unused|s3|features|load|performance|pxfExtensionVersion2|pxfExtensionVersion2_1|pxfFdwExtensionVersion1|pxfFdwExtensionVersion2)$
]] || continue
+ [[ "$group" =~
^(smoke|hcatalog|hcfs|hdfs|hive|gpdb|sanity|hbase|profile|jdbc|proxy|unused|s3|features|load|performance|pxfExtensionVersion2|pxfExtensionVersion2_1|pxfFdwExtensionVersion1|pxfFdwExtensionVersion2|fdw|gpdb_fdw)$
]] || continue
echo "Processing $group test reports from $group_dir"
@@ -841,6 +852,13 @@ run_single_group() {
cd "${REPO_ROOT}/external-table"
make installcheck
;;
+ fdw)
+ cd "${REPO_ROOT}/fdw"
+ make test
+ ;;
+ gpdb_fdw)
+ gpdb_test "true"
+ ;;
server)
cd "${REPO_ROOT}/server"
./gradlew test
@@ -873,7 +891,7 @@ run_single_group() {
feature_test
;;
gpdb)
- gpdb_test
+ gpdb_test "false"
;;
pxf_extension)
pxf_extension_test
@@ -898,7 +916,7 @@ run_single_group() {
;;
*)
echo "Unknown test group: $group"
- echo "Available groups: cli, external-table, server, sanity, smoke,
hdfs, hcatalog, hcfs, hive, hbase, profile, jdbc, proxy, unused, s3, features,
gpdb, load, performance, bench, pxf_extension"
+ echo "Available groups: cli, external-table, fdw, server, sanity, smoke,
hdfs, hcatalog, hcfs, hive, hbase, profile, jdbc, proxy, unused, s3, features,
gpdb, gpdb_fdw, load, performance, bench, pxf_extension"
exit 1
;;
esac
diff --git a/fdw/README.md b/fdw/README.md
index a10f15c3..a0e546c5 100644
--- a/fdw/README.md
+++ b/fdw/README.md
@@ -1,6 +1,6 @@
-# PXF Foreign Data Wrapper for Greenplum and PostgreSQL
+# PXF Foreign Data Wrapper for Cloudberry
-This Greenplum extension implements a Foreign Data Wrapper (FDW) for PXF.
+This Cloudberry extension implements a Foreign Data Wrapper (FDW) for PXF.
PXF is a query federation engine that accesses data residing in external
systems
such as Hadoop, Hive, HBase, relational databases, S3, Google Cloud Storage,
@@ -10,9 +10,9 @@ among other external systems.
## Compile
-To compile the PXF foreign data wrapper, we need a Greenplum 6+ installation
and libcurl.
+To compile the PXF foreign data wrapper, we need a Cloudberry installation and
libcurl.
- export PATH=/usr/local/greenplum-db/bin/:$PATH
+ export PATH=/usr/local/cloudberry-db/bin/:$PATH
make
diff --git a/fdw/libchurl.c b/fdw/libchurl.c
index 6b2fa93a..3003d4fe 100644
--- a/fdw/libchurl.c
+++ b/fdw/libchurl.c
@@ -16,19 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-#if PG_VERSION_NUM >= 90600
#include "postgres.h"
-#endif
-
#include "libchurl.h"
-#if PG_VERSION_NUM >= 90600
#include "lib/stringinfo.h"
-#endif
#include "miscadmin.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
-#include "utils/jsonapi.h"
+#include "common/jsonapi.h"
+#include "mb/pg_wchar.h"
/* include libcurl without typecheck.
* This allows wrapping curl_easy_setopt to be wrapped
@@ -98,13 +94,6 @@ typedef struct churl_settings
struct curl_slist *headers;
} churl_settings;
-/* the null action object used for pure validation */
-static JsonSemAction nullSemAction =
-{
- NULL, NULL, NULL, NULL, NULL,
- NULL, NULL, NULL, NULL, NULL
-};
-
churl_context *churl_new_context(void);
static void create_curl_handle(churl_context *context);
static void set_curl_option(churl_context *context, CURLoption
option, const void *data);
@@ -864,13 +853,8 @@ fill_internal_buffer(churl_context *context, int want)
if (errno == EINTR || errno == EAGAIN)
continue;
-#if PG_VERSION_NUM >= 90600
elog(ERROR, "internal error: select failed on
curl_multi_fdset (maxfd %d) (%d - %m)",
maxfd, save_errno);
-#else
- elog(ERROR, "internal error: select failed on
curl_multi_fdset (maxfd %d) (%d - %s)",
- maxfd, save_errno, strerror(errno));
-#endif
}
multi_perform(context);
}
@@ -1056,7 +1040,7 @@ IsValidJson(text *json)
PG_TRY();
{
/* validate it */
- lex = makeJsonLexContext(json, false);
+ lex = makeJsonLexContextCstringLen(text_to_cstring(json),
VARSIZE_ANY_EXHDR(json), GetDatabaseEncoding(), false);
pg_parse_json(lex, &nullSemAction);
}
PG_CATCH();
diff --git a/fdw/pxf_bridge.c b/fdw/pxf_bridge.c
index b56a2510..567665ac 100644
--- a/fdw/pxf_bridge.c
+++ b/fdw/pxf_bridge.c
@@ -27,11 +27,7 @@
/* helper function declarations */
static void BuildUriForRead(PxfFdwScanState *pxfsstate);
static void BuildUriForWrite(PxfFdwModifyState *pxfmstate);
-#if PG_VERSION_NUM >= 90600
static size_t FillBuffer(PxfFdwScanState *pxfsstate, char *start, int minlen,
int maxlen);
-#else
-static size_t FillBuffer(PxfFdwScanState *pxfsstate, char *start, size_t size);
-#endif
/*
* Clean up churl related data structures from the PXF FDW modify state.
@@ -102,20 +98,12 @@ PxfBridgeExportStart(PxfFdwModifyState *pxfmstate)
* Reads data from the PXF server into the given buffer of a given size
*/
int
-#if PG_VERSION_NUM >= 90600
PxfBridgeRead(void *outbuf, int minlen, int maxlen, void *extra)
-#else
-PxfBridgeRead(void *outbuf, int datasize, void *extra)
-#endif
{
size_t n = 0;
PxfFdwScanState *pxfsstate = (PxfFdwScanState *) extra;
-#if PG_VERSION_NUM >= 90600
n = FillBuffer(pxfsstate, outbuf, minlen, maxlen);
-#else
- n = FillBuffer(pxfsstate, outbuf, datasize);
-#endif
if (n == 0)
{
@@ -176,28 +164,16 @@ BuildUriForWrite(PxfFdwModifyState *pxfmstate)
* Read data from churl until the buffer is full or there is no more data to
be read
*/
static size_t
-#if PG_VERSION_NUM >= 90600
FillBuffer(PxfFdwScanState *pxfsstate, char *start, int minlen, int maxlen)
-#else
-FillBuffer(PxfFdwScanState *pxfsstate, char *start, size_t size)
-#endif
{
size_t n = 0;
char *ptr = start;
-#if PG_VERSION_NUM >= 90600
char *minend = ptr + minlen;
char *maxend = ptr + maxlen;
while (ptr < minend)
{
n = churl_read(pxfsstate->churl_handle, ptr, maxend - ptr);
-#else
- char *end = ptr + size;
-
- while (ptr < end)
- {
- n = churl_read(pxfsstate->churl_handle, ptr, end - ptr);
-#endif
if (n == 0)
break;
diff --git a/fdw/pxf_bridge.h b/fdw/pxf_bridge.h
index b8a866a4..e5e53610 100644
--- a/fdw/pxf_bridge.h
+++ b/fdw/pxf_bridge.h
@@ -44,14 +44,10 @@ typedef struct PxfFdwScanState
StringInfoData uri;
Relation relation;
char *filter_str;
-#if PG_VERSION_NUM >= 90600
ExprState *quals;
-#else
- List *quals;
-#endif
List *retrieved_attrs;
PxfOptions *options;
- CopyState cstate;
+ CopyFromState cstate;
ProjectionInfo *projectionInfo;
} PxfFdwScanState;
@@ -60,18 +56,13 @@ typedef struct PxfFdwScanState
*/
typedef struct PxfFdwModifyState
{
- CopyState cstate; /* state of writing to PXF */
+ CopyToState cstate; /* state of writing to PXF */
CHURL_HANDLE churl_handle; /* curl handle */
CHURL_HEADERS churl_headers; /* curl headers */
StringInfoData uri; /* rest endpoint URI for modify
*/
Relation relation;
PxfOptions *options; /* FDW options */
-
-#if PG_VERSION_NUM < 90600
- Datum *values; /* List of values exported for
the row */
- bool *nulls; /* List of null fields for the
exported row */
-#endif
} PxfFdwModifyState;
/* Clean up churl related data structures from the context */
@@ -84,11 +75,7 @@ void PxfBridgeImportStart(PxfFdwScanState
*pxfsstate);
void PxfBridgeExportStart(PxfFdwModifyState *pxfmstate);
/* Reads data from the PXF server into the given buffer of a given size */
-#if PG_VERSION_NUM >= 90600
int PxfBridgeRead(void *outbuf, int minlen, int maxlen,
void *extra);
-#else
-int PxfBridgeRead(void *outbuf, int datasize, void *extra);
-#endif
/* Writes data from the given buffer of a given size to the PXF server */
int PxfBridgeWrite(PxfFdwModifyState *context, char
*databuf, int datalen);
diff --git a/fdw/pxf_fdw.c b/fdw/pxf_fdw.c
index a4a1cc10..645a1abc 100644
--- a/fdw/pxf_fdw.c
+++ b/fdw/pxf_fdw.c
@@ -13,27 +13,22 @@
#include "pxf_filter.h"
#include "access/reloptions.h"
-#if PG_VERSION_NUM >= 90600
#include "access/table.h"
-#endif
#include "cdb/cdbsreh.h"
#include "cdb/cdbvars.h"
#include "commands/copy.h"
+#include "commands/copyfrom_internal.h"
+#include "commands/copyto_internal.h"
#include "commands/defrem.h"
#include "commands/explain.h"
#include "foreign/fdwapi.h"
#include "foreign/foreign.h"
#include "nodes/pg_list.h"
-#if PG_VERSION_NUM >= 90600
#include "optimizer/optimizer.h"
-#endif
#include "optimizer/paths.h"
#include "optimizer/pathnode.h"
#include "optimizer/planmain.h"
#include "optimizer/restrictinfo.h"
-#if PG_VERSION_NUM < 90600
-#include "optimizer/var.h"
-#endif
#include "parser/parsetree.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -61,15 +56,7 @@ PG_FUNCTION_INFO_V1(pxf_fdw_handler);
static void pxfGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid
foreigntableid);
static void pxfGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid
foreigntableid);
-
-#if (PG_VERSION_NUM <= 90500)
-
-static ForeignScan *pxfGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel,
Oid foreigntableid, ForeignPath *best_path, List *tlist,
- List
*scan_clauses);
-
-#else
static ForeignScan *pxfGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel,
Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses,
Plan *outer_plan);
-#endif
static void pxfExplainForeignScan(ForeignScanState *node, ExplainState *es);
@@ -101,7 +88,8 @@ static PxfFdwModifyState *InitForeignModify(Relation
relation);
static void FinishForeignModify(PxfFdwModifyState *pxfmstate);
static void InitCopyState(PxfFdwScanState *pxfsstate);
static void InitCopyStateForModify(PxfFdwModifyState *pxfmstate);
-static CopyState BeginCopyTo(Relation forrel, List *options);
+static CopyToState BeginCopyToModify(Relation forrel, List *options);
+static void EndCopyModify(CopyToState cstate);
static void PxfBeginScanErrorCallback(void *arg);
static void PxfCopyFromErrorCallback(void *arg);
@@ -146,9 +134,7 @@ pxf_fdw_handler(PG_FUNCTION_ARGS)
* taken
*/
fdw_routine->PlanForeignModify = NULL;
-#if PG_VERSION_NUM >= 120000
fdw_routine->BeginForeignInsert = pxfBeginForeignInsert;
-#endif
fdw_routine->BeginForeignModify = pxfBeginForeignModify;
fdw_routine->ExecForeignInsert = pxfExecForeignInsert;
@@ -158,9 +144,7 @@ pxf_fdw_handler(PG_FUNCTION_ARGS)
*/
fdw_routine->ExecForeignUpdate = NULL;
fdw_routine->ExecForeignDelete = NULL;
-#if PG_VERSION_NUM >= 120000
fdw_routine->EndForeignInsert = pxfEndForeignInsert;
-#endif
fdw_routine->EndForeignModify = pxfEndForeignModify;
fdw_routine->IsForeignRelUpdatable = pxfIsForeignRelUpdatable;
@@ -224,11 +208,7 @@ pxfGetForeignRelSize(PlannerInfo *root, RelOptInfo
*baserel, Oid foreigntableid)
*/
RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
-#if PG_VERSION_NUM >= 90600
rel = table_open(rte->relid, NoLock);
-#else
- rel = heap_open(rte->relid, NoLock);
-#endif
/*
* Identify which baserestrictinfo clauses can be sent to the remote
@@ -241,11 +221,7 @@ pxfGetForeignRelSize(PlannerInfo *root, RelOptInfo
*baserel, Oid foreigntableid)
* Identify which attributes will need to be retrieved from the remote
* server
*/
-#if (PG_VERSION_NUM <= 90500)
- pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
&fpinfo->attrs_used);
-#else
pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
&fpinfo->attrs_used);
-#endif
foreach(lc, fpinfo->local_conds)
{
RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
@@ -255,7 +231,7 @@ pxfGetForeignRelSize(PlannerInfo *root, RelOptInfo
*baserel, Oid foreigntableid)
deparseTargetList(rel, fpinfo->attrs_used, &fpinfo->retrieved_attrs);
- heap_close(rel, NoLock);
+ table_close(rel, NoLock);
/* Use an artificial number of estimated rows */
baserel->rows = 1000;
@@ -279,19 +255,14 @@ pxfGetForeignPaths(PlannerInfo *root,
elog(DEBUG5, "pxf_fdw: pxfGetForeignPaths starts on segment: %d",
PXF_SEGMENT_ID);
-
path = create_foreignscan_path(root, baserel,
-#if PG_VERSION_NUM >= 90600
NULL,
/* default pathtarget */
-#endif
baserel->rows,
DEFAULT_PXF_FDW_STARTUP_COST,
total_cost,
NIL, /* no
pathkeys */
NULL,
/* no outer rel either */
-#if PG_VERSION_NUM >= 90500
NULL,
/* no extra plan */
-#endif
fpinfo->retrieved_attrs);
@@ -299,7 +270,7 @@ pxfGetForeignPaths(PlannerInfo *root,
/*
* Create a ForeignPath node and add it as only possible path.
*/
- add_path(baserel, (Path *) path);
+ add_path(baserel, (Path *) path, root);
elog(DEBUG5, "pxf_fdw: pxfGetForeignPaths ends on segment: %d",
PXF_SEGMENT_ID);
}
@@ -308,7 +279,6 @@ pxfGetForeignPaths(PlannerInfo *root,
* GetForeignPlan
* create a ForeignScan plan node
*/
-#if PG_VERSION_NUM >= 90500
static ForeignScan *
pxfGetForeignPlan(PlannerInfo *root,
RelOptInfo *baserel,
@@ -317,15 +287,6 @@ pxfGetForeignPlan(PlannerInfo *root,
List *tlist,
List *scan_clauses,
Plan *outer_plan)
-#else
-static ForeignScan *
-pxfGetForeignPlan(PlannerInfo *root,
- RelOptInfo *baserel,
- Oid foreigntableid,
- ForeignPath *best_path,
- List *tlist, /* target list */
- List *scan_clauses)
-#endif
{
char *where_clauses_str = NULL;
List *fdw_private;
@@ -363,12 +324,10 @@ pxfGetForeignPlan(PlannerInfo *root,
scan_clauses,
scan_relid,
NIL, /* no
expressions to evaluate */
- fdw_private
-#if PG_VERSION_NUM >= 90500
- ,NIL
- ,NIL
- ,outer_plan
-#endif
+ fdw_private,
+ NIL,
+ NIL,
+ outer_plan
);
}
@@ -403,11 +362,7 @@ pxfBeginForeignScan(ForeignScanState *node, int eflags)
if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
return;
-#if PG_VERSION_NUM >= 90600
ExprState *quals = node->ss.ps.qual;
-#else
- List *quals = node->ss.ps.qual;
-#endif
Oid foreigntableid =
RelationGetRelid(node->ss.ss_currentRelation);
PxfFdwScanState *pxfsstate = NULL;
Relation relation = node->ss.ss_currentRelation;
@@ -493,15 +448,8 @@ pxfIterateForeignScan(ForeignScanState *node)
found = NextCopyFrom(pxfsstate->cstate,
NULL,
-#if PG_VERSION_NUM >= 90600
slot->tts_values,
- slot->tts_isnull
-#else
- slot_get_values(slot),
- slot_get_isnull(slot),
- NULL
-#endif
- );
+ slot->tts_isnull);
if (found)
{
@@ -644,10 +592,6 @@ InitForeignModify(Relation relation)
initStringInfo(&pxfmstate->uri);
pxfmstate->relation = relation;
pxfmstate->options = options;
-#if PG_VERSION_NUM < 90600
- pxfmstate->values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
- pxfmstate->nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
-#endif
InitCopyStateForModify(pxfmstate);
@@ -678,22 +622,11 @@ pxfExecForeignInsert(EState *estate,
resultRelInfo->ri_FdwState = pxfmstate;
}
- CopyState cstate = pxfmstate->cstate;
-#if PG_VERSION_NUM < 90600
- Relation relation = resultRelInfo->ri_RelationDesc;
- TupleDesc tupDesc = RelationGetDescr(relation);
- HeapTuple tuple = ExecMaterializeSlot(slot);
- Datum *values = pxfmstate->values;
- bool *nulls = pxfmstate->nulls;
-
- heap_deform_tuple(tuple, tupDesc, values, nulls);
- CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
-#else
+ CopyToState cstate = pxfmstate->cstate;
/* TEXT or CSV */
slot_getallattrs(slot);
CopyOneRowTo(cstate, slot);
-#endif
CopySendEndOfRow(cstate);
StringInfo fe_msgbuf = cstate->fe_msgbuf;
@@ -754,7 +687,7 @@ FinishForeignModify(PxfFdwModifyState *pxfmstate)
if (pxfmstate == NULL)
return;
- EndCopyFrom(pxfmstate->cstate);
+ EndCopyModify(pxfmstate->cstate);
pxfmstate->cstate = NULL;
PxfBridgeCleanup(pxfmstate);
@@ -781,7 +714,7 @@ pxfIsForeignRelUpdatable(Relation rel)
static void
InitCopyState(PxfFdwScanState *pxfsstate)
{
- CopyState cstate;
+ CopyFromState cstate;
PxfBridgeImportStart(pxfsstate);
@@ -790,19 +723,15 @@ InitCopyState(PxfFdwScanState *pxfsstate)
* as to match the expected ScanTupleSlot signature.
*/
cstate = BeginCopyFrom(
-#if PG_VERSION_NUM >= 90600
NULL,
-#endif
pxfsstate->relation,
NULL,
+ NULL,
false, /* is_program */
&PxfBridgeRead, /*
data_source_cb */
pxfsstate, /*
data_source_cb_extra */
NIL, /* attnamelist */
pxfsstate->options->copy_options /* copy options */
-#if PG_VERSION_NUM < 90600
- ,NIL /* ao_segnos */
-#endif
);
@@ -825,11 +754,7 @@ InitCopyState(PxfFdwScanState *pxfsstate)
pxfsstate->options->is_reject_limit_rows,
pxfsstate->options->resource,
(char
*) cstate->cur_relname,
-#if PG_VERSION_NUM >= 90600
pxfsstate->options->log_errors ? LOG_ERRORS_ENABLE : LOG_ERRORS_DISABLE);
-#else
-
pxfsstate->options->log_errors);
-#endif
cstate->cdbsreh->relid = RelationGetRelid(pxfsstate->relation);
}
@@ -859,7 +784,7 @@ static void
InitCopyStateForModify(PxfFdwModifyState *pxfmstate)
{
List *copy_options;
- CopyState cstate;
+ CopyToState cstate;
copy_options = pxfmstate->options->copy_options;
@@ -868,16 +793,12 @@ InitCopyStateForModify(PxfFdwModifyState *pxfmstate)
/*
* Create CopyState from FDW options. We always acquire all columns to
match the expected ScanTupleSlot signature.
*/
- cstate = BeginCopyTo(pxfmstate->relation, copy_options);
+ cstate = BeginCopyToModify(pxfmstate->relation, copy_options);
/* Initialize 'out_functions', like CopyTo() would. */
TupleDesc tupDesc = RelationGetDescr(pxfmstate->relation);
-#if PG_VERSION_NUM >= 90600
Form_pg_attribute attr = tupDesc->attrs;
-#else
- Form_pg_attribute *attr = tupDesc->attrs;
-#endif
int num_phys_attrs = tupDesc->natts;
cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs *
sizeof(FmgrInfo));
@@ -889,11 +810,7 @@ InitCopyStateForModify(PxfFdwModifyState *pxfmstate)
Oid out_func_oid;
bool isvarlena;
-#if PG_VERSION_NUM >= 90600
getTypeOutputInfo(attr[attnum - 1].atttypid,
-#else
- getTypeOutputInfo(attr[attnum - 1]->atttypid,
-#endif
&out_func_oid,
&isvarlena);
fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
@@ -920,14 +837,14 @@ InitCopyStateForModify(PxfFdwModifyState *pxfmstate)
/*
* Set up CopyState for writing to a foreign table.
*/
-static CopyState
-BeginCopyTo(Relation forrel, List *options)
+static CopyToState
+BeginCopyToModify(Relation forrel, List *options)
{
- CopyState cstate;
+ CopyToState cstate;
Assert(forrel->rd_rel->relkind == RELKIND_FOREIGN_TABLE);
- cstate = BeginCopyToForeignTable(forrel, options);
+ cstate = BeginCopy(NULL, forrel, NULL, forrel->rd_id, NIL, options,
NULL);
cstate->dispatch_mode = COPY_DIRECT;
/*
@@ -940,16 +857,29 @@ BeginCopyTo(Relation forrel, List *options)
* Some more initialization, that in the normal COPY TO codepath, is
done
* in CopyTo() itself.
*/
- cstate->null_print_client = cstate->null_print; /* default */
+ cstate->opts.null_print_client = cstate->opts.null_print; /* default */
if (cstate->need_transcoding)
- cstate->null_print_client =
pg_server_to_custom(cstate->null_print,
-
cstate->null_print_len,
-
cstate->file_encoding,
-
cstate->enc_conversion_proc);
+ cstate->opts.null_print_client =
pg_server_to_any(cstate->opts.null_print,
+
cstate->opts.null_print_len,
+
cstate->opts.file_encoding);
return cstate;
}
+/*
+ * Clean up storage and release resources for COPY TO.
+ */
+static void
+EndCopyModify(CopyToState cstate)
+{
+ /* No COPY TO related resources except memory. */
+ Assert(!cstate->is_program);
+ Assert(cstate->filename == NULL);
+
+ MemoryContextDelete(cstate->copycontext);
+ pfree(cstate);
+}
+
/*
* PXF specific error context callback for "begin foreign scan" operation.
* It replaces the "COPY" term in the error message context with
@@ -985,13 +915,13 @@ void
PxfCopyFromErrorCallback(void *arg)
{
PxfFdwScanState *pxfsstate = (PxfFdwScanState *) arg;
- CopyState cstate = pxfsstate->cstate;
+ CopyFromState cstate = pxfsstate->cstate;
char curlineno_str[32];
snprintf(curlineno_str, sizeof(curlineno_str), UINT64_FORMAT,
cstate->cur_lineno);
- if (cstate->binary)
+ if (cstate->opts.binary)
{
/* can't usefully display the data */
if (cstate->cur_attname)
@@ -1051,8 +981,7 @@ PxfCopyFromErrorCallback(void *arg)
* it, and at present there's no way to regurgitate it without
* conversion. So we have to punt and just report the line number.
*/
- else if (cstate->line_buf_valid &&
- (cstate->line_buf_converted || !cstate->need_transcoding))
+ else if (cstate->line_buf_valid && !cstate->need_transcoding)
{
char *lineval;
diff --git a/fdw/pxf_fdw.h b/fdw/pxf_fdw.h
index c7067cc9..baae7a24 100644
--- a/fdw/pxf_fdw.h
+++ b/fdw/pxf_fdw.h
@@ -10,13 +10,8 @@
#include "access/formatter.h"
#include "commands/copy.h"
-#if PG_VERSION_NUM >= 90600
#include "nodes/pathnodes.h"
-#endif
#include "nodes/pg_list.h"
-#if PG_VERSION_NUM < 90600
-#include "nodes/relation.h"
-#endif
#include "utils/rel.h"
#ifndef PXF_FDW_H
diff --git a/fdw/pxf_filter.c b/fdw/pxf_filter.c
index 79b90b9b..b57575cf 100644
--- a/fdw/pxf_filter.c
+++ b/fdw/pxf_filter.c
@@ -178,7 +178,7 @@ dbop_pxfop_map pxf_supported_opr_op_expr[] =
{85 /* boolne */ , PXFOP_NE},
/* bpchar */
- {BPCharEqualOperator /* bpchareq */ , PXFOP_EQ},
+ {BpcharEqualOperator /* bpchareq */ , PXFOP_EQ},
{1058 /* bpcharlt */ , PXFOP_LT},
{1060 /* bpchargt */ , PXFOP_GT},
{1059 /* bpcharle */ , PXFOP_LE},
@@ -247,7 +247,7 @@ dbop_pxfop_array_map
pxf_supported_opr_scalar_array_op_expr[] =
{1120 /* float48eq */ , PXFOP_IN, true},
/* bpchar */
- {BPCharEqualOperator /* bpchareq */ , PXFOP_IN,
+ {BpcharEqualOperator /* bpchareq */ , PXFOP_IN,
true},
};
diff --git a/fdw/pxf_header.c b/fdw/pxf_header.c
index b0175d25..1d018a84 100644
--- a/fdw/pxf_header.c
+++ b/fdw/pxf_header.c
@@ -20,13 +20,8 @@
#include "pxf_filter.h"
#include "pxf_header.h"
-#if PG_VERSION_NUM >= 90600
#include "access/external.h"
#include "access/url.h"
-#else
-#include "access/fileam.h"
-#include "catalog/pg_exttable.h"
-#endif
#include "cdb/cdbvars.h"
#include "commands/defrem.h"
#include "catalog/pg_namespace.h"
diff --git a/fdw/pxf_option.c b/fdw/pxf_option.c
index a0d62055..13ccfe92 100644
--- a/fdw/pxf_option.c
+++ b/fdw/pxf_option.c
@@ -379,11 +379,7 @@ ValidateCopyOptions(List *options_list, Oid catalog)
/*
* Apply the core COPY code's validation logic for more checks.
*/
-#if PG_VERSION_NUM >= 90600
- ProcessCopyOptions(NULL, NULL, true, copy_options);
-#else
- ProcessCopyOptions(NULL, true, copy_options, 0, true);
-#endif
+ ProcessCopyOptions(NULL, NULL, true, copy_options, InvalidOid);
PG_RETURN_VOID();
}
@@ -491,7 +487,7 @@ PxfGetOptions(Oid foreigntableid)
/*
* The source/target encoding is the same for TEXT/CSV wire format
*/
- opt->data_encoding = encoding;
+ opt->data_encoding = encoding ? pstrdup(encoding) : NULL;
opt->database_encoding = GetDatabaseEncodingName();
/* The profile corresponds to protocol[:format] */
@@ -506,11 +502,7 @@ PxfGetOptions(Oid foreigntableid)
/* default wire_format is CSV */
wireFormat = (Node *)makeString(FDW_OPTION_WIRE_FORMAT_CSV);
-#if PG_VERSION_NUM >= 90600
copy_options = lappend(copy_options,
makeDefElem(FDW_COPY_OPTION_FORMAT, wireFormat, -1));
-#else
- copy_options = lappend(copy_options,
makeDefElem(FDW_COPY_OPTION_FORMAT, wireFormat));
-#endif
opt->copy_options = copy_options;
opt->options = other_options;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]