This is an automated email from the ASF dual-hosted git repository. zenlin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push: new 11eed33 Add new servicecenter plugin supported eureka new dad8f29 Merge pull request #574 from zenlint/syncer-eureka 11eed33 is described below commit 11eed33811468dfa9bb4a46f7b8fbeec13f024f5 Author: Zen Lin <linzhi...@huawei.com> AuthorDate: Thu Aug 22 11:11:24 2019 +0800 Add new servicecenter plugin supported eureka --- syncer/cmd/daemon.go | 3 + syncer/config/config.go | 1 + syncer/plugins/eureka/eureka.go | 113 ++++++++++++++++ syncer/plugins/eureka/instance.go | 108 +++++++++++++++ syncer/plugins/eureka/service.go | 38 ++++++ syncer/plugins/eureka/transform.go | 218 ++++++++++++++++++++++++++++++ syncer/plugins/eureka/type.go | 155 +++++++++++++++++++++ syncer/plugins/servicecenter/transform.go | 6 +- 8 files changed, 641 insertions(+), 1 deletion(-) diff --git a/syncer/cmd/daemon.go b/syncer/cmd/daemon.go index dde9836..70b65da 100644 --- a/syncer/cmd/daemon.go +++ b/syncer/cmd/daemon.go @@ -61,6 +61,9 @@ func init() { syncerCmd.Flags().IntVar(&conf.ClusterPort, "cluster-port", conf.ClusterPort, "port to communicate between cluster members") + + syncerCmd.Flags().StringVar(&conf.ServicecenterPlugin, "sc-plugin", conf.ServicecenterPlugin, + "plugin name of servicecenter") } // runSyncer Runs the Syncer service. diff --git a/syncer/config/config.go b/syncer/config/config.go index b194d45..723eab8 100644 --- a/syncer/config/config.go +++ b/syncer/config/config.go @@ -25,6 +25,7 @@ import ( "github.com/apache/servicecomb-service-center/pkg/log" "github.com/apache/servicecomb-service-center/syncer/etcd" "github.com/apache/servicecomb-service-center/syncer/pkg/utils" + _ "github.com/apache/servicecomb-service-center/syncer/plugins/eureka" "github.com/apache/servicecomb-service-center/syncer/plugins/servicecenter" "github.com/apache/servicecomb-service-center/syncer/serf" ) diff --git a/syncer/plugins/eureka/eureka.go b/syncer/plugins/eureka/eureka.go new file mode 100644 index 0000000..71039e2 --- /dev/null +++ b/syncer/plugins/eureka/eureka.go @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package eureka + +import ( + "context" + "encoding/json" + "errors" + "io/ioutil" + "net/http" + + "github.com/apache/servicecomb-service-center/pkg/client/sc" + "github.com/apache/servicecomb-service-center/pkg/util" + "github.com/apache/servicecomb-service-center/syncer/plugins" + pb "github.com/apache/servicecomb-service-center/syncer/proto" +) + +const ( + PluginName = "eureka" + + apiApplications = "/apps" +) + +func init() { + // Register self as a repository plugin + plugins.RegisterPlugin(&plugins.Plugin{ + Kind: plugins.PluginServicecenter, + Name: PluginName, + New: New, + }) +} + +type adaptor struct{} + +func New() plugins.PluginInstance { + return &adaptor{} +} + +// New repository with endpoints +func (*adaptor) New(endpoints []string) (plugins.Servicecenter, error) { + cfg := sc.Config{Endpoints: endpoints} + client, err := sc.NewLBClient(cfg.Endpoints, cfg.Merge()) + if err != nil { + return nil, err + } + return &Client{LBClient: client, Cfg: cfg}, nil +} + +type Client struct { + *sc.LBClient + Cfg sc.Config +} + +// GetAll get and transform eureka data to SyncData +func (c *Client) GetAll(ctx context.Context) (*pb.SyncData, error) { + method := http.MethodGet + headers := c.CommonHeaders(method) + resp, err := c.RestDoWithContext(ctx, method, apiApplications, headers, nil) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, c.toError(body) + } + + data := &eurekaData{} + err = json.Unmarshal(body, data) + if err != nil { + return nil, err + } + + return toSyncData(data), nil +} + +// CommonHeaders Set the common header of the request +func (c *Client) CommonHeaders(method string) http.Header { + var headers = make(http.Header) + if len(c.Cfg.Token) > 0 { + headers.Set("X-Auth-Token", c.Cfg.Token) + } + headers.Set("Accept", " application/json") + headers.Set("Content-Type", "application/json") + if method == http.MethodPut { + headers.Set("x-netflix-discovery-replication", "true") + } + return headers +} + +// toError response body to error +func (c *Client) toError(body []byte) error { + return errors.New(util.BytesToStringWithNoCopy(body)) +} diff --git a/syncer/plugins/eureka/instance.go b/syncer/plugins/eureka/instance.go new file mode 100644 index 0000000..4e4806c --- /dev/null +++ b/syncer/plugins/eureka/instance.go @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package eureka + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + + pb "github.com/apache/servicecomb-service-center/syncer/proto" +) + +const ( + apiInstances = "/apps/%s" + apiInstance = "/apps/%s/%s" +) + +// RegisterInstance register instance to servicecenter +func (c *Client) RegisterInstance(ctx context.Context, domainProject, serviceId string, syncInstance *pb.SyncInstance) (string, error) { + instance := toInstance(serviceId, syncInstance) + method := http.MethodPost + headers := c.CommonHeaders(method) + body, err := json.Marshal(&InstanceRequest{Instance: instance}) + if err != nil { + return "", err + } + + apiURL := fmt.Sprintf(apiInstances, serviceId) + resp, err := c.RestDoWithContext(ctx, method, apiURL, headers, body) + if err != nil { + return "", err + } + defer resp.Body.Close() + + body, err = ioutil.ReadAll(resp.Body) + if err != nil { + return "", err + } + if resp.StatusCode != http.StatusNoContent { + return "", c.toError(body) + } + + return instance.InstanceId, nil +} + +// UnregisterInstance unregister instance from servicecenter +func (c *Client) UnregisterInstance(ctx context.Context, domainProject, serviceId, instanceId string) error { + method := http.MethodDelete + headers := c.CommonHeaders(method) + + apiURL := fmt.Sprintf(apiInstance, serviceId, instanceId) + resp, err := c.RestDoWithContext(ctx, method, apiURL, headers, nil) + if err != nil { + return err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusOK { + return c.toError(body) + } + + return nil +} + +// Heartbeat sends heartbeat to servicecenter +func (c *Client) Heartbeat(ctx context.Context, domainProject, serviceId, instanceId string) error { + method := http.MethodPut + headers := c.CommonHeaders(method) + apiURL := fmt.Sprintf(apiInstance, serviceId, instanceId) + "?" + url.Values{"status": {UP}}.Encode() + resp, err := c.RestDoWithContext(ctx, method, apiURL, headers, nil) + if err != nil { + return err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusOK { + return c.toError(body) + } + + return nil +} diff --git a/syncer/plugins/eureka/service.go b/syncer/plugins/eureka/service.go new file mode 100644 index 0000000..69c17c2 --- /dev/null +++ b/syncer/plugins/eureka/service.go @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package eureka + +import ( + "context" + + pb "github.com/apache/servicecomb-service-center/syncer/proto" +) + +// CreateService Eureka's application is created with instance and does not need to be processed here. +func (c *Client) CreateService(ctx context.Context, domainProject string, syncService *pb.SyncService) (string, error) { + return syncService.Name, nil +} + +// DeleteService Eureka's application is created with instance and does not need to be processed here. +func (c *Client) DeleteService(ctx context.Context, domainProject, serviceId string) error { + return nil +} + +// ServiceExistence Eureka's application is created with instance and does not need to be processed here. +func (c *Client) ServiceExistence(ctx context.Context, domainProject string, syncService *pb.SyncService) (string, error) { + return syncService.Name, nil +} diff --git a/syncer/plugins/eureka/transform.go b/syncer/plugins/eureka/transform.go new file mode 100644 index 0000000..686dbef --- /dev/null +++ b/syncer/plugins/eureka/transform.go @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package eureka + +import ( + "encoding/json" + "errors" + "fmt" + "net/url" + "strconv" + "strings" + + "github.com/apache/servicecomb-service-center/pkg/log" + pb "github.com/apache/servicecomb-service-center/syncer/proto" +) + +const ( + eurekaInstanceFormat = "%s:%s:%d" + + defaultApp = "eureka" + + // The name is limited to "Netflix" or "Amazon" or "MyOwn" + // by the enumeration type in the interface of eureka + // "com.netflix.appinfo.DataCenterInfo". + // Here, "MyOwn" is used as the default. + defaultDataCenterInfoName = "MyOwn" + + // Class is used the static variable "MY_DATA_CENTER_INFO_TYPE_MARKER" + // defined in eureka "com.netflix.discovery.converters.jackson", + // that is kept for backward compatibility + defaultDataCenterInfoClass = "com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo" +) + +// toSyncData transform eureka service cache to SyncData +func toSyncData(eureka *eurekaData) (data *pb.SyncData) { + apps := eureka.APPS.Applications + data = &pb.SyncData{ + Services: make([]*pb.SyncService, 0, len(apps)), + Instances: make([]*pb.SyncInstance, 0, 10), + } + + for _, app := range apps { + service := toSyncService(app) + + instances := toSyncInstances(service.ServiceId, app.Instances) + if len(instances) == 0 { + continue + } + + data.Services = append(data.Services, service) + data.Instances = append(data.Instances, instances...) + } + return +} + +// toSyncService transform eureka application to SyncService +func toSyncService(app *Application) (service *pb.SyncService) { + appName := strings.ToLower(app.Name) + service = &pb.SyncService{ + ServiceId: appName, + Name: appName, + App: defaultApp, + Version: "0.0.1", + DomainProject: "default/default", + Status: pb.SyncService_UP, + PluginName: PluginName, + } + return +} + +// toSyncInstances transform eureka instances to SyncInstances +func toSyncInstances(serviceID string, instances []*Instance) []*pb.SyncInstance { + instList := make([]*pb.SyncInstance, 0, len(instances)) + for _, inst := range instances { + if inst.Status != UP { + continue + } + + instList = append(instList, toSyncInstance(serviceID, inst)) + } + return instList +} + +// toSyncInstance transform eureka instance to SyncInstance +func toSyncInstance(serviceID string, instance *Instance) (syncInstance *pb.SyncInstance) { + syncInstance = &pb.SyncInstance{ + InstanceId: instance.InstanceId, + ServiceId: serviceID, + Endpoints: make([]string, 0, 2), + HostName: instance.HostName, + PluginName: PluginName, + Version: "latest", + } + + switch instance.Status { + case UP: + syncInstance.Status = pb.SyncInstance_UP + case DOWN: + syncInstance.Status = pb.SyncInstance_DOWN + case STARTING: + syncInstance.Status = pb.SyncInstance_STARTING + case OUTOFSERVICE: + syncInstance.Status = pb.SyncInstance_OUTOFSERVICE + default: + syncInstance.Status = pb.SyncInstance_UNKNOWN + } + + if instance.Port.Enabled.Bool() { + syncInstance.Endpoints = append(syncInstance.Endpoints, fmt.Sprintf("http://%s:%d", instance.IPAddr, instance.Port.Port)) + } + + if instance.SecurePort.Enabled.Bool() { + syncInstance.Endpoints = append(syncInstance.Endpoints, fmt.Sprintf("https://%s:%d", instance.IPAddr, instance.SecurePort.Port)) + } + + if instance.HealthCheckUrl != "" { + syncInstance.HealthCheck = &pb.HealthCheck{ + Interval: 30, + Times: 3, + Url: instance.HealthCheckUrl, + } + } + + expansion, err := json.Marshal(instance) + if err != nil { + log.Errorf(err, "transform sc service to syncer service failed: %s", err) + return + } + syncInstance.Expansion = expansion + return +} + +// toInstance transform SyncInstance to eureka instance +func toInstance(serviceID string, syncInstance *pb.SyncInstance) (instance *Instance) { + instance = &Instance{} + if syncInstance.PluginName == PluginName && syncInstance.Expansion != nil { + err := json.Unmarshal(syncInstance.Expansion, instance) + if err == nil { + return + } + log.Errorf(err, "proto unmarshal %s instance, instanceID = %s, content = %v failed", + PluginName, syncInstance.InstanceId, syncInstance.Expansion) + } + instance.InstanceId = syncInstance.InstanceId + instance.APP = serviceID + instance.Status = pb.SyncInstance_Status_name[int32(syncInstance.Status)] + instance.OverriddenStatus = UNKNOWN + instance.DataCenterInfo = &DataCenterInfo{ + Name: defaultDataCenterInfoName, + Class: defaultDataCenterInfoClass, + } + + instance.Metadata = &MetaData{ + Map: map[string]string{"instanceId": syncInstance.InstanceId}, + } + + ipAddr := "" + for _, ep := range syncInstance.Endpoints { + addr, err := url.Parse(ep) + if err != nil { + log.Error("parse the endpoint of eureka`s instance failed", err) + continue + } + hostname := addr.Hostname() + if ipAddr != "" || ipAddr != hostname { + log.Error("eureka`s ipAddr must be unique in endpoints", err) + } + ipAddr = hostname + + port := &Port{} + port.Port, err = strconv.Atoi(addr.Port()) + if err != nil { + log.Error("Illegal value of port", err) + continue + } + port.Enabled.Set(true) + + switch addr.Scheme { + case "http": + instance.Port = port + instance.VipAddress = serviceID + case "https": + instance.SecurePort = port + instance.SecureVipAddress = ep + } + } + log.Error(ipAddr,errors.New("sc to eureka hostname failed")) + instance.IPAddr = ipAddr + instance.HostName = ipAddr + + if syncInstance.HealthCheck != nil { + instance.HealthCheckUrl = syncInstance.HealthCheck.Url + } + + instArr := strings.Split(syncInstance.InstanceId, ":") + if len(instArr) != 3 { + if instance.Port != nil && instance.Port.Enabled.Bool() { + instance.InstanceId = fmt.Sprintf(eurekaInstanceFormat, ipAddr, instance.APP, instance.Port.Port) + } else if instance.SecurePort != nil && instance.SecurePort.Enabled.Bool() { + instance.InstanceId = fmt.Sprintf(eurekaInstanceFormat, ipAddr, instance.APP, instance.SecurePort.Port) + } + } + return +} diff --git a/syncer/plugins/eureka/type.go b/syncer/plugins/eureka/type.go new file mode 100644 index 0000000..3388165 --- /dev/null +++ b/syncer/plugins/eureka/type.go @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package eureka + +import ( + "encoding/json" + "strconv" +) + +const ( + UNKNOWN = "UNKNOWN" + UP = "UP" + DOWN = "DOWN" + STARTING = "STARTING" + OUTOFSERVICE = "OUTOFSERVICE" +) + +type eurekaData struct { + APPS *Applications `json:"applications"` +} + +type Applications struct { + VersionsDelta string `json:"versions__delta"` + AppsHashcode string `json:"apps__hashcode"` + Applications []*Application `json:"application,omitempty"` +} + +type Application struct { + Name string `json:"name"` + Instances []*Instance `json:"instance"` +} + +type InstanceRequest struct { + Instance *Instance `json:"instance"` +} + +type Instance struct { + InstanceId string `json:"instanceId"` + HostName string `json:"hostName"` + APP string `json:"app"` + IPAddr string `json:"ipAddr"` + Status string `json:"status"` + OverriddenStatus string `json:"overriddenStatus,omitempty"` + Port *Port `json:"port,omitempty"` + SecurePort *Port `json:"securePort,omitempty"` + CountryId int `json:"countryId,omitempty"` + DataCenterInfo *DataCenterInfo `json:"dataCenterInfo"` + LeaseInfo *LeaseInfo `json:"leaseInfo,omitempty"` + Metadata *MetaData `json:"metadata,omitempty"` + HomePageUrl string `json:"homePageUrl,omitempty"` + StatusPageUrl string `json:"statusPageUrl,omitempty"` + HealthCheckUrl string `json:"healthCheckUrl,omitempty"` + VipAddress string `json:"vipAddress,omitempty"` + SecureVipAddress string `json:"secureVipAddress,omitempty"` + IsCoordinatingDiscoveryServer BoolString `json:"isCoordinatingDiscoveryServer,omitempty"` + LastUpdatedTimestamp string `json:"lastUpdatedTimestamp,omitempty"` + LastDirtyTimestamp string `json:"lastDirtyTimestamp,omitempty"` + ActionType string `json:"actionType,omitempty"` +} + +type DataCenterInfo struct { + Name string `json:"name"` + Class string `json:"@class"` + Metadata *DataCenterMetadata `json:"metadata,omitempty"` +} + +type DataCenterMetadata struct { + AmiLaunchIndex string `json:"ami-launch-index,omitempty"` + LocalHostname string `json:"local-hostname,omitempty"` + AvailabilityZone string `json:"availability-zone,omitempty"` + InstanceId string `json:"instance-id,omitempty"` + PublicIpv4 string `json:"public-ipv4,omitempty"` + PublicHostname string `json:"public-hostname,omitempty"` + AmiManifestPath string `json:"ami-manifest-path,omitempty"` + LocalIpv4 string `json:"local-ipv4,omitempty"` + Hostname string `json:"hostname,omitempty"` + AmiId string `json:"ami-id,omitempty"` + InstanceType string `json:"instance-type,omitempty"` +} + +type LeaseInfo struct { + RenewalIntervalInSecs int `json:"renewalIntervalInSecs,omitempty"` + DurationInSecs int `json:"durationInSecs,omitempty"` + RegistrationTimestamp int `json:"registrationTimestamp,omitempty"` + LastRenewalTimestamp int `json:"lastRenewalTimestamp,omitempty"` + EvictionDurationInSecs uint `json:"evictionDurationInSecs,omitempty"` + EvictionTimestamp int `json:"evictionTimestamp,omitempty"` + ServiceUpTimestamp int `json:"serviceUpTimestamp,omitempty"` +} + +type Port struct { + Port int `json:"$"` + Enabled BoolString `json:"@enabled"` +} + +type MetaData struct { + Map map[string]string + Class string +} + +func (s *MetaData) MarshalJSON() ([]byte, error) { + newMap := make(map[string]string) + for key, value := range s.Map { + newMap[key] = value + } + if s.Class != "" { + newMap["@class"] = s.Class + } + return json.Marshal(&newMap) +} + +func (s *MetaData) UnmarshalJSON(data []byte) error { + newMap := make(map[string]string) + err := json.Unmarshal(data, &newMap) + if err != nil { + return err + } + + s.Map = newMap + if val, ok := s.Map["@class"]; ok { + s.Class = val + delete(s.Map, "@class") + } + return nil +} + +type BoolString string + +func (b *BoolString) Set(value bool) { + str := strconv.FormatBool(value) + *b = BoolString(str) +} + +func (b BoolString) Bool() bool { + enabled, err := strconv.ParseBool(string(b)) + if err != nil { + return false + } + return enabled +} diff --git a/syncer/plugins/servicecenter/transform.go b/syncer/plugins/servicecenter/transform.go index e028d1c..071e547 100644 --- a/syncer/plugins/servicecenter/transform.go +++ b/syncer/plugins/servicecenter/transform.go @@ -206,12 +206,16 @@ func toInstance(syncInstance *pb.SyncInstance) (instance *scpb.MicroServiceInsta endpoint = strings.Replace(ep, "http://", "rest://", 1) case "https": endpoint = strings.Replace(ep, "https://", "rest://", 1) + "?sslEnabled=true" + case "rest": + endpoint = ep + } instance.Endpoints = append(instance.Endpoints, endpoint) } - if syncInstance.HealthCheck != nil { + if syncInstance.HealthCheck != nil && syncInstance.HealthCheck.Mode != pb.HealthCheck_UNKNOWN { instance.HealthCheck = &scpb.HealthCheck{ + Mode: pb.HealthCheck_Modes_name[int32(syncInstance.HealthCheck.Mode)], Port: syncInstance.HealthCheck.Port, Interval: syncInstance.HealthCheck.Interval, Times: syncInstance.HealthCheck.Times,