This is an automated email from the ASF dual-hosted git repository.
djwang pushed a commit to branch sync-with-upstream
in repository https://gitbox.apache.org/repos/asf/cloudberry-gpbackup.git
The following commit(s) were added to refs/heads/sync-with-upstream by this
push:
new 59f10ac6 Merge gpbackup-s3-plugin-archive into plugins/s3plugin
directory
59f10ac6 is described below
commit 59f10ac645528e9c94d963bf3eaca466a30e9ad2
Author: Robert Mu <[email protected]>
AuthorDate: Sun Sep 28 20:58:48 2025 +0800
Merge gpbackup-s3-plugin-archive into plugins/s3plugin directory
* Integrate S3 storage plugin functionality into main repository
* Add gpbackup_s3_plugin executable target to Makefile
* Update build system to include S3 plugin compilation and
installation
* Include comprehensive S3 plugin configuration and usage
documentation
* Add unit tests for S3 plugin functionality
The S3 plugin allows users to backup and restore Cloudberry and
Greenplum databases to Amazon S3 compatible storage systems. This
integration eliminates the need for a separate plugin repository
and simplifies deployment and maintenance.
Previously located at:
https://github.com/greenplum-db/gpbackup-s3-plugin-archive
---
Makefile | 16 +-
go.mod | 7 +
go.sum | 40 +++-
gpbackup_s3_plugin.go | 140 ++++++++++++
plugins/README.md | 2 +-
plugins/s3plugin/README.md | 74 +++++++
plugins/s3plugin/backup.go | 222 +++++++++++++++++++
plugins/s3plugin/restore.go | 357 ++++++++++++++++++++++++++++++
plugins/s3plugin/s3plugin.go | 455 ++++++++++++++++++++++++++++++++++++++
plugins/s3plugin/s3plugin_test.go | 243 ++++++++++++++++++++
10 files changed, 1548 insertions(+), 8 deletions(-)
diff --git a/Makefile b/Makefile
index 50731331..431b7ab6 100644
--- a/Makefile
+++ b/Makefile
@@ -8,15 +8,17 @@ endif
BACKUP=gpbackup
RESTORE=gprestore
HELPER=gpbackup_helper
+S3PLUGIN=gpbackup_s3_plugin
BIN_DIR=$(shell echo $${GOPATH:-~/go} | awk -F':' '{ print $$1 "/bin"}')
GINKGO_FLAGS := -r --keep-going --randomize-suites --randomize-all --no-color
GIT_VERSION := $(shell git describe --tags | perl -pe
's/(.*)-([0-9]*)-(g[0-9a-f]*)/\1+dev.\2.\3/')
BACKUP_VERSION_STR=github.com/apache/cloudberry-backup/backup.version=$(GIT_VERSION)
RESTORE_VERSION_STR=github.com/apache/cloudberry-backup/restore.version=$(GIT_VERSION)
HELPER_VERSION_STR=github.com/apache/cloudberry-backup/helper.version=$(GIT_VERSION)
+S3PLUGIN_VERSION_STR=github.com/apache/cloudberry-backup/plugins/s3plugin.version=$(GIT_VERSION)
# note that /testutils is not a production directory, but has unit tests to
validate testing tools
-SUBDIRS_HAS_UNIT=backup/ filepath/ history/ helper/ options/ report/ restore/
toc/ utils/ testutils/
+SUBDIRS_HAS_UNIT=backup/ filepath/ history/ helper/ options/ report/ restore/
toc/ utils/ testutils/ plugins/s3plugin/
SUBDIRS_ALL=$(SUBDIRS_HAS_UNIT) integration/ end_to_end/
GOLANG_LINTER=$(GOPATH)/bin/golangci-lint
GINKGO=$(GOPATH)/bin/ginkgo
@@ -26,6 +28,7 @@ DEBUG=-gcflags=all="-N -l"
CUSTOM_BACKUP_DIR ?= "/tmp"
helper_path ?= $(BIN_DIR)/$(HELPER)
+s3plugin_path ?= $(BIN_DIR)/$(S3PLUGIN)
# Prefer gpsync as the newer utility, fall back to gpscp if not present (older
installs)
ifeq (, $(shell which gpsync))
@@ -82,26 +85,29 @@ build : $(GOSQLITE)
CGO_ENABLED=1 $(GO_BUILD) -tags '$(BACKUP)' -o
$(BIN_DIR)/$(BACKUP) --ldflags '-X $(BACKUP_VERSION_STR)'
CGO_ENABLED=1 $(GO_BUILD) -tags '$(RESTORE)' -o
$(BIN_DIR)/$(RESTORE) --ldflags '-X $(RESTORE_VERSION_STR)'
CGO_ENABLED=1 $(GO_BUILD) -tags '$(HELPER)' -o
$(BIN_DIR)/$(HELPER) --ldflags '-X $(HELPER_VERSION_STR)'
+ CGO_ENABLED=1 $(GO_BUILD) -tags '$(S3PLUGIN)' -o
$(BIN_DIR)/$(S3PLUGIN) --ldflags '-X $(S3PLUGIN_VERSION_STR)'
debug :
CGO_ENABLED=1 $(GO_BUILD) -tags '$(BACKUP)' -o
$(BIN_DIR)/$(BACKUP) -ldflags "-X $(BACKUP_VERSION_STR)" $(DEBUG)
CGO_ENABLED=1 $(GO_BUILD) -tags '$(RESTORE)' -o
$(BIN_DIR)/$(RESTORE) -ldflags "-X $(RESTORE_VERSION_STR)" $(DEBUG)
CGO_ENABLED=1 $(GO_BUILD) -tags '$(HELPER)' -o
$(BIN_DIR)/$(HELPER) -ldflags "-X $(HELPER_VERSION_STR)" $(DEBUG)
+ CGO_ENABLED=1 $(GO_BUILD) -tags '$(S3PLUGIN)' -o
$(BIN_DIR)/$(S3PLUGIN) -ldflags "-X $(S3PLUGIN_VERSION_STR)" $(DEBUG)
build_linux :
env GOOS=linux GOARCH=amd64 $(GO_BUILD) -tags '$(BACKUP)' -o
$(BACKUP) -ldflags "-X $(BACKUP_VERSION_STR)"
env GOOS=linux GOARCH=amd64 $(GO_BUILD) -tags '$(RESTORE)' -o
$(RESTORE) -ldflags "-X $(RESTORE_VERSION_STR)"
env GOOS=linux GOARCH=amd64 $(GO_BUILD) -tags '$(HELPER)' -o
$(HELPER) -ldflags "-X $(HELPER_VERSION_STR)"
+ env GOOS=linux GOARCH=amd64 $(GO_BUILD) -tags '$(S3PLUGIN)' -o
$(S3PLUGIN) -ldflags "-X $(S3PLUGIN_VERSION_STR)"
install :
cp $(BIN_DIR)/$(BACKUP) $(BIN_DIR)/$(RESTORE) $(GPHOME)/bin
@psql -X -t -d template1 -c 'select distinct hostname from
gp_segment_configuration where content != -1' > /tmp/seg_hosts 2>/dev/null; \
if [ $$? -eq 0 ]; then \
- $(COPYUTIL) -f /tmp/seg_hosts $(helper_path)
=:$(GPHOME)/bin/$(HELPER); \
+ $(COPYUTIL) -f /tmp/seg_hosts $(helper_path)
$(s3plugin_path) =:$(GPHOME)/bin/; \
if [ $$? -eq 0 ]; then \
- echo 'Successfully copied gpbackup_helper to
$(GPHOME) on all segments'; \
+ echo 'Successfully copied gpbackup_helper and
gpbackup_s3_plugin to $(GPHOME) on all segments'; \
else \
- echo 'Failed to copy gpbackup_helper to
$(GPHOME)'; \
+ echo 'Failed to copy gpbackup_helper and
gpbackup_s3_plugin to $(GPHOME)'; \
exit 1; \
fi; \
else \
@@ -112,7 +118,7 @@ install :
clean :
# Build artifacts
- rm -f $(BIN_DIR)/$(BACKUP) $(BACKUP) $(BIN_DIR)/$(RESTORE)
$(RESTORE) $(BIN_DIR)/$(HELPER) $(HELPER)
+ rm -f $(BIN_DIR)/$(BACKUP) $(BACKUP) $(BIN_DIR)/$(RESTORE)
$(RESTORE) $(BIN_DIR)/$(HELPER) $(HELPER) $(BIN_DIR)/$(S3PLUGIN) $(S3PLUGIN)
# Test artifacts
rm -rf /tmp/go-build* /tmp/gexec_artifacts* /tmp/ginkgo*
docker stop s3-minio # stop minio before removing its data
directories
diff --git a/go.mod b/go.mod
index 405dfb04..3adaf23e 100644
--- a/go.mod
+++ b/go.mod
@@ -5,20 +5,24 @@ go 1.21
require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/apache/cloudberry-go-libs
v1.0.12-0.20250910014224-fc376e8a1056
+ github.com/aws/aws-sdk-go v1.44.257
github.com/blang/semver v3.5.1+incompatible
github.com/blang/vfs v1.0.0
+ github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf
github.com/jackc/pgconn v1.14.3
github.com/jmoiron/sqlx v1.3.5
github.com/klauspost/compress v1.15.15
github.com/lib/pq v1.10.7
github.com/mattn/go-sqlite3 v1.14.19
github.com/nightlyone/lockfile v1.0.0
+ github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo/v2 v2.13.0
github.com/onsi/gomega v1.27.10
github.com/pkg/errors v0.9.1
github.com/sergi/go-diff v1.3.1
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
+ github.com/urfave/cli v1.22.13
golang.org/x/sys v0.18.0
golang.org/x/tools v0.12.0
gopkg.in/cheggaaa/pb.v1 v1.0.28
@@ -26,6 +30,7 @@ require (
)
require (
+ github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 //
indirect
@@ -39,9 +44,11 @@ require (
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a //
indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jackc/pgx/v4 v4.18.2 // indirect
+ github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
+ github.com/russross/blackfriday/v2 v2.1.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.23.0 // indirect
diff --git a/go.sum b/go.sum
index e7789e23..ca12ca24 100644
--- a/go.sum
+++ b/go.sum
@@ -1,10 +1,13 @@
github.com/BurntSushi/toml v0.3.1/go.mod
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/BurntSushi/toml v1.2.1/go.mod
h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/DATA-DOG/go-sqlmock v1.5.0
h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod
h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/Masterminds/semver/v3 v3.1.1
h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc=
github.com/Masterminds/semver/v3 v3.1.1/go.mod
h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs=
github.com/apache/cloudberry-go-libs v1.0.12-0.20250910014224-fc376e8a1056
h1:ycrFztmYATpidbSAU1rw60XuhuDxgBHtLD3Sueu947c=
github.com/apache/cloudberry-go-libs
v1.0.12-0.20250910014224-fc376e8a1056/go.mod
h1:lfHWkNYsno/lV+Nee0OoCmlOlBz5yvT6EW8WQEOUI5c=
+github.com/aws/aws-sdk-go v1.44.257
h1:HwelXYZZ8c34uFFhgVw3ybu2gB5fkk8KLj2idTvzZb8=
+github.com/aws/aws-sdk-go v1.44.257/go.mod
h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/blang/semver v3.5.1+incompatible
h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ=
github.com/blang/semver v3.5.1+incompatible/go.mod
h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/blang/vfs v1.0.0 h1:AUZUgulCDzbaNjTRWEP45X7m/J10brAptZpSRKRZBZc=
@@ -16,6 +19,7 @@ github.com/cockroachdb/apd v1.1.0
h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I
github.com/cockroachdb/apd v1.1.0/go.mod
h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod
h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod
h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
+github.com/cpuguy83/go-md2man/v2 v2.0.2
h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod
h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.7/go.mod
h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/davecgh/go-spew v1.1.0/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -45,6 +49,8 @@ github.com/google/renameio v0.1.0/go.mod
h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod
h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.1
h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7Pgzkat/bFNc=
github.com/inconshreveable/mousetrap v1.0.1/go.mod
h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
+github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf
h1:FtEj8sfIcaaBfAKrE1Cwb61YDtYq9JxChK1c7AKce7s=
+github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf/go.mod
h1:yrqSXGoD/4EKfF26AOGzscPOgTTJcyAwM2rpixWT+t4=
github.com/jackc/chunkreader v1.0.0/go.mod
h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo=
github.com/jackc/chunkreader/v2 v2.0.0/go.mod
h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
github.com/jackc/chunkreader/v2 v2.0.1
h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8=
@@ -92,6 +98,10 @@ github.com/jackc/pgx/v4 v4.18.2/go.mod
h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod
h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod
h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.1.3/go.mod
h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
+github.com/jmespath/go-jmespath v0.4.0
h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
+github.com/jmespath/go-jmespath v0.4.0/go.mod
h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
+github.com/jmespath/go-jmespath/internal/testify v1.5.1
h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
+github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod
h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
github.com/jmoiron/sqlx v1.3.5/go.mod
h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
github.com/kisielk/gotool v1.0.0/go.mod
h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
@@ -121,6 +131,7 @@ github.com/mattn/go-isatty v0.0.12/go.mod
h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
github.com/mattn/go-isatty v0.0.16/go.mod
h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.17
h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod
h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
+github.com/mattn/go-runewidth v0.0.9/go.mod
h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.13
h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
github.com/mattn/go-runewidth v0.0.13/go.mod
h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-sqlite3 v1.14.6/go.mod
h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
@@ -128,6 +139,8 @@ github.com/mattn/go-sqlite3 v1.14.19
h1:fhGleo2h1p8tVChob4I9HpmVFIAkKGpiukdrgQbW
github.com/mattn/go-sqlite3 v1.14.19/go.mod
h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/nightlyone/lockfile v1.0.0
h1:RHep2cFKK4PonZJDdEl4GmkabuhbsRMgk/k3uAmxBiA=
github.com/nightlyone/lockfile v1.0.0/go.mod
h1:rywoIealpdNse2r832aiD9jRk8ErCatROs6LzC841CI=
+github.com/olekukonko/tablewriter v0.0.5
h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
+github.com/olekukonko/tablewriter v0.0.5/go.mod
h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo/v2 v2.13.0
h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4=
github.com/onsi/ginkgo/v2 v2.13.0/go.mod
h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o=
github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
@@ -143,6 +156,7 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod
h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.13.0/go.mod
h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
github.com/rs/zerolog v1.15.0/go.mod
h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=
+github.com/russross/blackfriday/v2 v2.1.0
h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod
h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/satori/go.uuid v1.2.0/go.mod
h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8=
@@ -159,14 +173,21 @@ github.com/spf13/pflag v1.0.5/go.mod
h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/objx v0.1.0/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod
h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
+github.com/stretchr/objx v0.4.0/go.mod
h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/objx v0.5.0/go.mod
h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod
h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod
h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.8.1
h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
-github.com/stretchr/testify v1.8.1/go.mod
h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/stretchr/testify v1.7.1/go.mod
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod
h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.8.2
h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
+github.com/stretchr/testify v1.8.2/go.mod
h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
+github.com/urfave/cli v1.22.13 h1:wsLILXG8qCJNse/qAgLNf23737Cx05GflHg/PJGe1Ok=
+github.com/urfave/cli v1.22.13/go.mod
h1:VufqObjsMTF2BBwKawpx9R8eAneNEWhoO0yx8Vd+FkE=
+github.com/yuin/goldmark v1.4.13/go.mod
h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/zenazn/goji v0.9.0/go.mod
h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
go.uber.org/atomic v1.3.2/go.mod
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
@@ -188,11 +209,13 @@ golang.org/x/crypto
v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod
h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod
h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod
h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod
h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod
h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod
h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod
h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod
h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod
h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
@@ -200,9 +223,12 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod
h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod
h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod
h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -216,16 +242,23 @@ golang.org/x/sys
v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod
h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod
h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod
h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod
h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod
h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -235,7 +268,9 @@ golang.org/x/tools
v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw
golang.org/x/tools v0.0.0-20190823170909-c4a336ef6a2f/go.mod
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod
h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
+golang.org/x/tools v0.1.12/go.mod
h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.12.0 h1:YW6HUoUmYBpwSgyaGaZq1fHjrBjX1rlpZ54T6mu2kss=
golang.org/x/tools v0.12.0/go.mod
h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -254,6 +289,7 @@ gopkg.in/cheggaaa/pb.v1 v1.0.28/go.mod
h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qS
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod
h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/gpbackup_s3_plugin.go b/gpbackup_s3_plugin.go
new file mode 100644
index 00000000..b699e715
--- /dev/null
+++ b/gpbackup_s3_plugin.go
@@ -0,0 +1,140 @@
+//go:build gpbackup_s3_plugin
+// +build gpbackup_s3_plugin
+
+package main
+
+import (
+ "fmt"
+ "os"
+
+ "github.com/apache/cloudberry-backup/plugins/s3plugin"
+ "github.com/apache/cloudberry-go-libs/gplog"
+ "github.com/urfave/cli"
+)
+
+func main() {
+ gplog.InitializeLogging("gpbackup_s3_plugin", "")
+ app := cli.NewApp()
+ cli.VersionFlag = cli.BoolFlag{
+ Name: "version",
+ Usage: "print version of gpbackup_s3_plugin",
+ }
+ app.Version = s3plugin.Version
+ app.Usage = ""
+ app.UsageText = "Not supported as a standalone utility. " +
+ "This plugin must be used in conjunction with gpbackup and
gprestore."
+
+ app.Commands = []cli.Command{
+ {
+ Name: "setup_plugin_for_backup",
+ Action: s3plugin.SetupPluginForBackup,
+ Before: buildBeforeFunc(3, 4),
+ },
+ {
+ Name: "setup_plugin_for_restore",
+ Action: s3plugin.SetupPluginForRestore,
+ Before: buildBeforeFunc(3, 4),
+ },
+ {
+ Name: "cleanup_plugin_for_backup",
+ Action: s3plugin.CleanupPlugin,
+ Before: buildBeforeFunc(3, 4),
+ },
+ {
+ Name: "cleanup_plugin_for_restore",
+ Action: s3plugin.CleanupPlugin,
+ Before: buildBeforeFunc(3, 4),
+ },
+ {
+ Name: "backup_file",
+ Action: s3plugin.BackupFile,
+ Before: buildBeforeFunc(2),
+ },
+ {
+ Name: "backup_directory",
+ Action: s3plugin.BackupDirectory,
+ Before: buildBeforeFunc(2, 3),
+ Hidden: true,
+ },
+ {
+ Name: "backup_directory_parallel",
+ Action: s3plugin.BackupDirectoryParallel,
+ Before: buildBeforeFunc(2, 3),
+ Hidden: true,
+ },
+ {
+ Name: "restore_file",
+ Action: s3plugin.RestoreFile,
+ Before: buildBeforeFunc(2),
+ },
+ {
+ Name: "restore_directory",
+ Action: s3plugin.RestoreDirectory,
+ Before: buildBeforeFunc(2, 3),
+ Hidden: true,
+ },
+ {
+ Name: "restore_directory_parallel",
+ Action: s3plugin.RestoreDirectoryParallel,
+ Before: buildBeforeFunc(2, 3),
+ Hidden: true,
+ },
+ {
+ Name: "backup_data",
+ Action: s3plugin.BackupData,
+ Before: buildBeforeFunc(2),
+ },
+ {
+ Name: "restore_data",
+ Action: s3plugin.RestoreData,
+ Before: buildBeforeFunc(2),
+ },
+ {
+ Name: "plugin_api_version",
+ Action: s3plugin.GetAPIVersion,
+ Before: buildBeforeFunc(0),
+ },
+ {
+ Name: "delete_backup",
+ Action: s3plugin.DeleteBackup,
+ Before: buildBeforeFunc(2),
+ },
+ {
+ Name: "delete_directory",
+ Action: s3plugin.DeleteDirectory,
+ Before: buildBeforeFunc(2),
+ },
+ {
+ Name: "list_directory",
+ Action: s3plugin.ListDirectory,
+ Before: buildBeforeFunc(1, 2),
+ },
+ }
+
+ err := app.Run(os.Args)
+ if err != nil {
+ gplog.Error(err.Error())
+ os.Exit(1)
+ }
+}
+
+func buildBeforeFunc(expectedNArgs ...int) (beforeFunc cli.BeforeFunc) {
+ beforeFunc = func(context *cli.Context) error {
+ actualNArg := context.NArg()
+ argMatched := false
+ for _, expectedNArg := range expectedNArgs {
+ if actualNArg == expectedNArg {
+ argMatched = true
+ break
+ }
+ }
+ if !argMatched {
+ return fmt.Errorf("ERROR: Invalid number of arguments
to plugin command. "+
+ "Expected %v arguments. Got %d arguments",
expectedNArgs, actualNArg)
+ } else {
+ return nil
+ }
+
+ }
+ return beforeFunc
+}
diff --git a/plugins/README.md b/plugins/README.md
index 4fe80fc9..4e285e4b 100644
--- a/plugins/README.md
+++ b/plugins/README.md
@@ -32,7 +32,7 @@ options:
```
## Available plugins
-[gpbackup_s3_plugin](https://github.com/apache/cloudberry-gpbackup-s3-plugin):
Allows users to back up their Apache Cloudberry to Amazon S3.
+**gpbackup_s3_plugin**: S3 storage plugin located in the
[`s3plugin/`](s3plugin/) directory, allows users to back up their Apache
Cloudberry and Greenplum to Amazon S3. See
[s3plugin/README.md](s3plugin/README.md) for detailed configuration and usage
instructions..
## Developing plugins
diff --git a/plugins/s3plugin/README.md b/plugins/s3plugin/README.md
new file mode 100644
index 00000000..6619b206
--- /dev/null
+++ b/plugins/s3plugin/README.md
@@ -0,0 +1,74 @@
+## Using the S3 Storage Plugin with gpbackup and gprestore
+The S3 plugin lets you use an Amazon Simple Storage Service (Amazon S3)
location to store and retrieve backups when you run gpbackup and gprestore.
+
+To use the S3 plugin, you specify the location of the plugin and the AWS login
and backup location in a configuration file. When you run gpbackup or
gprestore, you specify the configuration file with the option --plugin-config.
+
+If you perform a backup operation with the gpbackup option --plugin-config,
you must also specify the --plugin-config option when you restore the backup
with gprestore.
+
+The S3 plugin supports both AWS and custom storage servers that implement the
S3 interface.
+
+## S3 Storage Plugin Configuration File Format
+The configuration file specifies the absolute path to the gpbackup_s3_plugin
executable, AWS connection credentials, and S3 location.
+
+The configuration file must be a valid YAML document in the following format:
+
+```
+executablepath: <absolute-path-to-gpbackup_s3_plugin>
+options:
+ region: <aws-region>
+ endpoint: <s3-endpoint>
+ aws_access_key_id: <aws-user-id>
+ aws_secret_access_key: <aws-user-id-key>
+ bucket: <s3-bucket>
+ folder: <s3-location>
+ encryption: [on|off]
+ http_proxy: <http-proxy>
+ ```
+
+`executablepath` is the absolute path to the plugin executable (eg: use the
fully expanded path of $GPHOME/bin/gpbackup_s3_plugin).
+
+Below are the s3 plugin options
+
+| Option Name | Description |
+| --- | --- |
+| `region` | aws region (will be ignored if `endpoint` is specified |
+| `endpoint` | endpoint to a server implementing the S3 interface |
+| `aws_access_key_id` | AWS S3 ID to access the S3 bucket location that
stores backup files |
+| `aws_secret_access_key` | AWS S3 passcode for the S3 ID to access the
S3 bucket location |
+| `bucket` | name of the S3 bucket. The bucket must exist with the necessary
permissions |
+| `folder` | S3 location for backups. During a backup operation, the plugin
creates the S3 location if it does not exist in the S3 bucket. |
+| `encryption` | Enable or disable SSL encryption to connect to S3. Valid
values are on and off. On by default |
+| `http_proxy` | your http proxy url |
+| `backup_max_concurrent_requests` | concurrency level for any file's backup
request |
+| `backup_multipart_chunksize` | maximum buffer/chunk size for multipart
transfers during backup |
+| `restore_max_concurrent_requests` | concurrency level for any file's restore
request |
+| `restore_multipart_chunksize` | maximum buffer/chunk size for multipart
transfers during restore |
+
+## Example
+This is an example S3 storage plugin configuration file that is used in the
next gpbackup example command. The name of the file is s3-test-config.yaml.
+
+```
+executablepath: $GPHOME/bin/gpbackup_s3_plugin
+options:
+ region: us-west-2
+ aws_access_key_id: test-s3-user
+ aws_secret_access_key: asdf1234asdf
+ bucket: gpdb-backup
+ folder: test/backup3
+```
+
+This gpbackup example backs up the database demo using the S3 storage plugin.
The absolute path to the S3 storage plugin configuration file is
/home/gpadmin/s3-test.
+
+```
+gpbackup --dbname demo --single-data-file --plugin-config
/home/gpadmin/s3-test-config.yaml
+```
+The S3 storage plugin writes the backup files to this S3 location in the AWS
region us-west-2.
+
+```
+gpdb-backup/test/backup3/backups/YYYYMMDD/YYYYMMDDHHMMSS/
+```
+
+## Notes
+The S3 storage plugin application must be in the same location on every
Greenplum Database host. The configuration file is required only on the
coordinator host.
+
+Using Amazon S3 to back up and restore data requires an Amazon AWS account
with access to the Amazon S3 bucket. The Amazon S3 bucket permissions required
are Upload/Delete for the S3 user ID that uploads the files and Open/Download
and View for the S3 user ID that accesses the files.
diff --git a/plugins/s3plugin/backup.go b/plugins/s3plugin/backup.go
new file mode 100644
index 00000000..a2fcbea5
--- /dev/null
+++ b/plugins/s3plugin/backup.go
@@ -0,0 +1,222 @@
+package s3plugin
+
+import (
+ "bufio"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/apache/cloudberry-go-libs/gplog"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/s3/s3manager"
+ "github.com/pkg/errors"
+ "github.com/urfave/cli"
+)
+
+func SetupPluginForBackup(c *cli.Context) error {
+ scope := (Scope)(c.Args().Get(2))
+ if scope != Master && scope != Coordinator && scope != SegmentHost {
+ return nil
+ }
+ config, sess, err := readConfigAndStartSession(c)
+ if err != nil {
+ return err
+ }
+ localBackupDir := c.Args().Get(1)
+ _, timestamp := filepath.Split(localBackupDir)
+ testFileName := fmt.Sprintf("gpbackup_%s_report", timestamp)
+ testFilePath := fmt.Sprintf("%s/%s", localBackupDir, testFileName)
+ fileKey := GetS3Path(config.Options.Folder, testFilePath)
+ file, err := os.Create("/tmp/" + testFileName) // dummy empty reader
for probe
+ defer file.Close()
+ if err != nil {
+ return err
+ }
+ _, _, err = uploadFile(sess, config, fileKey, file)
+ return err
+}
+
+func BackupFile(c *cli.Context) error {
+ config, sess, err := readConfigAndStartSession(c)
+ if err != nil {
+ return err
+ }
+ fileName := c.Args().Get(1)
+ fileKey := GetS3Path(config.Options.Folder, fileName)
+ file, err := os.Open(fileName)
+ defer file.Close()
+ if err != nil {
+ return err
+ }
+ bytes, elapsed, err := uploadFile(sess, config, fileKey, file)
+ if err != nil {
+ return err
+ }
+
+ gplog.Info("Uploaded %d bytes for %s in %v", bytes,
filepath.Base(fileKey),
+ elapsed.Round(time.Millisecond))
+ return nil
+}
+
+func BackupDirectory(c *cli.Context) error {
+ start := time.Now()
+ totalBytes := int64(0)
+ config, sess, err := readConfigAndStartSession(c)
+ if err != nil {
+ return err
+ }
+ dirName := c.Args().Get(1)
+ gplog.Verbose("Restore Directory '%s' from S3", dirName)
+ gplog.Verbose("S3 Location = s3://%s/%s", config.Options.Bucket,
dirName)
+ gplog.Info("dirKey = %s\n", dirName)
+
+ // Populate a list of files to be backed up
+ fileList := make([]string, 0)
+ _ = filepath.Walk(dirName, func(path string, f os.FileInfo, err error)
error {
+ isDir, _ := isDirectoryGetSize(path)
+ if !isDir {
+ fileList = append(fileList, path)
+ }
+ return nil
+ })
+
+ // Process the files sequentially
+ for _, fileName := range fileList {
+ file, err := os.Open(fileName)
+ if err != nil {
+ return err
+ }
+ bytes, elapsed, err := uploadFile(sess, config, fileName, file)
+ _ = file.Close()
+ if err != nil {
+ return err
+ }
+
+ totalBytes += bytes
+ gplog.Debug("Uploaded %d bytes for %s in %v", bytes,
+ filepath.Base(fileName),
elapsed.Round(time.Millisecond))
+ }
+
+ gplog.Info("Uploaded %d files (%d bytes) in %v\n", len(fileList),
+ totalBytes, time.Since(start).Round(time.Millisecond))
+ return nil
+}
+
+func BackupDirectoryParallel(c *cli.Context) error {
+ start := time.Now()
+ totalBytes := int64(0)
+ parallel := 5
+ config, sess, err := readConfigAndStartSession(c)
+ if err != nil {
+ return err
+ }
+ dirName := c.Args().Get(1)
+ if len(c.Args()) == 3 {
+ parallel, _ = strconv.Atoi(c.Args().Get(2))
+ }
+ gplog.Verbose("Backup Directory '%s' to S3", dirName)
+ gplog.Verbose("S3 Location = s3://%s/%s", config.Options.Bucket,
dirName)
+ gplog.Info("dirKey = %s\n", dirName)
+
+ // Populate a list of files to be backed up
+ fileList := make([]string, 0)
+ _ = filepath.Walk(dirName, func(path string, f os.FileInfo, err error)
error {
+ isDir, _ := isDirectoryGetSize(path)
+ if !isDir {
+ fileList = append(fileList, path)
+ }
+ return nil
+ })
+
+ var wg sync.WaitGroup
+ var finalErr error
+ // Create jobs using a channel
+ fileChannel := make(chan string, len(fileList))
+ for _, fileKey := range fileList {
+ wg.Add(1)
+ fileChannel <- fileKey
+ }
+ close(fileChannel)
+ // Process the files in parallel
+ for i := 0; i < parallel; i++ {
+ go func(jobs chan string) {
+ for fileKey := range jobs {
+ file, err := os.Open(fileKey)
+ if err != nil {
+ finalErr = err
+ return
+ }
+ bytes, elapsed, err := uploadFile(sess, config,
fileKey, file)
+ if err == nil {
+ totalBytes += bytes
+ msg := fmt.Sprintf("Uploaded %d bytes
for %s in %v", bytes,
+ filepath.Base(fileKey),
elapsed.Round(time.Millisecond))
+ gplog.Verbose(msg)
+ fmt.Println(msg)
+ } else {
+ finalErr = err
+ gplog.FatalOnError(err)
+ }
+ _ = file.Close()
+ wg.Done()
+ }
+ }(fileChannel)
+ }
+ // Wait for jobs to be done
+ wg.Wait()
+
+ gplog.Info("Uploaded %d files (%d bytes) in %v\n",
+ len(fileList), totalBytes,
time.Since(start).Round(time.Millisecond))
+ return finalErr
+}
+
+func BackupData(c *cli.Context) error {
+ config, sess, err := readConfigAndStartSession(c)
+ if err != nil {
+ return err
+ }
+ dataFile := c.Args().Get(1)
+ fileKey := GetS3Path(config.Options.Folder, dataFile)
+
+ bytes, elapsed, err := uploadFile(sess, config, fileKey, os.Stdin)
+ if err != nil {
+ return err
+ }
+
+ gplog.Debug("Uploaded %d bytes for file %s in %v", bytes,
+ filepath.Base(fileKey), elapsed.Round(time.Millisecond))
+ return nil
+}
+
+func uploadFile(sess *session.Session, config *PluginConfig, fileKey string,
+ file *os.File) (int64, time.Duration, error) {
+
+ start := time.Now()
+ bucket := config.Options.Bucket
+ uploadChunkSize := config.Options.UploadChunkSize
+ uploadConcurrency := config.Options.UploadConcurrency
+
+ uploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) {
+ u.PartSize = uploadChunkSize
+ u.Concurrency = uploadConcurrency
+ })
+ gplog.Debug("Uploading file %s with chunksize %d and concurrency %d",
+ filepath.Base(fileKey), uploader.PartSize, uploader.Concurrency)
+ _, err := uploader.Upload(&s3manager.UploadInput{
+ Bucket: aws.String(bucket),
+ Key: aws.String(fileKey),
+ // This will cause memory issues if
+ // segment_per_host*uploadChunkSize*uploadConcurreny is larger
than
+ // the amount of ram a system has.
+ Body: bufio.NewReaderSize(file,
int(uploadChunkSize)*uploadConcurrency),
+ })
+ if err != nil {
+ return 0, -1, errors.Wrap(err, fmt.Sprintf("Error while
uploading %s", fileKey))
+ }
+ bytes, err := getFileSize(uploader.S3, bucket, fileKey)
+ return bytes, time.Since(start), err
+}
diff --git a/plugins/s3plugin/restore.go b/plugins/s3plugin/restore.go
new file mode 100644
index 00000000..b7f31f23
--- /dev/null
+++ b/plugins/s3plugin/restore.go
@@ -0,0 +1,357 @@
+package s3plugin
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/apache/cloudberry-go-libs/gplog"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/s3"
+ "github.com/aws/aws-sdk-go/service/s3/s3manager"
+ "github.com/pkg/errors"
+ "github.com/urfave/cli"
+)
+
+func SetupPluginForRestore(c *cli.Context) error {
+ scope := (Scope)(c.Args().Get(2))
+ if scope != Master && scope != Coordinator && scope != SegmentHost {
+ return nil
+ }
+ _, err := readAndValidatePluginConfig(c.Args().Get(0))
+ return err
+}
+
+func RestoreFile(c *cli.Context) error {
+ config, sess, err := readConfigAndStartSession(c)
+ if err != nil {
+ return err
+ }
+ fileName := c.Args().Get(1)
+ bucket := config.Options.Bucket
+ fileKey := GetS3Path(config.Options.Folder, fileName)
+ file, err := os.Create(fileName)
+ defer file.Close()
+ if err != nil {
+ return err
+ }
+ bytes, elapsed, err := downloadFile(sess, config, bucket, fileKey, file)
+ if err != nil {
+ fileErr := os.Remove(fileName)
+ if fileErr != nil {
+ gplog.Error(fileErr.Error())
+ }
+ return err
+ }
+
+ gplog.Info("Downloaded %d bytes for %s in %v", bytes,
+ filepath.Base(fileKey), elapsed.Round(time.Millisecond))
+ return err
+}
+
+func RestoreDirectory(c *cli.Context) error {
+ start := time.Now()
+ totalBytes := int64(0)
+ config, sess, err := readConfigAndStartSession(c)
+ if err != nil {
+ return err
+ }
+ dirName := c.Args().Get(1)
+ bucket := config.Options.Bucket
+ gplog.Verbose("Restore Directory '%s' from S3", dirName)
+ gplog.Verbose("S3 Location = s3://%s/%s", bucket, dirName)
+ gplog.Info("dirKey = %s\n", dirName)
+
+ _ = os.MkdirAll(dirName, 0775)
+ client := s3.New(sess)
+ params := &s3.ListObjectsV2Input{Bucket: &bucket, Prefix: &dirName}
+ bucketObjectsList, _ := client.ListObjectsV2(params)
+
+ numFiles := 0
+ for _, key := range bucketObjectsList.Contents {
+ var filename string
+ if strings.HasSuffix(*key.Key, "/") {
+ // Got a directory
+ continue
+ }
+ if strings.Contains(*key.Key, "/") {
+ // split
+ s3FileFullPathList := strings.Split(*key.Key, "/")
+ filename = s3FileFullPathList[len(s3FileFullPathList)-1]
+ }
+ filePath := dirName + "/" + filename
+ file, err := os.Create(filePath)
+ if err != nil {
+ return err
+ }
+
+ bytes, elapsed, err := downloadFile(sess, config, bucket,
*key.Key, file)
+ _ = file.Close()
+ if err != nil {
+ fileErr := os.Remove(filename)
+ if fileErr != nil {
+ gplog.Error(fileErr.Error())
+ }
+ return err
+ }
+
+ totalBytes += bytes
+ numFiles++
+ gplog.Info("Downloaded %d bytes for %s in %v", bytes,
+ filepath.Base(*key.Key),
elapsed.Round(time.Millisecond))
+ }
+
+ gplog.Info("Downloaded %d files (%d bytes) in %v\n",
+ numFiles, totalBytes, time.Since(start).Round(time.Millisecond))
+ return nil
+}
+
+func RestoreDirectoryParallel(c *cli.Context) error {
+ start := time.Now()
+ totalBytes := int64(0)
+ parallel := 5
+ config, sess, err := readConfigAndStartSession(c)
+ if err != nil {
+ return err
+ }
+ dirName := c.Args().Get(1)
+ if len(c.Args()) == 3 {
+ parallel, _ = strconv.Atoi(c.Args().Get(2))
+ }
+ bucket := config.Options.Bucket
+ gplog.Verbose("Restore Directory Parallel '%s' from S3", dirName)
+ gplog.Verbose("S3 Location = s3://%s/%s", bucket, dirName)
+ fmt.Printf("dirKey = %s\n", dirName)
+
+ _ = os.MkdirAll(dirName, 0775)
+ client := s3.New(sess)
+ params := &s3.ListObjectsV2Input{Bucket: &bucket, Prefix: &dirName}
+ bucketObjectsList, _ := client.ListObjectsV2(params)
+
+ // Create a list of files to be restored
+ numFiles := 0
+ fileList := make([]string, 0)
+ for _, key := range bucketObjectsList.Contents {
+ gplog.Verbose("File '%s' = %d bytes", filepath.Base(*key.Key),
*key.Size)
+ if strings.HasSuffix(*key.Key, "/") {
+ // Got a directory
+ continue
+ }
+ fileList = append(fileList, *key.Key)
+ }
+
+ var wg sync.WaitGroup
+ var finalErr error
+ // Create jobs using a channel
+ fileChannel := make(chan string, len(fileList))
+ for _, fileKey := range fileList {
+ wg.Add(1)
+ fileChannel <- fileKey
+ }
+ close(fileChannel)
+ // Process the files in parallel
+ for i := 0; i < parallel; i++ {
+ go func(jobs chan string) {
+ for fileKey := range jobs {
+ fileName := fileKey
+ if strings.Contains(fileKey, "/") {
+ fileName = filepath.Base(fileKey)
+ }
+ // construct local file name
+ filePath := dirName + "/" + fileName
+ file, err := os.Create(filePath)
+ if err != nil {
+ finalErr = err
+ return
+ }
+ bytes, elapsed, err := downloadFile(sess,
config, bucket, fileKey, file)
+ if err == nil {
+ totalBytes += bytes
+ numFiles++
+ msg := fmt.Sprintf("Downloaded %d bytes
for %s in %v", bytes,
+ filepath.Base(fileKey),
elapsed.Round(time.Millisecond))
+ gplog.Verbose(msg)
+ fmt.Println(msg)
+ } else {
+ finalErr = err
+ gplog.FatalOnError(err)
+ _ = os.Remove(filePath)
+ }
+ _ = file.Close()
+ wg.Done()
+ }
+ }(fileChannel)
+ }
+ // Wait for jobs to be done
+ wg.Wait()
+
+ fmt.Printf("Downloaded %d files (%d bytes) in %v\n",
+ numFiles, totalBytes, time.Since(start).Round(time.Millisecond))
+ return finalErr
+}
+
+func RestoreData(c *cli.Context) error {
+ config, sess, err := readConfigAndStartSession(c)
+ if err != nil {
+ return err
+ }
+ dataFile := c.Args().Get(1)
+ bucket := config.Options.Bucket
+ fileKey := GetS3Path(config.Options.Folder, dataFile)
+ bytes, elapsed, err := downloadFile(sess, config, bucket, fileKey,
os.Stdout)
+ if err != nil {
+ return err
+ }
+
+ gplog.Verbose("Downloaded %d bytes for file %s in %v", bytes,
+ filepath.Base(fileKey), elapsed.Round(time.Millisecond))
+ return nil
+}
+
+type chunk struct {
+ chunkIndex int
+ startByte int64
+ endByte int64
+}
+
+func downloadFile(sess *session.Session, config *PluginConfig, bucket string,
fileKey string,
+ file *os.File) (int64, time.Duration, error) {
+
+ start := time.Now()
+ downloader := s3manager.NewDownloader(sess, func(u
*s3manager.Downloader) {
+ u.PartSize = config.Options.DownloadChunkSize
+ })
+
+ totalBytes, err := getFileSize(downloader.S3, bucket, fileKey)
+ if err != nil {
+ return 0, -1, errors.Wrap(err, fmt.Sprintf("Error getting file
size for %s in bucket %s", fileKey, bucket))
+ }
+ gplog.Verbose("File %s size = %d bytes", filepath.Base(fileKey),
totalBytes)
+ if totalBytes <= config.Options.DownloadChunkSize {
+ buffer := &aws.WriteAtBuffer{}
+ if _, err = downloader.Download(
+ buffer,
+ &s3.GetObjectInput{
+ Bucket: aws.String(bucket),
+ Key: aws.String(fileKey),
+ }); err != nil {
+ return 0, -1, errors.Wrap(err, fmt.Sprintf("Error while
downloading %s", fileKey))
+ }
+ if _, err = file.Write(buffer.Bytes()); err != nil {
+ return 0, -1, err
+ }
+ } else {
+ return downloadFileInParallel(sess,
config.Options.DownloadConcurrency, config.Options.DownloadChunkSize,
totalBytes, bucket, fileKey, file)
+ }
+ return totalBytes, time.Since(start), err
+}
+
+/*
+ * Performs ranged requests for the file while exploiting parallelism between
the copy and download tasks
+ */
+func downloadFileInParallel(sess *session.Session, downloadConcurrency int,
downloadChunkSize int64,
+ totalBytes int64, bucket string, fileKey string, file *os.File) (int64,
time.Duration, error) {
+
+ var finalErr error
+ start := time.Now()
+ waitGroup := sync.WaitGroup{}
+ numberOfChunks := int((totalBytes + downloadChunkSize - 1) /
downloadChunkSize)
+ bufferPointers := make([]*[]byte, numberOfChunks)
+ copyChannel := make([]chan int, numberOfChunks)
+ jobs := make(chan chunk, numberOfChunks)
+ for i := 0; i < numberOfChunks; i++ {
+ copyChannel[i] = make(chan int)
+ }
+
+ startByte := int64(0)
+ endByte := int64(-1)
+ done := false
+ // Create jobs based on the number of chunks to be downloaded
+ for chunkIndex := 0; chunkIndex < numberOfChunks && !done; chunkIndex++
{
+ startByte = endByte + 1
+ endByte += downloadChunkSize
+ if endByte >= totalBytes {
+ endByte = totalBytes - 1
+ done = true
+ }
+ jobs <- chunk{chunkIndex, startByte, endByte}
+ waitGroup.Add(1)
+ }
+
+ // Create a pool of download workers (based on concurrency)
+ numberOfWorkers := downloadConcurrency
+ if numberOfChunks < downloadConcurrency {
+ numberOfWorkers = numberOfChunks
+ }
+ downloadBuffers := make(chan []byte, numberOfWorkers)
+ for i := 0; i < cap(downloadBuffers); i++ {
+ buffer := make([]byte, downloadChunkSize)
+ downloadBuffers <- buffer
+ }
+ // Download concurrency is handled on our end hence we don't need to
set concurrency
+ downloader := s3manager.NewDownloader(sess, func(u
*s3manager.Downloader) {
+ u.PartSize = downloadChunkSize
+ u.Concurrency = 1
+ })
+ gplog.Debug("Downloading file %s with chunksize %d and concurrency %d",
+ filepath.Base(fileKey), downloadChunkSize, numberOfWorkers)
+
+ for i := 0; i < numberOfWorkers; i++ {
+ go func(id int) {
+ for j := range jobs {
+ buffer := <-downloadBuffers
+ chunkStart := time.Now()
+ byteRange := fmt.Sprintf("bytes=%d-%d",
j.startByte, j.endByte)
+ if j.endByte-j.startByte+1 != downloadChunkSize
{
+ buffer = make([]byte,
j.endByte-j.startByte+1)
+ }
+ bufferPointers[j.chunkIndex] = &buffer
+ gplog.Debug("Worker %d (chunk %d) for %s with
partsize %d and concurrency %d",
+ id, j.chunkIndex,
filepath.Base(fileKey),
+ downloader.PartSize,
downloader.Concurrency)
+ chunkBytes, err := downloader.Download(
+ aws.NewWriteAtBuffer(buffer),
+ &s3.GetObjectInput{
+ Bucket: aws.String(bucket),
+ Key: aws.String(fileKey),
+ Range: aws.String(byteRange),
+ })
+ if err != nil {
+ finalErr = err
+ }
+ gplog.Debug("Worker %d Downloaded %d bytes
(chunk %d) for %s in %v",
+ id, chunkBytes, j.chunkIndex,
filepath.Base(fileKey),
+
time.Since(chunkStart).Round(time.Millisecond))
+ copyChannel[j.chunkIndex] <- j.chunkIndex
+ }
+ }(i)
+ }
+
+ // Copy data from download buffers into the output stream sequentially
+ go func() {
+ for i := range copyChannel {
+ currentChunk := <-copyChannel[i]
+ chunkStart := time.Now()
+ numBytes, err :=
file.Write(*bufferPointers[currentChunk])
+ if err != nil {
+ finalErr = err
+ }
+ gplog.Debug("Copied %d bytes (chunk %d) for %s in %v",
+ numBytes, currentChunk, filepath.Base(fileKey),
+ time.Since(chunkStart).Round(time.Millisecond))
+ // Deallocate buffer
+ downloadBuffers <- *bufferPointers[currentChunk]
+ bufferPointers[currentChunk] = nil
+ waitGroup.Done()
+ close(copyChannel[i])
+ }
+ }()
+
+ waitGroup.Wait()
+ return totalBytes, time.Since(start), errors.Wrap(finalErr,
fmt.Sprintf("Error while downloading %s", fileKey))
+}
diff --git a/plugins/s3plugin/s3plugin.go b/plugins/s3plugin/s3plugin.go
new file mode 100644
index 00000000..9d9f42f7
--- /dev/null
+++ b/plugins/s3plugin/s3plugin.go
@@ -0,0 +1,455 @@
+package s3plugin
+
+import (
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "os"
+ "path/filepath"
+ "regexp"
+ "strconv"
+ "strings"
+
+ "github.com/apache/cloudberry-go-libs/gplog"
+ "github.com/apache/cloudberry-go-libs/operating"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/client"
+ "github.com/aws/aws-sdk-go/aws/credentials"
+ "github.com/aws/aws-sdk-go/aws/request"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/s3"
+ "github.com/aws/aws-sdk-go/service/s3/s3iface"
+ "github.com/aws/aws-sdk-go/service/s3/s3manager"
+ "github.com/inhies/go-bytesize"
+ "github.com/olekukonko/tablewriter"
+ "github.com/urfave/cli"
+ "gopkg.in/yaml.v2"
+)
+
+var Version string
+
+const apiVersion = "0.5.0"
+const Mebibyte = 1024 * 1024
+const DefaultConcurrency = 6
+const DefaultUploadChunkSize = int64(Mebibyte) * 500 // default 500MB
+const DefaultDownloadChunkSize = int64(Mebibyte) * 500 // default 500MB
+
+type Scope string
+
+const (
+ Master Scope = "master"
+ Coordinator Scope = "coordinator"
+ SegmentHost Scope = "segment_host"
+ Segment Scope = "segment"
+)
+
+type PluginConfig struct {
+ ExecutablePath string `yaml:"executablepath"`
+ Options PluginOptions `yaml:"options"`
+}
+
+type PluginOptions struct {
+ AwsAccessKeyId string `yaml:"aws_access_key_id"`
+ AwsSecretAccessKey string `yaml:"aws_secret_access_key"`
+ BackupMaxConcurrentRequests string
`yaml:"backup_max_concurrent_requests"`
+ BackupMultipartChunksize string `yaml:"backup_multipart_chunksize"`
+ Bucket string `yaml:"bucket"`
+ Encryption string `yaml:"encryption"`
+ Endpoint string `yaml:"endpoint"`
+ Folder string `yaml:"folder"`
+ HttpProxy string `yaml:"http_proxy"`
+ Region string `yaml:"region"`
+ RemoveDuplicateBucket string `yaml:"remove_duplicate_bucket"`
+ RestoreMaxConcurrentRequests string
`yaml:"restore_max_concurrent_requests"`
+ RestoreMultipartChunksize string `yaml:"restore_multipart_chunksize"`
+ PgPort string `yaml:"pgport"`
+ BackupPluginVersion string `yaml:"backup_plugin_version"`
+
+ UploadChunkSize int64
+ UploadConcurrency int
+ DownloadChunkSize int64
+ DownloadConcurrency int
+}
+
+func CleanupPlugin(c *cli.Context) error {
+ return nil
+}
+
+func GetAPIVersion(c *cli.Context) {
+ fmt.Println(apiVersion)
+}
+
+/*
+ * Helper Functions
+ */
+
+func readAndValidatePluginConfig(configFile string) (*PluginConfig, error) {
+ config := &PluginConfig{}
+ contents, err := ioutil.ReadFile(configFile)
+ if err != nil {
+ return nil, err
+ }
+ if err = yaml.UnmarshalStrict(contents, config); err != nil {
+ return nil, fmt.Errorf("Yaml failures encountered reading
config file %s. Error: %s", configFile, err.Error())
+ }
+ if err = InitializeAndValidateConfig(config); err != nil {
+ return nil, err
+ }
+ return config, nil
+}
+
+func InitializeAndValidateConfig(config *PluginConfig) error {
+ var err error
+ var errTxt string
+ opt := &config.Options
+
+ // Initialize defaults
+ if opt.Region == "" {
+ opt.Region = "unused"
+ }
+ if opt.Encryption == "" {
+ opt.Encryption = "on"
+ }
+ if opt.RemoveDuplicateBucket == "" {
+ opt.RemoveDuplicateBucket = "false"
+ }
+ opt.UploadChunkSize = DefaultUploadChunkSize
+ opt.UploadConcurrency = DefaultConcurrency
+ opt.DownloadChunkSize = DefaultDownloadChunkSize
+ opt.DownloadConcurrency = DefaultConcurrency
+
+ // Validate configurations and overwrite defaults
+ if config.ExecutablePath == "" {
+ errTxt += fmt.Sprintf("executable_path must exist and cannot be
empty in plugin configuration file\n")
+ }
+ if opt.Bucket == "" {
+ errTxt += fmt.Sprintf("bucket must exist and cannot be empty in
plugin configuration file\n")
+ }
+ if opt.Folder == "" {
+ errTxt += fmt.Sprintf("folder must exist and cannot be empty in
plugin configuration file\n")
+ }
+ if opt.AwsAccessKeyId == "" {
+ if opt.AwsSecretAccessKey != "" {
+ errTxt += fmt.Sprintf("aws_access_key_id must exist in
plugin configuration file if aws_secret_access_key does\n")
+ }
+ } else if opt.AwsSecretAccessKey == "" {
+ errTxt += fmt.Sprintf("aws_secret_access_key must exist in
plugin configuration file if aws_access_key_id does\n")
+ }
+ if opt.Region == "unused" && opt.Endpoint == "" {
+ errTxt += fmt.Sprintf("region or endpoint must exist in plugin
configuration file\n")
+ }
+ if opt.Encryption != "on" && opt.Encryption != "off" {
+ errTxt += fmt.Sprintf("Invalid encryption configuration. Valid
choices are on or off.\n")
+ }
+ if opt.RemoveDuplicateBucket != "true" && opt.RemoveDuplicateBucket !=
"false" {
+ errTxt += fmt.Sprintf("Invalid value for
remove_duplicate_bucket. Valid choices are true or false.\n")
+ }
+ if opt.BackupMultipartChunksize != "" {
+ chunkSize, err := bytesize.Parse(opt.BackupMultipartChunksize)
+ if err != nil {
+ errTxt += fmt.Sprintf("Invalid
backup_multipart_chunksize. Err: %s\n", err)
+ }
+ // Chunk size is being converted from uint64 to int64. This is
safe as
+ // long as chunksize smaller than math.MaxInt64 bytes (~9223
Petabytes)
+ opt.UploadChunkSize = int64(chunkSize)
+ }
+ if opt.BackupMaxConcurrentRequests != "" {
+ opt.UploadConcurrency, err =
strconv.Atoi(opt.BackupMaxConcurrentRequests)
+ if err != nil {
+ errTxt += fmt.Sprintf("Invalid
backup_max_concurrent_requests. Err: %s\n", err)
+ }
+ }
+ if opt.RestoreMultipartChunksize != "" {
+ chunkSize, err := bytesize.Parse(opt.RestoreMultipartChunksize)
+ if err != nil {
+ errTxt += fmt.Sprintf("Invalid
restore_multipart_chunksize. Err: %s\n", err)
+ }
+ // Chunk size is being converted from uint64 to int64. This is
safe as
+ // long as chunksize smaller than math.MaxInt64 bytes (~9223
Petabytes)
+ opt.DownloadChunkSize = int64(chunkSize)
+ }
+ if opt.RestoreMaxConcurrentRequests != "" {
+ opt.DownloadConcurrency, err =
strconv.Atoi(opt.RestoreMaxConcurrentRequests)
+ if err != nil {
+ errTxt += fmt.Sprintf("Invalid
restore_max_concurrent_requests. Err: %s\n", err)
+ }
+ }
+
+ if errTxt != "" {
+ return errors.New(errTxt)
+ }
+ return nil
+}
+
+// CustomRetryer wraps the SDK's built in DefaultRetryer
+type CustomRetryer struct {
+ client.DefaultRetryer
+}
+
+// ShouldRetry overrides the SDK's built in DefaultRetryer
+func (r CustomRetryer) ShouldRetry(req *request.Request) bool {
+ if r.NumMaxRetries == 0 {
+ return false
+ }
+
+ willRetry := false
+ if req.Error != nil && strings.Contains(req.Error.Error(), "connection
reset by peer") {
+ willRetry = true
+ } else if req.HTTPResponse.StatusCode == 404 &&
strings.Contains(req.Error.Error(), "NoSuchKey") {
+ // 404 NoSuchKey error is possible due to AWS's eventual
consistency
+ // when attempting to inspect or get a file too quickly after
it was
+ // uploaded. The s3 plugin does exactly this to determine the
amount of
+ // bytes uploaded. For this reason we retry 404 errors.
+ willRetry = true
+ } else {
+ willRetry = r.DefaultRetryer.ShouldRetry(req)
+ }
+
+ if willRetry {
+ // While its possible to let the AWS client log for us, it
doesn't seem
+ // possible to set it up to only log errors. To prevent our log
from
+ // filling up with debug logs of successful https requests and
+ // response, we'll only log when retries are attempted.
+ if req.Error != nil {
+ gplog.Debug("Https request attempt %d failed. Next
attempt in %v. %s\n", req.RetryCount, r.RetryRules(req), req.Error.Error())
+ } else {
+ gplog.Debug("Https request attempt %d failed. Next
attempt in %v.\n", req.RetryCount, r.RetryRules(req))
+ }
+ return true
+ }
+
+ return false
+}
+
+func readConfigAndStartSession(c *cli.Context) (*PluginConfig,
*session.Session, error) {
+ configPath := c.Args().Get(0)
+ config, err := readAndValidatePluginConfig(configPath)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ disableSSL := !ShouldEnableEncryption(config.Options.Encryption)
+
+ awsConfig := request.WithRetryer(aws.NewConfig(),
CustomRetryer{DefaultRetryer: client.DefaultRetryer{NumMaxRetries: 10}}).
+ WithRegion(config.Options.Region).
+ WithEndpoint(config.Options.Endpoint).
+ WithS3ForcePathStyle(true).
+ WithDisableSSL(disableSSL).
+ WithUseDualStack(true)
+
+ // Will use default credential chain if none provided
+ if config.Options.AwsAccessKeyId != "" {
+ awsConfig = awsConfig.WithCredentials(
+ credentials.NewStaticCredentials(
+ config.Options.AwsAccessKeyId,
+ config.Options.AwsSecretAccessKey, ""))
+ }
+
+ if config.Options.HttpProxy != "" {
+ httpclient := &http.Client{
+ Transport: &http.Transport{
+ Proxy: func(*http.Request) (*url.URL, error) {
+ return
url.Parse(config.Options.HttpProxy)
+ },
+ },
+ }
+ awsConfig.WithHTTPClient(httpclient)
+ }
+
+ sess, err := session.NewSession(awsConfig)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ if config.Options.RemoveDuplicateBucket == "true" {
+ sess.Handlers.Build.PushFront(removeBucketFromPath)
+ }
+ return config, sess, nil
+}
+
+func ShouldEnableEncryption(encryption string) bool {
+ isOff := strings.EqualFold(encryption, "off")
+ return !isOff
+}
+
+func isDirectoryGetSize(path string) (bool, int64) {
+ fd, err := os.Stat(path)
+ if err != nil {
+ gplog.FatalOnError(err)
+ }
+ switch mode := fd.Mode(); {
+ case mode.IsDir():
+ return true, 0
+ case mode.IsRegular():
+ return false, fd.Size()
+ }
+ gplog.FatalOnError(errors.New(fmt.Sprintf("INVALID file %s", path)))
+ return false, 0
+}
+
+func getFileSize(S3 s3iface.S3API, bucket string, fileKey string) (int64,
error) {
+ req, resp := S3.HeadObjectRequest(&s3.HeadObjectInput{
+ Bucket: aws.String(bucket),
+ Key: aws.String(fileKey),
+ })
+ err := req.Send()
+
+ if err != nil {
+ return 0, err
+ }
+ return *resp.ContentLength, nil
+}
+
+func GetS3Path(folder string, path string) string {
+ /*
+ a typical path for an already-backed-up file will be
stored in a
+ parent directory of a segment, and beneath that, under
a datestamp/timestamp/
+ hierarchy. We assume the incoming path is a long absolute
one.
+ For example from the test bench:
+
testdir_for_del="/tmp/testseg/backups/$current_date_for_del/$time_second_for_del"
+
testfile_for_del="$testdir_for_del/testfile_$time_second_for_del.txt"
+
+ Therefore, the incoming path is relevant to S3 in only
the last four segments,
+ which indicate the file and its 2 date/timestamp
parents, and the grandparent "backups"
+ */
+ pathArray := strings.Split(path, "/")
+ lastFour := strings.Join(pathArray[(len(pathArray)-4):], "/")
+ return fmt.Sprintf("%s/%s", folder, lastFour)
+}
+
+func DeleteBackup(c *cli.Context) error {
+ timestamp := c.Args().Get(1)
+ if timestamp == "" {
+ return errors.New("delete requires a <timestamp>")
+ }
+
+ if !IsValidTimestamp(timestamp) {
+ msg := fmt.Sprintf("delete requires a <timestamp> with format "+
+ "YYYYMMDDHHMMSS, but received: %s", timestamp)
+ return fmt.Errorf(msg)
+ }
+
+ date := timestamp[0:8]
+ // note that "backups" is a directory is a fact of how we save, choosing
+ // to use the 3 parent directories of the source file. That becomes:
+ // <s3folder>/backups/<date>/<timestamp>
+ config, sess, err := readConfigAndStartSession(c)
+ if err != nil {
+ return err
+ }
+ deletePath := filepath.Join(config.Options.Folder, "backups", date,
timestamp)
+ bucket := config.Options.Bucket
+ gplog.Debug("Delete location = s3://%s/%s", bucket, deletePath)
+
+ service := s3.New(sess)
+ iter := s3manager.NewDeleteListIterator(service, &s3.ListObjectsInput{
+ Bucket: aws.String(bucket),
+ Prefix: aws.String(deletePath),
+ })
+
+ batchClient := s3manager.NewBatchDeleteWithClient(service)
+ return batchClient.Delete(aws.BackgroundContext(), iter)
+}
+
+func ListDirectory(c *cli.Context) error {
+ var err error
+ config, sess, err := readConfigAndStartSession(c)
+ if err != nil {
+ return err
+ }
+ bucket := config.Options.Bucket
+
+ var listPath string
+ if len(c.Args()) == 2 {
+ listPath = c.Args().Get(1)
+ } else {
+ listPath = config.Options.Folder
+ }
+
+ client := s3.New(sess)
+ params := &s3.ListObjectsV2Input{Bucket: &bucket, Prefix: &listPath}
+ bucketObjectsList, _ := client.ListObjectsV2(params)
+ fileSizes := make([][]string, 0)
+
+ gplog.Verbose("Retrieving file information from directory %s in S3",
listPath)
+ for _, key := range bucketObjectsList.Contents {
+ if strings.HasSuffix(*key.Key, "/") {
+ // Got a directory
+ continue
+ }
+
+ downloader := s3manager.NewDownloader(sess, func(u
*s3manager.Downloader) {
+ u.PartSize = config.Options.DownloadChunkSize
+ })
+
+ totalBytes, err := getFileSize(downloader.S3, bucket, *key.Key)
+ if err != nil {
+ return err
+ }
+
+ fileSizes = append(fileSizes, []string{*key.Key,
fmt.Sprint(totalBytes)})
+ }
+
+ // Render the data as a table
+ table := tablewriter.NewWriter(operating.System.Stdout)
+ columns := []string{"NAME", "SIZE(bytes)"}
+ table.SetHeader(columns)
+
+ colors := make([]tablewriter.Colors, len(columns))
+ for i := range colors {
+ colors[i] = tablewriter.Colors{tablewriter.Bold}
+ }
+
+ table.SetHeaderColor(colors...)
+ table.SetCenterSeparator(" ")
+ table.SetColumnSeparator(" ")
+ table.SetRowSeparator(" ")
+ table.SetHeaderAlignment(tablewriter.ALIGN_LEFT)
+ table.SetHeaderLine(true)
+ table.SetAutoFormatHeaders(false)
+ table.SetBorders(tablewriter.Border{Left: true, Right: true, Bottom:
false, Top: false})
+ table.AppendBulk(fileSizes)
+ table.Render()
+
+ return err
+}
+
+func DeleteDirectory(c *cli.Context) error {
+ config, sess, err := readConfigAndStartSession(c)
+ if err != nil {
+ return err
+ }
+ deletePath := c.Args().Get(1)
+ bucket := config.Options.Bucket
+ gplog.Verbose("Deleting directory s3://%s/%s", bucket, deletePath)
+ service := s3.New(sess)
+ iter := s3manager.NewDeleteListIterator(service, &s3.ListObjectsInput{
+ Bucket: aws.String(bucket),
+ Prefix: aws.String(deletePath),
+ })
+ batchClient := s3manager.NewBatchDeleteWithClient(service)
+ return batchClient.Delete(aws.BackgroundContext(), iter)
+}
+
+func IsValidTimestamp(timestamp string) bool {
+ timestampFormat := regexp.MustCompile(`^([0-9]{14})$`)
+ return timestampFormat.MatchString(timestamp)
+}
+
+// Some AWS SDK automatically prepends "/BucketName/" to any request's path,
which breaks placement
+// of all objects when doing backups or restores with an Endpoint URL that
already directs requests
+// to the correct bucket. To circumvent this, we manually remove the initial
Bucket reference from
+// the path in this case. NOTE: this does not happen in if an IP address is
used directly, so we
+// attempt to parse IP addresses and do not invoke this removal if found.
+func removeBucketFromPath(req *request.Request) {
+ req.Operation.HTTPPath = strings.Replace(req.Operation.HTTPPath,
"/{Bucket}", "", -1)
+ if !strings.HasPrefix(req.Operation.HTTPPath, "/") {
+ req.Operation.HTTPPath = "/" + req.Operation.HTTPPath
+ }
+ req.HTTPRequest.URL.Path = strings.Replace(req.HTTPRequest.URL.Path,
"/{Bucket}", "", -1)
+ if !strings.HasPrefix(req.HTTPRequest.URL.Path, "/") {
+ req.HTTPRequest.URL.Path = "/" + req.HTTPRequest.URL.Path
+ }
+}
diff --git a/plugins/s3plugin/s3plugin_test.go
b/plugins/s3plugin/s3plugin_test.go
new file mode 100644
index 00000000..486a4808
--- /dev/null
+++ b/plugins/s3plugin/s3plugin_test.go
@@ -0,0 +1,243 @@
+package s3plugin_test
+
+import (
+ "errors"
+ "flag"
+ "net/http"
+ "testing"
+
+ "github.com/apache/cloudberry-backup/plugins/s3plugin"
+ "github.com/apache/cloudberry-go-libs/testhelper"
+ "github.com/aws/aws-sdk-go/aws/awserr"
+ "github.com/aws/aws-sdk-go/aws/client"
+ "github.com/aws/aws-sdk-go/aws/request"
+ "github.com/aws/aws-sdk-go/service/s3"
+ "github.com/urfave/cli"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+)
+
+func TestCluster(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "s3_plugin tests")
+}
+
+var _ = Describe("s3_plugin tests", func() {
+ var pluginConfig *s3plugin.PluginConfig
+ var opts *s3plugin.PluginOptions
+ BeforeEach(func() {
+ pluginConfig = &s3plugin.PluginConfig{
+ ExecutablePath: "/tmp/location",
+ Options: s3plugin.PluginOptions{
+ AwsAccessKeyId: "12345",
+ AwsSecretAccessKey: "6789",
+ BackupMaxConcurrentRequests: "5",
+ BackupMultipartChunksize: "7MB",
+ Bucket: "bucket_name",
+ Endpoint: "endpoint_name",
+ Folder: "folder_name",
+ Region: "region_name",
+ RestoreMaxConcurrentRequests: "5",
+ RestoreMultipartChunksize: "7MB",
+ },
+ }
+ opts = &pluginConfig.Options
+ })
+ Describe("GetS3Path", func() {
+ It("it combines the folder directory with a path that results
from removing all but the last 3 directories of the file path parameter",
func() {
+ folder := "s3/Dir"
+ path :=
"/a/b/c/tmp/datadir/gpseg-1/backups/20180101/20180101082233/backup_file"
+ newPath := s3plugin.GetS3Path(folder, path)
+ expectedPath :=
"s3/Dir/backups/20180101/20180101082233/backup_file"
+ Expect(newPath).To(Equal(expectedPath))
+ })
+ })
+ Describe("ShouldEnableEncryption", func() {
+ It("returns true when no encryption in config", func() {
+ result := s3plugin.ShouldEnableEncryption("")
+ Expect(result).To(BeTrue())
+ })
+ It("returns true when encryption set to 'on' in config", func()
{
+ result := s3plugin.ShouldEnableEncryption("on")
+ Expect(result).To(BeTrue())
+ })
+ It("returns false when encryption set to 'off' in config",
func() {
+ result := s3plugin.ShouldEnableEncryption("off")
+ Expect(result).To(BeFalse())
+ })
+ It("returns true when encryption set to anything else in
config", func() {
+ result := s3plugin.ShouldEnableEncryption("random_test")
+ Expect(result).To(BeTrue())
+ })
+ })
+ Describe("InitializeAndValidateConfig", func() {
+ Context("Sets defaults", func() {
+ It("sets region to unused when endpoint is used instead
of region", func() {
+ opts.Region = ""
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(BeNil())
+ Expect(opts.Region).To(Equal("unused"))
+ })
+ It(`sets encryption to default value "on" if none is
specified`, func() {
+ opts.Encryption = ""
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(BeNil())
+ Expect(opts.Encryption).To(Equal("on"))
+ })
+ It("sets backup upload chunk size to default if
BackupMultipartChunkSize is not specified", func() {
+ opts.BackupMultipartChunksize = ""
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(BeNil())
+
Expect(opts.UploadChunkSize).To(Equal(s3plugin.DefaultUploadChunkSize))
+ })
+ It("sets backup upload concurrency to default if
BackupMaxConcurrentRequests is not specified", func() {
+ opts.BackupMaxConcurrentRequests = ""
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(BeNil())
+
Expect(opts.UploadConcurrency).To(Equal(s3plugin.DefaultConcurrency))
+ })
+ It("sets restore download chunk size to default if
RestoreMultipartChunkSize is not specified", func() {
+ opts.RestoreMultipartChunksize = ""
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(BeNil())
+
Expect(opts.DownloadChunkSize).To(Equal(s3plugin.DefaultDownloadChunkSize))
+ })
+ It("sets restore download concurrency to default is
RestoreMaxConcurrentRequests is not specified", func() {
+ opts.RestoreMaxConcurrentRequests = ""
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(BeNil())
+
Expect(opts.DownloadConcurrency).To(Equal(s3plugin.DefaultConcurrency))
+ })
+ })
+ It("succeeds when all fields in config filled", func() {
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(BeNil())
+ })
+ It("succeeds when all fields except endpoint filled in config",
func() {
+ opts.Endpoint = ""
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(BeNil())
+ })
+ It("succeeds when all fields except region filled in config",
func() {
+ opts.Region = ""
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(BeNil())
+ })
+ It("succeeds when all fields except aws_access_key_id and
aws_secret_access_key in config", func() {
+ opts.AwsAccessKeyId = ""
+ opts.AwsSecretAccessKey = ""
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(BeNil())
+ })
+ It("returns error when neither region nor endpoint in config",
func() {
+ opts.Region = ""
+ opts.Endpoint = ""
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(HaveOccurred())
+ })
+ It("returns error when no aws_access_key_id in config", func() {
+ opts.AwsAccessKeyId = ""
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(HaveOccurred())
+ })
+ It("returns error when no aws_secret_access_key in config",
func() {
+ opts.AwsSecretAccessKey = ""
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(HaveOccurred())
+ })
+ It("returns error when no bucket in config", func() {
+ opts.Bucket = ""
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(HaveOccurred())
+ })
+ It("returns error when no folder in config", func() {
+ opts.Folder = ""
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(HaveOccurred())
+ })
+ It("returns error when the encryption value is invalid", func()
{
+ opts.Encryption = "invalid_value"
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(HaveOccurred())
+ })
+ It("returns error when the encryption value is invalid", func()
{
+ opts.Encryption = "invalid_value"
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(HaveOccurred())
+ })
+ It("returns error if executable path is missing", func() {
+ pluginConfig.ExecutablePath = ""
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(HaveOccurred())
+ })
+ It("correctly parses upload params from config", func() {
+ opts.BackupMultipartChunksize = "10MB"
+ opts.BackupMaxConcurrentRequests = "10"
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(BeNil())
+ Expect(opts.UploadChunkSize).To(Equal(int64(10 * 1024 *
1024)))
+ Expect(opts.UploadConcurrency).To(Equal(10))
+ })
+ It("correctly parses download params from config", func() {
+ opts.RestoreMultipartChunksize = "10GB"
+ opts.RestoreMaxConcurrentRequests = "10"
+ err :=
s3plugin.InitializeAndValidateConfig(pluginConfig)
+ Expect(err).To(BeNil())
+ Expect(opts.DownloadChunkSize).To(Equal(int64(10 * 1024
* 1024 * 1024)))
+ Expect(opts.DownloadConcurrency).To(Equal(10))
+ })
+ })
+ Describe("Delete", func() {
+ var flags *flag.FlagSet
+
+ BeforeEach(func() {
+ flags = flag.NewFlagSet("testing flagset",
flag.PanicOnError)
+ })
+ It("returns error when timestamp is not provided", func() {
+ err := flags.Parse([]string{"myconfigfilepath"})
+ Expect(err).ToNot(HaveOccurred())
+ context := cli.NewContext(nil, flags, nil)
+
+ err = s3plugin.DeleteBackup(context)
+
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(Equal("delete requires a
<timestamp>"))
+ })
+ It("returns error when timestamp does not parse", func() {
+ err := flags.Parse([]string{"myconfigfilepath",
"badformat"})
+ Expect(err).ToNot(HaveOccurred())
+ context := cli.NewContext(nil, flags, nil)
+
+ err = s3plugin.DeleteBackup(context)
+
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(Equal("delete requires a
<timestamp> with format YYYYMMDDHHMMSS, but received: badformat"))
+ })
+ })
+ Describe("CustomRetryer", func() {
+ DescribeTable("validate retryer on different http status codes",
+ func(httpStatusCode int, testError error,
expectedRetryValue bool) {
+ _, _, _ = testhelper.SetupTestLogger()
+ var awsErr awserr.RequestFailure
+ if testError != nil {
+ awsErr =
awserr.NewRequestFailure(awserr.New(request.ErrCodeRequestError,
testError.Error(), testError), httpStatusCode, "")
+ }
+ req := &request.Request{
+ HTTPResponse:
&http.Response{StatusCode: httpStatusCode},
+ Error: awsErr,
+ }
+ retryer :=
s3plugin.CustomRetryer{DefaultRetryer: client.DefaultRetryer{NumMaxRetries: 5}}
+ retryValue := retryer.ShouldRetry(req)
+ if retryValue != expectedRetryValue {
+ Fail("unexpected retry behavior")
+ }
+ },
+ Entry("status OK", 200, nil, false),
+ Entry("connection reset", 104, errors.New("connection
reset by peer"), true),
+ Entry("NoSuchKey", 404, awserr.New(s3.ErrCodeNoSuchKey,
"No Such Key", nil), true),
+ Entry("NoSuchBucket", 404,
awserr.New(s3.ErrCodeNoSuchBucket, "No Such Bucket", nil), false),
+ Entry("NotFound", 404, awserr.New("NotFound", "Not
Found", nil), false),
+ )
+ })
+})
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]