This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new aa26b53 Add environment_service.go and structures for beam sdk, network envs, cache and application envs new 835d19a Merge pull request #15654 from [BEAM-12998] [Playground] Environment service aa26b53 is described below commit aa26b53f633847947c535b48b574cd652a1a0c3f Author: Ilya Kozyrev <ilya.kozy...@akvelon.com> AuthorDate: Mon Oct 25 18:33:34 2021 +0300 Add environment_service.go and structures for beam sdk, network envs, cache and application envs Co-authored-by: AydarZaynutdinov <aydar.zaynutdi...@akvelon.com> Co-authored-by: daria.malkova <daria.malk...@akvelon.com> --- playground/backend/cmd/server/http.go | 2 +- playground/backend/cmd/server/server.go | 23 +- playground/backend/configs/SDK_JAVA.json | 13 + .../backend/internal/environment/application.go | 98 +++++++ .../internal/environment/application_test.go | 261 +++++++++++++++++++ playground/backend/internal/environment/beam.go | 43 ++++ .../backend/internal/environment/environment.go | 28 -- .../internal/environment/environment_service.go | 193 ++++++++++++++ .../environment/environment_service_test.go | 283 +++++++++++++++++++++ 9 files changed, 913 insertions(+), 31 deletions(-) diff --git a/playground/backend/cmd/server/http.go b/playground/backend/cmd/server/http.go index 8dce878..d49cbfe 100644 --- a/playground/backend/cmd/server/http.go +++ b/playground/backend/cmd/server/http.go @@ -22,7 +22,7 @@ import ( ) // listenHttp binds the http.Handler on the TCP network address -func listenHttp(ctx context.Context, errChan chan error, envs environment.ServerEnvs, handler http.Handler) { +func listenHttp(ctx context.Context, errChan chan error, envs environment.NetworkEnvs, handler http.Handler) { grpclog.Infof("listening HTTP at %s\n", envs.Address()) if err := http.ListenAndServe(envs.Address(), handler); err != nil { errChan <- err diff --git a/playground/backend/cmd/server/server.go b/playground/backend/cmd/server/server.go index 3102188..ea9edfc 100644 --- a/playground/backend/cmd/server/server.go +++ b/playground/backend/cmd/server/server.go @@ -32,7 +32,10 @@ func runServer() error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - envService := environment.NewEnvironment() + envService, err := setupEnvironment() + if err != nil { + return err + } grpcServer := grpc.NewServer() pb.RegisterPlaygroundServiceServer(grpcServer, &playgroundController{}) @@ -40,7 +43,7 @@ func runServer() error { handler := Wrap(grpcServer, getGrpcWebOptions()) errChan := make(chan error) - go listenHttp(ctx, errChan, envService, handler) + go listenHttp(ctx, errChan, envService.NetworkEnvs, handler) for { select { @@ -53,6 +56,22 @@ func runServer() error { } } +func setupEnvironment() (*environment.Environment, error) { + networkEnvs, err := environment.GetNetworkEnvsFromOsEnvs() + if err != nil { + return nil, err + } + beamEnvs, err := environment.GetSdkEnvsFromOsEnvs() + if err != nil { + return nil, err + } + appEnvs, err := environment.GetApplicationEnvsFromOsEnvs() + if err != nil { + return nil, err + } + return environment.NewEnvironment(*networkEnvs, *beamEnvs, *appEnvs), nil +} + // getGrpcWebOptions returns grpcweb options needed to configure wrapper func getGrpcWebOptions() []grpcweb.Option { return []grpcweb.Option{ diff --git a/playground/backend/configs/SDK_JAVA.json b/playground/backend/configs/SDK_JAVA.json new file mode 100644 index 0000000..f371b12 --- /dev/null +++ b/playground/backend/configs/SDK_JAVA.json @@ -0,0 +1,13 @@ +{ + "compile_cmd": "javac", + "run_cmd": "java", + "compile_args": [ + "-d", + "bin", + "-classpath" + ], + "run_args": [ + "-cp", + "bin:" + ] +} \ No newline at end of file diff --git a/playground/backend/internal/environment/application.go b/playground/backend/internal/environment/application.go new file mode 100644 index 0000000..39c7753 --- /dev/null +++ b/playground/backend/internal/environment/application.go @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package environment + +import ( + "fmt" + "time" +) + +// NetworkEnvs contains all environment variables that need to run server. +type NetworkEnvs struct { + ip string + port int +} + +// NewNetworkEnvs constructor for NetworkEnvs +func NewNetworkEnvs(ip string, port int) *NetworkEnvs { + return &NetworkEnvs{ip: ip, port: port} +} + +// Address returns concatenated ip and port through ':' +func (serverEnvs NetworkEnvs) Address() string { + return fmt.Sprintf("%s:%d", serverEnvs.ip, serverEnvs.port) +} + +//CacheEnvs contains all environment variables that needed to use cache +type CacheEnvs struct { + cacheType string + address string + keyExpirationTime time.Duration +} + +// CacheType returns cache type +func (ce *CacheEnvs) CacheType() string { + return ce.cacheType +} + +// Address returns address to connect to remote cache service +func (ce *CacheEnvs) Address() string { + return ce.address +} + +// KeyExpirationTime returns cacheExpirationTime +func (ce *CacheEnvs) KeyExpirationTime() time.Duration { + return ce.keyExpirationTime +} + +// NewCacheEnvs constructor for CacheEnvs +func NewCacheEnvs(cacheType, cacheAddress string, cacheExpirationTime time.Duration) *CacheEnvs { + return &CacheEnvs{ + cacheType: cacheType, + address: cacheAddress, + keyExpirationTime: cacheExpirationTime, + } +} + +//ApplicationEnvs contains all environment variables that needed to run backend processes +type ApplicationEnvs struct { + workingDir string + cacheEnvs *CacheEnvs + pipelineExecuteTimeout time.Duration +} + +// NewApplicationEnvs constructor for ApplicationEnvs +func NewApplicationEnvs(workingDir string, cacheEnvs *CacheEnvs, pipelineExecuteTimeout time.Duration) *ApplicationEnvs { + return &ApplicationEnvs{ + workingDir: workingDir, + cacheEnvs: cacheEnvs, + pipelineExecuteTimeout: pipelineExecuteTimeout, + } +} + +// WorkingDir returns workingDir +func (ae *ApplicationEnvs) WorkingDir() string { + return ae.workingDir +} + +func (ae *ApplicationEnvs) CacheEnvs() CacheEnvs { + return *ae.cacheEnvs +} + +// PipelineExecuteTimeout returns pipelineExecuteTimeout +func (ae *ApplicationEnvs) PipelineExecuteTimeout() time.Duration { + return ae.pipelineExecuteTimeout +} diff --git a/playground/backend/internal/environment/application_test.go b/playground/backend/internal/environment/application_test.go new file mode 100644 index 0000000..78bca3f --- /dev/null +++ b/playground/backend/internal/environment/application_test.go @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package environment + +import ( + "fmt" + "testing" + "time" +) + +func TestNetworkEnvs_Address(t *testing.T) { + type fields struct { + ip string + port int + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "ip and port concatenated through ':'", + fields: fields{ip: defaultIp, port: defaultPort}, + want: fmt.Sprintf("%s:%d", defaultIp, defaultPort), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + serverEnvs := NetworkEnvs{ + ip: tt.fields.ip, + port: tt.fields.port, + } + if got := serverEnvs.Address(); got != tt.want { + t.Errorf("Address() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestCacheEnvs_CacheType(t *testing.T) { + type fields struct { + cacheType string + address string + keyExpirationTime time.Duration + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "all success", + fields: fields{ + cacheType: "MOCK_CACHE_TYPE", + address: "MOCK_ADDRESS", + keyExpirationTime: 0, + }, + want: "MOCK_CACHE_TYPE", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ce := &CacheEnvs{ + cacheType: tt.fields.cacheType, + address: tt.fields.address, + keyExpirationTime: tt.fields.keyExpirationTime, + } + if got := ce.CacheType(); got != tt.want { + t.Errorf("CacheType() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestCacheEnvs_Address(t *testing.T) { + type fields struct { + cacheType string + address string + keyExpirationTime time.Duration + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "all success", + fields: fields{ + cacheType: "MOCK_CACHE_TYPE", + address: "MOCK_ADDRESS", + keyExpirationTime: 0, + }, + want: "MOCK_ADDRESS", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ce := &CacheEnvs{ + cacheType: tt.fields.cacheType, + address: tt.fields.address, + keyExpirationTime: tt.fields.keyExpirationTime, + } + if got := ce.Address(); got != tt.want { + t.Errorf("Address() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestCacheEnvs_KeyExpirationTime(t *testing.T) { + type fields struct { + cacheType string + address string + keyExpirationTime time.Duration + } + tests := []struct { + name string + fields fields + want time.Duration + }{ + { + name: "all success", + fields: fields{ + cacheType: "MOCK_CACHE_TYPE", + address: "MOCK_ADDRESS", + keyExpirationTime: 0, + }, + want: 0, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ce := &CacheEnvs{ + cacheType: tt.fields.cacheType, + address: tt.fields.address, + keyExpirationTime: tt.fields.keyExpirationTime, + } + if got := ce.KeyExpirationTime(); got != tt.want { + t.Errorf("KeyExpirationTime() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestApplicationEnvs_WorkingDir(t *testing.T) { + type fields struct { + workingDir string + cacheEnvs *CacheEnvs + pipelineExecuteTimeout time.Duration + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "all success", + fields: fields{ + workingDir: "MOCK_WORKING_DIR", + cacheEnvs: &CacheEnvs{}, + pipelineExecuteTimeout: 0, + }, + want: "MOCK_WORKING_DIR", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ae := &ApplicationEnvs{ + workingDir: tt.fields.workingDir, + cacheEnvs: tt.fields.cacheEnvs, + pipelineExecuteTimeout: tt.fields.pipelineExecuteTimeout, + } + if got := ae.WorkingDir(); got != tt.want { + t.Errorf("WorkingDir() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestApplicationEnvs_CacheEnvs(t *testing.T) { + type fields struct { + workingDir string + cacheEnvs *CacheEnvs + pipelineExecuteTimeout time.Duration + } + tests := []struct { + name string + fields fields + want CacheEnvs + }{ + { + name: "all success", + fields: fields{ + workingDir: "MOCK_WORKING_DIR", + cacheEnvs: &CacheEnvs{}, + pipelineExecuteTimeout: 0, + }, + want: CacheEnvs{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ae := &ApplicationEnvs{ + workingDir: tt.fields.workingDir, + cacheEnvs: tt.fields.cacheEnvs, + pipelineExecuteTimeout: tt.fields.pipelineExecuteTimeout, + } + if got := ae.CacheEnvs(); got != tt.want { + t.Errorf("CacheEnvs() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestApplicationEnvs_PipelineExecuteTimeout(t *testing.T) { + type fields struct { + workingDir string + cacheEnvs *CacheEnvs + pipelineExecuteTimeout time.Duration + } + tests := []struct { + name string + fields fields + want time.Duration + }{ + { + name: "all success", + fields: fields{ + workingDir: "MOCK_WORKING_DIR", + cacheEnvs: &CacheEnvs{}, + pipelineExecuteTimeout: 0, + }, + want: 0, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ae := &ApplicationEnvs{ + workingDir: tt.fields.workingDir, + cacheEnvs: tt.fields.cacheEnvs, + pipelineExecuteTimeout: tt.fields.pipelineExecuteTimeout, + } + if got := ae.PipelineExecuteTimeout(); got != tt.want { + t.Errorf("PipelineExecuteTimeout() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/playground/backend/internal/environment/beam.go b/playground/backend/internal/environment/beam.go new file mode 100644 index 0000000..d266d0c --- /dev/null +++ b/playground/backend/internal/environment/beam.go @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package environment + +import ( + pb "beam.apache.org/playground/backend/internal/api/v1" +) + +// ExecutorConfig contains needed for compiling and execution of the code commands +type ExecutorConfig struct { + CompileCmd string `json:"compile_cmd"` + RunCmd string `json:"run_cmd"` + CompileArgs []string `json:"compile_args"` + RunArgs []string `json:"run_args"` +} + +func NewExecutorConfig(compileCmd string, runCmd string, compileArgs []string, runArgs []string) *ExecutorConfig { + return &ExecutorConfig{CompileCmd: compileCmd, RunCmd: runCmd, CompileArgs: compileArgs, RunArgs: runArgs} +} + +// BeamEnvs contains all environments related of ApacheBeam. These will use to run pipelines +type BeamEnvs struct { + ApacheBeamSdk pb.Sdk + ExecutorConfig *ExecutorConfig +} + +// NewBeamEnvs is a BeamEnvs constructor +func NewBeamEnvs(apacheBeamSdk pb.Sdk, executorConfig *ExecutorConfig) *BeamEnvs { + return &BeamEnvs{ApacheBeamSdk: apacheBeamSdk, ExecutorConfig: executorConfig} +} diff --git a/playground/backend/internal/environment/environment.go b/playground/backend/internal/environment/environment.go deleted file mode 100644 index 910dec9..0000000 --- a/playground/backend/internal/environment/environment.go +++ /dev/null @@ -1,28 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// TODO: remove this code when merging https://github.com/apache/beam/pull/15654 - -package environment - -type ServerEnvs struct {} - -func (envs ServerEnvs) Address() string { - return "" -} - -func NewEnvironment() ServerEnvs { - return ServerEnvs{} -} diff --git a/playground/backend/internal/environment/environment_service.go b/playground/backend/internal/environment/environment_service.go new file mode 100644 index 0000000..240a972 --- /dev/null +++ b/playground/backend/internal/environment/environment_service.go @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package environment + +import ( + pb "beam.apache.org/playground/backend/internal/api/v1" + "encoding/json" + "errors" + "io/ioutil" + "log" + "os" + "path/filepath" + "strconv" + "strings" + "time" +) + +const ( + serverIpKey = "SERVER_IP" + serverPortKey = "SERVER_PORT" + beamSdkKey = "BEAM_SDK" + workingDirKey = "APP_WORK_DIR" + cacheTypeKey = "CACHE_TYPE" + cacheAddressKey = "CACHE_ADDRESS" + beamPathKey = "BEAM_PATH" + beamRunnerKey = "BEAM_RUNNER" + SLF4jKey = "SLF4J" + cacheKeyExpirationTimeKey = "KEY_EXPIRATION_TIME" + pipelineExecuteTimeoutKey = "PIPELINE_EXPIRATION_TIMEOUT" + defaultIp = "localhost" + defaultPort = 8080 + defaultSdk = pb.Sdk_SDK_JAVA + defaultBeamSdkPath = "/opt/apache/beam/jars/beam-sdks-java-harness.jar" + defaultCacheType = "local" + defaultCacheAddress = "localhost:6379" + defaultCacheKeyExpirationTime = time.Minute * 15 + defaultPipelineExecuteTimeout = time.Minute * 10 + defaultBeamRunner = "/opt/apache/beam/jars/beam-runners-direct.jar" + defaultSLF4j = "/opt/apache/beam/jars/slf4j-jdk14.jar" + jsonExt = ".json" + configFolderName = "configs" +) + +// Environment operates with environment structures: NetworkEnvs, BeamEnvs, ApplicationEnvs +type Environment struct { + NetworkEnvs NetworkEnvs + BeamSdkEnvs BeamEnvs + ApplicationEnvs ApplicationEnvs +} + +// NewEnvironment is a constructor for Environment. +// Default values: +// LogWriters: by default using os.Stdout +// NetworkEnvs: by default using defaultIp and defaultPort from constants +// BeamEnvs: by default using pb.Sdk_SDK_JAVA +// ApplicationEnvs: required field not providing by default value +func NewEnvironment(networkEnvs NetworkEnvs, beamEnvs BeamEnvs, appEnvs ApplicationEnvs) *Environment { + svc := Environment{} + svc.NetworkEnvs = networkEnvs + svc.BeamSdkEnvs = beamEnvs + svc.ApplicationEnvs = appEnvs + + return &svc +} + +//GetApplicationEnvsFromOsEnvs lookups in os environment variables and takes value for app working dir. If not exists - return error +func GetApplicationEnvsFromOsEnvs() (*ApplicationEnvs, error) { + pipelineExecuteTimeout := defaultPipelineExecuteTimeout + cacheExpirationTime := defaultCacheKeyExpirationTime + cacheType := getEnv(cacheTypeKey, defaultCacheType) + cacheAddress := getEnv(cacheAddressKey, defaultCacheAddress) + + if value, present := os.LookupEnv(cacheKeyExpirationTimeKey); present { + if converted, err := time.ParseDuration(value); err == nil { + cacheExpirationTime = converted + } else { + log.Printf("couldn't convert provided cache expiration time. Using default %s\n", defaultCacheKeyExpirationTime) + } + } + if value, present := os.LookupEnv(pipelineExecuteTimeoutKey); present { + if converted, err := time.ParseDuration(value); err == nil { + pipelineExecuteTimeout = converted + } else { + log.Printf("couldn't convert provided pipeline execute timeout. Using default %s\n", defaultPipelineExecuteTimeout) + } + } + + if value, present := os.LookupEnv(workingDirKey); present { + return NewApplicationEnvs(value, NewCacheEnvs(cacheType, cacheAddress, cacheExpirationTime), pipelineExecuteTimeout), nil + } + return nil, errors.New("APP_WORK_DIR env should be provided with os.env") +} + +// GetNetworkEnvsFromOsEnvs lookups in os environment variables and takes value for ip and port. If not exists - using default +func GetNetworkEnvsFromOsEnvs() (*NetworkEnvs, error) { + ip := getEnv(serverIpKey, defaultIp) + port := defaultPort + var err error + if value, present := os.LookupEnv(serverPortKey); present { + port, err = strconv.Atoi(value) + if err != nil { + return nil, err + } + } + return NewNetworkEnvs(ip, port), nil +} + +// GetSdkEnvsFromOsEnvs lookups in os environment variables and takes value for Apache Beam SDK. If not exists - using default +func GetSdkEnvsFromOsEnvs() (*BeamEnvs, error) { + sdk := pb.Sdk_SDK_UNSPECIFIED + if value, present := os.LookupEnv(beamSdkKey); present { + + switch value { + case pb.Sdk_SDK_JAVA.String(): + sdk = pb.Sdk_SDK_JAVA + case pb.Sdk_SDK_GO.String(): + sdk = pb.Sdk_SDK_GO + case pb.Sdk_SDK_PYTHON.String(): + sdk = pb.Sdk_SDK_PYTHON + case pb.Sdk_SDK_SCIO.String(): + sdk = pb.Sdk_SDK_SCIO + } + } + if sdk == pb.Sdk_SDK_UNSPECIFIED { + return nil, errors.New("env BEAM_SDK must be specified in the environment variables") + } + configPath := filepath.Join(os.Getenv(workingDirKey), configFolderName, sdk.String()+jsonExt) + executorConfig, err := createExecutorConfig(sdk, configPath) + if err != nil { + return nil, err + } + return NewBeamEnvs(sdk, executorConfig), nil +} + +//createExecutorConfig creates ExecutorConfig object that corresponds to specific apache beam sdk +func createExecutorConfig(apacheBeamSdk pb.Sdk, configPath string) (*ExecutorConfig, error) { + executorConfig, err := getConfigFromJson(configPath) + if err != nil { + return nil, err + } + switch apacheBeamSdk { + case pb.Sdk_SDK_JAVA: + executorConfig.CompileArgs = append(executorConfig.CompileArgs, getEnv(beamPathKey, defaultBeamSdkPath)) + jars := strings.Join([]string{ + getEnv(beamPathKey, defaultBeamSdkPath), + getEnv(beamRunnerKey, defaultBeamRunner), + getEnv(SLF4jKey, defaultSLF4j), + }, ":") + executorConfig.RunArgs[1] += jars + case pb.Sdk_SDK_GO: + return nil, errors.New("not yet supported") + case pb.Sdk_SDK_PYTHON: + return nil, errors.New("not yet supported") + case pb.Sdk_SDK_SCIO: + return nil, errors.New("not yet supported") + } + return executorConfig, nil +} + +//getConfigFromJson reads a json file to ExecutorConfig struct +func getConfigFromJson(configPath string) (*ExecutorConfig, error) { + file, err := ioutil.ReadFile(configPath) + if err != nil { + return nil, err + } + executorConfig := ExecutorConfig{} + err = json.Unmarshal(file, &executorConfig) + if err != nil { + return nil, err + } + return &executorConfig, err +} + +//getEnv returns a environment variable or default value +func getEnv(key, defaultValue string) string { + if value, ok := os.LookupEnv(key); ok { + return value + } + return defaultValue +} diff --git a/playground/backend/internal/environment/environment_service_test.go b/playground/backend/internal/environment/environment_service_test.go new file mode 100644 index 0000000..51644c9 --- /dev/null +++ b/playground/backend/internal/environment/environment_service_test.go @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package environment + +import ( + playground "beam.apache.org/playground/backend/internal/api/v1" + "fmt" + "io/fs" + "os" + "path/filepath" + "reflect" + "strings" + "testing" +) + +const ( + javaConfig = "{\n \"compile_cmd\": \"javac\",\n \"run_cmd\": \"java\",\n \"compile_args\": [\"-d\", \"bin\", \"-classpath\"],\n \"run_args\": [\"-cp\", \"bin:\"]\n}" +) + +func TestMain(m *testing.M) { + err := setup() + if err != nil { + fmt.Errorf("error during test setup: %s", err.Error()) + } + defer teardown() + m.Run() +} + +func setup() error { + err := os.MkdirAll(configFolderName, fs.ModePerm) + if err != nil { + return err + } + filePath := filepath.Join(configFolderName, defaultSdk.String()+jsonExt) + err = os.WriteFile(filePath, []byte(javaConfig), 0600) + if err != nil { + return err + } + return nil +} + +func teardown() { + err := os.RemoveAll(configFolderName) + if err != nil { + fmt.Errorf("error during test setup: %s", err.Error()) + } +} + +func setOsEnvs(envsToSet map[string]string) error { + for key, value := range envsToSet { + if err := os.Setenv(key, value); err != nil { + return err + } + + } + return nil +} + +func TestNewEnvironment(t *testing.T) { + executorConfig := NewExecutorConfig("javac", "java", []string{""}, []string{""}) + tests := []struct { + name string + want *Environment + }{ + {name: "create env service with default envs", want: &Environment{ + NetworkEnvs: *NewNetworkEnvs(defaultIp, defaultPort), + BeamSdkEnvs: *NewBeamEnvs(defaultSdk, executorConfig), + ApplicationEnvs: *NewApplicationEnvs("/app", &CacheEnvs{defaultCacheType, defaultCacheAddress, defaultCacheKeyExpirationTime}, defaultPipelineExecuteTimeout), + }}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := NewEnvironment( + *NewNetworkEnvs(defaultIp, defaultPort), + *NewBeamEnvs(defaultSdk, executorConfig), + *NewApplicationEnvs("/app", &CacheEnvs{defaultCacheType, defaultCacheAddress, defaultCacheKeyExpirationTime}, defaultPipelineExecuteTimeout)); !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewEnvironment() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_getSdkEnvsFromOsEnvs(t *testing.T) { + jars := strings.Join([]string{defaultBeamSdkPath, defaultBeamRunner, defaultSLF4j}, ":") + tests := []struct { + name string + want *BeamEnvs + envsToSet map[string]string + wantErr bool + }{ + { + name: "not specified beam sdk key in os envs", + want: nil, + envsToSet: map[string]string{workingDirKey: "./"}, + wantErr: true, + }, + { + name: "default beam envs", + want: NewBeamEnvs(defaultSdk, NewExecutorConfig("javac", "java", []string{"-d", "bin", "-classpath", defaultBeamSdkPath}, []string{"-cp", "bin:" + jars})), + envsToSet: map[string]string{beamSdkKey: "SDK_JAVA", workingDirKey: "./"}, + wantErr: false, + }, + { + name: "specific sdk key in os envs", + want: NewBeamEnvs(defaultSdk, NewExecutorConfig("javac", "java", []string{"-d", "bin", "-classpath", defaultBeamSdkPath}, []string{"-cp", "bin:" + jars})), + envsToSet: map[string]string{beamSdkKey: "SDK_JAVA", workingDirKey: "./"}, + wantErr: false, + }, + { + name: "wrong sdk key in os envs", + want: nil, + envsToSet: map[string]string{beamSdkKey: "SDK_J", workingDirKey: "./"}, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := setOsEnvs(tt.envsToSet); err != nil { + t.Fatalf("couldn't setup os env") + } + got, err := GetSdkEnvsFromOsEnvs() + if (err != nil) != tt.wantErr { + t.Errorf("getSdkEnvsFromOsEnvs() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("getSdkEnvsFromOsEnvs() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_getNetworkEnvsFromOsEnvs(t *testing.T) { + tests := []struct { + name string + want *NetworkEnvs + envsToSet map[string]string + wantErr bool + }{ + { + name: "default values", + want: NewNetworkEnvs(defaultIp, defaultPort), + }, + { + name: "values from os envs", + want: NewNetworkEnvs("12.12.12.21", 1234), + envsToSet: map[string]string{serverIpKey: "12.12.12.21", serverPortKey: "1234"}, + }, + { + name: "not int port in os env, should be default", + want: nil, + envsToSet: map[string]string{serverIpKey: "12.12.12.21", serverPortKey: "1a34"}, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := setOsEnvs(tt.envsToSet); err != nil { + t.Fatalf("couldn't setup os env") + } + got, err := GetNetworkEnvsFromOsEnvs() + if (err != nil) != tt.wantErr { + t.Errorf("getNetworkEnvsFromOsEnvs() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("getNetworkEnvsFromOsEnvs() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_getApplicationEnvsFromOsEnvs(t *testing.T) { + tests := []struct { + name string + want *ApplicationEnvs + wantErr bool + envsToSet map[string]string + }{ + {name: "working dir is provided", want: NewApplicationEnvs("/app", &CacheEnvs{defaultCacheType, defaultCacheAddress, defaultCacheKeyExpirationTime}, defaultPipelineExecuteTimeout), wantErr: false, envsToSet: map[string]string{workingDirKey: "/app"}}, + {name: "working dir isn't provided", want: nil, wantErr: true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := setOsEnvs(tt.envsToSet); err != nil { + t.Fatalf("couldn't setup os env") + } + got, err := GetApplicationEnvsFromOsEnvs() + if (err != nil) != tt.wantErr { + t.Errorf("getApplicationEnvsFromOsEnvs() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("getApplicationEnvsFromOsEnvs() got = %v, want %v", got, tt.want) + } + os.Clearenv() + }) + } +} + +func Test_createExecutorConfig(t *testing.T) { + jars := strings.Join([]string{defaultBeamSdkPath, defaultBeamRunner, defaultSLF4j}, ":") + type args struct { + apacheBeamSdk playground.Sdk + configPath string + } + tests := []struct { + name string + args args + want *ExecutorConfig + wantErr bool + }{ + { + name: "create executor configuration from json file", + args: args{apacheBeamSdk: defaultSdk, configPath: filepath.Join(configFolderName, defaultSdk.String()+jsonExt)}, + want: NewExecutorConfig("javac", "java", []string{"-d", "bin", "-classpath", defaultBeamSdkPath}, []string{"-cp", "bin:" + jars}), + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := createExecutorConfig(tt.args.apacheBeamSdk, tt.args.configPath) + if (err != nil) != tt.wantErr { + t.Errorf("createExecutorConfig() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("createExecutorConfig() got = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_getConfigFromJson(t *testing.T) { + type args struct { + configPath string + } + tests := []struct { + name string + args args + want *ExecutorConfig + wantErr bool + }{ + { + name: "get object from json", + args: args{filepath.Join(configFolderName, defaultSdk.String()+jsonExt)}, + want: NewExecutorConfig("javac", "java", []string{"-d", "bin", "-classpath"}, []string{"-cp", "bin:"}), + wantErr: false, + }, + { + name: "error if wrong json path", + args: args{filepath.Join("wrong_folder", defaultSdk.String()+jsonExt)}, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := getConfigFromJson(tt.args.configPath) + if (err != nil) != tt.wantErr { + t.Errorf("getConfigFromJson() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("getConfigFromJson() got = %v, want %v", got, tt.want) + } + }) + } +}