This is an automated email from the ASF dual-hosted git repository.

maxyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry-go-libs.git

commit ce8cf9a072c22382bab65ee33f349756d8bad47a
Author: Jamie McAtamney <[email protected]>
AuthorDate: Wed Apr 12 21:57:40 2023 -0700

    Miscellaneous mirror-related enhancements
    
    This commit changes cluster behavior in the following ways in order to
    make it easier to work with mirrors:
    
    1) The ContentIDs array is now deduplicated and sorted in the Cluster
    constructor, so that a cluster with mirrors won't have duplicate values
    in that array and segments can be added in any order without affecting
    the order of ContentIDs.
    
    2) GetSegmentConfiguration can now retrieve only mirror information if
    desired, so that if some code wants to work with primaries and mirrors
    separately it can do so without having to retrieve both primary and
    mirror information at once and then manually separate it out into two
    Clusters.
    
    3) TestExecutor now supports returning different values on subsequent
    calls for both ExecuteLocalCommand and ExecuteClusterCommand, in order
    to make it easier to test functions that might make one call to deal
    with primaries and another to deal with mirrors (or any other use case
    involving multiple calls per function, of course).
---
 cluster/cluster.go      | 28 ++++++++++++++++++-----
 cluster/cluster_test.go | 33 +++++++++++++++++++++++++++
 testhelper/structs.go   | 60 +++++++++++++++++++++++++++++++++++++++++--------
 3 files changed, 107 insertions(+), 14 deletions(-)

