This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new 31e7cbc refactor: refactor to disk space error while backing up (#474)
31e7cbc is described below
commit 31e7cbc96b289816892bdade4c6db07754676f27
Author: liyao <[email protected]>
AuthorDate: Mon Nov 27 16:36:27 2023 +0800
refactor: refactor to disk space error while backing up (#474)
* fix: detect disk space error
Signed-off-by: mlycore <[email protected]>
* feat: add data node ip and address to error message
Signed-off-by: mlycore <[email protected]>
* fix: remove useless log
Signed-off-by: mlycore <[email protected]>
* feat: add disk space error check to cmd output
Signed-off-by: mlycore <[email protected]>
* fix: fix return error
Signed-off-by: mlycore <[email protected]>
* feat: hide delete backupfiles output
Signed-off-by: mlycore <[email protected]>
* fix: remove comments
Signed-off-by: mlycore <[email protected]>
* chore: add golint comment
Signed-off-by: mlycore <[email protected]>
* fix: fix test
Signed-off-by: mlycore <[email protected]>
---------
Signed-off-by: mlycore <[email protected]>
---
pitr/agent/internal/pkg/opengauss.go | 23 +++++++++++++----------
pitr/agent/pkg/cmds/cmd.go | 36 +++++++++++++++++++++++++++---------
pitr/cli/internal/cmd/backup.go | 15 +++++++--------
pitr/cli/internal/cmd/backup_test.go | 4 ++--
4 files changed, 49 insertions(+), 29 deletions(-)
diff --git a/pitr/agent/internal/pkg/opengauss.go
b/pitr/agent/internal/pkg/opengauss.go
index 21872c6..84866d3 100644
--- a/pitr/agent/internal/pkg/opengauss.go
+++ b/pitr/agent/internal/pkg/opengauss.go
@@ -98,6 +98,10 @@ const (
)
func (og *openGauss) AsyncBackup(backupPath, instanceName, backupMode string,
threadsNum uint8, dbPort uint16) (string, error) {
+ var (
+ bid string
+ err error
+ )
cmd := fmt.Sprintf(_backupFmt, backupPath, instanceName, backupMode,
og.pgData, threadsNum, dbPort)
outputs, err := cmds.AsyncExec(og.shell, cmd)
if err != nil {
@@ -117,17 +121,15 @@ func (og *openGauss) AsyncBackup(backupPath,
instanceName, backupMode string, th
return "", output.Error
}
- // get the backup id from the first line
- bid, err := og.getBackupID(output.Message)
- if err != nil {
- og.log.Error(fmt.Sprintf("og.getBackupID[source=%s]
return err wrap: %s", output.Message, err))
- return "", err
+ if strings.Contains(output.Message, "INFO: Backup start") {
+ bid, err = og.getBackupID(output.Message)
+ if err != nil {
+
og.log.Error(fmt.Sprintf("og.getBackupID[source=%s] return err wrap: %s",
output.Message, err))
+ return "", err
+ }
}
- // ignore other output
- go og.ignore(outputs)
- return bid, nil //nolint
}
- return "", fmt.Errorf("unknow err")
+ return bid, nil //nolint
}
//nolint:dupl
@@ -192,7 +194,7 @@ func (og *openGauss) AddInstance(backupPath, instance
string) error {
if errors.Is(err, cons.CmdOperateFailed) {
og.log.Error(fmt.Sprintf("add instance failure[output=%s], err:
%s, wrap: %s", output, err, cons.InstanceAlreadyExist))
- return err
+ return fmt.Errorf("add instance failure[output=%s], err: %s,
wrap: %w", output, err, cons.InstanceAlreadyExist)
}
if err != nil {
og.log.Error(fmt.Sprintf(_CmdErrorFmt, og.shell, cmd,
cons.CmdAddInstanceFailed))
@@ -347,6 +349,7 @@ func (og *openGauss) ShowBackupList(backupPath,
instanceName string) ([]*model.B
return og.showbackup(cmd, instanceName)
}
+//nolint:unused
func (og *openGauss) ignore(outputs chan *cmds.Output) {
defer func() {
_ = recover()
diff --git a/pitr/agent/pkg/cmds/cmd.go b/pitr/agent/pkg/cmds/cmd.go
index 486148b..77a98f5 100644
--- a/pitr/agent/pkg/cmds/cmd.go
+++ b/pitr/agent/pkg/cmds/cmd.go
@@ -49,6 +49,7 @@ func AsyncExec(name string, args ...string) (chan *Output,
error) {
if err != nil {
return nil, fmt.Errorf("can not obtain stdout pipe for
command[args=%+v]:%s", args, err)
}
+
if err = cmd.Start(); err != nil {
return nil, fmt.Errorf("the command is err[args=%+v]:%s", args,
err)
}
@@ -61,11 +62,16 @@ func AsyncExec(name string, args ...string) (chan *Output,
error) {
go func() {
if err = syncutils.NewRecoverFuncWithErrRet("", func() error {
for scanner.Scan() {
- output <- &Output{
+ op := &Output{
LineNo: index,
Message: scanner.Text(),
Error: err,
}
+ if strings.Contains(scanner.Text(), "No space
left on device") {
+ op.Error = fmt.Errorf("%s", "No space
left on device")
+ }
+
+ output <- op
index++
}
@@ -78,13 +84,14 @@ func AsyncExec(name string, args ...string) (chan *Output,
error) {
if err = cmd.Wait(); err != nil {
if ee, ok := err.(*exec.ExitError); ok {
- logging.Error(fmt.Sprintf("exec
failure[ee=%s], wrap=%s", ee, cons.CmdOperateFailed))
+ output <- &Output{
+ Error: fmt.Errorf("exec
failure[ee=%s], wrap=%w", ee, cons.CmdOperateFailed),
+ }
+ } else {
+ output <- &Output{
+ Error: fmt.Errorf("%s err: %s",
cmd.String(), err),
+ }
}
-
- output <- &Output{
- Error: fmt.Errorf("%s err: %s",
cmd.String(), err),
- }
-
}
return nil
})(); err != nil {
@@ -113,6 +120,12 @@ func Exec(name string, args ...string) (string, error) {
if err != nil {
return "", fmt.Errorf("can not obtain stdout pipe for
command[args=%+v]:%s", args, err)
}
+
+ stderr, err := cmd.StderrPipe()
+ if err != nil {
+ return "", fmt.Errorf("can not obtain stderr pipe for
cmand[args=%+v]:%s", args, err)
+ }
+
if err = cmd.Start(); err != nil {
return "", fmt.Errorf("the command is err[args=%+v]:%s", args,
err)
}
@@ -122,11 +135,16 @@ func Exec(name string, args ...string) (string, error) {
return "", fmt.Errorf("io.ReadAll return err=%w", err)
}
+ ereader, err := io.ReadAll(stderr)
+ if err != nil {
+ return "", fmt.Errorf("io.ReadAll return err=%w", err)
+ }
+
if err = cmd.Wait(); err != nil {
if ee, ok := err.(*exec.ExitError); ok {
- logging.Error(fmt.Sprintf("exec
failure[ee=%s,stdout=%s]", ee, string(reader)))
+ return "", fmt.Errorf("exec failure[ee=%s,stdout=%s],
wrap:%w", ee, string(reader), cons.CmdOperateFailed)
}
- return "", fmt.Errorf("%s err: %s", cmd.String(), err)
+ return "", fmt.Errorf("%s err: %s", cmd.String(),
string(ereader))
}
return string(reader), nil
}
diff --git a/pitr/cli/internal/cmd/backup.go b/pitr/cli/internal/cmd/backup.go
index 1b59e3c..77f19a5 100644
--- a/pitr/cli/internal/cmd/backup.go
+++ b/pitr/cli/internal/cmd/backup.go
@@ -134,12 +134,7 @@ func backup() error {
}
if lsBackup != nil {
- if cancel {
- deleteBackupFiles(ls, lsBackup,
deleteModeQuiet)
- } else {
- logging.Warn("Try to delete backup data
...")
- deleteBackupFiles(ls, lsBackup,
deleteModeNormal)
- }
+ deleteBackupFiles(ls, lsBackup, deleteModeQuiet)
}
}
}()
@@ -310,20 +305,24 @@ func _execBackup(as pkg.IAgentServer, node
*model.StorageNode, dnCh chan *model.
Instance: defaultInstance,
}
backupID, err := as.Backup(in)
+ status := model.SsBackupStatusRunning
if err != nil {
- return xerr.NewCliErr(err.Error())
+ status = model.BackupStatus(err.Error())
}
// update DnList of lsBackup
dn := &model.DataNode{
IP: node.IP,
Port: node.Port,
- Status: model.SsBackupStatusRunning,
+ Status: status,
BackupID: backupID,
StartTime: timeutil.Now().String(),
EndTime: timeutil.Init(),
}
dnCh <- dn
+ if err != nil {
+ return fmt.Errorf("data node %s:%d backup error: %s", node.IP,
node.Port, err)
+ }
return nil
}
diff --git a/pitr/cli/internal/cmd/backup_test.go
b/pitr/cli/internal/cmd/backup_test.go
index ee72697..8c7387c 100644
--- a/pitr/cli/internal/cmd/backup_test.go
+++ b/pitr/cli/internal/cmd/backup_test.go
@@ -180,9 +180,9 @@ var _ = Describe("Backup", func() {
as.EXPECT().Backup(gomock.Any()).Return("",
xerr.NewCliErr("backup failed"))
- Expect(_execBackup(as, bak.SsBackup.StorageNodes[0],
dnCh)).ToNot(BeNil())
+ Expect(_execBackup(as, bak.SsBackup.StorageNodes[1],
dnCh)).ToNot(BeNil())
close(dnCh)
- Expect(len(dnCh)).To(Equal(1))
+ Expect(len(dnCh)).To(Equal(2))
})
})