This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aa26b53  Add environment_service.go and structures for beam sdk, 
network envs, cache and application envs
     new 835d19a  Merge pull request #15654 from [BEAM-12998] [Playground] 
Environment service
aa26b53 is described below

commit aa26b53f633847947c535b48b574cd652a1a0c3f
Author: Ilya Kozyrev <ilya.kozy...@akvelon.com>
AuthorDate: Mon Oct 25 18:33:34 2021 +0300

    Add environment_service.go and structures for beam sdk, network envs, cache 
and application envs
    
    Co-authored-by: AydarZaynutdinov <aydar.zaynutdi...@akvelon.com>
    Co-authored-by: daria.malkova <daria.malk...@akvelon.com>
---
 playground/backend/cmd/server/http.go              |   2 +-
 playground/backend/cmd/server/server.go            |  23 +-
 playground/backend/configs/SDK_JAVA.json           |  13 +
 .../backend/internal/environment/application.go    |  98 +++++++
 .../internal/environment/application_test.go       | 261 +++++++++++++++++++
 playground/backend/internal/environment/beam.go    |  43 ++++
 .../backend/internal/environment/environment.go    |  28 --
 .../internal/environment/environment_service.go    | 193 ++++++++++++++
 .../environment/environment_service_test.go        | 283 +++++++++++++++++++++
 9 files changed, 913 insertions(+), 31 deletions(-)

diff --git a/playground/backend/cmd/server/http.go 
b/playground/backend/cmd/server/http.go
index 8dce878..d49cbfe 100644
--- a/playground/backend/cmd/server/http.go
+++ b/playground/backend/cmd/server/http.go
@@ -22,7 +22,7 @@ import (
 )
 
 // listenHttp binds the http.Handler on the TCP network address
