This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch minor-improvements-opcua-file-storage in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit f843d60ec6ebf22fc0a6319a75f63d7cb7c1d0fc Author: Dominik Riemer <[email protected]> AuthorDate: Tue Oct 7 10:44:53 2025 +0200 feat: Add environment variable for extensions file storage --- .../apache/streampipes/commons/constants/Envs.java | 1 + .../commons/environment/DefaultEnvironment.java | 5 +++++ .../commons/environment/Environment.java | 2 ++ .../connect/iiot/utils/FileProtocolUtils.java | 8 +++++++- .../security/CompositeCertificateValidator.java | 10 +++++++++- .../connectors/opcua/utils/OpcUaUtils.java | 21 +++++++++++++++++---- 6 files changed, 41 insertions(+), 6 deletions(-) diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java index 7158b50974..bd06f7317a 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java @@ -23,6 +23,7 @@ public enum Envs { SP_PORT("SP_PORT"), SP_CORE_ASSET_BASE_DIR("SP_CORE_ASSET_BASE_DIR"), + SP_EXT_ASSET_BASE_DIR("SP_EXT_ASSET_BASE_DIR"), SP_CORE_SCHEME("SP_CORE_SCHEME", "http", "http"), SP_CORE_HOST("SP_CORE_HOST", "backend", "localhost"), diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java index a9f35337d9..c8f5f0686c 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java @@ -233,6 +233,11 @@ public class DefaultEnvironment implements Environment { return new StringEnvironmentVariable(Envs.SP_CORE_ASSET_BASE_DIR); } + @Override + public StringEnvironmentVariable getExtAssetBaseDir() { + return new StringEnvironmentVariable(Envs.SP_EXT_ASSET_BASE_DIR); + } + @Override public StringEnvironmentVariable getFlinkJarFileLoc() { return new StringEnvironmentVariable(Envs.SP_FLINK_JAR_FILE_LOC); diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java index e168cc23c4..07ed38c9df 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java @@ -98,6 +98,8 @@ public interface Environment { StringEnvironmentVariable getCoreAssetBaseDir(); + StringEnvironmentVariable getExtAssetBaseDir(); + // Flink Wrapper StringEnvironmentVariable getFlinkJarFileLoc(); diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java index 6c7ece4c44..e0b5bbf872 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/utils/FileProtocolUtils.java @@ -19,6 +19,7 @@ package org.apache.streampipes.connect.iiot.utils; import org.apache.streampipes.client.StreamPipesClient; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.commons.exceptions.connect.ParseException; import org.apache.streampipes.commons.file.FileHasher; import org.apache.streampipes.extensions.management.client.StreamPipesClientResolver; @@ -86,7 +87,12 @@ public class FileProtocolUtils { } private static String makeServiceStorageDir() { - return System.getProperty("user.home") + var storageDir = Environments + .getEnvironment() + .getExtAssetBaseDir() + .getValueOrReturn(System.getProperty("user.home")); + + return storageDir + File.separator + ".streampipes" + File.separator diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/CompositeCertificateValidator.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/CompositeCertificateValidator.java index c88392c729..f5c391ffa7 100644 --- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/CompositeCertificateValidator.java +++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/security/CompositeCertificateValidator.java @@ -71,6 +71,7 @@ public class CompositeCertificateValidator implements ClientCertificateValidator public void validateCertificateChain(List<X509Certificate> certificateChain) throws UaException { PKIXCertPathBuilderResult certPathResult; + X509Certificate peer = getEndEntity(certificateChain); try { certPathResult = CertificateValidationUtil.buildTrustedCertPath( certificateChain, @@ -79,7 +80,7 @@ public class CompositeCertificateValidator implements ClientCertificateValidator ); } catch (UaException e) { if (isCertificateRejected(e.getStatusCode().getValue())) { - sendToCore(certificateChain.get(0)); + sendToCore(peer); } throw e; } @@ -97,6 +98,13 @@ public class CompositeCertificateValidator implements ClientCertificateValidator ); } + private X509Certificate getEndEntity(List<X509Certificate> chain) { + return chain.stream() + .filter(c -> c.getBasicConstraints() < 0) + .findFirst() + .orElse(chain.get(0)); + } + @Override public void validateCertificateChain( List<X509Certificate> certificateChain, diff --git a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtils.java b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtils.java index df09ed3050..5debd21414 100644 --- a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtils.java +++ b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/utils/OpcUaUtils.java @@ -34,6 +34,8 @@ import org.eclipse.milo.opcua.sdk.client.api.UaClient; import org.eclipse.milo.opcua.stack.core.AttributeId; import org.eclipse.milo.opcua.stack.core.UaException; import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.URISyntaxException; import java.util.List; @@ -45,6 +47,8 @@ import java.util.concurrent.ExecutionException; */ public class OpcUaUtils { + private static final Logger LOG = LoggerFactory.getLogger(OpcUaUtils.class); + private static final String OPC_TCP_PREFIX = "opc.tcp://"; /*** @@ -141,19 +145,28 @@ public class OpcUaUtils { Throwable cause = e.getCause(); if (cause instanceof UaException uaException) { - return CompositeCertificateValidator.REJECTED_STATUS_CODES - .contains(uaException.getStatusCode().getValue()); + return checkAndLogCertificateException(uaException); } Throwable nestedCause = cause != null ? cause.getCause() : null; if (nestedCause instanceof UaException uaException) { - return CompositeCertificateValidator.REJECTED_STATUS_CODES - .contains(uaException.getStatusCode().getValue()); + return checkAndLogCertificateException(uaException); } return false; } + private static boolean checkAndLogCertificateException(UaException e) { + var containsRejectedStatusCode = CompositeCertificateValidator.REJECTED_STATUS_CODES + .contains(e.getStatusCode().getValue()); + + if (containsRejectedStatusCode) { + var statusCode = CompositeCertificateValidator.REJECTED_STATUS_CODES.stream().filter(code -> code.equals(e.getStatusCode().getValue())).findFirst(); + statusCode.ifPresent(sc -> LOG.warn("Status Code: {}", sc)); + } + return containsRejectedStatusCode; + } + private static String makeExceptionMessage(ExecutionException e) { StringBuilder message = new StringBuilder( "The provided certificate could not be trusted. Administrators can accept this certificate in the settings. "
