This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
commit b2e3898e171b89e25b5e06f6ee849bb3edabbf9a Author: Peter Gyori <pgy...@apache.org> AuthorDate: Tue Oct 3 14:42:29 2023 +0200 NIFI-12160 Kafka Connect: Check for NAR unpacking before starting Check that required NAR files are unpacked completely before starting the Kafka Connector This closes #7832 Signed-off-by: David Handermann <exceptionfact...@apache.org> --- .../kafka/connect/WorkingDirectoryUtilsTest.java | 222 +++++++++++++++++++++ .../kafka/connect/StatelessKafkaConnectorUtil.java | 10 +- .../nifi/kafka/connect/WorkingDirectoryUtils.java | 100 ++++++++++ 3 files changed, 329 insertions(+), 3 deletions(-) diff --git a/nifi-external/nifi-kafka-connect/nifi-kafka-connector-tests/src/test/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtilsTest.java b/nifi-external/nifi-kafka-connect/nifi-kafka-connector-tests/src/test/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtilsTest.java new file mode 100644 index 0000000000..e98fcaabb4 --- /dev/null +++ b/nifi-external/nifi-kafka-connect/nifi-kafka-connector-tests/src/test/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtilsTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.kafka.connect; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.io.CleanupMode.ALWAYS; + +public class WorkingDirectoryUtilsTest { + + @Test + public void testDeleteNonexistentFile(@TempDir(cleanup = ALWAYS) File tempDir) { + File nonexistentFile = new File(tempDir, "testFile"); + + WorkingDirectoryUtils.purgeDirectory(nonexistentFile); + + assertFalse(nonexistentFile.exists()); + } + + @Test + public void testDeleteFlatFile(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException { + File file = new File(tempDir, "testFile"); + file.createNewFile(); + + WorkingDirectoryUtils.purgeDirectory(file); + + assertFalse(file.exists()); + } + + @Test + public void testDeleteDirectoryWithContents(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException { + File directory = new File(tempDir, "directory"); + File subDirectory = new File(directory, "subDirectory"); + File subDirectoryContent = new File(subDirectory, "subDirectoryContent"); + File directoryContent = new File(directory, "directoryContent"); + + directory.mkdir(); + subDirectory.mkdir(); + subDirectoryContent.createNewFile(); + directoryContent.createNewFile(); + + WorkingDirectoryUtils.purgeDirectory(directory); + + assertFalse(directory.exists()); + } + + @Test + public void testPurgeUnpackedNarsEmptyRootDirectory(@TempDir(cleanup = ALWAYS) File tempDir) { + File rootDirectory = new File(tempDir, "rootDirectory"); + + rootDirectory.mkdir(); + + WorkingDirectoryUtils.purgeIncompleteUnpackedNars(rootDirectory); + + assertTrue(rootDirectory.exists()); + } + + @Test + public void testPurgeUnpackedNarsRootDirectoryWithFilesOnly(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException { + File rootDirectory = new File(tempDir, "rootDirectory"); + File directoryContent1 = new File(rootDirectory, "file1"); + File directoryContent2 = new File(rootDirectory, "file2"); + + rootDirectory.mkdir(); + directoryContent1.createNewFile(); + directoryContent2.createNewFile(); + + WorkingDirectoryUtils.purgeIncompleteUnpackedNars(rootDirectory); + + assertTrue(rootDirectory.exists() && directoryContent1.exists() && directoryContent2.exists()); + } + + @Test + public void testPurgeUnpackedNars(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException { + File rootDirectory = new File(tempDir, "rootDirectory"); + rootDirectory.mkdir(); + TestDirectoryStructure testDirectoryStructure = new TestDirectoryStructure(rootDirectory); + + WorkingDirectoryUtils.purgeIncompleteUnpackedNars(testDirectoryStructure.getRootDirectory()); + + assertTrue(testDirectoryStructure.isConsistent()); + } + + @Test + public void testWorkingDirectoryIntegrityRestored(@TempDir(cleanup = ALWAYS) File tempDir) throws IOException { + /* + workingDirectory + - nar + - extensions + - *TestDirectoryStructure* + - narDirectory + - narFile + - extensions + - *TestDirectoryStructure* + - additionalDirectory + - workingDirectoryFile + */ + File workingDirectory = new File(tempDir, "workingDirectory"); + File nar = new File(workingDirectory, "nar"); + File narExtensions = new File(nar, "extensions"); + File narDirectory = new File(nar, "narDirectory"); + File narFile = new File(nar, "narFile"); + File extensions = new File(workingDirectory, "extensions"); + File additionalDirectory = new File(workingDirectory, "additionalDirectory"); + File workingDirectoryFile = new File(workingDirectory, "workingDirectoryFile"); + + workingDirectory.mkdir(); + nar.mkdir(); + narExtensions.mkdir(); + narDirectory.mkdir(); + narFile.createNewFile(); + extensions.mkdir(); + additionalDirectory.mkdir(); + workingDirectoryFile.createNewFile(); + + TestDirectoryStructure narExtensionsStructure = new TestDirectoryStructure(narExtensions); + TestDirectoryStructure extensionsStructure = new TestDirectoryStructure(extensions); + + WorkingDirectoryUtils.reconcileWorkingDirectory(workingDirectory); + + assertTrue(workingDirectory.exists() + && nar.exists() + && narExtensionsStructure.isConsistent() + && narDirectory.exists() + && narFile.exists() + && extensionsStructure.isConsistent() + && additionalDirectory.exists() + && workingDirectoryFile.exists() + ); + } + + private class TestDirectoryStructure { + /* + rootDirectory + - subDirectory1-nar-unpacked + - subDirectory1File1 + - nar-digest + - subDirectory2 + - subDirectory2File1 + - subDirectory3-nar-unpacked + - subDirectory3Dir1 + - subDirectory3Dir1File1 + - subDirectory3File1 + - fileInRoot + */ + File rootDirectory; + File subDirectory1; + File subDirectory2; + File subDirectory3; + File fileInRoot; + File subDirectory1File1; + File subDirectory1File2; + File subDirectory2File1; + File subDirectory3Dir1; + File subDirectory3File1; + File subDirectory3Dir1File1; + + public TestDirectoryStructure(final File rootDirectory) throws IOException { + this.rootDirectory = rootDirectory; + subDirectory1 = new File(rootDirectory, "subDirectory1-" + WorkingDirectoryUtils.NAR_UNPACKED_SUFFIX); + subDirectory2 = new File(rootDirectory, "subDirector2"); + subDirectory3 = new File(rootDirectory, "subDirector3-" + WorkingDirectoryUtils.NAR_UNPACKED_SUFFIX); + fileInRoot = new File(rootDirectory, "fileInRoot"); + subDirectory1File1 = new File(subDirectory1, "subDirectory1File1"); + subDirectory1File2 = new File(subDirectory1, WorkingDirectoryUtils.HASH_FILENAME); + subDirectory2File1 = new File(subDirectory2, "subDirectory2File1"); + subDirectory3Dir1 = new File(subDirectory3, "subDirectory3Dir1"); + subDirectory3File1 = new File(subDirectory3, "subDirectory3File1"); + subDirectory3Dir1File1 = new File(subDirectory3Dir1, "subDirectory3Dir1File1"); + + subDirectory1.mkdir(); + subDirectory2.mkdir(); + subDirectory3.mkdir(); + fileInRoot.createNewFile(); + subDirectory1File1.createNewFile(); + subDirectory1File2.createNewFile(); + subDirectory2File1.createNewFile(); + subDirectory3File1.createNewFile(); + subDirectory3Dir1.mkdir(); + subDirectory3Dir1File1.createNewFile(); + } + + public File getRootDirectory() { + return rootDirectory; + } + + /** + * Checks if all directories ending in 'nar-unpacked' that have a file named 'nar-digest' within still exist, + * and the directory ending in 'nar-unpacked' without 'nar-digest' has been removed with all of its contents. + * @return true if the above is met. + */ + public boolean isConsistent() { + return (rootDirectory.exists() + && subDirectory1.exists() && subDirectory1File1.exists() && subDirectory1File2.exists() + && subDirectory2.exists() && subDirectory2File1.exists() + && !(subDirectory3.exists() || subDirectory3Dir1.exists() || subDirectory3File1.exists() || subDirectory3Dir1File1.exists()) + && fileInRoot.exists()); + } + } + +} diff --git a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java index dcc138c977..5554217f42 100644 --- a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java +++ b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessKafkaConnectorUtil.java @@ -84,6 +84,7 @@ public class StatelessKafkaConnectorUtil { config.setFlowDefinition(dataflowDefinitionProperties); dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_FLOW_NAME, dataflowName); MDC.setContextMap(Collections.singletonMap("dataflow", dataflowName)); + StatelessDataflow dataflow; // Use a Write Lock to ensure that only a single thread is calling StatelessBootstrap.bootstrap(). // We do this because the bootstrap() method will expand all NAR files into the working directory. @@ -91,13 +92,16 @@ public class StatelessKafkaConnectorUtil { // unpacking NARs at the same time, as it could potentially result in the working directory becoming corrupted. unpackNarLock.lock(); try { + WorkingDirectoryUtils.reconcileWorkingDirectory(engineConfiguration.getWorkingDirectory()); + bootstrap = StatelessBootstrap.bootstrap(engineConfiguration, StatelessNiFiSourceTask.class.getClassLoader()); + + dataflowDefinition = bootstrap.parseDataflowDefinition(dataflowDefinitionProperties, parameterOverrides); + dataflow = bootstrap.createDataflow(dataflowDefinition); } finally { unpackNarLock.unlock(); } - - dataflowDefinition = bootstrap.parseDataflowDefinition(dataflowDefinitionProperties, parameterOverrides); - return bootstrap.createDataflow(dataflowDefinition); + return dataflow; } catch (final Exception e) { throw new RuntimeException("Failed to bootstrap Stateless NiFi Engine", e); } diff --git a/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtils.java b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtils.java new file mode 100644 index 0000000000..1453267a30 --- /dev/null +++ b/nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/WorkingDirectoryUtils.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.kafka.connect; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Arrays; + +public class WorkingDirectoryUtils { + + protected static final String NAR_UNPACKED_SUFFIX = "nar-unpacked"; + protected static final String HASH_FILENAME = "nar-digest"; + private static final Logger logger = LoggerFactory.getLogger(WorkingDirectoryUtils.class); + + /** + * Goes through the nar/extensions and extensions directories within the working directory + * and deletes every directory whose name ends in "nar-unpacked" and does not have a + * "nar-digest" file in it. + * @param workingDirectory File object pointing to the working directory. + */ + public static void reconcileWorkingDirectory(final File workingDirectory) { + purgeIncompleteUnpackedNars(new File(new File(workingDirectory, "nar"), "extensions")); + purgeIncompleteUnpackedNars(new File(workingDirectory, "extensions")); + } + + /** + * Receives a directory as parameter and goes through every directory within it that ends in + * "nar-unpacked". If a directory ending in "nar-unpacked" does not have a file named + * "nar-digest" within it, it gets deleted with all of its contents. + * @param directory A File object pointing to the directory that is supposed to contain + * further directories whose name ends in "nar-unpacked". + */ + public static void purgeIncompleteUnpackedNars(final File directory) { + final File[] unpackedDirs = directory.listFiles(file -> file.isDirectory() && file.getName().endsWith(NAR_UNPACKED_SUFFIX)); + if (unpackedDirs == null || unpackedDirs.length == 0) { + logger.debug("Found no unpacked NARs in {}", directory); + if (logger.isDebugEnabled()) { + logger.debug("Directory contains: {}", Arrays.deepToString(directory.listFiles())); + } + return; + } + + for (final File unpackedDir : unpackedDirs) { + final File narHashFile = new File(unpackedDir, HASH_FILENAME); + if (narHashFile.exists()) { + logger.debug("Already successfully unpacked {}", unpackedDir); + } else { + purgeDirectory(unpackedDir); + } + } + } + + /** + * Delete a directory with all of its contents. + * @param directory The directory to be deleted. + */ + public static void purgeDirectory(final File directory) { + if (directory.exists()) { + deleteRecursively(directory); + logger.debug("Cleaned up {}", directory); + } + } + + private static void deleteRecursively(final File fileOrDirectory) { + if (fileOrDirectory.isDirectory()) { + final File[] files = fileOrDirectory.listFiles(); + if (files != null) { + for (final File file : files) { + deleteRecursively(file); + } + } + } + deleteQuietly(fileOrDirectory); + } + + private static void deleteQuietly(final File file) { + final boolean deleted = file.delete(); + if (!deleted) { + logger.debug("Failed to cleanup temporary file {}", file); + } + } + +}