diff --git a/cluster/cluster.go b/cluster/cluster.go
index 0fd9135..6f406d8 100644
--- a/cluster/cluster.go
+++ b/cluster/cluster.go
@@ -9,6 +9,7 @@ import (
        "bytes"
        "fmt"
        "os/exec"
+       "sort"
        "strings"
 
        "github.com/cloudberrydb/gp-common-go-libs/dbconn"
@@ -218,9 +219,9 @@ func NewCluster(segConfigs []SegConfig) *Cluster {
        cluster.ByContent = make(map[int][]*SegConfig, 0)
        cluster.ByHost = make(map[string][]*SegConfig, 0)
        cluster.Executor = &GPDBExecutor{}
+
        for i := range cluster.Segments {
                segment := &cluster.Segments[i]
-               cluster.ContentIDs = append(cluster.ContentIDs, 
segment.ContentID)
                cluster.ByContent[segment.ContentID] = 
append(cluster.ByContent[segment.ContentID], segment)
                segmentList := cluster.ByContent[segment.ContentID]
                if len(segmentList) == 2 && segmentList[0].Role == "m" {
@@ -236,6 +237,10 @@ func NewCluster(segConfigs []SegConfig) *Cluster {
                        cluster.Hostnames = append(cluster.Hostnames, 
segment.Hostname)
                }
        }
+       for content := range cluster.ByContent {
+               cluster.ContentIDs = append(cluster.ContentIDs, content)
+       }
+       sort.Ints(cluster.ContentIDs)
        return &cluster
 }
 
@@ -493,13 +498,24 @@ func (cluster *Cluster) GetDirsForHost(hostname string) 
[]string {
  * Helper functions
  */
 
+/*
+ * This function accepts up to two booleans:
+ * By default, it retrieves only primary and coordinator information.
+ * If the first boolean is set to true, it also retrieves mirror and standby 
information.
+ * If the second is set to true, it retrieves only mirror and standby 
information, regardless of the value of the first boolean.
+ */
 func GetSegmentConfiguration(connection *dbconn.DBConn, getMirrors ...bool) 
([]SegConfig, error) {
        includeMirrors := len(getMirrors) == 1 && getMirrors[0]
+       includeOnlyMirrors := len(getMirrors) == 2 && getMirrors[1]
        query := ""
        if connection.Version.Before("6") {
-               whereClause := "WHERE s.role = 'p' AND f.fsname = 'pg_system'"
-               if includeMirrors {
-                       whereClause = "WHERE f.fsname = 'pg_system'"
+               whereClause := "WHERE%s f.fsname = 'pg_system'"
+               if includeOnlyMirrors {
+                       whereClause = fmt.Sprintf(whereClause, " s.role = 'm' 
AND")
+               } else if includeMirrors {
+                       whereClause = fmt.Sprintf(whereClause, "")
+               } else {
+                       whereClause = fmt.Sprintf(whereClause, " s.role = 'p' 
AND")
                }
                query = fmt.Sprintf(`
 SELECT
@@ -516,7 +532,9 @@ JOIN pg_filespace f ON e.fsefsoid = f.oid
 ORDER BY s.content, s.role DESC;`, whereClause)
        } else {
                whereClause := "WHERE role = 'p'"
-               if includeMirrors {
+               if includeOnlyMirrors {
+                       whereClause = "WHERE role = 'm'"
+               } else if includeMirrors {
                        whereClause = ""
                }
                query = fmt.Sprintf(`
diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go
index 7d737f9..6572d53 100644
--- a/cluster/cluster_test.go
+++ b/cluster/cluster_test.go
@@ -147,6 +147,39 @@ var _ = Describe("cluster/cluster tests", func() {
                        Expect(results[2].DataDir).To(Equal("/data/gpseg2"))
                        Expect(results[2].Hostname).To(Equal("remotehost"))
                })
+               It("returns mirrors for a single-host, single-segment cluster", 
func() {
+                       fakeResult := 
sqlmock.NewRows(header).AddRow(localSegOne...)
+                       mock.ExpectQuery("SELECT 
(.*)").WillReturnRows(fakeResult)
+                       results, err := 
cluster.GetSegmentConfiguration(connection, true, true)
+                       Expect(err).ToNot(HaveOccurred())
+                       Expect(len(results)).To(Equal(1))
+                       Expect(results[0].DataDir).To(Equal("/data/gpseg0"))
+                       Expect(results[0].Hostname).To(Equal("localhost"))
+               })
+               It("returns mirrors for a single-host, multi-segment cluster", 
func() {
+                       fakeResult := 
sqlmock.NewRows(header).AddRow(localSegOne...).AddRow(localSegTwo...)
+                       mock.ExpectQuery("SELECT 
(.*)").WillReturnRows(fakeResult)
+                       results, err := 
cluster.GetSegmentConfiguration(connection, true, true)
+                       Expect(err).ToNot(HaveOccurred())
+                       Expect(len(results)).To(Equal(2))
+                       Expect(results[0].DataDir).To(Equal("/data/gpseg0"))
+                       Expect(results[0].Hostname).To(Equal("localhost"))
+                       Expect(results[1].DataDir).To(Equal("/data/gpseg1"))
+                       Expect(results[1].Hostname).To(Equal("localhost"))
+               })
+               It("returns mirrors for a multi-host, multi-segment cluster", 
func() {
+                       fakeResult := 
sqlmock.NewRows(header).AddRow(localSegOne...).AddRow(localSegTwo...).AddRow(remoteSegOne...)
+                       mock.ExpectQuery("SELECT 
(.*)").WillReturnRows(fakeResult)
+                       results, err := 
cluster.GetSegmentConfiguration(connection, true, true)
+                       Expect(err).ToNot(HaveOccurred())
+                       Expect(len(results)).To(Equal(3))
+                       Expect(results[0].DataDir).To(Equal("/data/gpseg0"))
+                       Expect(results[0].Hostname).To(Equal("localhost"))
+                       Expect(results[1].DataDir).To(Equal("/data/gpseg1"))
+                       Expect(results[1].Hostname).To(Equal("localhost"))
+                       Expect(results[2].DataDir).To(Equal("/data/gpseg2"))
+                       Expect(results[2].Hostname).To(Equal("remotehost"))
+               })
        })
 
        Describe("GenerateSSHCommandList", func() {
diff --git a/testhelper/structs.go b/testhelper/structs.go
index e3e4245..6816556 100644
--- a/testhelper/structs.go
+++ b/testhelper/structs.go
@@ -6,6 +6,7 @@ package testhelper
 
 import (
        "github.com/cloudberrydb/gp-common-go-libs/cluster"
+       "github.com/cloudberrydb/gp-common-go-libs/gplog"
        "github.com/jmoiron/sqlx"
 )
 
@@ -42,20 +43,53 @@ func (result TestResult) RowsAffected() (int64, error) {
        return result.Rows, nil
 }
 
+/*
+ * Each output or error type has both a plural form and a singular form.  If 
the plural form is set, it overrides the singular.
+ * The singular form returns the same output and error on each call; the 
plural form returns one output per call (first element on the first call, etc.)
+ * If more calls are made than there are outputs provided a Fatal error is 
raised, unless UseDefaultOutput or UseLastOutput is set.
+ *
+ * The LocalOutputs and LocalErrors arrays are "paired" in that the struct 
doesn't know how "normal" calls and "error" calls will be interleaved, so if
+ * N calls are expected then at least N outputs and N errors must be provided; 
even if UseLastOutput or UseDefaultOutput is set in order to define its
+ * behavior when more than N calls are made and it runs out of outputs and 
errors to return, the two array lengths must still be identical.
+ */
 type TestExecutor struct {
-       LocalOutput     string
-       LocalError      error
-       LocalCommands   []string
+       LocalOutput   string
+       LocalOutputs  []string
+       LocalError    error
+       LocalErrors   []error
+       LocalCommands []string
+
        ClusterOutput   *cluster.RemoteOutput
+       ClusterOutputs  []*cluster.RemoteOutput
        ClusterCommands [][]cluster.ShellCommand
-       ErrorOnExecNum  int // Throw the specified error after this many 
executions of Execute[...]Command(); 0 means always return error
-       NumExecutions   int
+
+       ErrorOnExecNum       int // Return LocalError after this many calls of 
ExecuteLocalCommand (0 means always return error); has no effect for 
ExecuteClusterCommand
+       NumExecutions        int // Total of NumLocalExecutions and 
NumClusterExecutions, for convenience and backwards compatibility
+       NumLocalExecutions   int
+       NumClusterExecutions int
+       UseLastOutput        bool // If we run out of LocalOutputs/LocalErrors 
or ClusterOutputs, default to the final items in those arrays
+       UseDefaultOutput     bool // If we run out of LocalOutputs/LocalErrors 
or ClusterOutputs, default to LocalOutput/LocalError or ClusterOutput
 }
 
 func (executor *TestExecutor) ExecuteLocalCommand(commandStr string) (string, 
error) {
        executor.NumExecutions++
+       executor.NumLocalExecutions++
        executor.LocalCommands = append(executor.LocalCommands, commandStr)
-       if executor.ErrorOnExecNum == 0 || executor.NumExecutions == 
executor.ErrorOnExecNum {
+       if (executor.LocalOutputs == nil && executor.LocalErrors != nil) || 
(executor.LocalOutputs != nil && executor.LocalErrors == nil) {
+               gplog.Fatal(nil, "If one of LocalOutputs or LocalErrors is set, 
both must be set")
+       } else if executor.LocalOutputs != nil && executor.LocalErrors != nil 
&& len(executor.LocalOutputs) != len(executor.LocalErrors) {
+               gplog.Fatal(nil, "Found %d LocalOutputs and %d LocalErrors, but 
one output and one error must be set for each call", 
len(executor.LocalOutputs), len(executor.LocalErrors))
+       }
+       if executor.LocalOutputs != nil {
+               if executor.NumLocalExecutions <= len(executor.LocalOutputs) {
+                       return 
executor.LocalOutputs[executor.NumLocalExecutions-1], 
executor.LocalErrors[executor.NumLocalExecutions-1]
+               } else if executor.UseLastOutput {
+                       return 
executor.LocalOutputs[len(executor.LocalOutputs)-1], 
executor.LocalErrors[len(executor.LocalErrors)-1]
+               } else if executor.UseDefaultOutput {
+                       return executor.LocalOutput, executor.LocalError
+               }
+               gplog.Fatal(nil, "ExecuteLocalCommand called %d times, but only 
%d outputs and errors provided", executor.NumLocalExecutions, 
len(executor.LocalOutputs))
+       } else if executor.ErrorOnExecNum == 0 || executor.NumLocalExecutions 
== executor.ErrorOnExecNum {
                return executor.LocalOutput, executor.LocalError
        }
        return executor.LocalOutput, nil
@@ -63,9 +97,17 @@ func (executor *TestExecutor) ExecuteLocalCommand(commandStr 
string) (string, er
 
 func (executor *TestExecutor) ExecuteClusterCommand(scope cluster.Scope, 
commandList []cluster.ShellCommand) *cluster.RemoteOutput {
        executor.NumExecutions++
+       executor.NumClusterExecutions++
        executor.ClusterCommands = append(executor.ClusterCommands, commandList)
-       if executor.ErrorOnExecNum == 0 || executor.NumExecutions == 
executor.ErrorOnExecNum {
-               return executor.ClusterOutput
+       if executor.ClusterOutputs != nil {
+               if executor.NumClusterExecutions <= 
len(executor.ClusterOutputs) {
+                       return 
executor.ClusterOutputs[executor.NumClusterExecutions-1]
+               } else if executor.UseLastOutput {
+                       return 
executor.ClusterOutputs[len(executor.ClusterOutputs)-1]
+               } else if executor.UseDefaultOutput {
+                       return executor.ClusterOutput
+               }
+               gplog.Fatal(nil, "ExecuteClusterCommand called %d times, but 
only %d ClusterOutputs provided", executor.NumClusterExecutions, 
len(executor.ClusterOutputs))
        }
-       return nil
+       return executor.ClusterOutput
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to