This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch rel/0.91.0 in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 713443a265038bb098e4290ba392e0908a0f58b8 Author: Dominik Riemer <[email protected]> AuthorDate: Thu Feb 16 21:20:37 2023 +0100 Provide environment variables via Environment provider (#1223) (#1224) * Provide environment variables via Environment provider (#1223) * Clean up consul configuration and env variable provider (#1223) * Fix bug in environment variable provider (#1223) * Receive database name from environment (#1223) --- .../streampipes/commons/constants/CustomEnvs.java | 20 +++- .../commons/constants/DefaultEnvValues.java | 9 +- .../apache/streampipes/commons/constants/Envs.java | 87 +++++++---------- .../commons/environment/DefaultEnvironment.java | 80 ++++++++++++++++ .../commons/environment/Environment.java | 40 ++++++++ .../commons/environment/Environments.java | 16 +++- .../environment/variable/EnvironmentVariable.java | 20 ++-- .../streampipes/commons/networking/Networking.java | 21 +++-- .../streampipes/config/backend/BackendConfig.java | 104 ++------------------- .../config/backend/BackendConfigKeys.java | 11 --- .../dataexplorer/DataLakeManagementV4.java | 10 +- .../dataexplorer/query/DataExplorerQuery.java | 10 +- .../dataexplorer/sdk/DataLakeQueryBuilder.java | 8 +- .../dataexplorer/v4/query/DataExplorerQueryV4.java | 18 ++-- ...StreamPipesClientRuntimeConnectionResolver.java | 21 ++--- .../management/connect/adapter/Adapter.java | 16 ---- .../elements/SendToBrokerAdapterSink.java | 15 ++- .../elements/SendToJmsAdapterSink.java | 4 +- .../elements/SendToKafkaAdapterSink.java | 6 +- .../elements/SendToMqttAdapterSink.java | 4 +- .../elements/SendToNatsAdapterSink.java | 4 +- .../simulator/simulator/VehicleDataSimulator.java | 13 ++- .../simulator/utils/WatertankDataSimulator.java | 13 ++- .../messaging/kafka/SpKafkaProducer.java | 6 +- .../runtime/PipelineElementRuntimeInfoFetcher.java | 13 ++- .../manager/setup/AutoInstallation.java | 56 +++++------ .../resource/management/UserResourceManager.java | 11 ++- .../pe/InvocablePipelineElementResource.java | 4 +- .../streampipes/security/jwt/KeyGenerator.java | 5 +- .../service/core/StreamPipesEnvChecker.java | 40 +++++--- .../svcdiscovery/consul/ConsulProvider.java | 2 +- .../extensions/security/WebSecurityConfig.java | 12 ++- .../storage/couchdb/utils/CouchDbConfig.java | 55 ----------- .../streampipes/storage/couchdb/utils/Utils.java | 27 +----- .../encryption/SecretEncryptionManager.java | 7 +- .../user/management/jwt/JwtTokenProvider.java | 7 +- .../standalone/function/StreamPipesFunction.java | 10 +- 37 files changed, 405 insertions(+), 400 deletions(-) diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java index 514c2e5b5..c4cdb950b 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/CustomEnvs.java @@ -17,14 +17,16 @@ */ package org.apache.streampipes.commons.constants; +import java.util.Map; + public class CustomEnvs { public static boolean exists(String envVariable) { - return System.getenv().containsKey(envVariable); + return AllEnvs.INSTANCE.getEnvs().containsKey(envVariable); } public static String getEnv(String envVariable) { - return System.getenv(envVariable); + return AllEnvs.INSTANCE.getEnvs().get(envVariable); } public static Integer getEnvAsInt(String envVariable) { @@ -34,4 +36,18 @@ public class CustomEnvs { public static Boolean getEnvAsBoolean(String envVariable) { return Boolean.parseBoolean(getEnv(envVariable)); } + + private enum AllEnvs { + INSTANCE; + + private final Map<String, String> envs; + + AllEnvs() { + this.envs = System.getenv(); + } + + public Map<String, String> getEnvs() { + return envs; + } + } } diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/DefaultEnvValues.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/DefaultEnvValues.java index 590abb902..04eb5b564 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/DefaultEnvValues.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/DefaultEnvValues.java @@ -25,11 +25,14 @@ public class DefaultEnvValues { public static final String INITIAL_CLIENT_SECRET_DEFAULT = "my-apache-streampipes-secret-key-change-me"; public static final String CONSUL_HOST_DEFAULT = "consul"; - public static final String CONSUL_HOST_LOCAL = "localhost"; - public static final int CONSUL_PORT_DEFAULT = 8500; + public static final String CONSUL_PORT_DEFAULT = "8500"; public static final int MAX_WAIT_TIME_AT_SHUTDOWN_DEFAULT = 10000; - public static final boolean INSTALL_PIPELINE_ELEMENTS = true; + public static final String INSTALL_PIPELINE_ELEMENTS = "true"; public static final String DEFAULT_ENCRYPTION_PASSCODE = "eGgemyGBoILAu3xckoIp"; + + public static final String SP_KAFKA_RETENTION_MS_DEFAULT = "600000"; + + public static final String LOCALHOST = "localhost"; } diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java index 067df3dea..8be54973f 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java @@ -19,34 +19,38 @@ package org.apache.streampipes.commons.constants; public enum Envs { - SP_HOST("SP_HOST", null), - SP_PORT("SP_PORT", null), + SP_HOST("SP_HOST"), + SP_PORT("SP_PORT"), @Deprecated(since = "0.90.0", forRemoval = true) - SP_CONSUL_LOCATION("CONSUL_LOCATION", "consul", "localhost"), - - SP_CONSUL_HOST("SP_CONSUL_HOST", "consul", "localhost"), - SP_CONSUL_PORT("SP_CONSUL_PORT", "8500"), - SP_KAFKA_RETENTION_MS("SP_KAFKA_RETENTION_MS", null), - SP_JWT_SECRET("JWT_SECRET", null), - SP_JWT_SIGNING_MODE("SP_JWT_SIGNING_MODE", null), - SP_JWT_PRIVATE_KEY_LOC("SP_JWT_PRIVATE_KEY_LOC", null), - SP_JWT_PUBLIC_KEY_LOC("SP_JWT_PUBLIC_KEY_LOC", null), - SP_INITIAL_ADMIN_EMAIL("SP_INITIAL_ADMIN_EMAIL", null), - SP_INITIAL_ADMIN_PASSWORD("SP_INITIAL_ADMIN_PASSWORD", null), - SP_INITIAL_SERVICE_USER("SP_INITIAL_SERVICE_USER", null), - SP_INITIAL_SERVICE_USER_SECRET("SP_INITIAL_SERVICE_USER_SECRET", null), - SP_SETUP_INSTALL_PIPELINE_ELEMENTS("SP_SETUP_INSTALL_PIPELINE_ELEMENTS", null), - SP_EXT_AUTH_MODE("SP_EXT_AUTH_MODE", null), - SP_CLIENT_USER("SP_CLIENT_USER", null), - SP_CLIENT_SECRET("SP_CLIENT_SECRET", null), - SP_ENCRYPTION_PASSCODE("SP_ENCRYPTION_PASSCODE", null), + SP_CONSUL_LOCATION("CONSUL_LOCATION", + DefaultEnvValues.CONSUL_HOST_DEFAULT, + DefaultEnvValues.LOCALHOST), + + SP_CONSUL_HOST("SP_CONSUL_HOST", + DefaultEnvValues.CONSUL_HOST_DEFAULT, + DefaultEnvValues.LOCALHOST), + SP_CONSUL_PORT("SP_CONSUL_PORT", DefaultEnvValues.CONSUL_PORT_DEFAULT), + SP_KAFKA_RETENTION_MS("SP_KAFKA_RETENTION_MS", DefaultEnvValues.SP_KAFKA_RETENTION_MS_DEFAULT), + SP_JWT_SECRET("JWT_SECRET"), + SP_JWT_SIGNING_MODE("SP_JWT_SIGNING_MODE"), + SP_JWT_PRIVATE_KEY_LOC("SP_JWT_PRIVATE_KEY_LOC"), + SP_JWT_PUBLIC_KEY_LOC("SP_JWT_PUBLIC_KEY_LOC"), + SP_INITIAL_ADMIN_EMAIL("SP_INITIAL_ADMIN_EMAIL", DefaultEnvValues.INITIAL_ADMIN_EMAIL_DEFAULT), + SP_INITIAL_ADMIN_PASSWORD("SP_INITIAL_ADMIN_PASSWORD", DefaultEnvValues.INITIAL_ADMIN_PW_DEFAULT), + SP_INITIAL_SERVICE_USER("SP_INITIAL_SERVICE_USER", DefaultEnvValues.INITIAL_CLIENT_USER_DEFAULT), + SP_INITIAL_SERVICE_USER_SECRET("SP_INITIAL_SERVICE_USER_SECRET", DefaultEnvValues.INITIAL_CLIENT_SECRET_DEFAULT), + SP_SETUP_INSTALL_PIPELINE_ELEMENTS("SP_SETUP_INSTALL_PIPELINE_ELEMENTS", DefaultEnvValues.INSTALL_PIPELINE_ELEMENTS), + SP_EXT_AUTH_MODE("SP_EXT_AUTH_MODE"), + SP_CLIENT_USER("SP_CLIENT_USER", DefaultEnvValues.INITIAL_CLIENT_USER_DEFAULT), + SP_CLIENT_SECRET("SP_CLIENT_SECRET", DefaultEnvValues.INITIAL_CLIENT_SECRET_DEFAULT), + SP_ENCRYPTION_PASSCODE("SP_ENCRYPTION_PASSCODE", DefaultEnvValues.DEFAULT_ENCRYPTION_PASSCODE), SP_DEBUG("SP_DEBUG", "false"), - SP_MAX_WAIT_TIME_AT_SHUTDOWN("SP_MAX_WAIT_TIME_AT_SHUTDOWN", null), + SP_MAX_WAIT_TIME_AT_SHUTDOWN("SP_MAX_WAIT_TIME_AT_SHUTDOWN"), // CouchDB Storage SP_COUCHDB_PROTOCOL("SP_COUCHDB_PROTOCOL", "http"), - SP_COUCHDB_HOST("SP_COUCHDB_HOST", "couchdb", "localhost"), + SP_COUCHDB_HOST("SP_COUCHDB_HOST", "couchdb", DefaultEnvValues.LOCALHOST), SP_COUCHDB_PORT("SP_COUCHDB_PORT", "5984"), SP_COUCHDB_USER("SP_COUCHDB_USER", "admin"), SP_COUCHDB_PASSWORD("SP_COUCHDB_PASSWORD", "admin"), @@ -54,7 +58,7 @@ public enum Envs { // Time Series Storage SP_TS_STORAGE_PROTOCOL("SP_TS_STORAGE_PROTOCOL", "http"), - SP_TS_STORAGE_HOST("SP_TS_STORAGE_HOST", "influxdb", "localhost"), + SP_TS_STORAGE_HOST("SP_TS_STORAGE_HOST", "influxdb", DefaultEnvValues.LOCALHOST), SP_TS_STORAGE_PORT("SP_TS_STORAGE_PORT", "8086"), SP_TS_STORAGE_TOKEN("SP_TS_STORAGE_TOKEN", "sp-admin"), @@ -64,54 +68,29 @@ public enum Envs { SP_TS_STORAGE_BUCKET("SP_TS_STORAGE_BUCKET", "sp"); private final String envVariableName; - private final String defaultValue; + private String defaultValue; - private final String devDefaultValue; + private String devDefaultValue; Envs(String envVariableName, String defaultValue, String devDefaultValue) { - this.envVariableName = envVariableName; - this.defaultValue = defaultValue; + this(envVariableName, defaultValue); this.devDefaultValue = devDefaultValue; } Envs(String envVariableName, String defaultValue) { - this.envVariableName = envVariableName; + this(envVariableName); this.defaultValue = defaultValue; this.devDefaultValue = defaultValue; } - public boolean exists() { - return CustomEnvs.exists(this.envVariableName); - } - - public String getValue() { - return CustomEnvs.getEnv(this.envVariableName); - } - - public Integer getValueAsInt() { - return CustomEnvs.getEnvAsInt(this.envVariableName); - } - - public Integer getValueAsIntOrDefault(int defaultValue) { - return exists() ? getValueAsInt() : defaultValue; - } - - public Boolean getValueAsBoolean() { - return CustomEnvs.getEnvAsBoolean(this.envVariableName); - } - - public boolean getValueAsBooleanOrDefault(boolean defaultValue) { - return this.exists() ? this.getValueAsBoolean() : defaultValue; + Envs(String envVariableName) { + this.envVariableName = envVariableName; } public String getEnvVariableName() { return envVariableName; } - public String getValueOrDefault(String defaultValue) { - return this.exists() ? this.getValue() : defaultValue; - } - public String getDefaultValue() { return defaultValue; } diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java index 8ccf01948..d58260a90 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java @@ -40,6 +40,16 @@ public class DefaultEnvironment implements Environment { return new BooleanEnvironmentVariable(Envs.SP_DEBUG); } + @Override + public StringEnvironmentVariable getServiceHost() { + return new StringEnvironmentVariable(Envs.SP_HOST); + } + + @Override + public IntEnvironmentVariable getServicePort() { + return new IntEnvironmentVariable(Envs.SP_PORT); + } + @Override public StringEnvironmentVariable getTsStorageProtocol() { return new StringEnvironmentVariable(Envs.SP_TS_STORAGE_PROTOCOL); @@ -94,6 +104,76 @@ public class DefaultEnvironment implements Environment { return new StringEnvironmentVariable(Envs.SP_COUCHDB_PASSWORD); } + @Override + public StringEnvironmentVariable getClientUser() { + return new StringEnvironmentVariable(Envs.SP_CLIENT_USER); + } + + @Override + public StringEnvironmentVariable getClientSecret() { + return new StringEnvironmentVariable(Envs.SP_CLIENT_SECRET); + } + + @Override + public StringEnvironmentVariable getJwtSecret() { + return new StringEnvironmentVariable(Envs.SP_JWT_SECRET); + } + + @Override + public StringEnvironmentVariable getJwtPublicKeyLoc() { + return new StringEnvironmentVariable(Envs.SP_JWT_PUBLIC_KEY_LOC); + } + + @Override + public StringEnvironmentVariable getJwtPrivateKeyLoc() { + return new StringEnvironmentVariable(Envs.SP_JWT_PRIVATE_KEY_LOC); + } + + @Override + public StringEnvironmentVariable getJwtSigningMode() { + return new StringEnvironmentVariable(Envs.SP_JWT_SIGNING_MODE); + } + + @Override + public StringEnvironmentVariable getExtensionsAuthMode() { + return new StringEnvironmentVariable(Envs.SP_EXT_AUTH_MODE); + } + + @Override + public StringEnvironmentVariable getEncryptionPasscode() { + return new StringEnvironmentVariable(Envs.SP_ENCRYPTION_PASSCODE); + } + + @Override + public StringEnvironmentVariable getKafkaRetentionTimeMs() { + return new StringEnvironmentVariable(Envs.SP_KAFKA_RETENTION_MS); + } + + @Override + public BooleanEnvironmentVariable getSetupInstallPipelineElements() { + return new BooleanEnvironmentVariable(Envs.SP_SETUP_INSTALL_PIPELINE_ELEMENTS); + } + + @Override + public StringEnvironmentVariable getInitialServiceUserSecret() { + return new StringEnvironmentVariable(Envs.SP_INITIAL_SERVICE_USER_SECRET); + } + + @Override + public StringEnvironmentVariable getInitialServiceUser() { + return new StringEnvironmentVariable(Envs.SP_INITIAL_SERVICE_USER); + } + + @Override + public StringEnvironmentVariable getInitialAdminEmail() { + return new StringEnvironmentVariable(Envs.SP_INITIAL_ADMIN_EMAIL); + } + + @Override + public StringEnvironmentVariable getInitialAdminPassword() { + return new StringEnvironmentVariable(Envs.SP_INITIAL_ADMIN_PASSWORD); + } + @Override public StringEnvironmentVariable getConsulLocation() { return new StringEnvironmentVariable(Envs.SP_CONSUL_LOCATION); diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java index 63be042c3..2bf591303 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java @@ -34,6 +34,13 @@ public interface Environment { BooleanEnvironmentVariable getSpDebug(); + // Service base configuration + + StringEnvironmentVariable getServiceHost(); + + IntEnvironmentVariable getServicePort(); + + // Time series storage env variables StringEnvironmentVariable getTsStorageProtocol(); @@ -60,4 +67,37 @@ public interface Environment { StringEnvironmentVariable getCouchDbPassword(); + + // JWT & Authentication + + StringEnvironmentVariable getClientUser(); + + StringEnvironmentVariable getClientSecret(); + + StringEnvironmentVariable getJwtSecret(); + + StringEnvironmentVariable getJwtPublicKeyLoc(); + + StringEnvironmentVariable getJwtPrivateKeyLoc(); + + StringEnvironmentVariable getJwtSigningMode(); + + StringEnvironmentVariable getExtensionsAuthMode(); + + StringEnvironmentVariable getEncryptionPasscode(); + + // Messaging + StringEnvironmentVariable getKafkaRetentionTimeMs(); + + + // Setup + BooleanEnvironmentVariable getSetupInstallPipelineElements(); + + StringEnvironmentVariable getInitialServiceUserSecret(); + + StringEnvironmentVariable getInitialServiceUser(); + + StringEnvironmentVariable getInitialAdminEmail(); + + StringEnvironmentVariable getInitialAdminPassword(); } diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java index 631e72dcc..c25d98bbb 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environments.java @@ -21,6 +21,20 @@ package org.apache.streampipes.commons.environment; public class Environments { public static Environment getEnvironment() { - return new DefaultEnvironment(); + return Env.DEFAULT.getEnvironment(); + } + + private enum Env { + DEFAULT(new DefaultEnvironment()); + + private final Environment environment; + + Env(Environment env) { + this.environment = env; + } + + public Environment getEnvironment() { + return environment; + } } } diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java index e02031df4..4219c51d3 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/variable/EnvironmentVariable.java @@ -23,20 +23,14 @@ import org.apache.streampipes.commons.constants.Envs; public abstract class EnvironmentVariable<T> { - private final T defaultValue; + private final String unparsedDefaultValue; private final String envVariableName; private boolean devModeActive; - public EnvironmentVariable(String envVariableName, - T defaultValue) { - this.envVariableName = envVariableName; - this.defaultValue = defaultValue; - } - public EnvironmentVariable(Envs envVariable) { this.envVariableName = envVariable.getEnvVariableName(); this.devModeActive = isDevModeActive(); - this.defaultValue = devModeActive ? parse(envVariable.getDevDefaultValue()) : parse(envVariable.getDefaultValue()); + this.unparsedDefaultValue = devModeActive ? envVariable.getDevDefaultValue() : envVariable.getDefaultValue(); } public T getValue() { @@ -48,7 +42,7 @@ public abstract class EnvironmentVariable<T> { } public T getValueOrDefault() { - return exists() ? getValue() : defaultValue; + return exists() ? getValue() : parse(unparsedDefaultValue); } public T getValueOrReturn(T defaultValue) { @@ -59,6 +53,14 @@ public abstract class EnvironmentVariable<T> { return resolver.resolve(); } + public T getDefault() { + return parse(unparsedDefaultValue); + } + + public String getEnvVariableName() { + return this.envVariableName; + } + private boolean isDevModeActive() { return CustomEnvs.getEnvAsBoolean(Envs.SP_DEBUG.getEnvVariableName()); } diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/networking/Networking.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/networking/Networking.java index 4034c3a9c..0f38cf9cd 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/networking/Networking.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/networking/Networking.java @@ -17,7 +17,8 @@ */ package org.apache.streampipes.commons.networking; -import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,10 +36,11 @@ public class Networking { private static final String DEFAULT_LOCALHOST_IP = "127.0.0.1"; public static String getHostname() throws UnknownHostException { + var svcHostname = getEnvironment().getServiceHost(); String selectedAddress; - if (Envs.SP_HOST.exists()) { - selectedAddress = Envs.SP_HOST.getValue(); - LOG.info("Using IP from provided environment variable {}: {}", Envs.SP_HOST, selectedAddress); + if (svcHostname.exists()) { + selectedAddress = svcHostname.getValue(); + LOG.info("Using IP from provided environment variable {}: {}", svcHostname.getEnvVariableName(), selectedAddress); } else { selectedAddress = InetAddress.getLocalHost().getHostAddress(); @@ -79,10 +81,11 @@ public class Networking { } public static Integer getPort(Integer defaultPort) { + var servicePort = getEnvironment().getServicePort(); Integer selectedPort; - if (Envs.SP_PORT.exists()) { - selectedPort = Envs.SP_PORT.getValueAsInt(); - LOG.info("Using port from provided environment variable {}: {}", Envs.SP_PORT, selectedPort); + if (servicePort.exists()) { + selectedPort = servicePort.getValue(); + LOG.info("Using port from provided environment variable {}: {}", servicePort.getEnvVariableName(), selectedPort); } else { selectedPort = defaultPort; LOG.info("Using default port: {}", defaultPort); @@ -90,4 +93,8 @@ public class Networking { return selectedPort; } + + private static Environment getEnvironment() { + return Environments.getEnvironment(); + } } diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java index 43e602362..caa2c74d8 100644 --- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java +++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java @@ -19,7 +19,8 @@ package org.apache.streampipes.config.backend; -import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.commons.random.TokenGenerator; import org.apache.streampipes.config.backend.model.EmailConfig; import org.apache.streampipes.config.backend.model.GeneralConfig; @@ -28,10 +29,7 @@ import org.apache.streampipes.model.config.MessagingSettings; import org.apache.streampipes.svcdiscovery.SpServiceDiscovery; import org.apache.streampipes.svcdiscovery.api.SpConfig; -import org.apache.commons.lang3.RandomStringUtils; - import java.io.File; -import java.security.SecureRandom; public enum BackendConfig { INSTANCE; @@ -59,9 +57,6 @@ public enum BackendConfig { config.register(BackendConfigKeys.KAFKA_PORT, 9092, "Port for backend service for kafka"); config.register(BackendConfigKeys.ZOOKEEPER_HOST, "zookeeper", "Hostname for backend service for zookeeper"); config.register(BackendConfigKeys.ZOOKEEPER_PORT, 2181, "Port for backend service for zookeeper"); - config.register(BackendConfigKeys.ELASTICSEARCH_HOST, "elasticsearch", "Hostname for elasticsearch service"); - config.register(BackendConfigKeys.ELASTICSEARCH_PORT, 9200, "Port for elasticsearch service"); - config.register(BackendConfigKeys.ELASTICSEARCH_PROTOCOL, "http", "Protocol the elasticsearch service"); config.register(BackendConfigKeys.IS_CONFIGURED, false, "Boolean that indicates whether streampipes is " + "already configured or not"); config.register(BackendConfigKeys.IS_SETUP_RUNNING, false, @@ -70,13 +65,6 @@ public enum BackendConfig { "The directory where " + "pipeline element assets are stored."); config.register(BackendConfigKeys.FILES_DIR, makeFileLocation(), "The directory where " + "pipeline element files are stored."); - config.register(BackendConfigKeys.DATA_LAKE_HOST, "elasticsearch", - "The host of the data base used for the data lake"); - config.register(BackendConfigKeys.DATA_LAKE_PORT, 9200, "The port of the data base used for the data lake"); - - config.register(BackendConfigKeys.INFLUX_HOST, "influxdb", "The host of the influx data base"); - config.register(BackendConfigKeys.INFLUX_PORT, 8086, "The hist of the influx data base"); - config.register(BackendConfigKeys.INFLUX_DATA_BASE, "sp", "The influx data base name"); config.registerObject(BackendConfigKeys.MESSAGING_SETTINGS, DefaultMessagingSettings.make(), "Default Messaging Settings"); @@ -103,27 +91,6 @@ public enum BackendConfig { + File.separator; } - private String randomKey() { - return RandomStringUtils.random(10, 0, possibleCharacters.length - 1, - false, false, possibleCharacters, new SecureRandom()); - } - - public String getBackendHost() { - return config.getString(BackendConfigKeys.BACKEND_HOST); - } - - public int getBackendPort() { - return config.getInteger(BackendConfigKeys.BACKEND_PORT); - } - - public String getBackendUrl() { - return "http://" + getBackendHost() + ":" + getBackendPort(); - } - - public String getBackendApiUrl() { - return getBackendUrl() + "/streampipes-backend/"; - } - public String getJmsHost() { return config.getString(BackendConfigKeys.JMS_HOST); } @@ -156,10 +123,6 @@ public enum BackendConfig { return config.getInteger(BackendConfigKeys.KAFKA_PORT); } - public String getKafkaUrl() { - return getKafkaHost() + ":" + getKafkaPort(); - } - public String getZookeeperHost() { return config.getString(BackendConfigKeys.ZOOKEEPER_HOST); } @@ -189,30 +152,6 @@ public enum BackendConfig { config.setBoolean(BackendConfigKeys.IS_CONFIGURED, b); } - public String getElasticsearchHost() { - return config.getString(BackendConfigKeys.ELASTICSEARCH_HOST); - } - - public int getElasticsearchPort() { - return config.getInteger(BackendConfigKeys.ELASTICSEARCH_PORT); - } - - public String getElasticsearchProtocol() { - return config.getString(BackendConfigKeys.ELASTICSEARCH_PROTOCOL); - } - - public String getKafkaRestHost() { - return config.getString(BackendConfigKeys.KAFKA_REST_HOST); - } - - public Integer getKafkaRestPort() { - return config.getInteger(BackendConfigKeys.KAFKA_REST_PORT); - } - - public String getKafkaRestUrl() { - return "http://" + getKafkaRestHost() + ":" + getKafkaRestPort(); - } - public String getAssetDir() { return config.getString(BackendConfigKeys.ASSETS_DIR); } @@ -221,30 +160,6 @@ public enum BackendConfig { return config.getString(BackendConfigKeys.FILES_DIR); } - public String getDatalakeHost() { - return config.getString(BackendConfigKeys.DATA_LAKE_HOST); - } - - public int getDatalakePort() { - return config.getInteger(BackendConfigKeys.DATA_LAKE_PORT); - } - - public String getDataLakeUrl() { - return getDatalakeHost() + ":" + getDatalakePort(); - } - - public String getInfluxHost() { - return config.getString(BackendConfigKeys.INFLUX_HOST); - } - - public int getInfluxPort() { - return config.getInteger(BackendConfigKeys.INFLUX_PORT); - } - - public String getInfluxDatabaseName() { - return config.getString(BackendConfigKeys.INFLUX_DATA_BASE); - } - public LocalAuthConfig getLocalAuthConfig() { return config.getObject(BackendConfigKeys.LOCAL_AUTH_CONFIG, LocalAuthConfig.class, LocalAuthConfig.fromDefaults(getJwtSecret())); @@ -270,24 +185,21 @@ public enum BackendConfig { config.setObject(BackendConfigKeys.LOCAL_AUTH_CONFIG, authConfig); } - public boolean isSetupRunning() { - return config.getBoolean(BackendConfigKeys.IS_SETUP_RUNNING); - } - public void updateSetupStatus(boolean status) { config.setBoolean(BackendConfigKeys.IS_SETUP_RUNNING, status); } private String getJwtSecret() { - if (Envs.SP_JWT_SECRET.exists()) { - return Envs.SP_JWT_SECRET.getValue(); - } else { - return makeDefaultJwtSecret(); - } + var env = getEnvironment(); + return env.getJwtSecret().getValueOrResolve(this::makeDefaultJwtSecret); } private String makeDefaultJwtSecret() { return TokenGenerator.generateNewToken(); } + private Environment getEnvironment() { + return Environments.getEnvironment(); + } + } diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java index 3d1950b75..9970f241f 100644 --- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java +++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java @@ -31,21 +31,10 @@ public class BackendConfigKeys { public static final String KAFKA_PORT = "SP_KAFKA_PORT"; public static final String ZOOKEEPER_HOST = "SP_ZOOKEEPER_HOST"; public static final String ZOOKEEPER_PORT = "SP_ZOOKEEPER_PORT"; - public static final String ELASTICSEARCH_HOST = "SP_ELASTICSEARCH_HOST"; - public static final String ELASTICSEARCH_PORT = "SP_ELASTICSEARCH_PORT"; - public static final String ELASTICSEARCH_PROTOCOL = "SP_ELASTICSEARCH_PROTOCOL"; public static final String IS_CONFIGURED = "SP_IS_CONFIGURED"; public static final String IS_SETUP_RUNNING = "SP_IS_SETUP_RUNNING"; - public static final String KAFKA_REST_HOST = "SP_KAFKA_REST_HOST"; - public static final String KAFKA_REST_PORT = "SP_KAFKA_REST_PORT"; public static final String ASSETS_DIR = "SP_ASSETS_DIR"; public static final String FILES_DIR = "SP_FILES_DIR"; - public static final String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST"; - public static final String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT"; - - public static final String INFLUX_PORT = "SP_INFLUX_PORT"; - public static final String INFLUX_HOST = "SP_INFLUX_HOST"; - public static final String INFLUX_DATA_BASE = "SP_INFLUX_DATA_BASE"; public static final String MESSAGING_SETTINGS = "SP_MESSAGING_SETTINGS"; public static final String LOCAL_AUTH_CONFIG = "SP_LOCAL_AUTH_CONFIG"; diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java index cb73aaddf..6cc0d5620 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java @@ -18,7 +18,8 @@ package org.apache.streampipes.dataexplorer; -import org.apache.streampipes.config.backend.BackendConfig; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider; import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams; import org.apache.streampipes.dataexplorer.query.DeleteDataQuery; @@ -184,12 +185,13 @@ public class DataLakeManagementV4 { public Map<String, Object> getTagValues(String measurementId, String fields) { InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient(); + String databaseName = getEnvironment().getTsStorageBucket().getValueOrDefault(); Map<String, Object> tags = new HashMap<>(); if (fields != null && !("".equals(fields))) { List<String> fieldList = Arrays.asList(fields.split(",")); fieldList.forEach(f -> { String q = - "SHOW TAG VALUES ON \"" + BackendConfig.INSTANCE.getInfluxDatabaseName() + "\" FROM \"" + measurementId + "SHOW TAG VALUES ON \"" + databaseName + "\" FROM \"" + measurementId + "\" WITH KEY = \"" + f + "\""; Query query = new Query(q); QueryResult queryResult = influxDB.query(query); @@ -287,4 +289,8 @@ public class DataLakeManagementV4 { private IDataLakeStorage getDataLakeStorage() { return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage(); } + + private Environment getEnvironment() { + return Environments.getEnvironment(); + } } diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQuery.java index 3bd5140e0..33cd9b997 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQuery.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQuery.java @@ -17,7 +17,8 @@ */ package org.apache.streampipes.dataexplorer.query; -import org.apache.streampipes.config.backend.BackendConfig; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider; import org.apache.streampipes.model.datalake.DataSeries; import org.apache.streampipes.model.datalake.SpQueryResult; @@ -33,8 +34,9 @@ public abstract class DataExplorerQuery<T> { public T executeQuery() throws RuntimeException { InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient(); + var databaseName = getEnvironment().getTsStorageBucket().getValueOrDefault(); DataExplorerQueryBuilder queryBuilder = - DataExplorerQueryBuilder.create(BackendConfig.INSTANCE.getInfluxDatabaseName()); + DataExplorerQueryBuilder.create(databaseName); getQuery(queryBuilder); Query query = queryBuilder.toQuery(); org.influxdb.dto.QueryResult result; @@ -82,6 +84,10 @@ public abstract class DataExplorerQuery<T> { } + private Environment getEnvironment() { + return Environments.getEnvironment(); + } + protected abstract void getQuery(DataExplorerQueryBuilder queryBuilder); protected abstract T postQuery(org.influxdb.dto.QueryResult result) throws RuntimeException; diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java index 21008a315..71c091348 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java @@ -18,7 +18,8 @@ package org.apache.streampipes.dataexplorer.sdk; -import org.apache.streampipes.config.backend.BackendConfig; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.dataexplorer.v4.params.ColumnFunction; import org.influxdb.dto.Query; @@ -48,11 +49,14 @@ public class DataLakeQueryBuilder { private int limit = Integer.MIN_VALUE; private int offset = Integer.MIN_VALUE; + private Environment env; + private DataLakeQueryBuilder(String measurementId) { this.measurementId = measurementId; this.selectionQuery = select(); this.whereClauses = new ArrayList<>(); this.groupByClauses = new ArrayList<>(); + this.env = Environments.getEnvironment(); } public static DataLakeQueryBuilder create(String measurementId) { @@ -195,7 +199,7 @@ public class DataLakeQueryBuilder { public Query build() { var selectQuery = - this.selectionQuery.from(BackendConfig.INSTANCE.getInfluxDatabaseName(), "\"" + measurementId + "\""); + this.selectionQuery.from(env.getTsStorageBucket().getValueOrDefault(), "\"" + measurementId + "\""); this.whereClauses.forEach(selectQuery::where); if (this.groupByClauses.size() > 0) { diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java index 5bb56b375..8cf455d8e 100644 --- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java +++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java @@ -18,7 +18,8 @@ package org.apache.streampipes.dataexplorer.v4.query; -import org.apache.streampipes.config.backend.BackendConfig; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider; import org.apache.streampipes.dataexplorer.v4.params.DeleteFromStatementParams; import org.apache.streampipes.dataexplorer.v4.params.FillParams; @@ -66,9 +67,7 @@ public class DataExplorerQueryV4 { private boolean appendId = false; private String forId; - public DataExplorerQueryV4() { - - } + private Environment env; public DataExplorerQueryV4(Map<String, QueryParamsV4> params, String forId) { @@ -79,11 +78,12 @@ public class DataExplorerQueryV4 { public DataExplorerQueryV4(Map<String, QueryParamsV4> params) { this.params = params; + this.env = Environments.getEnvironment(); this.maximumAmountOfEvents = -1; } public DataExplorerQueryV4(Map<String, QueryParamsV4> params, int maximumAmountOfEvents) { - this.params = params; + this(params); this.maximumAmountOfEvents = maximumAmountOfEvents; } @@ -92,7 +92,7 @@ public class DataExplorerQueryV4 { List<QueryElement<?>> queryElements = getQueryElements(); if (this.maximumAmountOfEvents != -1) { - QueryBuilder countQueryBuilder = QueryBuilder.create(BackendConfig.INSTANCE.getInfluxDatabaseName()); + QueryBuilder countQueryBuilder = QueryBuilder.create(getDatabaseName()); Query countQuery = countQueryBuilder.build(queryElements, true); QueryResult countQueryResult = influxDB.query(countQuery); Double amountOfQueryResults = getAmountOfResults(countQueryResult); @@ -105,7 +105,7 @@ public class DataExplorerQueryV4 { } } - QueryBuilder queryBuilder = QueryBuilder.create(BackendConfig.INSTANCE.getInfluxDatabaseName()); + QueryBuilder queryBuilder = QueryBuilder.create(getDatabaseName()); Query query = queryBuilder.build(queryElements, false); LOG.debug("Data Lake Query (database:" + query.getDatabase() + "): " + query.getCommand()); @@ -237,4 +237,8 @@ public class DataExplorerQueryV4 { return queryElements; } + + private String getDatabaseName() { + return env.getTsStorageBucket().getValueOrDefault(); + } } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/client/StreamPipesClientRuntimeConnectionResolver.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/client/StreamPipesClientRuntimeConnectionResolver.java index 8bb4d28ae..583bb5ab3 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/client/StreamPipesClientRuntimeConnectionResolver.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/client/StreamPipesClientRuntimeConnectionResolver.java @@ -20,8 +20,8 @@ package org.apache.streampipes.extensions.management.client; import org.apache.streampipes.client.credentials.CredentialsProvider; import org.apache.streampipes.client.credentials.StreamPipesTokenCredentials; import org.apache.streampipes.client.model.ClientConnectionUrlResolver; -import org.apache.streampipes.commons.constants.DefaultEnvValues; -import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.commons.networking.Networking; import org.apache.streampipes.svcdiscovery.SpServiceDiscovery; @@ -38,9 +38,10 @@ import java.util.List; public class StreamPipesClientRuntimeConnectionResolver implements ClientConnectionUrlResolver { private static final Logger LOG = LoggerFactory.getLogger(StreamPipesClientRuntimeConnectionResolver.class); + private Environment env; public StreamPipesClientRuntimeConnectionResolver() { - + this.env = Environments.getEnvironment(); } @Override @@ -52,7 +53,7 @@ public class StreamPipesClientRuntimeConnectionResolver implements ClientConnect public String getBaseUrl() throws SpRuntimeException { List<String> baseUrls = findClientServices(); if (baseUrls.size() > 0) { - if (Envs.SP_DEBUG.exists()) { + if (env.getSpDebug().getValueOrDefault()) { try { return "http://" + Networking.getHostname() + ":" + 8030; } catch (UnknownHostException e) { @@ -67,19 +68,11 @@ public class StreamPipesClientRuntimeConnectionResolver implements ClientConnect } private String getClientApiUser() { - if (Envs.SP_CLIENT_USER.exists()) { - return Envs.SP_CLIENT_USER.getValue(); - } else { - return DefaultEnvValues.INITIAL_CLIENT_USER_DEFAULT; - } + return env.getClientUser().getValueOrDefault(); } private String getClientApiSecret() { - if (Envs.SP_CLIENT_SECRET.exists()) { - return Envs.SP_CLIENT_SECRET.getValue(); - } else { - return DefaultEnvValues.INITIAL_CLIENT_SECRET_DEFAULT; - } + return env.getClientSecret().getValueOrDefault(); } private List<String> findClientServices() { diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/Adapter.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/Adapter.java index 4d87918d6..1f797ab93 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/Adapter.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/Adapter.java @@ -48,33 +48,17 @@ public abstract class Adapter<T extends AdapterDescription> implements IAdapter< @Override public void changeEventGrounding(TransportProtocol transportProtocol) { - if (transportProtocol instanceof JmsTransportProtocol) { SendToJmsAdapterSink sink = (SendToJmsAdapterSink) this.adapterPipeline.getPipelineSink(); - if ("true".equals(System.getenv("SP_DEBUG"))) { - transportProtocol.setBrokerHostname("localhost"); - //((JmsTransportProtocol) transportProtocol).setPort(61616); - } sink.changeTransportProtocol((JmsTransportProtocol) transportProtocol); } else if (transportProtocol instanceof KafkaTransportProtocol) { SendToKafkaAdapterSink sink = (SendToKafkaAdapterSink) this.adapterPipeline.getPipelineSink(); - if ("true".equals(System.getenv("SP_DEBUG"))) { - transportProtocol.setBrokerHostname("localhost"); - ((KafkaTransportProtocol) transportProtocol).setKafkaPort(9094); - } sink.changeTransportProtocol((KafkaTransportProtocol) transportProtocol); } else if (transportProtocol instanceof MqttTransportProtocol) { SendToMqttAdapterSink sink = (SendToMqttAdapterSink) this.adapterPipeline.getPipelineSink(); - if ("true".equals(System.getenv("SP_DEBUG"))) { - transportProtocol.setBrokerHostname("localhost"); - //((MqttTransportProtocol) transportProtocol).setPort(1883); - } sink.changeTransportProtocol((MqttTransportProtocol) transportProtocol); } else if (transportProtocol instanceof NatsTransportProtocol) { SendToNatsAdapterSink sink = (SendToNatsAdapterSink) this.adapterPipeline.getPipelineSink(); - if ("true".equals(System.getenv("SP_DEBUG"))) { - transportProtocol.setBrokerHostname("localhost"); - } sink.changeTransportProtocol((NatsTransportProtocol) transportProtocol); } } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java index 3d2109a4e..7b7c653b8 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToBrokerAdapterSink.java @@ -17,6 +17,8 @@ */ package org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.dataformat.SpDataFormatDefinition; import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement; @@ -48,8 +50,8 @@ public abstract class SendToBrokerAdapterSink<T extends TransportProtocol> imple .getEventGrounding() .getTransportProtocol()); - if ("true".equals(System.getenv("SP_DEBUG"))) { - modifyProtocolForDebugging(); + if (getEnvironment().getSpDebug().getValueOrDefault()) { + modifyProtocolForDebugging(this.protocol); } TransportFormat transportFormat = adapterDescription @@ -88,12 +90,11 @@ public abstract class SendToBrokerAdapterSink<T extends TransportProtocol> imple producer.publish(event); } - protected void modifyProtocolForDebugging() { - - } + public abstract void modifyProtocolForDebugging(T transportProtocol); public void changeTransportProtocol(T transportProtocol) { try { + modifyProtocolForDebugging(transportProtocol); producer.disconnect(); producer.connect(transportProtocol); } catch (SpRuntimeException e) { @@ -101,6 +102,10 @@ public abstract class SendToBrokerAdapterSink<T extends TransportProtocol> imple } } + private Environment getEnvironment() { + return Environments.getEnvironment(); + } + } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java index a392f6c7f..17f954769 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToJmsAdapterSink.java @@ -30,7 +30,7 @@ public class SendToJmsAdapterSink extends SendToBrokerAdapterSink<JmsTransportPr } @Override - public void modifyProtocolForDebugging() { - this.protocol.setBrokerHostname("localhost"); + public void modifyProtocolForDebugging(JmsTransportProtocol protocol) { + protocol.setBrokerHostname("localhost"); } } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java index b92dc711a..a619140f5 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToKafkaAdapterSink.java @@ -30,8 +30,8 @@ public class SendToKafkaAdapterSink extends SendToBrokerAdapterSink<KafkaTranspo } @Override - public void modifyProtocolForDebugging() { - this.protocol.setBrokerHostname("localhost"); - this.protocol.setKafkaPort(9094); + public void modifyProtocolForDebugging(KafkaTransportProtocol protocol) { + protocol.setBrokerHostname("localhost"); + protocol.setKafkaPort(9094); } } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java index 62351bf89..04a481434 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToMqttAdapterSink.java @@ -30,7 +30,7 @@ public class SendToMqttAdapterSink extends SendToBrokerAdapterSink<MqttTransport } @Override - public void modifyProtocolForDebugging() { - this.protocol.setBrokerHostname("localhost"); + public void modifyProtocolForDebugging(MqttTransportProtocol transportProtocol) { + protocol.setBrokerHostname("localhost"); } } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToNatsAdapterSink.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToNatsAdapterSink.java index 89b7471a2..932183c18 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToNatsAdapterSink.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/SendToNatsAdapterSink.java @@ -31,7 +31,7 @@ public class SendToNatsAdapterSink extends SendToBrokerAdapterSink<NatsTransport } @Override - public void modifyProtocolForDebugging() { - this.protocol.setBrokerHostname("localhost"); + public void modifyProtocolForDebugging(NatsTransportProtocol protocol) { + protocol.setBrokerHostname("localhost"); } } diff --git a/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java b/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java index 87622c56d..63a036b6a 100644 --- a/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java +++ b/streampipes-extensions/streampipes-sources-vehicle-simulator/src/main/java/org/apache/streampipes/sources/vehicle/simulator/simulator/VehicleDataSimulator.java @@ -18,7 +18,8 @@ package org.apache.streampipes.sources.vehicle.simulator.simulator; -import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.extensions.management.config.ConfigExtractor; import org.apache.streampipes.extensions.management.init.DeclarersSingleton; import org.apache.streampipes.pe.simulator.StreamPipesSimulationRunner; @@ -37,6 +38,12 @@ public class VehicleDataSimulator implements Runnable { private static final String EXAMPLES_CONFIG_FILE = "streampipesDemoConfig.json"; + private Environment env; + + public VehicleDataSimulator() { + this.env = Environments.getEnvironment(); + } + private void initSimulation() { try { ConfigExtractor configExtractor = @@ -70,12 +77,12 @@ public class VehicleDataSimulator implements Runnable { } private String getKafkaHost(ConfigExtractor configExtractor) { - return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean() + return env.getSpDebug().getValueOrDefault() ? "localhost" : configExtractor.getConfig().getString(ConfigKeys.KAFKA_HOST); } private Integer getKafkaPort(ConfigExtractor configExtractor) { - return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean() + return env.getSpDebug().getValueOrDefault() ? 9094 : configExtractor.getConfig().getInteger(ConfigKeys.KAFKA_PORT); } diff --git a/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java b/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java index 810054d0f..35dd6f2e0 100644 --- a/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java +++ b/streampipes-extensions/streampipes-sources-watertank-simulator/src/main/java/org/apache/streampipes/sources/watertank/simulator/utils/WatertankDataSimulator.java @@ -18,7 +18,8 @@ package org.apache.streampipes.sources.watertank.simulator.utils; -import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.extensions.management.config.ConfigExtractor; import org.apache.streampipes.extensions.management.init.DeclarersSingleton; import org.apache.streampipes.pe.simulator.StreamPipesSimulationRunner; @@ -37,6 +38,12 @@ public class WatertankDataSimulator implements Runnable { private static final String EXAMPLES_CONFIG_FILE = "streampipesDemoConfig.json"; + private Environment env; + + public WatertankDataSimulator() { + this.env = Environments.getEnvironment(); + } + private void initSimulation() { try { ConfigExtractor configExtractor = @@ -61,12 +68,12 @@ public class WatertankDataSimulator implements Runnable { } private String getKafkaHost(ConfigExtractor configExtractor) { - return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean() + return env.getSpDebug().getValueOrDefault() ? "localhost" : configExtractor.getConfig().getString(ConfigKeys.KAFKA_HOST); } private Integer getKafkaPort(ConfigExtractor configExtractor) { - return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean() + return env.getSpDebug().getValueOrDefault() ? 9094 : configExtractor.getConfig().getInteger(ConfigKeys.KAFKA_PORT); } diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java index afd3d44a7..407f2d028 100644 --- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java +++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java @@ -18,7 +18,7 @@ package org.apache.streampipes.messaging.kafka; -import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.messaging.EventProducer; import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender; import org.apache.streampipes.messaging.kafka.config.ProducerConfigFactory; @@ -49,7 +49,6 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S private static final String COLON = ":"; - private static final String SP_KAFKA_RETENTION_MS_DEFAULT = "600000"; private String brokerUrl; private String topic; @@ -125,8 +124,7 @@ public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, S if (!topicExists(topics)) { Map<String, String> topicConfig = new HashMap<>(); - String retentionTime = Envs.SP_KAFKA_RETENTION_MS.exists() - ? Envs.SP_KAFKA_RETENTION_MS.getValue() : SP_KAFKA_RETENTION_MS_DEFAULT; + String retentionTime = Environments.getEnvironment().getKafkaRetentionTimeMs().getValueOrDefault(); topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, retentionTime); final NewTopic newTopic = new NewTopic(topic, 1, (short) 1); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java index 08866eb31..2762bad07 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.java @@ -17,7 +17,8 @@ */ package org.apache.streampipes.manager.runtime; -import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.messaging.jms.ActiveMQConsumer; import org.apache.streampipes.messaging.kafka.SpKafkaConsumer; @@ -43,15 +44,17 @@ public enum PipelineElementRuntimeInfoFetcher { private static final int FETCH_INTERVAL_MS = 300; private final Map<String, SpDataFormatConverter> converterMap; + private Environment env; PipelineElementRuntimeInfoFetcher() { this.converterMap = new HashMap<>(); + this.env = Environments.getEnvironment(); } public String getCurrentData(SpDataStream spDataStream) throws SpRuntimeException { var topic = getOutputTopic(spDataStream); var protocol = spDataStream.getEventGrounding().getTransportProtocol(); - if (Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean()) { + if (env.getSpDebug().getValueOrDefault()) { protocol.setBrokerHostname("localhost"); } if (!converterMap.containsKey(topic)) { @@ -143,7 +146,7 @@ public enum PipelineElementRuntimeInfoFetcher { String topic) throws SpRuntimeException { final String[] result = {null}; // Change kafka config when running in development mode - if ("true".equals(System.getenv("SP_DEBUG"))) { + if (getEnvironment().getSpDebug().getValueOrDefault()) { protocol.setKafkaPort(9094); } @@ -161,4 +164,8 @@ public enum PipelineElementRuntimeInfoFetcher { return result[0]; } + private Environment getEnvironment() { + return Environments.getEnvironment(); + } + } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java index 54d5808ed..0cad980a5 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/AutoInstallation.java @@ -17,9 +17,9 @@ */ package org.apache.streampipes.manager.setup; -import org.apache.streampipes.commons.constants.CustomEnvs; -import org.apache.streampipes.commons.constants.DefaultEnvValues; -import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; +import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable; import org.apache.streampipes.config.backend.BackendConfig; import org.apache.streampipes.model.client.setup.InitialSettings; @@ -33,6 +33,12 @@ public class AutoInstallation { private static final Logger LOG = LoggerFactory.getLogger(AutoInstallation.class); + private Environment env; + + public AutoInstallation() { + this.env = Environments.getEnvironment(); + } + public void startAutoInstallation() { InitialSettings settings = collectInitialSettings(); @@ -64,57 +70,37 @@ public class AutoInstallation { } private boolean autoInstallPipelineElements() { - if (Envs.SP_SETUP_INSTALL_PIPELINE_ELEMENTS.exists()) { - return Envs.SP_SETUP_INSTALL_PIPELINE_ELEMENTS.getValueAsBoolean(); - } else { - return DefaultEnvValues.INSTALL_PIPELINE_ELEMENTS; - } + return env.getSetupInstallPipelineElements().getValueOrDefault(); } private String findServiceAccountSecret() { - return getStringOrDefault( - Envs.SP_INITIAL_SERVICE_USER_SECRET.getEnvVariableName(), - DefaultEnvValues.INITIAL_CLIENT_SECRET_DEFAULT - ); + return env.getInitialServiceUserSecret().getValueOrDefault(); } private String findServiceAccountName() { - return getStringOrDefault( - Envs.SP_INITIAL_SERVICE_USER.getEnvVariableName(), - DefaultEnvValues.INITIAL_CLIENT_USER_DEFAULT - ); + return env.getInitialServiceUser().getValueOrDefault(); } private String findAdminUser() { return getStringOrDefault( - Envs.SP_INITIAL_ADMIN_EMAIL.getEnvVariableName(), - DefaultEnvValues.INITIAL_ADMIN_EMAIL_DEFAULT + env.getInitialAdminEmail() ); } private String findAdminPassword() { return getStringOrDefault( - Envs.SP_INITIAL_ADMIN_PASSWORD.getEnvVariableName(), - DefaultEnvValues.INITIAL_ADMIN_PW_DEFAULT + env.getInitialAdminPassword() ); } - private String getStringOrDefault(String envVariable, String defaultValue) { - boolean exists = exists(envVariable); - if (exists) { - LOG.info("Using provided environment variable {}", envVariable); - return getString(envVariable); + private String getStringOrDefault(StringEnvironmentVariable variable) { + String name = variable.getEnvVariableName(); + if (variable.exists()) { + LOG.info("Using provided environment variable {}", name); + return variable.getValue(); } else { - LOG.info("Environment variable {} not found, using default value {}", envVariable, defaultValue); - return defaultValue; + LOG.info("Environment variable {} not found, using default value {}", name, variable.getDefault()); + return variable.getDefault(); } } - - private boolean exists(String envVariable) { - return CustomEnvs.exists(envVariable); - } - - private String getString(String envVariable) { - return CustomEnvs.getEnv(envVariable); - } } diff --git a/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java b/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java index 506ac0e78..b42cbc0cd 100644 --- a/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java +++ b/streampipes-resource-management/src/main/java/org/apache/streampipes/resource/management/UserResourceManager.java @@ -18,8 +18,8 @@ package org.apache.streampipes.resource.management; -import org.apache.streampipes.commons.constants.DefaultEnvValues; -import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.commons.exceptions.UserNotFoundException; import org.apache.streampipes.commons.exceptions.UsernameAlreadyTakenException; import org.apache.streampipes.mail.MailSender; @@ -65,8 +65,9 @@ public class UserResourceManager extends AbstractResourceManager<IUserStorage> { } public Principal getServiceAdmin() { + var env = getEnvironment(); return db.getServiceAccount( - Envs.SP_INITIAL_SERVICE_USER.getValueOrDefault(DefaultEnvValues.INITIAL_CLIENT_USER_DEFAULT) + env.getInitialServiceUser().getValueOrDefault() ); } @@ -160,5 +161,9 @@ public class UserResourceManager extends AbstractResourceManager<IUserStorage> { return StorageDispatcher.INSTANCE.getNoSqlStore().getUserActivationTokenStorage(); } + private Environment getEnvironment() { + return Environments.getEnvironment(); + } + } diff --git a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/pe/InvocablePipelineElementResource.java b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/pe/InvocablePipelineElementResource.java index fae16db4b..37633c631 100644 --- a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/pe/InvocablePipelineElementResource.java +++ b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/pe/InvocablePipelineElementResource.java @@ -18,7 +18,7 @@ package org.apache.streampipes.rest.extensions.pe; -import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.commons.exceptions.SpConfigurationException; import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.extensions.api.declarer.Declarer; @@ -197,7 +197,7 @@ public abstract class InvocablePipelineElementResource<K extends InvocableStream protected abstract K createGroundingDebugInformation(K graph); private Boolean isDebug() { - return Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean(); + return Environments.getEnvironment().getSpDebug().getValueOrDefault(); } private String getServiceGroup() { diff --git a/streampipes-security-jwt/src/main/java/org/apache/streampipes/security/jwt/KeyGenerator.java b/streampipes-security-jwt/src/main/java/org/apache/streampipes/security/jwt/KeyGenerator.java index ceda5a079..31592b984 100644 --- a/streampipes-security-jwt/src/main/java/org/apache/streampipes/security/jwt/KeyGenerator.java +++ b/streampipes-security-jwt/src/main/java/org/apache/streampipes/security/jwt/KeyGenerator.java @@ -18,7 +18,7 @@ package org.apache.streampipes.security.jwt; -import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.commons.environment.Environments; import io.jsonwebtoken.security.Keys; import org.slf4j.Logger; @@ -66,7 +66,8 @@ public class KeyGenerator { } public String readKey() throws IOException { - return Files.readString(Paths.get(Envs.SP_JWT_PUBLIC_KEY_LOC.getValue()), Charset.defaultCharset()); + var publicKeyLoc = Environments.getEnvironment().getJwtPublicKeyLoc().getValue(); + return Files.readString(Paths.get(publicKeyLoc), Charset.defaultCharset()); } public Key makeKeyForRsa(String key) throws IOException, InvalidKeySpecException, NoSuchAlgorithmException { diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesEnvChecker.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesEnvChecker.java index a2fac136d..64e187c78 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesEnvChecker.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesEnvChecker.java @@ -18,7 +18,8 @@ package org.apache.streampipes.service.core; -import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.config.backend.BackendConfig; import org.apache.streampipes.config.backend.model.JwtSigningMode; import org.apache.streampipes.config.backend.model.LocalAuthConfig; @@ -36,6 +37,12 @@ public class StreamPipesEnvChecker { BackendConfig coreConfig; + private Environment env; + + public StreamPipesEnvChecker() { + this.env = Environments.getEnvironment(); + } + public void updateEnvironmentVariables() { this.coreConfig = BackendConfig.INSTANCE; @@ -46,37 +53,42 @@ public class StreamPipesEnvChecker { private void updateJwtSettings() { LocalAuthConfig localAuthConfig = coreConfig.getLocalAuthConfig(); boolean incompleteConfig = false; - if (Envs.SP_JWT_SIGNING_MODE.exists()) { - localAuthConfig.setJwtSigningMode(JwtSigningMode.valueOf(Envs.SP_JWT_SIGNING_MODE.getValue())); + var signingMode = env.getJwtSigningMode(); + var jwtSecret = env.getJwtSecret(); + var publicKeyLoc = env.getJwtPublicKeyLoc(); + var privateKeyLoc = env.getJwtPrivateKeyLoc(); + + if (signingMode.exists()) { + localAuthConfig.setJwtSigningMode(JwtSigningMode.valueOf(signingMode.getValue())); } - if (Envs.SP_JWT_SECRET.exists()) { - localAuthConfig.setTokenSecret(Envs.SP_JWT_SECRET.getValue()); + if (jwtSecret.exists()) { + localAuthConfig.setTokenSecret(jwtSecret.getValue()); } - if (Envs.SP_JWT_PUBLIC_KEY_LOC.exists()) { + if (publicKeyLoc.exists()) { try { - localAuthConfig.setPublicKey(readPublicKey(Envs.SP_JWT_PUBLIC_KEY_LOC.getValue())); + localAuthConfig.setPublicKey(readPublicKey(publicKeyLoc.getValue())); } catch (IOException e) { incompleteConfig = true; - LOG.warn("Could not read public key at location " + Envs.SP_JWT_PUBLIC_KEY_LOC); + LOG.warn("Could not read public key at location " + publicKeyLoc.getValue()); } } - if (!Envs.SP_JWT_SIGNING_MODE.exists()) { + if (!signingMode.exists()) { LOG.info( "No JWT signing mode provided (using default settings), " + "consult the docs to learn how to provide JWT settings"); - } else if (localAuthConfig.getJwtSigningMode() == JwtSigningMode.HMAC && !Envs.SP_JWT_SECRET.exists()) { + } else if (localAuthConfig.getJwtSigningMode() == JwtSigningMode.HMAC && !jwtSecret.exists()) { LOG.warn( "JWT signing mode set to HMAC but no secret provided (falling back to auto-generated secret), " + "provide a {} variable", - Envs.SP_JWT_SECRET.getEnvVariableName()); + jwtSecret.getEnvVariableName()); } else if (localAuthConfig.getJwtSigningMode() == JwtSigningMode.RSA - && ((!Envs.SP_JWT_PUBLIC_KEY_LOC.exists() || !Envs.SP_JWT_PRIVATE_KEY_LOC.exists()) || incompleteConfig)) { + && ((!publicKeyLoc.exists() || !privateKeyLoc.exists()) || incompleteConfig)) { LOG.warn( "JWT signing mode set to RSA but no public or private key location provided, " + "do you provide {} and {} variables?", - Envs.SP_JWT_PRIVATE_KEY_LOC.getEnvVariableName(), - Envs.SP_JWT_PUBLIC_KEY_LOC.getEnvVariableName()); + privateKeyLoc.getEnvVariableName(), + publicKeyLoc.getEnvVariableName()); } if (!incompleteConfig) { LOG.info("Updating local auth config with signing mode {}", localAuthConfig.getJwtSigningMode().name()); diff --git a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java index cd7282d32..9148a69ac 100644 --- a/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java +++ b/streampipes-service-discovery-consul/src/main/java/org/apache/streampipes/svcdiscovery/consul/ConsulProvider.java @@ -97,7 +97,7 @@ public enum ConsulProvider { return environment.getConsulLocation().getValue(); } else { if (environment.getSpDebug().getValueOrReturn(false)) { - return DefaultEnvValues.CONSUL_HOST_LOCAL; + return DefaultEnvValues.LOCALHOST; } else { return environment.getConsulHost().getValueOrDefault(); } diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/security/WebSecurityConfig.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/security/WebSecurityConfig.java index 56f0b2050..9e88dcf19 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/security/WebSecurityConfig.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/security/WebSecurityConfig.java @@ -18,7 +18,8 @@ package org.apache.streampipes.service.extensions.security; -import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.service.base.security.UnauthorizedRequestEntryPoint; import org.slf4j.Logger; @@ -45,9 +46,11 @@ public class WebSecurityConfig { private static final Logger LOG = LoggerFactory.getLogger(WebSecurityConfig.class); private final UserDetailsService userDetailsService; + private Environment env; public WebSecurityConfig() { this.userDetailsService = username -> null; + this.env = Environments.getEnvironment(); } @Autowired @@ -91,14 +94,15 @@ public class WebSecurityConfig { } private boolean isAnonymousAccess() { - if (Envs.SP_EXT_AUTH_MODE.exists() && Envs.SP_EXT_AUTH_MODE.getValue().equals("AUTH")) { - if (Envs.SP_JWT_PUBLIC_KEY_LOC.exists()) { + var extAuthMode = env.getExtensionsAuthMode(); + if (extAuthMode.exists() && extAuthMode.getValue().equals("AUTH")) { + if (env.getJwtPublicKeyLoc().exists()) { LOG.info("Configured service for authenticated access mode"); return false; } else { LOG.warn( "No env variable {} provided, which is required for authenticated access. Defaulting to anonymous access.", - Envs.SP_JWT_PUBLIC_KEY_LOC.getEnvVariableName()); + env.getJwtPublicKeyLoc().getEnvVariableName()); return true; } } else { diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/CouchDbConfig.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/CouchDbConfig.java deleted file mode 100644 index 8cbdda055..000000000 --- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/CouchDbConfig.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.storage.couchdb.utils; - -import org.apache.streampipes.svcdiscovery.SpServiceDiscovery; -import org.apache.streampipes.svcdiscovery.api.SpConfig; - -public enum CouchDbConfig { - - INSTANCE; - - private static final String COUCHDB_HOST = "SP_COUCHDB_HOST"; - private static final String COUCHDB_PORT = "SP_COUCHDB_PORT"; - private static final String PROTOCOL = "PROTOCOL"; - private SpConfig config; - - CouchDbConfig() { - config = SpServiceDiscovery.getSpConfig("storage/couchdb"); - config.register(COUCHDB_HOST, "couchdb", "Hostname for the couch db service"); - config.register(COUCHDB_PORT, 5984, "Port for the couch db service"); - config.register(PROTOCOL, "http", "Protocol the couch db service"); - } - - public String getHost() { - return config.getString(COUCHDB_HOST); - } - - public void setHost(String host) { - config.setString(COUCHDB_HOST, host); - } - - public int getPort() { - return config.getInteger(COUCHDB_PORT); - } - - public String getProtocol() { - return config.getString(PROTOCOL); - } -} diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java index c9f36caf8..7b937a77c 100644 --- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java +++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/utils/Utils.java @@ -63,18 +63,10 @@ public class Utils { return getCouchDbGsonClient("label"); } - public static CouchDbClient getCouchDbConnectWorkerContainerClient() { - return getCouchDbGsonClient("connectworkercontainer"); - } - public static CouchDbClient getCouchDbFileMetadataClient() { return getCouchDbGsonClient("filemetadata"); } - public static CouchDbClient getCouchDbAdapterTemplateClient() { - return getCouchDbAdapterClient("adaptertemplate"); - } - public static CouchDbClient getCouchDbAssetDashboardClient() { return getCouchDbGsonClient("assetdashboard"); } @@ -91,14 +83,6 @@ public class Utils { return getCouchDbGsonClient("pipeline"); } - public static CouchDbClient getCouchDbUserGroupStorage() { - return getCouchDbGsonClient("usergroup"); - } - - public static CouchDbClient getCouchDbSepaInvocationClient() { - return getCouchDbGsonClient("invocation"); - } - public static CouchDbClient getCouchDbConnectionClient() { return getCouchDbStandardSerializerClient("connection"); } @@ -157,10 +141,6 @@ public class Utils { return getCouchDbStandardSerializerClient("pipelinecategories"); } - public static CouchDbClient getCouchDbElasticsearchFilesEndppointClient() { - return getCouchDbStandardSerializerClient("file-export-endpoints-elasticsearch"); - } - public static CouchDbClient getCouchDbDataLakeClient() { return getCouchDbGsonClient("data-lake"); } @@ -208,9 +188,10 @@ public class Utils { } private static String toUrl() { - return CouchDbConfig.INSTANCE.getProtocol() - + "://" + CouchDbConfig.INSTANCE.getHost() - + ":" + CouchDbConfig.INSTANCE.getPort(); + var env = getEnvironment(); + return env.getCouchDbProtocol().getValueOrDefault() + + "://" + env.getCouchDbHost().getValueOrDefault() + + ":" + env.getCouchDbPort().getValueOrDefault(); } public static Request getRequest(String route) { diff --git a/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/encryption/SecretEncryptionManager.java b/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/encryption/SecretEncryptionManager.java index 81215579a..76b4494c8 100644 --- a/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/encryption/SecretEncryptionManager.java +++ b/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/encryption/SecretEncryptionManager.java @@ -17,8 +17,7 @@ */ package org.apache.streampipes.user.management.encryption; -import org.apache.streampipes.commons.constants.DefaultEnvValues; -import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.commons.environment.Environments; import org.jasypt.encryption.StringEncryptor; import org.jasypt.encryption.pbe.StandardPBEStringEncryptor; @@ -36,11 +35,11 @@ public class SecretEncryptionManager { } private static StringEncryptor getEncryptor() { + var env = Environments.getEnvironment(); StandardPBEStringEncryptor encryptor = new StandardPBEStringEncryptor(); - encryptor.setPassword(Envs.SP_ENCRYPTION_PASSCODE.getValueOrDefault(DefaultEnvValues.DEFAULT_ENCRYPTION_PASSCODE)); + encryptor.setPassword(env.getEncryptionPasscode().getValueOrDefault()); encryptor.setIvGenerator(new RandomIvGenerator()); return encryptor; - } } diff --git a/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java b/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java index a17fed07b..4abb09a3c 100644 --- a/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java +++ b/streampipes-user-management/src/main/java/org/apache/streampipes/user/management/jwt/JwtTokenProvider.java @@ -18,7 +18,8 @@ package org.apache.streampipes.user.management.jwt; -import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.config.backend.BackendConfig; import org.apache.streampipes.config.backend.model.JwtSigningMode; import org.apache.streampipes.config.backend.model.LocalAuthConfig; @@ -51,9 +52,11 @@ public class JwtTokenProvider { public static final String CLAIM_USER = "user"; private static final Logger LOG = LoggerFactory.getLogger(JwtTokenProvider.class); private BackendConfig config; + private Environment env; public JwtTokenProvider() { this.config = BackendConfig.INSTANCE; + this.env = Environments.getEnvironment(); } public String createToken(Authentication authentication) { @@ -117,7 +120,7 @@ public class JwtTokenProvider { } private Path getKeyFilePath() { - return Paths.get(Envs.SP_JWT_PRIVATE_KEY_LOC.getValue()); + return Paths.get(env.getJwtPrivateKeyLoc().getValue()); } private LocalAuthConfig authConfig() { diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java index eb0a49c27..ccc2b9ff5 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java @@ -18,7 +18,8 @@ package org.apache.streampipes.wrapper.standalone.function; -import org.apache.streampipes.commons.constants.Envs; +import org.apache.streampipes.commons.environment.Environment; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.commons.exceptions.SpRuntimeException; import org.apache.streampipes.extensions.api.declarer.IFunctionConfig; import org.apache.streampipes.extensions.api.declarer.IStreamPipesFunctionDeclarer; @@ -158,8 +159,9 @@ public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclare private Map<String, SpInputCollector> getInputCollectors(Collection<SpDataStream> streams) throws SpRuntimeException { Map<String, SpInputCollector> inputCollectors = new HashMap<>(); + var env = getEnvironment(); for (SpDataStream is : streams) { - if (Envs.SP_DEBUG.exists() && Envs.SP_DEBUG.getValueAsBoolean()) { + if (env.getSpDebug().getValueOrDefault()) { GroundingDebugUtils.modifyGrounding(is.getEventGrounding()); } inputCollectors.put(is.getElementId(), ProtocolManager.findInputCollector(is.getEventGrounding() @@ -193,6 +195,10 @@ public abstract class StreamPipesFunction implements IStreamPipesFunctionDeclare return new SchemaInfo(eventSchema, new ArrayList<>()); } + private Environment getEnvironment() { + return Environments.getEnvironment(); + } + public abstract IFunctionConfig getFunctionConfig(); public abstract void onServiceStarted(FunctionContext context);
