This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.15 in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.15 by this push: new 7cedb9b NIFI-9448 Improved S2S HTTP Extend Transaction Exception Handling 7cedb9b is described below commit 7cedb9b8a89f2d3e8e258be24c99f70a34198d71 Author: exceptionfactory <exceptionfact...@apache.org> AuthorDate: Mon Dec 6 13:24:40 2021 -0600 NIFI-9448 Improved S2S HTTP Extend Transaction Exception Handling - Refactor background transaction extension to ExtendTransactionCommand - Avoid closing S2S HTTP client for IllegalStateExceptions - Avoid creating additional S2S HTTP client instance for transaction extension commands - Add check for extend transaction requests received in client test class - Add null check for Peer Persistence implementation in PeerSelector Signed-off-by: Joe Gresock <jgres...@gmail.com> This closes #5577. --- .../apache/nifi/remote/client/PeerSelector.java | 4 +- .../nifi/remote/util/ExtendTransactionCommand.java | 68 +++++++++++++++ .../nifi/remote/util/SiteToSiteRestApiClient.java | 75 ++++++---------- .../nifi/remote/client/http/TestHttpClient.java | 52 ++++++++---- .../remote/client/socket/SiteToSiteClientIT.java | 99 ---------------------- .../remote/util/TestExtendTransactionCommand.java | 85 +++++++++++++++++++ 6 files changed, 216 insertions(+), 167 deletions(-) diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java index 03d191f..c423295 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java @@ -540,7 +540,9 @@ public class PeerSelector { this.peerStatusCache = peerStatusCache; // The #save mechanism persists the cache to stateful or file-based storage - peerPersistence.save(peerStatusCache); + if (peerPersistence != null) { + peerPersistence.save(peerStatusCache); + } } catch (final IOException e) { error(logger, eventReporter, "Failed to persist list of peers due to {}; if restarted" + " and the nodes specified at the remote instance are down," + diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/ExtendTransactionCommand.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/ExtendTransactionCommand.java new file mode 100644 index 0000000..8e427a6 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/ExtendTransactionCommand.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.util; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.web.api.entity.TransactionResultEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Site-to-Site Extend Transaction Command executes background requests for transfer transactions + */ +public class ExtendTransactionCommand implements Runnable { + private static final String CATEGORY = "Site-to-Site"; + + private static final Logger logger = LoggerFactory.getLogger(ExtendTransactionCommand.class); + + private final SiteToSiteRestApiClient client; + + private final String transactionUrl; + + private final EventReporter eventReporter; + + ExtendTransactionCommand(final SiteToSiteRestApiClient client, final String transactionUrl, final EventReporter eventReporter) { + this.client = client; + this.transactionUrl = transactionUrl; + this.eventReporter = eventReporter; + } + + /** + * Run Command and attempt to extend transaction + */ + @Override + public void run() { + try { + final TransactionResultEntity entity = client.extendTransaction(transactionUrl); + logger.debug("Extend Transaction Completed [{}] Code [{}] FlowFiles Sent [{}]", transactionUrl, entity.getResponseCode(), entity.getFlowFileSent()); + } catch (final Throwable e) { + if (e instanceof IllegalStateException) { + logger.debug("Extend Transaction Failed [{}] client connection pool shutdown", transactionUrl, e); + } else { + logger.warn("Extend Transaction Failed [{}]", transactionUrl, e); + final String message = String.format("Extend Transaction Failed [%s]: %s", transactionUrl, e.getMessage()); + eventReporter.reportEvent(Severity.WARNING, CATEGORY, message); + try { + client.close(); + } catch (final Exception closeException) { + logger.warn("Extend Transaction [{}] Close Client Failed", transactionUrl, closeException); + } + } + } + } +} diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java index 3852177..8726c49 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/SiteToSiteRestApiClient.java @@ -170,12 +170,11 @@ public class SiteToSiteRestApiClient implements Closeable { private int batchCount = 0; private long batchSize = 0; private long batchDurationMillis = 0; - private TransportProtocolVersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1); + private final TransportProtocolVersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(1); private String trustedPeerDn; private final ScheduledExecutorService ttlExtendTaskExecutor; private ScheduledFuture<?> ttlExtendingFuture; - private SiteToSiteRestApiClient extendingApiClient; private int connectTimeoutMillis; private int readTimeoutMillis; @@ -199,7 +198,7 @@ public class SiteToSiteRestApiClient implements Closeable { @Override public Thread newThread(final Runnable r) { final Thread thread = defaultFactory.newThread(r); - thread.setName(Thread.currentThread().getName() + " TTLExtend"); + thread.setName(Thread.currentThread().getName() + " Site-to-Site Extend Transactions"); thread.setDaemon(true); return thread; } @@ -208,7 +207,7 @@ public class SiteToSiteRestApiClient implements Closeable { @Override public void close() throws IOException { - stopExtendingTtl(); + stopExtendingTransaction(); closeSilently(httpClient); closeSilently(httpAsyncClient); } @@ -376,7 +375,7 @@ public class SiteToSiteRestApiClient implements Closeable { private ControllerDTO getController() throws IOException { // first check cache and prune any old values. // Periodically prune the map so that we are not keeping entries around forever, in case an RPG is removed - // from he canvas, etc. We want to ensure that we avoid memory leaks, even if they are likely to not cause a problem. + // from the canvas, etc. We want to ensure that we avoid memory leaks, even if they are likely to not cause a problem. if (System.currentTimeMillis() > lastPruneTimestamp + TimeUnit.MINUTES.toMillis(5)) { pruneCache(); } @@ -487,7 +486,7 @@ public class SiteToSiteRestApiClient implements Closeable { if (transportProtocolVersionHeader == null) { throw new ProtocolException("Server didn't return confirmed protocol version"); } - final Integer protocolVersionConfirmedByServer = Integer.valueOf(transportProtocolVersionHeader.getValue()); + final int protocolVersionConfirmedByServer = Integer.parseInt(transportProtocolVersionHeader.getValue()); logger.debug("Finished version negotiation, protocolVersionConfirmedByServer={}", protocolVersionConfirmedByServer); transportProtocolVersionNegotiator.setVersion(protocolVersionConfirmedByServer); @@ -590,7 +589,7 @@ public class SiteToSiteRestApiClient implements Closeable { } @Override - public HttpRequest generateRequest() throws IOException, HttpException { + public HttpRequest generateRequest() { final BasicHttpEntity entity = new BasicHttpEntity(); post.setEntity(entity); return post; @@ -623,12 +622,12 @@ public class SiteToSiteRestApiClient implements Closeable { } @Override - public void resetRequest() throws IOException { + public void resetRequest() { requestHasBeenReset = true; } @Override - public void close() throws IOException { + public void close() { } }; @@ -722,7 +721,7 @@ public class SiteToSiteRestApiClient implements Closeable { if (r < 0) { closed = true; logger.debug("Reached to end of input stream. Closing resources..."); - stopExtendingTtl(); + stopExtendingTransaction(); closeSilently(httpIn); closeSilently(response); } @@ -731,7 +730,7 @@ public class SiteToSiteRestApiClient implements Closeable { }; ((HttpInput) peer.getCommunicationsSession().getInput()).setInputStream(streamCapture); - startExtendingTtl(transactionUrl, httpIn, response); + startExtendingTransaction(transactionUrl); keepItOpen = true; return true; @@ -784,7 +783,7 @@ public class SiteToSiteRestApiClient implements Closeable { } @Override - public HttpRequest generateRequest() throws IOException, HttpException { + public HttpRequest generateRequest() { // Pass the output stream so that Site-to-Site client thread can send // data packet through this connection. @@ -888,17 +887,17 @@ public class SiteToSiteRestApiClient implements Closeable { } @Override - public void resetRequest() throws IOException { + public void resetRequest() { logger.debug("Sending data request to {} has been reset...", flowFilesPath); requestHasBeenReset = true; } @Override - public void close() throws IOException { + public void close() { logger.debug("Closing sending data request to {}", flowFilesPath); closeSilently(outputStream); closeSilently(dataPacketChannel); - stopExtendingTtl(); + stopExtendingTransaction(); } }; @@ -912,7 +911,7 @@ public class SiteToSiteRestApiClient implements Closeable { // Started. transferDataLatch = new CountDownLatch(1); - startExtendingTtl(transactionUrl, dataPacketChannel, null); + startExtendingTransaction(transactionUrl); } catch (final InterruptedException e) { throw new IOException("Awaiting initConnectionLatch has been interrupted.", e); @@ -927,7 +926,7 @@ public class SiteToSiteRestApiClient implements Closeable { } // No more data can be sent. - // Close PipedOutputStream so that dataPacketChannel doesn't blocked. + // Close PipedOutputStream so that dataPacketChannel doesn't get blocked. // If we don't close this output stream, then PipedInputStream loops infinitely at read(). commSession.getOutput().getOutputStream().close(); logger.debug("{} FinishTransferFlowFiles no more data can be sent", this); @@ -940,7 +939,7 @@ public class SiteToSiteRestApiClient implements Closeable { throw new IOException("Awaiting transferDataLatch has been interrupted.", e); } - stopExtendingTtl(); + stopExtendingTransaction(); final HttpResponse response; try { @@ -968,37 +967,15 @@ public class SiteToSiteRestApiClient implements Closeable { } } - private void startExtendingTtl(final String transactionUrl, final Closeable stream, final CloseableHttpResponse response) { + private void startExtendingTransaction(final String transactionUrl) { if (ttlExtendingFuture != null) { - // Already started. return; } - logger.debug("Starting extending TTL thread..."); - - extendingApiClient = new SiteToSiteRestApiClient(sslContext, proxy, EventReporter.NO_OP); - extendingApiClient.transportProtocolVersionNegotiator = this.transportProtocolVersionNegotiator; - extendingApiClient.connectTimeoutMillis = this.connectTimeoutMillis; - extendingApiClient.readTimeoutMillis = this.readTimeoutMillis; - extendingApiClient.localAddress = this.localAddress; - final int extendFrequency = serverTransactionTtl / 2; - - ttlExtendingFuture = ttlExtendTaskExecutor.scheduleWithFixedDelay(() -> { - try { - extendingApiClient.extendTransaction(transactionUrl); - } catch (final Exception e) { - logger.warn("Failed to extend transaction ttl", e); - - try { - // Without disconnecting, Site-to-Site client keep reading data packet, - // while server has already rollback. - this.close(); - } catch (final IOException ec) { - logger.warn("Failed to close", e); - } - } - }, extendFrequency, extendFrequency, TimeUnit.SECONDS); + logger.debug("Extend Transaction Started [{}] Frequency [{} seconds]", transactionUrl, extendFrequency); + final Runnable command = new ExtendTransactionCommand(this, transactionUrl, eventReporter); + ttlExtendingFuture = ttlExtendTaskExecutor.scheduleWithFixedDelay(command, extendFrequency, extendFrequency, TimeUnit.SECONDS); } private void closeSilently(final Closeable closeable) { @@ -1040,17 +1017,15 @@ public class SiteToSiteRestApiClient implements Closeable { } - private void stopExtendingTtl() { + private void stopExtendingTransaction() { if (!ttlExtendTaskExecutor.isShutdown()) { ttlExtendTaskExecutor.shutdown(); } if (ttlExtendingFuture != null && !ttlExtendingFuture.isCancelled()) { - logger.debug("Cancelling extending ttl..."); - ttlExtendingFuture.cancel(true); + final boolean cancelled = ttlExtendingFuture.cancel(true); + logger.debug("Extend Transaction Cancelled [{}]", cancelled); } - - closeSilently(extendingApiClient); } private IOException handleErrResponse(final int responseCode, final InputStream in) throws IOException { @@ -1451,7 +1426,7 @@ public class SiteToSiteRestApiClient implements Closeable { logger.debug("Sending commitReceivingFlowFiles request to transactionUrl: {}, clientResponse={}, checksum={}", transactionUrl, clientResponse, checksum); - stopExtendingTtl(); + stopExtendingTransaction(); final StringBuilder urlBuilder = new StringBuilder(transactionUrl).append("?responseCode=").append(clientResponse.getCode()); if (ResponseCode.CONFIRM_TRANSACTION.equals(clientResponse)) { diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java index 437ae78..e26b4b6 100644 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/http/TestHttpClient.java @@ -18,6 +18,7 @@ package org.apache.nifi.remote.client.http; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; @@ -62,10 +63,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; +import org.junit.jupiter.api.extension.ExtendWith; import org.littleshoot.proxy.HttpProxyServer; import org.littleshoot.proxy.ProxyAuthenticator; import org.littleshoot.proxy.impl.DefaultHttpProxyServer; import org.littleshoot.proxy.impl.ThreadPoolConfiguration; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,6 +86,7 @@ import java.io.OutputStream; import java.net.ServerSocket; import java.net.SocketTimeoutException; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; @@ -89,6 +94,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.commons.lang3.StringUtils.isEmpty; import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_HEADER_NAME; @@ -96,13 +102,15 @@ import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTE import static org.apache.nifi.remote.protocol.http.HttpHeaders.LOCATION_URI_INTENT_VALUE; import static org.apache.nifi.remote.protocol.http.HttpHeaders.PROTOCOL_VERSION; import static org.apache.nifi.remote.protocol.http.HttpHeaders.SERVER_SIDE_TRANSACTION_TTL; -import static org.junit.Assert.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.fail; +@ExtendWith(MockitoExtension.class) public class TestHttpClient { private static final Logger logger = LoggerFactory.getLogger(TestHttpClient.class); @@ -123,6 +131,13 @@ public class TestHttpClient { private static TlsConfiguration tlsConfiguration; + private static final int INITIAL_TRANSACTIONS = 0; + + private static final AtomicInteger outputExtendTransactions = new AtomicInteger(INITIAL_TRANSACTIONS); + + @Mock + private EventReporter eventReporter; + public static class SiteInfoServlet extends HttpServlet { @Override @@ -161,7 +176,7 @@ public class TestHttpClient { @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - // This response simulates when a Site-to-Site is given an URL which has wrong path. + // This response simulates when a Site-to-Site is given a URL which has wrong path. respondWithText(resp, "<p class=\"message-pane-content\">You may have mistyped...</p>", 200); } } @@ -252,6 +267,7 @@ public class TestHttpClient { @Override protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws IOException { + outputExtendTransactions.incrementAndGet(); final int reqProtocolVersion = getReqProtocolVersion(req); final TransactionResultEntity entity = new TransactionResultEntity(); @@ -295,7 +311,7 @@ public class TestHttpClient { } logger.info("finish receiving data packets."); - assertNotNull("Test case should set <serverChecksum> depending on the test scenario.", serverChecksum); + assertNotNull(serverChecksum, "Test case should set <serverChecksum> depending on the test scenario."); respondWithText(resp, serverChecksum, HttpServletResponse.SC_ACCEPTED); } @@ -372,6 +388,7 @@ public class TestHttpClient { fail("Test case timeout."); } } catch (InterruptedException e) { + fail("Test interrupted"); } } @@ -387,7 +404,7 @@ public class TestHttpClient { private static OutputStream getOutputStream(HttpServletRequest req, HttpServletResponse resp) throws IOException { OutputStream outputStream = resp.getOutputStream(); - if (Boolean.valueOf(req.getHeader(HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION))){ + if (Boolean.parseBoolean(req.getHeader(HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION))){ outputStream = new CompressionOutputStream(outputStream); } return outputStream; @@ -396,7 +413,7 @@ public class TestHttpClient { private static DataPacket readIncomingPacket(HttpServletRequest req) throws IOException { final StandardFlowFileCodec codec = new StandardFlowFileCodec(); InputStream inputStream = req.getInputStream(); - if (Boolean.valueOf(req.getHeader(HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION))){ + if (Boolean.parseBoolean(req.getHeader(HttpHeaders.HANDSHAKE_PROPERTY_USE_COMPRESSION))){ inputStream = new CompressionInputStream(inputStream); } @@ -605,6 +622,7 @@ public class TestHttpClient { @BeforeEach public void before() throws Exception { + outputExtendTransactions.set(INITIAL_TRANSACTIONS); testCaseFinished = new CountDownLatch(1); final PeerDTO peer = new PeerDTO(); @@ -708,7 +726,7 @@ public class TestHttpClient { private static void consumeDataPacket(DataPacket packet) throws IOException { final ByteArrayOutputStream bos = new ByteArrayOutputStream(); StreamUtils.copy(packet.getData(), bos); - String contents = new String(bos.toByteArray()); + String contents = new String(bos.toByteArray(), StandardCharsets.UTF_8); logger.info("received: {}, {}", contents, packet.getAttributes()); } @@ -924,7 +942,6 @@ public class TestHttpClient { transaction.complete(); } catch (final IOException e) { if (isProxyEnabled && e.getMessage().contains("504")) { - // Gateway Timeout happens sometimes at Travis CI. logger.warn("Request timeout. Most likely an environment dependent issue.", e); } else { throw e; @@ -1106,7 +1123,7 @@ public class TestHttpClient { } private void completeShouldFail(Transaction transaction) { - assertThrows(IllegalStateException.class, () -> transaction.complete()); + assertThrows(IllegalStateException.class, transaction::complete); } private void confirmShouldFail(Transaction transaction) throws IOException { @@ -1139,7 +1156,7 @@ public class TestHttpClient { serverChecksum = "1345413116"; transaction.send(packet); - IOException e = assertThrows(IOException.class, () -> transaction.confirm()); + IOException e = assertThrows(IOException.class, transaction::confirm); assertTrue(e.getMessage().contains("TimeoutException")); completeShouldFail(transaction); @@ -1313,7 +1330,7 @@ public class TestHttpClient { DataPacket packet; while ((packet = transaction.receive()) != null) { consumeDataPacket(packet); - Thread.sleep(500); + TimeUnit.MILLISECONDS.sleep(500); } transaction.confirm(); transaction.complete(); @@ -1336,12 +1353,11 @@ public class TestHttpClient { @Test public void testReceiveTimeoutAfterDataExchange() throws Exception { - - try ( - SiteToSiteClient client = getDefaultBuilder() - .timeout(5, TimeUnit.SECONDS) - .portName("output-timeout-data-ex") - .build() + try (final SiteToSiteClient client = getDefaultBuilder() + .timeout(3, TimeUnit.SECONDS) + .portName("output-timeout-data-ex") + .eventReporter(eventReporter) + .build() ) { final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); assertNotNull(transaction); @@ -1350,11 +1366,13 @@ public class TestHttpClient { assertNotNull(packet); consumeDataPacket(packet); - IOException e = assertThrows(IOException.class, () -> transaction.receive()); + IOException e = assertThrows(IOException.class, transaction::receive); assertTrue(e.getCause() instanceof SocketTimeoutException); confirmShouldFail(transaction); completeShouldFail(transaction); + + assertNotSame(INITIAL_TRANSACTIONS, outputExtendTransactions.get()); } } diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/SiteToSiteClientIT.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/SiteToSiteClientIT.java deleted file mode 100644 index 4aa8736..0000000 --- a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/SiteToSiteClientIT.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.client.socket; - -import org.apache.nifi.remote.Transaction; -import org.apache.nifi.remote.TransferDirection; -import org.apache.nifi.remote.client.SiteToSiteClient; -import org.apache.nifi.remote.protocol.DataPacket; -import org.apache.nifi.remote.util.StandardDataPacket; -import org.apache.nifi.stream.io.StreamUtils; -import org.junit.jupiter.api.Test; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertNotNull; - -public class SiteToSiteClientIT { - @Test - public void testReceive() throws IOException { - System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); - - final SiteToSiteClient client = new SiteToSiteClient.Builder() - .url("http://localhost:8080/nifi") - .portName("cba") - .requestBatchCount(10) - .build(); - - try { - for (int i = 0; i < 1000; i++) { - final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); - assertNotNull(transaction); - - DataPacket packet; - while (true) { - packet = transaction.receive(); - if (packet == null) { - break; - } - - final InputStream in = packet.getData(); - final long size = packet.getSize(); - final byte[] buff = new byte[(int) size]; - - StreamUtils.fillBuffer(in, buff); - } - - transaction.confirm(); - transaction.complete(); - } - } finally { - client.close(); - } - } - - @Test - public void testSend() throws IOException { - System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); - - final SiteToSiteClient client = new SiteToSiteClient.Builder() - .url("http://localhost:8080/nifi") - .portName("input") - .build(); - - try { - final Transaction transaction = client.createTransaction(TransferDirection.SEND); - assertNotNull(transaction); - - final Map<String, String> attrs = new HashMap<>(); - attrs.put("site-to-site", "yes, please!"); - final byte[] bytes = "Hello".getBytes(); - final ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length); - transaction.send(packet); - - transaction.confirm(); - transaction.complete(); - } finally { - client.close(); - } - } -} diff --git a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestExtendTransactionCommand.java b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestExtendTransactionCommand.java new file mode 100644 index 0000000..1918f82 --- /dev/null +++ b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/util/TestExtendTransactionCommand.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.util; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.web.api.entity.TransactionResultEntity; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.net.SocketTimeoutException; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TestExtendTransactionCommand { + private static final String TRANSACTION_URL = "https://localhost:8443/nifi-api/transaction-id"; + + private static final TransactionResultEntity RESULT_ENTITY = new TransactionResultEntity(); + + @Mock + private SiteToSiteRestApiClient client; + + @Mock + private EventReporter eventReporter; + + private ExtendTransactionCommand command; + + @BeforeEach + public void setCommand() { + command = new ExtendTransactionCommand(client, TRANSACTION_URL, eventReporter); + } + + @Test + public void testRun() throws IOException { + when(client.extendTransaction(eq(TRANSACTION_URL))).thenReturn(RESULT_ENTITY); + + command.run(); + + verifyNoInteractions(eventReporter); + } + + @Test + public void testRunIllegalStateExceptionClientNotClosed() throws IOException { + when(client.extendTransaction(eq(TRANSACTION_URL))).thenThrow(new IllegalStateException()); + + command.run(); + + verifyNoInteractions(eventReporter); + verify(client, never()).close(); + } + + @Test + public void testRunSocketTimeoutExceptionClientClosed() throws IOException { + when(client.extendTransaction(eq(TRANSACTION_URL))).thenThrow(new SocketTimeoutException()); + + command.run(); + + verify(eventReporter).reportEvent(eq(Severity.WARNING), anyString(), anyString()); + verify(client).close(); + } +}