This is an automated email from the ASF dual-hosted git repository. ferdei pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new d84ce83654 NIFI-11366 Proxy aware C2 communication d84ce83654 is described below commit d84ce836541266ce930bf12f32df2ecb035ac23f Author: Ferenc Kis <briansolo1...@gmail.com> AuthorDate: Mon Apr 3 13:13:20 2023 +0200 NIFI-11366 Proxy aware C2 communication Signed-off-by: Ferenc Erdei <fer...@cloudera.com> This closes #7125 --- .../org/apache/nifi/c2/client/api/C2Client.java | 23 ++- .../org/apache/nifi/c2/client/C2ClientConfig.java | 80 +++++++--- .../apache/nifi/c2/client/http/C2HttpClient.java | 172 +++++---------------- .../nifi/c2/client/http/OkHttpClientProvider.java | 142 +++++++++++++++++ .../nifi/c2/client/http/url/C2UrlProvider.java | 46 ++++++ .../c2/client/http/url/C2UrlProviderFactory.java | 45 ++++++ .../c2/client/http/url/LegacyC2UrlProvider.java | 55 +++++++ .../client/http/url/ProxyAwareC2UrlProvider.java | 81 ++++++++++ .../nifi/c2/client/http/C2HttpClientTest.java | 19 ++- .../client/http/url/C2UrlProviderFactoryTest.java | 96 ++++++++++++ .../client/http/url/LegacyC2UrlProviderTest.java | 61 ++++++++ .../client/http/url/ProxyAwareC2ProviderTest.java | 113 ++++++++++++++ .../operation/TransferDebugOperationHandler.java | 10 +- .../operation/UpdateAssetOperationHandler.java | 27 ++-- .../UpdateConfigurationOperationHandler.java | 119 ++++++++------ .../TransferDebugOperationHandlerTest.java | 5 + .../operation/UpdateAssetOperationHandlerTest.java | 9 ++ .../UpdateConfigurationOperationHandlerTest.java | 31 ++-- .../org/apache/nifi/minifi/MiNiFiProperties.java | 3 + .../apache/nifi/minifi/c2/C2NifiClientService.java | 13 +- .../src/main/resources/conf/bootstrap.conf | 10 ++ 21 files changed, 914 insertions(+), 246 deletions(-) diff --git a/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java index c42b24a3b1..3ef7e362c6 100644 --- a/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java +++ b/c2/c2-client-bundle/c2-client-api/src/main/java/org/apache/nifi/c2/client/api/C2Client.java @@ -35,19 +35,19 @@ public interface C2Client { Optional<C2HeartbeatResponse> publishHeartbeat(C2Heartbeat heartbeat); /** - * Retrieve the content of the new flow from the C2 Server + * After operation completed the acknowledgment to be sent to the C2 Server * - * @param flowUpdateUrl url where the content should be downloaded from - * @return the actual downloaded content. Will be empty if no content can be downloaded + * @param operationAck the acknowledgment details to be sent */ - Optional<byte[]> retrieveUpdateContent(String flowUpdateUrl); + void acknowledgeOperation(C2OperationAck operationAck); /** - * After operation completed the acknowledge to be sent to the C2 Server + * Retrieve the content of the new flow from the C2 Server * - * @param operationAck the acknowledge details to be sent + * @param callbackUrl url where the content should be downloaded from + * @return the actual downloaded content. Will be empty if no content can be downloaded */ - void acknowledgeOperation(C2OperationAck operationAck); + Optional<byte[]> retrieveUpdateContent(String callbackUrl); /** * Uploads a binary bundle to C2 server @@ -57,4 +57,13 @@ public interface C2Client { * @return optional error message if any issues occurred */ Optional<String> uploadBundle(String callbackUrl, byte[] bundle); + + /** + * Creates a callback URL according to proxy aware C2 settings + * + * @param absoluteUrl absolute url sent by C2 server + * @param relativeUrl relative url sent by C2 server + * @return an optional with content of finalised callback url + */ + Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl); } diff --git a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java index 360e5287a2..71a94a3698 100644 --- a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java +++ b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java @@ -23,6 +23,9 @@ public class C2ClientConfig { private final String c2Url; private final String c2AckUrl; + private final String c2RestPathBase; + private final String c2RestPathHeartbeat; + private final String c2RestPathAcknowledge; private final String agentClass; private final String agentIdentifier; private final boolean fullHeartbeat; @@ -48,6 +51,9 @@ public class C2ClientConfig { private C2ClientConfig(final Builder builder) { this.c2Url = builder.c2Url; this.c2AckUrl = builder.c2AckUrl; + this.c2RestPathBase = builder.c2RestPathBase; + this.c2RestPathHeartbeat = builder.c2RestPathHeartbeat; + this.c2RestPathAcknowledge = builder.c2RestPathAcknowledge; this.agentClass = builder.agentClass; this.agentIdentifier = builder.agentIdentifier; this.fullHeartbeat = builder.fullHeartbeat; @@ -87,6 +93,18 @@ public class C2ClientConfig { return agentIdentifier; } + public String getC2RestPathBase() { + return c2RestPathBase; + } + + public String getC2RestPathHeartbeat() { + return c2RestPathHeartbeat; + } + + public String getC2RestPathAcknowledge() { + return c2RestPathAcknowledge; + } + public boolean isFullHeartbeat() { return fullHeartbeat; } @@ -170,6 +188,9 @@ public class C2ClientConfig { private String c2Url; private String c2AckUrl; + private String c2RestPathBase; + private String c2RestPathHeartbeat; + private String c2RestPathAcknowledge; private String agentClass; private String agentIdentifier; private boolean fullHeartbeat; @@ -192,117 +213,130 @@ public class C2ClientConfig { private String c2RequestCompression; private String c2AssetDirectory; - public Builder c2Url(final String c2Url) { + public Builder c2Url(String c2Url) { this.c2Url = c2Url; return this; } - public Builder c2AckUrl(final String c2AckUrl) { + public Builder c2AckUrl(String c2AckUrl) { this.c2AckUrl = c2AckUrl; return this; } - public Builder agentClass(final String agentClass) { + public Builder c2RestPathBase(String c2RestPathBase) { + this.c2RestPathBase = c2RestPathBase; + return this; + } + public Builder c2RestPathHeartbeat(String c2RestPathHeartbeat) { + this.c2RestPathHeartbeat = c2RestPathHeartbeat; + return this; + } + public Builder c2RestPathAcknowledge(String c2RestPathAcknowledge) { + this.c2RestPathAcknowledge = c2RestPathAcknowledge; + return this; + } + + public Builder agentClass(String agentClass) { this.agentClass = agentClass; return this; } - public Builder agentIdentifier(final String agentIdentifier) { + public Builder agentIdentifier(String agentIdentifier) { this.agentIdentifier = agentIdentifier; return this; } - public Builder fullHeartbeat(final boolean fullHeartbeat) { + public Builder fullHeartbeat(boolean fullHeartbeat) { this.fullHeartbeat = fullHeartbeat; return this; } - public Builder confDirectory(final String confDirectory) { + public Builder confDirectory(String confDirectory) { this.confDirectory = confDirectory; return this; } - public Builder runtimeManifestIdentifier(final String runtimeManifestIdentifier) { + public Builder runtimeManifestIdentifier(String runtimeManifestIdentifier) { this.runtimeManifestIdentifier = runtimeManifestIdentifier; return this; } - public Builder runtimeType(final String runtimeType) { + public Builder runtimeType(String runtimeType) { this.runtimeType = runtimeType; return this; } - public Builder heartbeatPeriod(final long heartbeatPeriod) { + public Builder heartbeatPeriod(long heartbeatPeriod) { this.heartbeatPeriod = heartbeatPeriod; return this; } - public Builder callTimeout(final long callTimeout) { + public Builder callTimeout(long callTimeout) { this.callTimeout = callTimeout; return this; } - public Builder keystoreFilename(final String keystoreFilename) { + public Builder keystoreFilename(String keystoreFilename) { this.keystoreFilename = keystoreFilename; return this; } - public Builder keystorePassword(final String keystorePass) { + public Builder keystorePassword(String keystorePass) { this.keystorePass = keystorePass; return this; } - public Builder keyPassword(final String keyPass) { + public Builder keyPassword(String keyPass) { this.keyPass = keyPass; return this; } - public Builder keystoreType(final String keystoreType) { + public Builder keystoreType(String keystoreType) { this.keystoreType = keystoreType; return this; } - public Builder truststoreFilename(final String truststoreFilename) { + public Builder truststoreFilename(String truststoreFilename) { this.truststoreFilename = truststoreFilename; return this; } - public Builder truststorePassword(final String truststorePass) { + public Builder truststorePassword(String truststorePass) { this.truststorePass = truststorePass; return this; } - public Builder truststoreType(final String truststoreType) { + public Builder truststoreType(String truststoreType) { this.truststoreType = truststoreType; return this; } - public Builder readTimeout(final long readTimeout) { + public Builder readTimeout(long readTimeout) { this.readTimeout = readTimeout; return this; } - public Builder connectTimeout(final long connectTimeout) { + public Builder connectTimeout(long connectTimeout) { this.connectTimeout = connectTimeout; return this; } - public Builder maxIdleConnections(final int maxIdleConnections) { + public Builder maxIdleConnections(int maxIdleConnections) { this.maxIdleConnections = maxIdleConnections; return this; } - public Builder keepAliveDuration(final long keepAliveDuration) { + public Builder keepAliveDuration(long keepAliveDuration) { this.keepAliveDuration = keepAliveDuration; return this; } - public Builder c2RequestCompression(final String c2RequestCompression) { + public Builder c2RequestCompression(String c2RequestCompression) { this.c2RequestCompression = c2RequestCompression; return this; } - public Builder c2AssetDirectory(final String c2AssetDirectory) { + public Builder c2AssetDirectory(String c2AssetDirectory) { this.c2AssetDirectory = c2AssetDirectory; return this; } diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java index d881fa5dff..563a8a6f14 100644 --- a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java +++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java @@ -18,35 +18,25 @@ package org.apache.nifi.c2.client.http; import static okhttp3.MultipartBody.FORM; -import static okhttp3.RequestBody.create; -import java.io.FileInputStream; import java.io.IOException; -import java.security.KeyStore; import java.util.Optional; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocketFactory; -import javax.net.ssl.X509TrustManager; -import okhttp3.ConnectionPool; import okhttp3.MediaType; import okhttp3.MultipartBody; import okhttp3.OkHttpClient; import okhttp3.Request; +import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.ResponseBody; -import okhttp3.logging.HttpLoggingInterceptor; -import org.apache.commons.lang3.StringUtils; import org.apache.nifi.c2.client.C2ClientConfig; import org.apache.nifi.c2.client.api.C2Client; +import org.apache.nifi.c2.client.http.url.C2UrlProvider; +import org.apache.nifi.c2.client.http.url.C2UrlProviderFactory; import org.apache.nifi.c2.protocol.api.C2Heartbeat; import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse; import org.apache.nifi.c2.protocol.api.C2OperationAck; import org.apache.nifi.c2.serializer.C2Serializer; -import org.apache.nifi.security.ssl.StandardKeyStoreBuilder; -import org.apache.nifi.security.ssl.StandardSslContextBuilder; -import org.apache.nifi.security.ssl.StandardTrustManagerBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,40 +48,23 @@ public class C2HttpClient implements C2Client { private static final String BUNDLE_FILE_NAME = "debug.tar.gz"; private static final MediaType BUNDLE_MIME_TYPE = MediaType.parse("application/gzip"); - private final AtomicReference<OkHttpClient> httpClientReference = new AtomicReference<>(); private final C2ClientConfig clientConfig; private final C2Serializer serializer; + private final C2UrlProvider c2UrlProvider; + private final AtomicReference<OkHttpClient> httpClientReference = new AtomicReference<>(); + + public static C2HttpClient create(C2ClientConfig clientConfig, C2Serializer serializer) { + OkHttpClient okHttpClient = new OkHttpClientProvider(clientConfig).okHttpClient(); + C2UrlProvider c2UrlProvider = new C2UrlProviderFactory(clientConfig).create(); + return new C2HttpClient(clientConfig, serializer, c2UrlProvider, okHttpClient); + } - public C2HttpClient(C2ClientConfig clientConfig, C2Serializer serializer) { - super(); + C2HttpClient(C2ClientConfig clientConfig, C2Serializer serializer, + C2UrlProvider c2UrlProvider, OkHttpClient okHttpClient) { this.clientConfig = clientConfig; this.serializer = serializer; - final OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder(); - - // Configure request and response logging - HttpLoggingInterceptor logging = new HttpLoggingInterceptor(logger::debug); - logging.setLevel(HttpLoggingInterceptor.Level.BASIC); - okHttpClientBuilder.addInterceptor(logging); - - // Set whether to follow redirects - okHttpClientBuilder.followRedirects(true); - okHttpClientBuilder.connectionPool(new ConnectionPool(clientConfig.getMaxIdleConnections(), clientConfig.getKeepAliveDuration(), TimeUnit.MILLISECONDS)); - - // Timeouts - okHttpClientBuilder.connectTimeout(clientConfig.getConnectTimeout(), TimeUnit.MILLISECONDS); - okHttpClientBuilder.readTimeout(clientConfig.getReadTimeout(), TimeUnit.MILLISECONDS); - okHttpClientBuilder.callTimeout(clientConfig.getCallTimeout(), TimeUnit.MILLISECONDS); - - // check if the ssl path is set and add the factory if so - if (StringUtils.isNotBlank(clientConfig.getKeystoreFilename())) { - try { - setSslSocketFactory(okHttpClientBuilder); - } catch (Exception e) { - throw new IllegalStateException("OkHttp TLS configuration failed", e); - } - } - - httpClientReference.set(okHttpClientBuilder.build()); + this.c2UrlProvider = c2UrlProvider; + this.httpClientReference.set(okHttpClient); } @Override @@ -100,12 +73,24 @@ public class C2HttpClient implements C2Client { } @Override - public Optional<byte[]> retrieveUpdateContent(String flowUpdateUrl) { + public void acknowledgeOperation(C2OperationAck operationAck) { + String c2AcknowledgeUrl = c2UrlProvider.getAcknowledgeUrl(); + logger.info("Acknowledging Operation {} to C2 server {}", operationAck.getOperationId(), c2AcknowledgeUrl); + serializer.serialize(operationAck) + .map(operationAckBody -> RequestBody.create(operationAckBody, MEDIA_TYPE_APPLICATION_JSON)) + .map(requestBody -> new Request.Builder().post(requestBody).url(c2AcknowledgeUrl).build()) + .map(C2RequestCompression.forType(clientConfig.getC2RequestCompression())::compress) + .ifPresent(this::sendAck); + } + + @Override + public Optional<byte[]> retrieveUpdateContent(String callbackUrl) { Optional<byte[]> updateContent = Optional.empty(); - final Request.Builder requestBuilder = new Request.Builder() + + Request.Builder requestBuilder = new Request.Builder() .get() - .url(flowUpdateUrl); - final Request request = requestBuilder.build(); + .url(callbackUrl); + Request request = requestBuilder.build(); try (Response response = httpClientReference.get().newCall(request).execute()) { Optional<ResponseBody> body = Optional.ofNullable(response.body()); @@ -128,23 +113,13 @@ public class C2HttpClient implements C2Client { return updateContent; } - @Override - public void acknowledgeOperation(C2OperationAck operationAck) { - logger.info("Acknowledging Operation {} to C2 server {}", operationAck.getOperationId(), clientConfig.getC2AckUrl()); - serializer.serialize(operationAck) - .map(operationAckBody -> create(operationAckBody, MEDIA_TYPE_APPLICATION_JSON)) - .map(requestBody -> new Request.Builder().post(requestBody).url(clientConfig.getC2AckUrl()).build()) - .map(C2RequestCompression.forType(clientConfig.getC2RequestCompression())::compress) - .ifPresent(this::sendAck); - } - @Override public Optional<String> uploadBundle(String callbackUrl, byte[] bundle) { Request request = new Request.Builder() .url(callbackUrl) .post(new MultipartBody.Builder() .setType(FORM) - .addFormDataPart(MULTIPART_FORM_FILE_FIELD_NAME, BUNDLE_FILE_NAME, create(bundle, BUNDLE_MIME_TYPE)) + .addFormDataPart(MULTIPART_FORM_FILE_FIELD_NAME, BUNDLE_FILE_NAME, RequestBody.create(bundle, BUNDLE_MIME_TYPE)) .build()) .build(); @@ -161,11 +136,16 @@ public class C2HttpClient implements C2Client { return Optional.empty(); } + @Override + public Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl) { + return c2UrlProvider.getCallbackUrl(absoluteUrl, relativeUrl); + } + private Optional<C2HeartbeatResponse> sendHeartbeat(String heartbeat) { Optional<C2HeartbeatResponse> c2HeartbeatResponse = Optional.empty(); Request request = new Request.Builder() - .post(create(heartbeat, MEDIA_TYPE_APPLICATION_JSON)) - .url(clientConfig.getC2Url()) + .post(RequestBody.create(heartbeat, MEDIA_TYPE_APPLICATION_JSON)) + .url(c2UrlProvider.getHeartbeatUrl()) .build(); Request decoratedRequest = C2RequestCompression.forType(clientConfig.getC2RequestCompression()).compress(request); @@ -173,7 +153,7 @@ public class C2HttpClient implements C2Client { try (Response heartbeatResponse = httpClientReference.get().newCall(decoratedRequest).execute()) { c2HeartbeatResponse = getResponseBody(heartbeatResponse).flatMap(response -> serializer.deserialize(response, C2HeartbeatResponse.class)); } catch (IOException ce) { - logger.error("Send Heartbeat failed to C2 server {}", clientConfig.getC2Url(), ce); + logger.error("Send Heartbeat failed to C2 server {}", c2UrlProvider.getHeartbeatUrl(), ce); } return c2HeartbeatResponse; @@ -192,81 +172,13 @@ public class C2HttpClient implements C2Client { return Optional.ofNullable(responseBody); } - private void setSslSocketFactory(OkHttpClient.Builder okHttpClientBuilder) throws Exception { - final String keystoreLocation = clientConfig.getKeystoreFilename(); - final String keystoreType = clientConfig.getKeystoreType(); - final String keystorePass = clientConfig.getKeystorePass(); - assertKeystorePropertiesSet(keystoreLocation, keystorePass, keystoreType); - - final KeyStore keyStore; - try (final FileInputStream keyStoreStream = new FileInputStream(keystoreLocation)) { - keyStore = new StandardKeyStoreBuilder() - .type(keystoreType) - .inputStream(keyStoreStream) - .password(keystorePass.toCharArray()) - .build(); - } - - final String truststoreLocation = clientConfig.getTruststoreFilename(); - final String truststorePass = clientConfig.getTruststorePass(); - final String truststoreType = clientConfig.getTruststoreType(); - assertTruststorePropertiesSet(truststoreLocation, truststorePass, truststoreType); - - final KeyStore truststore; - try (final FileInputStream trustStoreStream = new FileInputStream(truststoreLocation)) { - truststore = new StandardKeyStoreBuilder() - .type(truststoreType) - .inputStream(trustStoreStream) - .password(truststorePass.toCharArray()) - .build(); - } - - final X509TrustManager trustManager = new StandardTrustManagerBuilder().trustStore(truststore).build(); - final SSLContext sslContext = new StandardSslContextBuilder() - .keyStore(keyStore) - .keyPassword(keystorePass.toCharArray()) - .trustStore(truststore) - .build(); - final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory(); - - okHttpClientBuilder.sslSocketFactory(sslSocketFactory, trustManager); - } - - private void assertKeystorePropertiesSet(String location, String password, String type) { - if (location == null || location.isEmpty()) { - throw new IllegalArgumentException(clientConfig.getKeystoreFilename() + " is null or is empty"); - } - - if (password == null || password.isEmpty()) { - throw new IllegalArgumentException("The client's keystore filename is set but its password is not (or is empty). If the location is set, the password must also be."); - } - - if (type == null || type.isEmpty()) { - throw new IllegalArgumentException("The client's keystore filename is set but its type is not (or is empty). If the location is set, the type must also be."); - } - } - - private void assertTruststorePropertiesSet(String location, String password, String type) { - if (location == null || location.isEmpty()) { - throw new IllegalArgumentException("The client's truststore filename is not set or is empty"); - } - - if (password == null || password.isEmpty()) { - throw new IllegalArgumentException("The client's truststore filename is set but its password is not (or is empty). If the location is set, the password must also be."); - } - - if (type == null || type.isEmpty()) { - throw new IllegalArgumentException("The client's truststore filename is set but its type is not (or is empty). If the location is set, the type must also be."); - } - } - private void sendAck(Request request) { try (Response heartbeatResponse = httpClientReference.get().newCall(request).execute()) { if (!heartbeatResponse.isSuccessful()) { - logger.warn("Acknowledgement was not successful with C2 server {} with status code {}", clientConfig.getC2AckUrl(), heartbeatResponse.code()); + logger.warn("Acknowledgement was not successful with C2 server {} with status code {}", c2UrlProvider.getAcknowledgeUrl(), heartbeatResponse.code()); } } catch (IOException e) { - logger.error("Could not transmit ack to C2 server {}", clientConfig.getC2AckUrl(), e); + logger.error("Could not transmit ack to C2 server {}", c2UrlProvider.getAcknowledgeUrl(), e); } } } diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/OkHttpClientProvider.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/OkHttpClientProvider.java new file mode 100644 index 0000000000..000b347b4e --- /dev/null +++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/OkHttpClientProvider.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.c2.client.http; + +import java.io.FileInputStream; +import java.security.KeyStore; +import java.util.concurrent.TimeUnit; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.X509TrustManager; +import okhttp3.ConnectionPool; +import okhttp3.OkHttpClient; +import okhttp3.logging.HttpLoggingInterceptor; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.c2.client.C2ClientConfig; +import org.apache.nifi.security.ssl.StandardKeyStoreBuilder; +import org.apache.nifi.security.ssl.StandardSslContextBuilder; +import org.apache.nifi.security.ssl.StandardTrustManagerBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OkHttpClientProvider { + + private static final Logger logger = LoggerFactory.getLogger(OkHttpClientProvider.class); + + private final C2ClientConfig clientConfig; + + public OkHttpClientProvider(C2ClientConfig clientConfig) { + this.clientConfig = clientConfig; + } + + public OkHttpClient okHttpClient() { + OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder(); + + // Configure request and response logging + HttpLoggingInterceptor logging = new HttpLoggingInterceptor(logger::debug); + logging.setLevel(HttpLoggingInterceptor.Level.BASIC); + okHttpClientBuilder.addInterceptor(logging); + + // Set whether to follow redirects + okHttpClientBuilder.followRedirects(true); + okHttpClientBuilder.connectionPool(new ConnectionPool(clientConfig.getMaxIdleConnections(), clientConfig.getKeepAliveDuration(), TimeUnit.MILLISECONDS)); + + // Timeouts + okHttpClientBuilder.connectTimeout(clientConfig.getConnectTimeout(), TimeUnit.MILLISECONDS); + okHttpClientBuilder.readTimeout(clientConfig.getReadTimeout(), TimeUnit.MILLISECONDS); + okHttpClientBuilder.callTimeout(clientConfig.getCallTimeout(), TimeUnit.MILLISECONDS); + + // check if the ssl path is set and add the factory if so + if (StringUtils.isNotBlank(clientConfig.getKeystoreFilename())) { + try { + setSslSocketFactory(okHttpClientBuilder); + } catch (Exception e) { + throw new IllegalStateException("OkHttp TLS configuration failed", e); + } + } + + return okHttpClientBuilder.build(); + } + + private void setSslSocketFactory(OkHttpClient.Builder okHttpClientBuilder) throws Exception { + String keystoreLocation = clientConfig.getKeystoreFilename(); + String keystoreType = clientConfig.getKeystoreType(); + String keystorePass = clientConfig.getKeystorePass(); + assertKeystorePropertiesSet(keystoreLocation, keystorePass, keystoreType); + + KeyStore keyStore; + try (FileInputStream keyStoreStream = new FileInputStream(keystoreLocation)) { + keyStore = new StandardKeyStoreBuilder() + .type(keystoreType) + .inputStream(keyStoreStream) + .password(keystorePass.toCharArray()) + .build(); + } + + String truststoreLocation = clientConfig.getTruststoreFilename(); + String truststorePass = clientConfig.getTruststorePass(); + String truststoreType = clientConfig.getTruststoreType(); + assertTruststorePropertiesSet(truststoreLocation, truststorePass, truststoreType); + + KeyStore truststore; + try (FileInputStream trustStoreStream = new FileInputStream(truststoreLocation)) { + truststore = new StandardKeyStoreBuilder() + .type(truststoreType) + .inputStream(trustStoreStream) + .password(truststorePass.toCharArray()) + .build(); + } + + X509TrustManager trustManager = new StandardTrustManagerBuilder().trustStore(truststore).build(); + SSLContext sslContext = new StandardSslContextBuilder() + .keyStore(keyStore) + .keyPassword(keystorePass.toCharArray()) + .trustStore(truststore) + .build(); + SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory(); + + okHttpClientBuilder.sslSocketFactory(sslSocketFactory, trustManager); + } + + private void assertKeystorePropertiesSet(String location, String password, String type) { + if (location == null || location.isEmpty()) { + throw new IllegalArgumentException(clientConfig.getKeystoreFilename() + " is null or is empty"); + } + + if (password == null || password.isEmpty()) { + throw new IllegalArgumentException("The client's keystore filename is set but its password is not (or is empty). If the location is set, the password must also be."); + } + + if (type == null || type.isEmpty()) { + throw new IllegalArgumentException("The client's keystore filename is set but its type is not (or is empty). If the location is set, the type must also be."); + } + } + + private void assertTruststorePropertiesSet(String location, String password, String type) { + if (location == null || location.isEmpty()) { + throw new IllegalArgumentException("The client's truststore filename is not set or is empty"); + } + + if (password == null || password.isEmpty()) { + throw new IllegalArgumentException("The client's truststore filename is set but its password is not (or is empty). If the location is set, the password must also be."); + } + + if (type == null || type.isEmpty()) { + throw new IllegalArgumentException("The client's truststore filename is set but its type is not (or is empty). If the location is set, the type must also be."); + } + } +} diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/C2UrlProvider.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/C2UrlProvider.java new file mode 100644 index 0000000000..51ea7a30c8 --- /dev/null +++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/C2UrlProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.http.url; + +import java.util.Optional; + +public interface C2UrlProvider { + + /** + * Retrieves the url of the C2 server to send heartbeats to + * + * @return the url of the C2 server to send heartbeats to + */ + String getHeartbeatUrl(); + + /** + * Retrieves the url of the C2 server to send acknowledgements to + * + * @return the url of the C2 server to send acknowledgements to + */ + String getAcknowledgeUrl(); + + /** + * Retrieves the callback url of the C2 server according to the C2 configuration (proxy aware or not) + * + * @param absoluteUrl absolute url sent by the C2 server + * @param relativeUrl relative url sent by the C2 server + * @return the url of the C2 server to send requests to + */ + Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl); +} diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/C2UrlProviderFactory.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/C2UrlProviderFactory.java new file mode 100644 index 0000000000..ae2e12687e --- /dev/null +++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/C2UrlProviderFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.http.url; + +import static org.apache.commons.lang3.StringUtils.isNoneBlank; + +import org.apache.nifi.c2.client.C2ClientConfig; + +public class C2UrlProviderFactory { + + private static final String INCORRECT_SETTINGS_ERROR_MESSAGE = "Incorrect configuration. Please revisit C2 URL properties." + + "Either c2.rest.url and c2.rest.url.ack have to be set," + + "either c2.rest.path.base, c2.rest.path.heartbeat and c2.rest.path.acknowledge have to configured"; + + private final C2ClientConfig clientConfig; + + public C2UrlProviderFactory(C2ClientConfig clientConfig) { + this.clientConfig = clientConfig; + } + + public C2UrlProvider create() { + if (isNoneBlank(clientConfig.getC2RestPathBase(), clientConfig.getC2RestPathHeartbeat(), clientConfig.getC2RestPathAcknowledge())) { + return new ProxyAwareC2UrlProvider(clientConfig.getC2RestPathBase(), clientConfig.getC2RestPathHeartbeat(), clientConfig.getC2RestPathAcknowledge()); + } else if (isNoneBlank(clientConfig.getC2Url(), clientConfig.getC2AckUrl())) { + return new LegacyC2UrlProvider(clientConfig.getC2Url(), clientConfig.getC2AckUrl()); + } else { + throw new IllegalArgumentException(INCORRECT_SETTINGS_ERROR_MESSAGE); + } + } +} diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/LegacyC2UrlProvider.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/LegacyC2UrlProvider.java new file mode 100644 index 0000000000..fe9fad24ef --- /dev/null +++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/LegacyC2UrlProvider.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.http.url; + +import java.util.Optional; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LegacyC2UrlProvider implements C2UrlProvider { + + private static final Logger LOG = LoggerFactory.getLogger(LegacyC2UrlProvider.class); + + private final String c2Url; + private final String c2AckUrl; + + LegacyC2UrlProvider(String c2Url, String c2AckUrl) { + this.c2Url = c2Url; + this.c2AckUrl = c2AckUrl; + } + + @Override + public String getHeartbeatUrl() { + return c2Url; + } + + @Override + public String getAcknowledgeUrl() { + return c2AckUrl; + } + + @Override + public Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl) { + Optional<String> url = Optional.ofNullable(absoluteUrl).filter(StringUtils::isNotBlank); + if (!url.isPresent()) { + LOG.error("Provided absolute url was empty or null. Relative urls are not supported with this configuration"); + } + return url; + } +} diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/ProxyAwareC2UrlProvider.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/ProxyAwareC2UrlProvider.java new file mode 100644 index 0000000000..c5fb9059fa --- /dev/null +++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/url/ProxyAwareC2UrlProvider.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.http.url; + +import static org.apache.commons.lang3.StringUtils.appendIfMissing; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.stripStart; + +import java.util.Optional; +import okhttp3.HttpUrl; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProxyAwareC2UrlProvider implements C2UrlProvider { + + private static final Logger LOG = LoggerFactory.getLogger(ProxyAwareC2UrlProvider.class); + private static final String SLASH = "/"; + + private final HttpUrl c2RestPathBase; + private final String c2RestPathHeartbeat; + private final String c2RestPathAcknowledge; + + ProxyAwareC2UrlProvider(String c2RestPathBase, String c2RestPathHeartbeat, String c2RestPathAcknowledge) { + this.c2RestPathBase = Optional.ofNullable(c2RestPathBase) + .filter(StringUtils::isNotBlank) + .map(apiBase -> appendIfMissing(apiBase, SLASH)) // trailing slash needs to be added for proper URL creation + .map(HttpUrl::parse) + .orElseThrow(() -> new IllegalArgumentException("Parameter c2RestPathBase should not be null or empty and should be a valid URL")); + this.c2RestPathHeartbeat = toAbsoluteUrl(c2RestPathHeartbeat) + .orElseThrow(() -> new IllegalArgumentException("Unable to convert c2RestPathHeartbeat to absolute url. Please check C2 configuration")); + this.c2RestPathAcknowledge = toAbsoluteUrl(c2RestPathAcknowledge) + .orElseThrow(() -> new IllegalArgumentException("Unable to convert c2RestPathAcknowledge to absolute url. Please check C2 configuration")); + } + + @Override + public String getHeartbeatUrl() { + return c2RestPathHeartbeat; + } + + @Override + public String getAcknowledgeUrl() { + return c2RestPathAcknowledge; + } + + @Override + public Optional<String> getCallbackUrl(String absoluteUrl, String relativeUrl) { + return Optional.ofNullable(relativeUrl) + .map(this::toAbsoluteUrl) + .filter(Optional::isPresent) + .orElseGet(() -> Optional.ofNullable(absoluteUrl).filter(StringUtils::isNotBlank)); + } + + private Optional<String> toAbsoluteUrl(String path) { + if (isBlank(path)) { + LOG.error("Unable to convert to absolute url, provided path was null or empty"); + return Optional.empty(); + } + try { + return Optional.of(c2RestPathBase.resolve(stripStart(path, SLASH)).toString()); // leading slash needs to be removed for proper URL creation + } catch (Exception e) { + LOG.error("Unable to convert restBase=" + c2RestPathBase + " and restPath=" + path + " to absolute url", e); + return Optional.empty(); + } + } +} diff --git a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java index 28ceef2d1f..fb0ee3c49d 100644 --- a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java +++ b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java @@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.when; import java.io.IOException; @@ -45,6 +46,7 @@ import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) public class C2HttpClientTest { + public static final String INCORRECT_PATH = "http://localhost/incorrectPath"; private static final String HEARTBEAT_PATH = "c2/heartbeat"; private static final String UPDATE_PATH = "c2/update"; private static final String ACK_PATH = "c2/acknowledge"; @@ -59,8 +61,6 @@ public class C2HttpClientTest { @Mock private C2Serializer serializer; - private C2HttpClient c2HttpClient; - private MockWebServer mockWebServer; private String baseUrl; @@ -71,7 +71,8 @@ public class C2HttpClientTest { baseUrl = mockWebServer.url("/").newBuilder().host("localhost").build().toString(); when(c2ClientConfig.getKeepAliveDuration()).thenReturn(KEEP_ALIVE_DURATION); when(c2ClientConfig.getMaxIdleConnections()).thenReturn(MAX_IDLE_CONNECTIONS); - c2HttpClient = new C2HttpClient(c2ClientConfig, serializer); + lenient().when(c2ClientConfig.getC2Url()).thenReturn(baseUrl + HEARTBEAT_PATH); + lenient().when(c2ClientConfig.getC2AckUrl()).thenReturn(baseUrl + ACK_PATH); } @AfterEach @@ -86,8 +87,8 @@ public class C2HttpClientTest { when(serializer.serialize(any(C2Heartbeat.class))).thenReturn(Optional.of("Heartbeat")); when(serializer.deserialize(any(), any())).thenReturn(Optional.of(hbResponse)); - when(c2ClientConfig.getC2Url()).thenReturn(baseUrl + HEARTBEAT_PATH); + C2HttpClient c2HttpClient = C2HttpClient.create(c2ClientConfig, serializer); Optional<C2HeartbeatResponse> response = c2HttpClient.publishHeartbeat(new C2Heartbeat()); assertTrue(response.isPresent()); @@ -100,8 +101,10 @@ public class C2HttpClientTest { @Test void testPublishHeartbeatReturnEmptyInCaseOfCommunicationIssue() { when(serializer.serialize(any(C2Heartbeat.class))).thenReturn(Optional.of("Heartbeat")); - when(c2ClientConfig.getC2Url()).thenReturn("http://localhost/incorrectPath"); + when(c2ClientConfig.getC2Url()).thenReturn(INCORRECT_PATH); + when(c2ClientConfig.getC2AckUrl()).thenReturn(INCORRECT_PATH); + C2HttpClient c2HttpClient = C2HttpClient.create(c2ClientConfig, serializer); Optional<C2HeartbeatResponse> response = c2HttpClient.publishHeartbeat(new C2Heartbeat()); assertFalse(response.isPresent()); @@ -111,7 +114,7 @@ public class C2HttpClientTest { void testConstructorThrowsExceptionForInvalidKeystoreFilenameAtInitialization() { when(c2ClientConfig.getKeystoreFilename()).thenReturn("incorrectKeystoreFilename"); - IllegalStateException exception = assertThrows(IllegalStateException.class, () -> new C2HttpClient(c2ClientConfig, serializer)); + IllegalStateException exception = assertThrows(IllegalStateException.class, () -> C2HttpClient.create(c2ClientConfig, serializer)); assertTrue(exception.getMessage().contains("TLS")); } @@ -120,6 +123,7 @@ public class C2HttpClientTest { void testRetrieveUpdateContentReturnsEmptyWhenServerErrorResponse() throws InterruptedException { mockWebServer.enqueue(new MockResponse().setBody("updateContent").setResponseCode(HTTP_STATUS_BAD_REQUEST)); + C2HttpClient c2HttpClient = C2HttpClient.create(c2ClientConfig, serializer); Optional<byte[]> response = c2HttpClient.retrieveUpdateContent(baseUrl + UPDATE_PATH); assertFalse(response.isPresent()); @@ -133,6 +137,7 @@ public class C2HttpClientTest { String content = "updateContent"; mockWebServer.enqueue(new MockResponse().setBody(content).setResponseCode(HTTP_STATUS_OK)); + C2HttpClient c2HttpClient = C2HttpClient.create(c2ClientConfig, serializer); Optional<byte[]> response = c2HttpClient.retrieveUpdateContent(baseUrl + UPDATE_PATH); assertTrue(response.isPresent()); @@ -145,10 +150,10 @@ public class C2HttpClientTest { @Test void testAcknowledgeOperation() throws InterruptedException { String ackContent = "ack"; - when(c2ClientConfig.getC2AckUrl()).thenReturn(baseUrl + ACK_PATH); when(serializer.serialize(any(C2OperationAck.class))).thenReturn(Optional.of(ackContent)); mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_STATUS_OK)); + C2HttpClient c2HttpClient = C2HttpClient.create(c2ClientConfig, serializer); c2HttpClient.acknowledgeOperation(new C2OperationAck()); RecordedRequest request = mockWebServer.takeRequest(); diff --git a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/C2UrlProviderFactoryTest.java b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/C2UrlProviderFactoryTest.java new file mode 100644 index 0000000000..40460371c5 --- /dev/null +++ b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/C2UrlProviderFactoryTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.http.url; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrowsExactly; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.when; + +import org.apache.nifi.c2.client.C2ClientConfig; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class C2UrlProviderFactoryTest { + + private static final String C2_REST_BASE = "https://host:8080/c2/api"; + private static final String HEARTBEAT_PATH = "/heartbeat"; + private static final String ACKNOWLEDGE_PATH = "/acknowledge"; + + @Mock + private C2ClientConfig clientConfig; + @InjectMocks + private C2UrlProviderFactory testC2UrlProviderFactory; + + @Test + public void testProxyAwareC2UrlProviderIsCreated() { + // given + when(clientConfig.getC2RestPathBase()).thenReturn(C2_REST_BASE); + when(clientConfig.getC2RestPathHeartbeat()).thenReturn(HEARTBEAT_PATH); + when(clientConfig.getC2RestPathAcknowledge()).thenReturn(ACKNOWLEDGE_PATH); + + // when + C2UrlProvider c2UrlProvider = testC2UrlProviderFactory.create(); + + // then + assertInstanceOf(ProxyAwareC2UrlProvider.class, c2UrlProvider); + } + + @Test + public void testLegacyC2UrlProviderIsCreated() { + // given + when(clientConfig.getC2Url()).thenReturn(C2_REST_BASE + HEARTBEAT_PATH); + when(clientConfig.getC2AckUrl()).thenReturn(C2_REST_BASE + ACKNOWLEDGE_PATH); + + // when + C2UrlProvider c2UrlProvider = testC2UrlProviderFactory.create(); + + // then + assertInstanceOf(LegacyC2UrlProvider.class, c2UrlProvider); + } + + @Test + public void testProxyAwareProviderTakesPrecedenceOverLegacy() { + // given + lenient().when(clientConfig.getC2RestPathBase()).thenReturn(C2_REST_BASE); + lenient().when(clientConfig.getC2RestPathHeartbeat()).thenReturn(HEARTBEAT_PATH); + lenient().when(clientConfig.getC2RestPathAcknowledge()).thenReturn(ACKNOWLEDGE_PATH); + lenient().when(clientConfig.getC2Url()).thenReturn(C2_REST_BASE + HEARTBEAT_PATH); + lenient().when(clientConfig.getC2AckUrl()).thenReturn(C2_REST_BASE + ACKNOWLEDGE_PATH); + + // when + C2UrlProvider c2UrlProvider = testC2UrlProviderFactory.create(); + + // then + assertInstanceOf(ProxyAwareC2UrlProvider.class, c2UrlProvider); + } + + @Test + public void testInsufficientConfigurationResultsInException() { + // given + when(clientConfig.getC2RestPathBase()).thenReturn(C2_REST_BASE); + when(clientConfig.getC2Url()).thenReturn(C2_REST_BASE + HEARTBEAT_PATH); + + // when + then + assertThrowsExactly(IllegalArgumentException.class, testC2UrlProviderFactory::create); + } +} diff --git a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/LegacyC2UrlProviderTest.java b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/LegacyC2UrlProviderTest.java new file mode 100644 index 0000000000..05f88b65e1 --- /dev/null +++ b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/LegacyC2UrlProviderTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.http.url; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Optional; +import java.util.stream.Stream; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class LegacyC2UrlProviderTest { + + private static final String C2_HEARTBEAT_URL = "https://host:8080/c2/api/heartbeat"; + private static final String C2_ACKNOWLEDGE_URL = "https://host:8080/c2/api/acknowledge"; + + @Test + public void testProviderIsCreatedAndReturnsProperHeartbeatAndAcknowledgeUrls() { + LegacyC2UrlProvider testProvider = new LegacyC2UrlProvider(C2_HEARTBEAT_URL, C2_ACKNOWLEDGE_URL); + + assertEquals(C2_HEARTBEAT_URL, testProvider.getHeartbeatUrl()); + assertEquals(C2_ACKNOWLEDGE_URL, testProvider.getAcknowledgeUrl()); + } + + @MethodSource("testCallbackUrlProvidedArguments") + @ParameterizedTest(name = "{index} => absoluteUrl={0}, relativeUrl={1}, expectedCallbackUrl={2}") + public void testCallbackUrlProvidedFor(String absoluteUrl, String relativeUrl, Optional<String> expectedCallbackUrl) { + LegacyC2UrlProvider testProvider = new LegacyC2UrlProvider(C2_HEARTBEAT_URL, C2_ACKNOWLEDGE_URL); + assertEquals(expectedCallbackUrl, testProvider.getCallbackUrl(absoluteUrl, relativeUrl)); + } + + private static Stream<Arguments> testCallbackUrlProvidedArguments() { + return Stream.of( + Arguments.of(null, null, Optional.empty()), + Arguments.of(null, "any_url", Optional.empty()), + Arguments.of("", "", Optional.empty()), + Arguments.of("", "any_url", Optional.empty()), + Arguments.of("http://c2/api/callback", "any_url", Optional.of("http://c2/api/callback")) + ); + } +} diff --git a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/ProxyAwareC2ProviderTest.java b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/ProxyAwareC2ProviderTest.java new file mode 100644 index 0000000000..99c1a82211 --- /dev/null +++ b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/url/ProxyAwareC2ProviderTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.c2.client.http.url; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrowsExactly; + +import java.util.Optional; +import java.util.stream.Stream; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class ProxyAwareC2ProviderTest { + + @MethodSource("testInsufficientProviderConstructorArguments") + @ParameterizedTest(name = "{index} => c2RestApi={0}, c2RestPathHeartbeat={1}, c2RestPathAcknowledge={2}") + public void testExceptionIsThrownWhenUrlsCanNotBeCreatedFromInputParameters(String c2RestApi, String c2RestPathHeartbeat, String c2RestPathAcknowledge) { + assertThrowsExactly(IllegalArgumentException.class, () -> new ProxyAwareC2UrlProvider(c2RestApi, c2RestPathHeartbeat, c2RestPathAcknowledge)); + } + + private static Stream<Arguments> testInsufficientProviderConstructorArguments() { + return Stream.of( + Arguments.of(null, null, null), + Arguments.of(null, null, ""), + Arguments.of(null, "", ""), + Arguments.of("", "", ""), + Arguments.of("", "", null), + Arguments.of("", null, null), + Arguments.of("http://c2/api", null, null), + Arguments.of("http://c2/api", "", null), + Arguments.of("http://c2/api", null, ""), + Arguments.of("http://c2/api", "", ""), + Arguments.of(null, "path1", null), + Arguments.of(null, null, "path2"), + Arguments.of(null, "path1", "path2"), + Arguments.of("invalid_url/api", "path1", "path2") + ); + } + + @MethodSource("testValidProviderConstructorArguments") + @ParameterizedTest(name = "{index} => c2RestApi={0}, c2RestPathHeartbeat={1}, c2RestPathAcknowledge={2}, expectedHeartbeatUrl={3}, expectedAcknowledgeUrl={4}") + public void testUrlProviderIsCreatedAndHeartbeatAndAcknowledgeUrlsAreReturnedCorrectly(String c2RestApi, String c2RestPathHeartbeat, String c2RestPathAcknowledge, + String expectedHeartbeatUrl, String expectedAcknowledgeUrl) { + ProxyAwareC2UrlProvider testProvider = new ProxyAwareC2UrlProvider(c2RestApi, c2RestPathHeartbeat, c2RestPathAcknowledge); + + assertEquals(expectedHeartbeatUrl, testProvider.getHeartbeatUrl()); + assertEquals(expectedAcknowledgeUrl, testProvider.getAcknowledgeUrl()); + } + + private static Stream<Arguments> testValidProviderConstructorArguments() { + String expectedHearbeatUrl = "http://c2/api/path1"; + String expectedAckUrl = "http://c2/api/path2"; + return Stream.of( + Arguments.of("http://c2/api", "path1", "path2", expectedHearbeatUrl, expectedAckUrl), + Arguments.of("http://c2/api", "/path1", "path2", expectedHearbeatUrl, expectedAckUrl), + Arguments.of("http://c2/api", "path1", "/path2", expectedHearbeatUrl, expectedAckUrl), + Arguments.of("http://c2/api", "/path1", "/path2", expectedHearbeatUrl, expectedAckUrl), + Arguments.of("http://c2/api/", "path1", "path2", expectedHearbeatUrl, expectedAckUrl), + Arguments.of("http://c2/api/", "/path1", "path2", expectedHearbeatUrl, expectedAckUrl), + Arguments.of("http://c2/api/", "path1", "/path2", expectedHearbeatUrl, expectedAckUrl), + Arguments.of("http://c2/api/", "/path1", "/path2", expectedHearbeatUrl, expectedAckUrl) + ); + } + + @MethodSource("testCallbackUrlProvidedArguments") + @ParameterizedTest(name = "{index} => c2RestBase={0}, absoluteUrl={1}, relativeUrl={2}, expectedCallbackUrl={3}") + public void testCallbackUrlProvidedFor(String c2RestBase, String absoluteUrl, String relativeUrl, Optional<String> expectedCallbackUrl) { + ProxyAwareC2UrlProvider testProvider = new ProxyAwareC2UrlProvider(c2RestBase, "any_path", "any_path"); + assertEquals(expectedCallbackUrl, testProvider.getCallbackUrl(absoluteUrl, relativeUrl)); + } + + private static Stream<Arguments> testCallbackUrlProvidedArguments() { + String c2RestBaseNoTrailingSlash = "http://c2/api"; + String c2RestBaseWithTrailingSlash = "http://c2/api/"; + String path = "path/endpoint"; + String absoluteUrl = "http://c2-other/api/path/endpoint"; + return Stream.of( + Arguments.of(c2RestBaseNoTrailingSlash, null, null, Optional.empty()), + Arguments.of(c2RestBaseNoTrailingSlash, "", null, Optional.empty()), + Arguments.of(c2RestBaseNoTrailingSlash, null, "", Optional.empty()), + Arguments.of(c2RestBaseNoTrailingSlash, "", "", Optional.empty()), + Arguments.of(c2RestBaseWithTrailingSlash, null, null, Optional.empty()), + Arguments.of(c2RestBaseWithTrailingSlash, "", null, Optional.empty()), + Arguments.of(c2RestBaseWithTrailingSlash, null, "", Optional.empty()), + Arguments.of(c2RestBaseWithTrailingSlash, "", "", Optional.empty()), + Arguments.of(c2RestBaseNoTrailingSlash, null, path, Optional.of(c2RestBaseWithTrailingSlash + path)), + Arguments.of(c2RestBaseNoTrailingSlash, "", "/" + path, Optional.of(c2RestBaseWithTrailingSlash + path)), + Arguments.of(c2RestBaseWithTrailingSlash, null, path, Optional.of(c2RestBaseWithTrailingSlash + path)), + Arguments.of(c2RestBaseWithTrailingSlash, "", "/" + path, Optional.of(c2RestBaseWithTrailingSlash + path)), + Arguments.of(c2RestBaseWithTrailingSlash, absoluteUrl, null, Optional.of(absoluteUrl)), + Arguments.of(c2RestBaseWithTrailingSlash, absoluteUrl, "", Optional.of(absoluteUrl)) + ); + } +} diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandler.java index f9437b16e3..398f9c241e 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandler.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandler.java @@ -67,6 +67,7 @@ public class TransferDebugOperationHandler implements C2OperationHandler { private static final String UNABLE_TO_CREATE_BUNDLE = "Unable to create debug bundle"; static final String TARGET_ARG = "target"; + static final String RELATIVE_TARGET_ARG = "relativeTarget"; static final String NEW_LINE = "\n"; private final C2Client c2Client; @@ -116,9 +117,10 @@ public class TransferDebugOperationHandler implements C2OperationHandler { @Override public C2OperationAck handle(C2Operation operation) { - String debugCallbackUrl = ofNullable(operation.getArgs()).orElse(emptyMap()).get(TARGET_ARG); - if (debugCallbackUrl == null) { - LOG.error("Callback URL was not found in C2 request."); + Map<String, String> arguments = ofNullable(operation.getArgs()).orElse(emptyMap()); + Optional<String> callbackUrl = c2Client.getCallbackUrl(arguments.get(TARGET_ARG), arguments.get(RELATIVE_TARGET_ARG)); + if (!callbackUrl.isPresent()) { + LOG.error("Callback URL could not be constructed from C2 request and current configuration"); return operationAck(operation, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND)); } @@ -127,7 +129,7 @@ public class TransferDebugOperationHandler implements C2OperationHandler { try { contentFilteredFilePaths = filterContent(operation.getIdentifier(), bundleFilePaths); operationState = createDebugBundle(contentFilteredFilePaths) - .map(bundle -> c2Client.uploadBundle(debugCallbackUrl, bundle) + .map(bundle -> c2Client.uploadBundle(callbackUrl.get(), bundle) .map(errorMessage -> operationState(NOT_APPLIED, errorMessage)) .orElseGet(() -> operationState(FULLY_APPLIED, SUCCESSFUL_UPLOAD))) .orElseGet(() -> operationState(NOT_APPLIED, UNABLE_TO_CREATE_BUNDLE)); diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandler.java index b73e66cf32..5a964ea7fe 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandler.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandler.java @@ -28,6 +28,7 @@ import static org.apache.nifi.c2.protocol.api.OperandType.ASSET; import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE; import java.util.Map; +import java.util.Optional; import java.util.function.BiFunction; import java.util.function.BiPredicate; import org.apache.nifi.c2.client.api.C2Client; @@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory; public class UpdateAssetOperationHandler implements C2OperationHandler { static final String ASSET_URL_KEY = "url"; + static final String ASSET_RELATIVE_URL_KEY = "relativeUrl"; static final String ASSET_FILE_KEY = "file"; static final String ASSET_FORCE_DOWNLOAD_KEY = "forceDownload"; @@ -55,14 +57,14 @@ public class UpdateAssetOperationHandler implements C2OperationHandler { private static final Logger LOG = LoggerFactory.getLogger(UpdateAssetOperationHandler.class); - private final C2Client client; + private final C2Client c2Client; private final OperandPropertiesProvider operandPropertiesProvider; private final BiPredicate<String, Boolean> assetUpdatePrecondition; private final BiFunction<String, byte[], Boolean> assetPersistFunction; - public UpdateAssetOperationHandler(C2Client client, OperandPropertiesProvider operandPropertiesProvider, + public UpdateAssetOperationHandler(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider, BiPredicate<String, Boolean> assetUpdatePrecondition, BiFunction<String, byte[], Boolean> assetPersistFunction) { - this.client = client; + this.c2Client = c2Client; this.operandPropertiesProvider = operandPropertiesProvider; this.assetUpdatePrecondition = assetUpdatePrecondition; this.assetPersistFunction = assetPersistFunction; @@ -105,11 +107,12 @@ public class UpdateAssetOperationHandler implements C2OperationHandler { public C2OperationAck handle(C2Operation operation) { String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY); - String assetUrl = getOperationArg(operation, ASSET_URL_KEY); - if (assetUrl == null) { - LOG.error("Callback URL with key={} was not found in C2 request. C2 request arguments={}", ASSET_URL_KEY, operation.getArgs()); + Optional<String> callbackUrl = c2Client.getCallbackUrl(getOperationArg(operation, ASSET_URL_KEY), getOperationArg(operation, ASSET_RELATIVE_URL_KEY)); + if (!callbackUrl.isPresent()) { + LOG.error("Callback URL could not be constructed from C2 request and current configuration"); return operationAck(operationId, operationState(NOT_APPLIED, C2_CALLBACK_URL_NOT_FOUND)); } + String assetFileName = getOperationArg(operation, ASSET_FILE_KEY); if (assetFileName == null) { LOG.error("Asset file name with key={} was not found in C2 request. C2 request arguments={}", ASSET_FILE_KEY, operation.getArgs()); @@ -117,14 +120,14 @@ public class UpdateAssetOperationHandler implements C2OperationHandler { } boolean forceDownload = parseBoolean(getOperationArg(operation, ASSET_FORCE_DOWNLOAD_KEY)); - LOG.info("Initiating asset update from url {} with name {}, force update is {}", assetUrl, assetFileName, forceDownload); + LOG.info("Initiating asset update from url {} with name {}, force update is {}", callbackUrl, assetFileName, forceDownload); C2OperationState operationState = assetUpdatePrecondition.test(assetFileName, forceDownload) - ? client.retrieveUpdateContent(assetUrl) - .map(content -> assetPersistFunction.apply(assetFileName, content) - ? operationState(FULLY_APPLIED, SUCCESSFULLY_UPDATE_ASSET) - : operationState(NOT_APPLIED, FAILED_TO_PERSIST_ASSET_TO_DISK)) - .orElseGet(() -> operationState(NOT_APPLIED, UPDATE_ASSET_RETRIEVAL_RESULTED_IN_EMPTY_CONTENT)) + ? c2Client.retrieveUpdateContent(callbackUrl.get()) + .map(content -> assetPersistFunction.apply(assetFileName, content) + ? operationState(FULLY_APPLIED, SUCCESSFULLY_UPDATE_ASSET) + : operationState(NOT_APPLIED, FAILED_TO_PERSIST_ASSET_TO_DISK)) + .orElseGet(() -> operationState(NOT_APPLIED, UPDATE_ASSET_RETRIEVAL_RESULTED_IN_EMPTY_CONTENT)) : operationState(NO_OPERATION, UPDATE_ASSET_PRECONDITIONS_WERE_NOT_MET); return operationAck(operationId, operationState); diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java index db3ef8a86d..b58bdc8796 100644 --- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java +++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java @@ -14,9 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.nifi.c2.client.service.operation; +import static java.util.Collections.emptyMap; +import static java.util.Optional.ofNullable; import static org.apache.commons.lang3.StringUtils.EMPTY; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED; +import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION; import static org.apache.nifi.c2.protocol.api.OperandType.CONFIGURATION; import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE; @@ -41,6 +47,8 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler { private static final Pattern FLOW_ID_PATTERN = Pattern.compile("/[^/]+?/[^/]+?/[^/]+?/([^/]+)?/?.*"); static final String FLOW_ID = "flowId"; static final String LOCATION = "location"; + public static final String FLOW_URL_KEY = "flowUrl"; + public static final String FLOW_RELATIVE_URL_KEY = "relativeFlowUrl"; private final C2Client client; private final Function<byte[], Boolean> updateFlow; @@ -48,7 +56,7 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler { private final OperandPropertiesProvider operandPropertiesProvider; public UpdateConfigurationOperationHandler(C2Client client, FlowIdHolder flowIdHolder, Function<byte[], Boolean> updateFlow, - OperandPropertiesProvider operandPropertiesProvider) { + OperandPropertiesProvider operandPropertiesProvider) { this.client = client; this.updateFlow = updateFlow; this.flowIdHolder = flowIdHolder; @@ -66,79 +74,92 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler { } @Override - public C2OperationAck handle(C2Operation operation) { - String opIdentifier = Optional.ofNullable(operation.getIdentifier()) - .orElse(EMPTY); - C2OperationAck operationAck = new C2OperationAck(); - C2OperationState state = new C2OperationState(); - operationAck.setOperationState(state); - operationAck.setOperationId(opIdentifier); + public Map<String, Object> getProperties() { + return operandPropertiesProvider.getProperties(); + } - String updateLocation = Optional.ofNullable(operation.getArgs()) - .map(map -> map.get(LOCATION)) - .orElse(EMPTY); + @Override + public boolean requiresRestart() { + return true; + } + + @Override + public C2OperationAck handle(C2Operation operation) { + String operationId = Optional.ofNullable(operation.getIdentifier()).orElse(EMPTY); + + Map<String, String> arguments = ofNullable(operation.getArgs()).orElse(emptyMap()); + String absoluteFlowUrl = ofNullable(arguments.get(FLOW_URL_KEY)).orElse(arguments.get(LOCATION)); + Optional<String> callbackUrl = client.getCallbackUrl(absoluteFlowUrl, arguments.get(FLOW_RELATIVE_URL_KEY)); + if (!callbackUrl.isPresent()) { + logger.error("Callback URL could not be constructed from C2 request and current configuration"); + return operationAck(operationId, operationState(NOT_APPLIED, "Could not get callback url from operation and current configuration")); + } - String flowId = getFlowId(operation.getArgs(), updateLocation); + String flowId = getFlowId(operation.getArgs(), callbackUrl.get()); if (flowId == null) { - state.setState(C2OperationState.OperationState.NOT_APPLIED); - state.setDetails("Could not get flowId from the operation."); - logger.info("FlowId is missing, no update will be performed."); - } else { - if (flowIdHolder.getFlowId() == null || !flowIdHolder.getFlowId().equals(flowId)) { - logger.info("Will perform flow update from {} for operation #{}. Previous flow id was {}, replacing with new id {}", updateLocation, opIdentifier, - flowIdHolder.getFlowId() == null ? "not set" : flowIdHolder.getFlowId(), flowId); - } else { - logger.info("Flow is current, no update is necessary..."); - } - flowIdHolder.setFlowId(flowId); - state.setState(updateFlow(opIdentifier, updateLocation)); + logger.error("FlowId is missing, no update will be performed"); + return operationAck(operationId, operationState(NOT_APPLIED, "Could not get flowId from the operation")); } - return operationAck; + + if (flowIdHolder.getFlowId() != null && flowIdHolder.getFlowId().equals(flowId)) { + logger.info("Flow is current, no update is necessary"); + return operationAck(operationId, operationState(NO_OPERATION, "Flow is current, no update is necessary")); + } + + logger.info("Will perform flow update from {} for operation #{}. Previous flow id was {}, replacing with new id {}", + callbackUrl, operationId, ofNullable(flowIdHolder.getFlowId()).orElse("not set"), flowId); + flowIdHolder.setFlowId(flowId); + return operationAck(operationId, updateFlow(operationId, callbackUrl.get())); } - private C2OperationState.OperationState updateFlow(String opIdentifier, String updateLocation) { - Optional<byte[]> updateContent = client.retrieveUpdateContent(updateLocation); - if (updateContent.isPresent()) { - if (updateFlow.apply(updateContent.get())) { - logger.debug("Update configuration applied for operation #{}.", opIdentifier); - return C2OperationState.OperationState.FULLY_APPLIED; - } else { - logger.error("Update resulted in error for operation #{}.", opIdentifier); - return C2OperationState.OperationState.NOT_APPLIED; - } - } else { + private C2OperationState updateFlow(String opIdentifier, String callbackUrl) { + Optional<byte[]> updateContent = client.retrieveUpdateContent(callbackUrl); + + if (!updateContent.isPresent()) { logger.error("Update content retrieval resulted in empty content so flow update was omitted for operation #{}.", opIdentifier); - return C2OperationState.OperationState.NOT_APPLIED; + return operationState(NOT_APPLIED, "Update content retrieval resulted in empty content"); } + + if (!updateFlow.apply(updateContent.get())) { + logger.error("Update resulted in error for operation #{}.", opIdentifier); + return operationState(NOT_APPLIED, "Update resulted in error"); + } + + logger.debug("Update configuration applied for operation #{}.", opIdentifier); + return operationState(FULLY_APPLIED, "Update configuration applied successfully"); } - private String getFlowId(Map<String, String> args, String updateLocation) { + private String getFlowId(Map<String, String> args, String callbackUrl) { return Optional.ofNullable(args) - .map(map -> map.get(FLOW_ID)) - .orElseGet(() -> parseFlowId(updateLocation)); + .map(map -> map.get(FLOW_ID)) + .orElseGet(() -> parseFlowId(callbackUrl)); } - private String parseFlowId(String flowUpdateUrl) { + private String parseFlowId(String callbackUrl) { try { - URI flowUri = new URI(flowUpdateUrl); + URI flowUri = new URI(callbackUrl); Matcher matcher = FLOW_ID_PATTERN.matcher(flowUri.getPath()); if (matcher.matches()) { return matcher.group(1); } } catch (Exception e) { - logger.error("Could not get flow id from the provided URL, flow update URL format unexpected [{}]", flowUpdateUrl); + logger.error("Could not get flow id from the provided URL, flow update URL format unexpected [{}]", callbackUrl); } return null; } - @Override - public Map<String, Object> getProperties() { - return operandPropertiesProvider.getProperties(); + private C2OperationState operationState(C2OperationState.OperationState operationState, String details) { + C2OperationState state = new C2OperationState(); + state.setState(operationState); + state.setDetails(details); + return state; } - @Override - public boolean requiresRestart() { - return true; + private C2OperationAck operationAck(String operationId, C2OperationState operationState) { + C2OperationAck operationAck = new C2OperationAck(); + operationAck.setOperationState(operationState); + operationAck.setOperationId(operationId); + return operationAck; } } diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandlerTest.java index 7973f35fda..798928bce0 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandlerTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/TransferDebugOperationHandlerTest.java @@ -35,8 +35,10 @@ import static org.apache.nifi.c2.protocol.api.OperationType.TRANSFER; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.BufferedReader; import java.io.ByteArrayInputStream; @@ -49,6 +51,7 @@ import java.nio.file.Paths; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Predicate; import java.util.stream.Stream; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; @@ -136,6 +139,7 @@ public class TransferDebugOperationHandlerTest { .collect(toList()); TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, createBundleFiles, DEFAULT_CONTENT_FILTER); C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT); + when(c2Client.getCallbackUrl(any(), any())).thenReturn(Optional.of(C2_DEBUG_UPLOAD_ENDPOINT)); // when C2OperationAck result = testHandler.handle(c2Operation); @@ -196,6 +200,7 @@ public class TransferDebugOperationHandlerTest { Predicate<String> testContentFilter = content -> !content.contains(filterKeyword); TransferDebugOperationHandler testHandler = TransferDebugOperationHandler.create(c2Client, operandPropertiesProvider, singletonList(bundleFile), testContentFilter); C2Operation c2Operation = operation(C2_DEBUG_UPLOAD_ENDPOINT); + when(c2Client.getCallbackUrl(any(), any())).thenReturn(Optional.of(C2_DEBUG_UPLOAD_ENDPOINT)); // when C2OperationAck result = testHandler.handle(c2Operation); diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandlerTest.java index b5519abf8d..444bd9ff27 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandlerTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateAssetOperationHandlerTest.java @@ -37,6 +37,8 @@ import static org.apache.nifi.c2.protocol.api.OperandType.ASSET; import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -49,6 +51,7 @@ import java.util.stream.Stream; import org.apache.nifi.c2.client.api.C2Client; import org.apache.nifi.c2.protocol.api.C2Operation; import org.apache.nifi.c2.protocol.api.C2OperationAck; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -89,6 +92,11 @@ public class UpdateAssetOperationHandlerTest { Arguments.of(mock(C2Client.class), mock(OperandPropertiesProvider.class), mock(BiPredicate.class), null)); } + @BeforeEach + public void setup() { + lenient().when(c2Client.getCallbackUrl(any(), any())).thenReturn(Optional.of(ASSET_URL)); + } + @ParameterizedTest(name = "c2Client={0} operandPropertiesProvider={1} bundleFileList={2} contentFilter={3}") @MethodSource("invalidConstructorArguments") public void testAttemptingCreateWithInvalidParametersWillThrowException(C2Client c2Client, OperandPropertiesProvider operandPropertiesProvider, @@ -107,6 +115,7 @@ public class UpdateAssetOperationHandlerTest { public void testAssetUrlCanNotBeNull() { // given C2Operation operation = operation(null, ASSET_FILE_NAME, FORCE_DOWNLOAD); + when(c2Client.getCallbackUrl(any(), any())).thenReturn(Optional.empty()); // when C2OperationAck result = testHandler.handle(operation); diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java index 11acc072dc..fb23a15169 100644 --- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java +++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.nifi.c2.client.service.operation; import static org.apache.commons.lang3.StringUtils.EMPTY; @@ -21,6 +22,8 @@ import static org.apache.nifi.c2.client.service.operation.UpdateConfigurationOpe import static org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler.LOCATION; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.Collections; @@ -43,15 +46,17 @@ import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) public class UpdateConfigurationOperationHandlerTest { private static final String OPERATION_ID = "operationId"; - private static final Map<String, String> CORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, "/path/for/the/" + FLOW_ID); - private static final Map<String, String> INCORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, "incorrect/location"); + private static final String CORRECT_LOCATION = "/path/for/the/" + FLOW_ID; + private static final String INCORRECT_LOCATION = "incorrect/location"; - @Mock - private C2Client client; + private static final Map<String, String> CORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, CORRECT_LOCATION); + private static final Map<String, String> INCORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, INCORRECT_LOCATION); @Mock private FlowIdHolder flowIdHolder; @Mock + private C2Client client; + @Mock private OperandPropertiesProvider operandPropertiesProvider; @Test @@ -64,10 +69,12 @@ public class UpdateConfigurationOperationHandlerTest { @Test void testHandleIncorrectArg() { - UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(null, null, null, operandPropertiesProvider); + UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, null, null, operandPropertiesProvider); C2Operation operation = new C2Operation(); operation.setArgs(INCORRECT_LOCATION_MAP); + when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(INCORRECT_LOCATION)); + C2OperationAck response = handler.handle(operation); assertEquals(C2OperationState.OperationState.NOT_APPLIED, response.getOperationState().getState()); @@ -78,6 +85,7 @@ public class UpdateConfigurationOperationHandlerTest { Function<byte[], Boolean> successUpdate = x -> true; when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID); when(client.retrieveUpdateContent(any())).thenReturn(Optional.of("content".getBytes())); + when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(INCORRECT_LOCATION)); UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, successUpdate, operandPropertiesProvider); C2Operation operation = new C2Operation(); operation.setIdentifier(OPERATION_ID); @@ -94,9 +102,9 @@ public class UpdateConfigurationOperationHandlerTest { } @Test - void testHandleReturnsNotAppliedWithNoContent() { + void testHandleReturnsNoOperationWithNoContent() { when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID); - when(client.retrieveUpdateContent(any())).thenReturn(Optional.empty()); + when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(CORRECT_LOCATION)); UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, null, operandPropertiesProvider); C2Operation operation = new C2Operation(); operation.setArgs(CORRECT_LOCATION_MAP); @@ -104,14 +112,15 @@ public class UpdateConfigurationOperationHandlerTest { C2OperationAck response = handler.handle(operation); assertEquals(EMPTY, response.getOperationId()); - assertEquals(C2OperationState.OperationState.NOT_APPLIED, response.getOperationState().getState()); + assertEquals(C2OperationState.OperationState.NO_OPERATION, response.getOperationState().getState()); } @Test void testHandleReturnsNotAppliedWithContentApplyIssues() { Function<byte[], Boolean> failedToUpdate = x -> false; - when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID); + when(flowIdHolder.getFlowId()).thenReturn("previous_flow_id"); when(client.retrieveUpdateContent(any())).thenReturn(Optional.of("content".getBytes())); + when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(CORRECT_LOCATION)); UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, failedToUpdate, operandPropertiesProvider); C2Operation operation = new C2Operation(); operation.setIdentifier(OPERATION_ID); @@ -126,7 +135,8 @@ public class UpdateConfigurationOperationHandlerTest { @Test void testHandleReturnsFullyApplied() { Function<byte[], Boolean> successUpdate = x -> true; - when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID); + when(flowIdHolder.getFlowId()).thenReturn("previous_flow_id"); + when(client.getCallbackUrl(any(), any())).thenReturn(Optional.of(CORRECT_LOCATION)); when(client.retrieveUpdateContent(any())).thenReturn(Optional.of("content".getBytes())); UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, successUpdate, operandPropertiesProvider); C2Operation operation = new C2Operation(); @@ -135,6 +145,7 @@ public class UpdateConfigurationOperationHandlerTest { C2OperationAck response = handler.handle(operation); + verify(flowIdHolder, times(1)).setFlowId(FLOW_ID); assertEquals(OPERATION_ID, response.getOperationId()); assertEquals(C2OperationState.OperationState.FULLY_APPLIED, response.getOperationState().getState()); } diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/MiNiFiProperties.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/MiNiFiProperties.java index 755df830c2..12eb22a7f8 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/MiNiFiProperties.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/MiNiFiProperties.java @@ -85,6 +85,9 @@ public enum MiNiFiProperties { C2_ENABLE("c2.enable", "false", false, true, BOOLEAN_VALIDATOR), C2_REST_URL("c2.rest.url", "", false, true, VALID), C2_REST_URL_ACK("c2.rest.url.ack", "", false, true, VALID), + C2_REST_PATH_BASE("c2.rest.path.base", "", false, true, VALID), + C2_REST_PATH_HEARTBEAT("c2.rest.path.heartbeat", "", false, true, VALID), + C2_REST_PATH_ACKNOWLEDGE("c2.rest.path.acknowledge", "", false, true, VALID), C2_REST_CONNECTION_TIMEOUT("c2.rest.connectionTimeout", "5 sec", false, true, TIME_PERIOD_VALIDATOR), C2_REST_READ_TIMEOUT("c2.rest.readTimeout", "5 sec", false, true, TIME_PERIOD_VALIDATOR), C2_REST_CALL_TIMEOUT("c2.rest.callTimeout", "10 sec", false, true, TIME_PERIOD_VALIDATOR), diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java index ad8f5e5e26..02ad5f7054 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java @@ -27,8 +27,11 @@ import static org.apache.nifi.minifi.MiNiFiProperties.C2_FULL_HEARTBEAT; import static org.apache.nifi.minifi.MiNiFiProperties.C2_KEEP_ALIVE_DURATION; import static org.apache.nifi.minifi.MiNiFiProperties.C2_MAX_IDLE_CONNECTIONS; import static org.apache.nifi.minifi.MiNiFiProperties.C2_REQUEST_COMPRESSION; +import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_PATH_BASE; import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_CALL_TIMEOUT; import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_CONNECTION_TIMEOUT; +import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_PATH_ACKNOWLEDGE; +import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_PATH_HEARTBEAT; import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_READ_TIMEOUT; import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_URL; import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_URL_ACK; @@ -138,7 +141,6 @@ public class C2NifiClientService { private final PropertiesPersister propertiesPersister; private final ObjectMapper objectMapper; - private final long heartbeatPeriod; public C2NifiClientService(NiFiProperties niFiProperties, FlowController flowController, BootstrapCommunicator bootstrapCommunicator) { @@ -154,7 +156,7 @@ public class C2NifiClientService { this.heartbeatPeriod = clientConfig.getHeartbeatPeriod(); this.flowController = flowController; - C2HttpClient client = new C2HttpClient(clientConfig, new C2JacksonSerializer()); + C2HttpClient client = C2HttpClient.create(clientConfig, new C2JacksonSerializer()); C2HeartbeatFactory heartbeatFactory = new C2HeartbeatFactory(clientConfig, flowIdHolder, new ManifestHashProvider()); OperandPropertiesProvider emptyOperandPropertiesProvider = new EmptyOperandPropertiesProvider(); TransferDebugCommandHelper transferDebugCommandHelper = new TransferDebugCommandHelper(niFiProperties); @@ -196,19 +198,22 @@ public class C2NifiClientService { .maxIdleConnections(Integer.parseInt(properties.getProperty(C2_MAX_IDLE_CONNECTIONS.getKey(), C2_MAX_IDLE_CONNECTIONS.getDefaultValue()))) .keepAliveDuration((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_KEEP_ALIVE_DURATION.getKey(), C2_KEEP_ALIVE_DURATION.getDefaultValue()), TimeUnit.MILLISECONDS)) - .c2Url(properties.getProperty(C2_REST_URL.getKey(), C2_REST_URL.getDefaultValue())) .c2RequestCompression(properties.getProperty(C2_REQUEST_COMPRESSION.getKey(), C2_REQUEST_COMPRESSION.getDefaultValue())) .c2AssetDirectory(properties.getProperty(C2_ASSET_DIRECTORY.getKey(), C2_ASSET_DIRECTORY.getDefaultValue())) .confDirectory(properties.getProperty(C2_CONFIG_DIRECTORY.getKey(), C2_CONFIG_DIRECTORY.getDefaultValue())) .runtimeManifestIdentifier(properties.getProperty(C2_RUNTIME_MANIFEST_IDENTIFIER.getKey(), C2_RUNTIME_MANIFEST_IDENTIFIER.getDefaultValue())) .runtimeType(properties.getProperty(C2_RUNTIME_TYPE.getKey(), C2_RUNTIME_TYPE.getDefaultValue())) - .c2AckUrl(properties.getProperty(C2_REST_URL_ACK.getKey(), C2_REST_URL_ACK.getDefaultValue())) .truststoreFilename(properties.getProperty(C2_SECURITY_TRUSTSTORE_LOCATION.getKey(), C2_SECURITY_TRUSTSTORE_LOCATION.getDefaultValue())) .truststorePassword(properties.getProperty(C2_SECURITY_TRUSTSTORE_PASSWORD.getKey(), C2_SECURITY_TRUSTSTORE_PASSWORD.getDefaultValue())) .truststoreType(properties.getProperty(C2_SECURITY_TRUSTSTORE_TYPE.getKey(), C2_SECURITY_TRUSTSTORE_TYPE.getDefaultValue())) .keystoreFilename(properties.getProperty(C2_SECURITY_KEYSTORE_LOCATION.getKey(), C2_SECURITY_KEYSTORE_LOCATION.getDefaultValue())) .keystorePassword(properties.getProperty(C2_SECURITY_KEYSTORE_PASSWORD.getKey(), C2_SECURITY_KEYSTORE_PASSWORD.getDefaultValue())) .keystoreType(properties.getProperty(C2_SECURITY_KEYSTORE_TYPE.getKey(), C2_SECURITY_KEYSTORE_TYPE.getDefaultValue())) + .c2Url(properties.getProperty(C2_REST_URL.getKey(), C2_REST_URL.getDefaultValue())) + .c2AckUrl(properties.getProperty(C2_REST_URL_ACK.getKey(), C2_REST_URL_ACK.getDefaultValue())) + .c2RestPathBase(properties.getProperty(C2_REST_PATH_BASE.getKey(), C2_REST_PATH_BASE.getDefaultValue())) + .c2RestPathHeartbeat(properties.getProperty(C2_REST_PATH_HEARTBEAT.getKey(), C2_REST_PATH_HEARTBEAT.getDefaultValue())) + .c2RestPathAcknowledge(properties.getProperty(C2_REST_PATH_ACKNOWLEDGE.getKey(), C2_REST_PATH_ACKNOWLEDGE.getDefaultValue())) .build(); } diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf index 57653e7773..3d7aa5ceb1 100644 --- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf +++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf @@ -134,8 +134,18 @@ java.arg.14=-Djava.awt.headless=true # Enabling C2 Uncomment each of the following options #c2.enable=true ## define protocol parameters +# DEPRECATED: c2.rest.url and c2.rest.url.ack are deprecated in favor of c2.rest.path.* properties and are target to be removed in future release +# The absolute url of the C2 server's heartbeat endpoint, eg.: http://localhost/c2-server/api/heartbeat #c2.rest.url= +# The absolute url of the C2 server's acknowledge endpoint, eg.: http://localhost/c2-server/api/acknowledge #c2.rest.url.ack= +# C2 Rest Path Properties +# The base path of the C2 server's REST API, eg.: http://localhost/c2-server/api +#c2.rest.path.base= +# Relative url of the C2 server's heartbeat endpoint, eg.: /heartbeat +#c2.rest.path.heartbeat= +# Relative url of the C2 server's acknowledge endpoint, eg.: /acknowledge +#c2.rest.path.acknowledge= ## c2 timeouts #c2.rest.connectionTimeout=5 sec #c2.rest.readTimeout=5 sec