-func listenHttp(ctx context.Context, errChan chan error, envs 
environment.ServerEnvs, handler http.Handler) {
+func listenHttp(ctx context.Context, errChan chan error, envs 
environment.NetworkEnvs, handler http.Handler) {
        grpclog.Infof("listening HTTP at %s\n", envs.Address())
        if err := http.ListenAndServe(envs.Address(), handler); err != nil {
                errChan <- err
diff --git a/playground/backend/cmd/server/server.go 
b/playground/backend/cmd/server/server.go
index 3102188..ea9edfc 100644
--- a/playground/backend/cmd/server/server.go
+++ b/playground/backend/cmd/server/server.go
@@ -32,7 +32,10 @@ func runServer() error {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
 
-       envService := environment.NewEnvironment()
+       envService, err := setupEnvironment()
+       if err != nil {
+               return err
+       }
        grpcServer := grpc.NewServer()
        pb.RegisterPlaygroundServiceServer(grpcServer, &playgroundController{})
 
@@ -40,7 +43,7 @@ func runServer() error {
        handler := Wrap(grpcServer, getGrpcWebOptions())
        errChan := make(chan error)
 
-       go listenHttp(ctx, errChan, envService, handler)
+       go listenHttp(ctx, errChan, envService.NetworkEnvs, handler)
 
        for {
                select {
@@ -53,6 +56,22 @@ func runServer() error {
        }
 }
 
+func setupEnvironment() (*environment.Environment, error) {
+       networkEnvs, err := environment.GetNetworkEnvsFromOsEnvs()
+       if err != nil {
+               return nil, err
+       }
+       beamEnvs, err := environment.GetSdkEnvsFromOsEnvs()
+       if err != nil {
+               return nil, err
+       }
+       appEnvs, err := environment.GetApplicationEnvsFromOsEnvs()
+       if err != nil {
+               return nil, err
+       }
+       return environment.NewEnvironment(*networkEnvs, *beamEnvs, *appEnvs), 
nil
+}
+
 // getGrpcWebOptions returns grpcweb options needed to configure wrapper
 func getGrpcWebOptions() []grpcweb.Option {
        return []grpcweb.Option{
diff --git a/playground/backend/configs/SDK_JAVA.json 
b/playground/backend/configs/SDK_JAVA.json
new file mode 100644
index 0000000..f371b12
--- /dev/null
+++ b/playground/backend/configs/SDK_JAVA.json
@@ -0,0 +1,13 @@
+{
+  "compile_cmd": "javac",
+  "run_cmd": "java",
+  "compile_args": [
+    "-d",
+    "bin",
+    "-classpath"
+  ],
+  "run_args": [
+    "-cp",
+    "bin:"
+  ]
+}
\ No newline at end of file
diff --git a/playground/backend/internal/environment/application.go 
b/playground/backend/internal/environment/application.go
new file mode 100644
index 0000000..39c7753
--- /dev/null
+++ b/playground/backend/internal/environment/application.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package environment
+
+import (
+       "fmt"
+       "time"
+)
+
+// NetworkEnvs contains all environment variables that need to run server.
+type NetworkEnvs struct {
+       ip   string
+       port int
+}
+
+// NewNetworkEnvs constructor for NetworkEnvs
+func NewNetworkEnvs(ip string, port int) *NetworkEnvs {
+       return &NetworkEnvs{ip: ip, port: port}
+}
+
+// Address returns concatenated ip and port through ':'
+func (serverEnvs NetworkEnvs) Address() string {
+       return fmt.Sprintf("%s:%d", serverEnvs.ip, serverEnvs.port)
+}
+
+//CacheEnvs contains all environment variables that needed to use cache
+type CacheEnvs struct {
+       cacheType         string
+       address           string
+       keyExpirationTime time.Duration
+}
+
+// CacheType returns cache type
+func (ce *CacheEnvs) CacheType() string {
+       return ce.cacheType
+}
+
+// Address returns address to connect to remote cache service
+func (ce *CacheEnvs) Address() string {
+       return ce.address
+}
+
+// KeyExpirationTime returns cacheExpirationTime
+func (ce *CacheEnvs) KeyExpirationTime() time.Duration {
+       return ce.keyExpirationTime
+}
+
+// NewCacheEnvs constructor for CacheEnvs
+func NewCacheEnvs(cacheType, cacheAddress string, cacheExpirationTime 
time.Duration) *CacheEnvs {
+       return &CacheEnvs{
+               cacheType:         cacheType,
+               address:           cacheAddress,
+               keyExpirationTime: cacheExpirationTime,
+       }
+}
+
+//ApplicationEnvs contains all environment variables that needed to run 
backend processes
+type ApplicationEnvs struct {
+       workingDir             string
+       cacheEnvs              *CacheEnvs
+       pipelineExecuteTimeout time.Duration
+}
+
+// NewApplicationEnvs constructor for ApplicationEnvs
+func NewApplicationEnvs(workingDir string, cacheEnvs *CacheEnvs, 
pipelineExecuteTimeout time.Duration) *ApplicationEnvs {
+       return &ApplicationEnvs{
+               workingDir:             workingDir,
+               cacheEnvs:              cacheEnvs,
+               pipelineExecuteTimeout: pipelineExecuteTimeout,
+       }
+}
+
+// WorkingDir returns workingDir
+func (ae *ApplicationEnvs) WorkingDir() string {
+       return ae.workingDir
+}
+
+func (ae *ApplicationEnvs) CacheEnvs() CacheEnvs {
+       return *ae.cacheEnvs
+}
+
+// PipelineExecuteTimeout returns pipelineExecuteTimeout
+func (ae *ApplicationEnvs) PipelineExecuteTimeout() time.Duration {
+       return ae.pipelineExecuteTimeout
+}
diff --git a/playground/backend/internal/environment/application_test.go 
b/playground/backend/internal/environment/application_test.go
new file mode 100644
index 0000000..78bca3f
--- /dev/null
+++ b/playground/backend/internal/environment/application_test.go
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package environment
+
+import (
+       "fmt"
+       "testing"
+       "time"
+)
+
+func TestNetworkEnvs_Address(t *testing.T) {
+       type fields struct {
+               ip   string
+               port int
+       }
+       tests := []struct {
+               name   string
+               fields fields
+               want   string
+       }{
+               {
+                       name:   "ip and port concatenated through ':'",
+                       fields: fields{ip: defaultIp, port: defaultPort},
+                       want:   fmt.Sprintf("%s:%d", defaultIp, defaultPort),
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       serverEnvs := NetworkEnvs{
+                               ip:   tt.fields.ip,
+                               port: tt.fields.port,
+                       }
+                       if got := serverEnvs.Address(); got != tt.want {
+                               t.Errorf("Address() = %v, want %v", got, 
tt.want)
+                       }
+               })
+       }
+}
+
+func TestCacheEnvs_CacheType(t *testing.T) {
+       type fields struct {
+               cacheType         string
+               address           string
+               keyExpirationTime time.Duration
+       }
+       tests := []struct {
+               name   string
+               fields fields
+               want   string
+       }{
+               {
+                       name: "all success",
+                       fields: fields{
+                               cacheType:         "MOCK_CACHE_TYPE",
+                               address:           "MOCK_ADDRESS",
+                               keyExpirationTime: 0,
+                       },
+                       want: "MOCK_CACHE_TYPE",
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       ce := &CacheEnvs{
+                               cacheType:         tt.fields.cacheType,
+                               address:           tt.fields.address,
+                               keyExpirationTime: tt.fields.keyExpirationTime,
+                       }
+                       if got := ce.CacheType(); got != tt.want {
+                               t.Errorf("CacheType() = %v, want %v", got, 
tt.want)
+                       }
+               })
+       }
+}
+
+func TestCacheEnvs_Address(t *testing.T) {
+       type fields struct {
+               cacheType         string
+               address           string
+               keyExpirationTime time.Duration
+       }
+       tests := []struct {
+               name   string
+               fields fields
+               want   string
+       }{
+               {
+                       name: "all success",
+                       fields: fields{
+                               cacheType:         "MOCK_CACHE_TYPE",
+                               address:           "MOCK_ADDRESS",
+                               keyExpirationTime: 0,
+                       },
+                       want: "MOCK_ADDRESS",
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       ce := &CacheEnvs{
+                               cacheType:         tt.fields.cacheType,
+                               address:           tt.fields.address,
+                               keyExpirationTime: tt.fields.keyExpirationTime,
+                       }
+                       if got := ce.Address(); got != tt.want {
+                               t.Errorf("Address() = %v, want %v", got, 
tt.want)
+                       }
+               })
+       }
+}
+
+func TestCacheEnvs_KeyExpirationTime(t *testing.T) {
+       type fields struct {
+               cacheType         string
+               address           string
+               keyExpirationTime time.Duration
+       }
+       tests := []struct {
+               name   string
+               fields fields
+               want   time.Duration
+       }{
+               {
+                       name: "all success",
+                       fields: fields{
+                               cacheType:         "MOCK_CACHE_TYPE",
+                               address:           "MOCK_ADDRESS",
+                               keyExpirationTime: 0,
+                       },
+                       want: 0,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       ce := &CacheEnvs{
+                               cacheType:         tt.fields.cacheType,
+                               address:           tt.fields.address,
+                               keyExpirationTime: tt.fields.keyExpirationTime,
+                       }
+                       if got := ce.KeyExpirationTime(); got != tt.want {
+                               t.Errorf("KeyExpirationTime() = %v, want %v", 
got, tt.want)
+                       }
+               })
+       }
+}
+
+func TestApplicationEnvs_WorkingDir(t *testing.T) {
+       type fields struct {
+               workingDir             string
+               cacheEnvs              *CacheEnvs
+               pipelineExecuteTimeout time.Duration
+       }
+       tests := []struct {
+               name   string
+               fields fields
+               want   string
+       }{
+               {
+                       name: "all success",
+                       fields: fields{
+                               workingDir:             "MOCK_WORKING_DIR",
+                               cacheEnvs:              &CacheEnvs{},
+                               pipelineExecuteTimeout: 0,
+                       },
+                       want: "MOCK_WORKING_DIR",
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       ae := &ApplicationEnvs{
+                               workingDir:             tt.fields.workingDir,
+                               cacheEnvs:              tt.fields.cacheEnvs,
+                               pipelineExecuteTimeout: 
tt.fields.pipelineExecuteTimeout,
+                       }
+                       if got := ae.WorkingDir(); got != tt.want {
+                               t.Errorf("WorkingDir() = %v, want %v", got, 
tt.want)
+                       }
+               })
+       }
+}
+
+func TestApplicationEnvs_CacheEnvs(t *testing.T) {
+       type fields struct {
+               workingDir             string
+               cacheEnvs              *CacheEnvs
+               pipelineExecuteTimeout time.Duration
+       }
+       tests := []struct {
+               name   string
+               fields fields
+               want   CacheEnvs
+       }{
+               {
+                       name: "all success",
+                       fields: fields{
+                               workingDir:             "MOCK_WORKING_DIR",
+                               cacheEnvs:              &CacheEnvs{},
+                               pipelineExecuteTimeout: 0,
+                       },
+                       want: CacheEnvs{},
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       ae := &ApplicationEnvs{
+                               workingDir:             tt.fields.workingDir,
+                               cacheEnvs:              tt.fields.cacheEnvs,
+                               pipelineExecuteTimeout: 
tt.fields.pipelineExecuteTimeout,
+                       }
+                       if got := ae.CacheEnvs(); got != tt.want {
+                               t.Errorf("CacheEnvs() = %v, want %v", got, 
tt.want)
+                       }
+               })
+       }
+}
+
+func TestApplicationEnvs_PipelineExecuteTimeout(t *testing.T) {
+       type fields struct {
+               workingDir             string
+               cacheEnvs              *CacheEnvs
+               pipelineExecuteTimeout time.Duration
+       }
+       tests := []struct {
+               name   string
+               fields fields
+               want   time.Duration
+       }{
+               {
+                       name: "all success",
+                       fields: fields{
+                               workingDir:             "MOCK_WORKING_DIR",
+                               cacheEnvs:              &CacheEnvs{},
+                               pipelineExecuteTimeout: 0,
+                       },
+                       want: 0,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       ae := &ApplicationEnvs{
+                               workingDir:             tt.fields.workingDir,
+                               cacheEnvs:              tt.fields.cacheEnvs,
+                               pipelineExecuteTimeout: 
tt.fields.pipelineExecuteTimeout,
+                       }
+                       if got := ae.PipelineExecuteTimeout(); got != tt.want {
+                               t.Errorf("PipelineExecuteTimeout() = %v, want 
%v", got, tt.want)
+                       }
+               })
+       }
+}
diff --git a/playground/backend/internal/environment/beam.go 
b/playground/backend/internal/environment/beam.go
new file mode 100644
index 0000000..d266d0c
--- /dev/null
+++ b/playground/backend/internal/environment/beam.go
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package environment
+
+import (
+       pb "beam.apache.org/playground/backend/internal/api/v1"
+)
+
+// ExecutorConfig contains needed for compiling and execution of the code 
commands
+type ExecutorConfig struct {
+       CompileCmd  string   `json:"compile_cmd"`
+       RunCmd      string   `json:"run_cmd"`
+       CompileArgs []string `json:"compile_args"`
+       RunArgs     []string `json:"run_args"`
+}
+
+func NewExecutorConfig(compileCmd string, runCmd string, compileArgs []string, 
runArgs []string) *ExecutorConfig {
+       return &ExecutorConfig{CompileCmd: compileCmd, RunCmd: runCmd, 
CompileArgs: compileArgs, RunArgs: runArgs}
+}
+
+// BeamEnvs contains all environments related of ApacheBeam. These will use to 
run pipelines
+type BeamEnvs struct {
+       ApacheBeamSdk  pb.Sdk
+       ExecutorConfig *ExecutorConfig
+}
+
+// NewBeamEnvs is a BeamEnvs constructor
+func NewBeamEnvs(apacheBeamSdk pb.Sdk, executorConfig *ExecutorConfig) 
*BeamEnvs {
+       return &BeamEnvs{ApacheBeamSdk: apacheBeamSdk, ExecutorConfig: 
executorConfig}
+}
diff --git a/playground/backend/internal/environment/environment.go 
b/playground/backend/internal/environment/environment.go
deleted file mode 100644
index 910dec9..0000000
--- a/playground/backend/internal/environment/environment.go
+++ /dev/null
@@ -1,28 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// TODO: remove this code when merging 
https://github.com/apache/beam/pull/15654
-
-package environment
-
-type ServerEnvs struct {}
-
-func (envs ServerEnvs) Address() string {
-       return ""
-}
-
-func NewEnvironment() ServerEnvs {
-       return ServerEnvs{}
-}
diff --git a/playground/backend/internal/environment/environment_service.go 
b/playground/backend/internal/environment/environment_service.go
new file mode 100644
index 0000000..240a972
--- /dev/null
+++ b/playground/backend/internal/environment/environment_service.go
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package environment
+
+import (
+       pb "beam.apache.org/playground/backend/internal/api/v1"
+       "encoding/json"
+       "errors"
+       "io/ioutil"
+       "log"
+       "os"
+       "path/filepath"
+       "strconv"
+       "strings"
+       "time"
+)
+
+const (
+       serverIpKey                   = "SERVER_IP"
+       serverPortKey                 = "SERVER_PORT"
+       beamSdkKey                    = "BEAM_SDK"
+       workingDirKey                 = "APP_WORK_DIR"
+       cacheTypeKey                  = "CACHE_TYPE"
+       cacheAddressKey               = "CACHE_ADDRESS"
+       beamPathKey                   = "BEAM_PATH"
+       beamRunnerKey                 = "BEAM_RUNNER"
+       SLF4jKey                      = "SLF4J"
+       cacheKeyExpirationTimeKey     = "KEY_EXPIRATION_TIME"
+       pipelineExecuteTimeoutKey     = "PIPELINE_EXPIRATION_TIMEOUT"
+       defaultIp                     = "localhost"
+       defaultPort                   = 8080
+       defaultSdk                    = pb.Sdk_SDK_JAVA
+       defaultBeamSdkPath            = 
"/opt/apache/beam/jars/beam-sdks-java-harness.jar"
+       defaultCacheType              = "local"
+       defaultCacheAddress           = "localhost:6379"
+       defaultCacheKeyExpirationTime = time.Minute * 15
+       defaultPipelineExecuteTimeout = time.Minute * 10
+       defaultBeamRunner             = 
"/opt/apache/beam/jars/beam-runners-direct.jar"
+       defaultSLF4j                  = "/opt/apache/beam/jars/slf4j-jdk14.jar"
+       jsonExt                       = ".json"
+       configFolderName              = "configs"
+)
+
+// Environment operates with environment structures: NetworkEnvs, BeamEnvs, 
ApplicationEnvs
+type Environment struct {
+       NetworkEnvs     NetworkEnvs
+       BeamSdkEnvs     BeamEnvs
+       ApplicationEnvs ApplicationEnvs
+}
+
+// NewEnvironment is a constructor for Environment.
+// Default values:
+// LogWriters: by default using os.Stdout
+// NetworkEnvs: by default using defaultIp and defaultPort from constants
+// BeamEnvs: by default using pb.Sdk_SDK_JAVA
+// ApplicationEnvs: required field not providing by default value
+func NewEnvironment(networkEnvs NetworkEnvs, beamEnvs BeamEnvs, appEnvs 
ApplicationEnvs) *Environment {
+       svc := Environment{}
+       svc.NetworkEnvs = networkEnvs
+       svc.BeamSdkEnvs = beamEnvs
+       svc.ApplicationEnvs = appEnvs
+
+       return &svc
+}
+
+//GetApplicationEnvsFromOsEnvs lookups in os environment variables and takes 
value for app working dir. If not exists - return error
+func GetApplicationEnvsFromOsEnvs() (*ApplicationEnvs, error) {
+       pipelineExecuteTimeout := defaultPipelineExecuteTimeout
+       cacheExpirationTime := defaultCacheKeyExpirationTime
+       cacheType := getEnv(cacheTypeKey, defaultCacheType)
+       cacheAddress := getEnv(cacheAddressKey, defaultCacheAddress)
+
+       if value, present := os.LookupEnv(cacheKeyExpirationTimeKey); present {
+               if converted, err := time.ParseDuration(value); err == nil {
+                       cacheExpirationTime = converted
+               } else {
+                       log.Printf("couldn't convert provided cache expiration 
time. Using default %s\n", defaultCacheKeyExpirationTime)
+               }
+       }
+       if value, present := os.LookupEnv(pipelineExecuteTimeoutKey); present {
+               if converted, err := time.ParseDuration(value); err == nil {
+                       pipelineExecuteTimeout = converted
+               } else {
+                       log.Printf("couldn't convert provided pipeline execute 
timeout. Using default %s\n", defaultPipelineExecuteTimeout)
+               }
+       }
+
+       if value, present := os.LookupEnv(workingDirKey); present {
+               return NewApplicationEnvs(value, NewCacheEnvs(cacheType, 
cacheAddress, cacheExpirationTime), pipelineExecuteTimeout), nil
+       }
+       return nil, errors.New("APP_WORK_DIR env should be provided with 
os.env")
+}
+
+// GetNetworkEnvsFromOsEnvs lookups in os environment variables and takes 
value for ip and port. If not exists - using default
+func GetNetworkEnvsFromOsEnvs() (*NetworkEnvs, error) {
+       ip := getEnv(serverIpKey, defaultIp)
+       port := defaultPort
+       var err error
+       if value, present := os.LookupEnv(serverPortKey); present {
+               port, err = strconv.Atoi(value)
+               if err != nil {
+                       return nil, err
+               }
+       }
+       return NewNetworkEnvs(ip, port), nil
+}
+
+// GetSdkEnvsFromOsEnvs lookups in os environment variables and takes value 
for Apache Beam SDK. If not exists - using default
+func GetSdkEnvsFromOsEnvs() (*BeamEnvs, error) {
+       sdk := pb.Sdk_SDK_UNSPECIFIED
+       if value, present := os.LookupEnv(beamSdkKey); present {
+
+               switch value {
+               case pb.Sdk_SDK_JAVA.String():
+                       sdk = pb.Sdk_SDK_JAVA
+               case pb.Sdk_SDK_GO.String():
+                       sdk = pb.Sdk_SDK_GO
+               case pb.Sdk_SDK_PYTHON.String():
+                       sdk = pb.Sdk_SDK_PYTHON
+               case pb.Sdk_SDK_SCIO.String():
+                       sdk = pb.Sdk_SDK_SCIO
+               }
+       }
+       if sdk == pb.Sdk_SDK_UNSPECIFIED {
+               return nil, errors.New("env BEAM_SDK must be specified in the 
environment variables")
+       }
+       configPath := filepath.Join(os.Getenv(workingDirKey), configFolderName, 
sdk.String()+jsonExt)
+       executorConfig, err := createExecutorConfig(sdk, configPath)
+       if err != nil {
+               return nil, err
+       }
+       return NewBeamEnvs(sdk, executorConfig), nil
+}
+
+//createExecutorConfig creates ExecutorConfig object that corresponds to 
specific apache beam sdk
+func createExecutorConfig(apacheBeamSdk pb.Sdk, configPath string) 
(*ExecutorConfig, error) {
+       executorConfig, err := getConfigFromJson(configPath)
+       if err != nil {
+               return nil, err
+       }
+       switch apacheBeamSdk {
+       case pb.Sdk_SDK_JAVA:
+               executorConfig.CompileArgs = append(executorConfig.CompileArgs, 
getEnv(beamPathKey, defaultBeamSdkPath))
+               jars := strings.Join([]string{
+                       getEnv(beamPathKey, defaultBeamSdkPath),
+                       getEnv(beamRunnerKey, defaultBeamRunner),
+                       getEnv(SLF4jKey, defaultSLF4j),
+               }, ":")
+               executorConfig.RunArgs[1] += jars
+       case pb.Sdk_SDK_GO:
+               return nil, errors.New("not yet supported")
+       case pb.Sdk_SDK_PYTHON:
+               return nil, errors.New("not yet supported")
+       case pb.Sdk_SDK_SCIO:
+               return nil, errors.New("not yet supported")
+       }
+       return executorConfig, nil
+}
+
+//getConfigFromJson reads a json file to ExecutorConfig struct
+func getConfigFromJson(configPath string) (*ExecutorConfig, error) {
+       file, err := ioutil.ReadFile(configPath)
+       if err != nil {
+               return nil, err
+       }
+       executorConfig := ExecutorConfig{}
+       err = json.Unmarshal(file, &executorConfig)
+       if err != nil {
+               return nil, err
+       }
+       return &executorConfig, err
+}
+
+//getEnv returns a environment variable or default value
+func getEnv(key, defaultValue string) string {
+       if value, ok := os.LookupEnv(key); ok {
+               return value
+       }
+       return defaultValue
+}
diff --git 
a/playground/backend/internal/environment/environment_service_test.go 
b/playground/backend/internal/environment/environment_service_test.go
new file mode 100644
index 0000000..51644c9
--- /dev/null
+++ b/playground/backend/internal/environment/environment_service_test.go
@@ -0,0 +1,283 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package environment
+
+import (
+       playground "beam.apache.org/playground/backend/internal/api/v1"
+       "fmt"
+       "io/fs"
+       "os"
+       "path/filepath"
+       "reflect"
+       "strings"
+       "testing"
+)
+
+const (
+       javaConfig = "{\n  \"compile_cmd\": \"javac\",\n  \"run_cmd\": 
\"java\",\n  \"compile_args\": [\"-d\", \"bin\", \"-classpath\"],\n  
\"run_args\": [\"-cp\", \"bin:\"]\n}"
+)
+
+func TestMain(m *testing.M) {
+       err := setup()
+       if err != nil {
+               fmt.Errorf("error during test setup: %s", err.Error())
+       }
+       defer teardown()
+       m.Run()
+}
+
+func setup() error {
+       err := os.MkdirAll(configFolderName, fs.ModePerm)
+       if err != nil {
+               return err
+       }
+       filePath := filepath.Join(configFolderName, defaultSdk.String()+jsonExt)
+       err = os.WriteFile(filePath, []byte(javaConfig), 0600)
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+func teardown() {
+       err := os.RemoveAll(configFolderName)
+       if err != nil {
+               fmt.Errorf("error during test setup: %s", err.Error())
+       }
+}
+
+func setOsEnvs(envsToSet map[string]string) error {
+       for key, value := range envsToSet {
+               if err := os.Setenv(key, value); err != nil {
+                       return err
+               }
+
+       }
+       return nil
+}
+
+func TestNewEnvironment(t *testing.T) {
+       executorConfig := NewExecutorConfig("javac", "java", []string{""}, 
[]string{""})
+       tests := []struct {
+               name string
+               want *Environment
+       }{
+               {name: "create env service with default envs", want: 
&Environment{
+                       NetworkEnvs:     *NewNetworkEnvs(defaultIp, 
defaultPort),
+                       BeamSdkEnvs:     *NewBeamEnvs(defaultSdk, 
executorConfig),
+                       ApplicationEnvs: *NewApplicationEnvs("/app", 
&CacheEnvs{defaultCacheType, defaultCacheAddress, 
defaultCacheKeyExpirationTime}, defaultPipelineExecuteTimeout),
+               }},
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if got := NewEnvironment(
+                               *NewNetworkEnvs(defaultIp, defaultPort),
+                               *NewBeamEnvs(defaultSdk, executorConfig),
+                               *NewApplicationEnvs("/app", 
&CacheEnvs{defaultCacheType, defaultCacheAddress, 
defaultCacheKeyExpirationTime}, defaultPipelineExecuteTimeout)); 
!reflect.DeepEqual(got, tt.want) {
+                               t.Errorf("NewEnvironment() = %v, want %v", got, 
tt.want)
+                       }
+               })
+       }
+}
+
+func Test_getSdkEnvsFromOsEnvs(t *testing.T) {
+       jars := strings.Join([]string{defaultBeamSdkPath, defaultBeamRunner, 
defaultSLF4j}, ":")
+       tests := []struct {
+               name      string
+               want      *BeamEnvs
+               envsToSet map[string]string
+               wantErr   bool
+       }{
+               {
+                       name:      "not specified beam sdk key in os envs",
+                       want:      nil,
+                       envsToSet: map[string]string{workingDirKey: "./"},
+                       wantErr:   true,
+               },
+               {
+                       name:      "default beam envs",
+                       want:      NewBeamEnvs(defaultSdk, 
NewExecutorConfig("javac", "java", []string{"-d", "bin", "-classpath", 
defaultBeamSdkPath}, []string{"-cp", "bin:" + jars})),
+                       envsToSet: map[string]string{beamSdkKey: "SDK_JAVA", 
workingDirKey: "./"},
+                       wantErr:   false,
+               },
+               {
+                       name:      "specific sdk key in os envs",
+                       want:      NewBeamEnvs(defaultSdk, 
NewExecutorConfig("javac", "java", []string{"-d", "bin", "-classpath", 
defaultBeamSdkPath}, []string{"-cp", "bin:" + jars})),
+                       envsToSet: map[string]string{beamSdkKey: "SDK_JAVA", 
workingDirKey: "./"},
+                       wantErr:   false,
+               },
+               {
+                       name:      "wrong sdk key in os envs",
+                       want:      nil,
+                       envsToSet: map[string]string{beamSdkKey: "SDK_J", 
workingDirKey: "./"},
+                       wantErr:   true,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if err := setOsEnvs(tt.envsToSet); err != nil {
+                               t.Fatalf("couldn't setup os env")
+                       }
+                       got, err := GetSdkEnvsFromOsEnvs()
+                       if (err != nil) != tt.wantErr {
+                               t.Errorf("getSdkEnvsFromOsEnvs() error = %v, 
wantErr %v", err, tt.wantErr)
+                               return
+                       }
+                       if !reflect.DeepEqual(got, tt.want) {
+                               t.Errorf("getSdkEnvsFromOsEnvs() got = %v, want 
%v", got, tt.want)
+                       }
+               })
+       }
+}
+
+func Test_getNetworkEnvsFromOsEnvs(t *testing.T) {
+       tests := []struct {
+               name      string
+               want      *NetworkEnvs
+               envsToSet map[string]string
+               wantErr   bool
+       }{
+               {
+                       name: "default values",
+                       want: NewNetworkEnvs(defaultIp, defaultPort),
+               },
+               {
+                       name:      "values from os envs",
+                       want:      NewNetworkEnvs("12.12.12.21", 1234),
+                       envsToSet: map[string]string{serverIpKey: 
"12.12.12.21", serverPortKey: "1234"},
+               },
+               {
+                       name:      "not int port in os env, should be default",
+                       want:      nil,
+                       envsToSet: map[string]string{serverIpKey: 
"12.12.12.21", serverPortKey: "1a34"},
+                       wantErr:   true,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if err := setOsEnvs(tt.envsToSet); err != nil {
+                               t.Fatalf("couldn't setup os env")
+                       }
+                       got, err := GetNetworkEnvsFromOsEnvs()
+                       if (err != nil) != tt.wantErr {
+                               t.Errorf("getNetworkEnvsFromOsEnvs() error = 
%v, wantErr %v", err, tt.wantErr)
+                               return
+                       }
+                       if !reflect.DeepEqual(got, tt.want) {
+                               t.Errorf("getNetworkEnvsFromOsEnvs() got = %v, 
want %v", got, tt.want)
+                       }
+               })
+       }
+}
+
+func Test_getApplicationEnvsFromOsEnvs(t *testing.T) {
+       tests := []struct {
+               name      string
+               want      *ApplicationEnvs
+               wantErr   bool
+               envsToSet map[string]string
+       }{
+               {name: "working dir is provided", want: 
NewApplicationEnvs("/app", &CacheEnvs{defaultCacheType, defaultCacheAddress, 
defaultCacheKeyExpirationTime}, defaultPipelineExecuteTimeout), wantErr: false, 
envsToSet: map[string]string{workingDirKey: "/app"}},
+               {name: "working dir isn't provided", want: nil, wantErr: true},
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if err := setOsEnvs(tt.envsToSet); err != nil {
+                               t.Fatalf("couldn't setup os env")
+                       }
+                       got, err := GetApplicationEnvsFromOsEnvs()
+                       if (err != nil) != tt.wantErr {
+                               t.Errorf("getApplicationEnvsFromOsEnvs() error 
= %v, wantErr %v", err, tt.wantErr)
+                               return
+                       }
+
+                       if !reflect.DeepEqual(got, tt.want) {
+                               t.Errorf("getApplicationEnvsFromOsEnvs() got = 
%v, want %v", got, tt.want)
+                       }
+                       os.Clearenv()
+               })
+       }
+}
+
+func Test_createExecutorConfig(t *testing.T) {
+       jars := strings.Join([]string{defaultBeamSdkPath, defaultBeamRunner, 
defaultSLF4j}, ":")
+       type args struct {
+               apacheBeamSdk playground.Sdk
+               configPath    string
+       }
+       tests := []struct {
+               name    string
+               args    args
+               want    *ExecutorConfig
+               wantErr bool
+       }{
+               {
+                       name:    "create executor configuration from json file",
+                       args:    args{apacheBeamSdk: defaultSdk, configPath: 
filepath.Join(configFolderName, defaultSdk.String()+jsonExt)},
+                       want:    NewExecutorConfig("javac", "java", 
[]string{"-d", "bin", "-classpath", defaultBeamSdkPath}, []string{"-cp", "bin:" 
+ jars}),
+                       wantErr: false,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       got, err := createExecutorConfig(tt.args.apacheBeamSdk, 
tt.args.configPath)
+                       if (err != nil) != tt.wantErr {
+                               t.Errorf("createExecutorConfig() error = %v, 
wantErr %v", err, tt.wantErr)
+                               return
+                       }
+                       if !reflect.DeepEqual(got, tt.want) {
+                               t.Errorf("createExecutorConfig() got = %v, want 
%v", got, tt.want)
+                       }
+               })
+       }
+}
+
+func Test_getConfigFromJson(t *testing.T) {
+       type args struct {
+               configPath string
+       }
+       tests := []struct {
+               name    string
+               args    args
+               want    *ExecutorConfig
+               wantErr bool
+       }{
+               {
+                       name:    "get object from json",
+                       args:    args{filepath.Join(configFolderName, 
defaultSdk.String()+jsonExt)},
+                       want:    NewExecutorConfig("javac", "java", 
[]string{"-d", "bin", "-classpath"}, []string{"-cp", "bin:"}),
+                       wantErr: false,
+               },
+               {
+                       name:    "error if wrong json path",
+                       args:    args{filepath.Join("wrong_folder", 
defaultSdk.String()+jsonExt)},
+                       want:    nil,
+                       wantErr: true,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       got, err := getConfigFromJson(tt.args.configPath)
+                       if (err != nil) != tt.wantErr {
+                               t.Errorf("getConfigFromJson() error = %v, 
wantErr %v", err, tt.wantErr)
+                               return
+                       }
+                       if !reflect.DeepEqual(got, tt.want) {
+                               t.Errorf("getConfigFromJson() got = %v, want 
%v", got, tt.want)
+                       }
+               })
+       }
+}

Reply via email to