NIFI-731: Updated admin guide to explain the flowfile repo properties more; added new property for now often to 'sync' the flowfile repo; allowed number of partitions in flowfile repo to be changed after repo created. Minor tweaks to content repo and its communication with flowfile repo via ContentClaimManager
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/bb5128be Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/bb5128be Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/bb5128be Branch: refs/heads/NIFI-731 Commit: bb5128be2246dc48d5200e98572c47ff7cdfb905 Parents: fb0a1a7 Author: Mark Payne <marka...@hotmail.com> Authored: Sun Jun 28 13:07:04 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Sun Jun 28 13:36:43 2015 -0400 ---------------------------------------------------------------------- nifi/nifi-assembly/pom.xml | 3 +- .../org/apache/nifi/util/NiFiProperties.java | 63 +++++++------ .../org/wali/MinimalLockingWriteAheadLog.java | 99 +++++++++++++++----- .../wali/TestMinimalLockingWriteAheadLog.java | 97 ++++++++++++++++++- .../src/main/asciidoc/administration-guide.adoc | 25 ++++- .../apache/nifi/controller/FlowController.java | 25 ++--- .../repository/FileSystemRepository.java | 2 +- .../WriteAheadFlowFileRepository.java | 18 +++- .../claim/StandardContentClaimManager.java | 21 ++++- .../repository/TestFileSystemRepository.java | 12 ++- .../repository/TestStandardProcessSession.java | 43 ++++----- .../TestVolatileContentRepository.java | 5 +- .../TestWriteAheadFlowFileRepository.java | 7 +- .../src/main/resources/conf/nifi.properties | 1 + 14 files changed, 311 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml index 9b17617..ac39ca6 100644 --- a/nifi/nifi-assembly/pom.xml +++ b/nifi/nifi-assembly/pom.xml @@ -229,9 +229,10 @@ language governing permissions and limitations under the License. --> <nifi.flowfile.repository.implementation>org.apache.nifi.controller.repository.WriteAheadFlowFileRepository</nifi.flowfile.repository.implementation> <nifi.flowfile.repository.directory>./flowfile_repository</nifi.flowfile.repository.directory> - <nifi.flowfile.repository.partitions>256</nifi.flowfile.repository.partitions> + <nifi.flowfile.repository.partitions>16</nifi.flowfile.repository.partitions> <nifi.flowfile.repository.checkpoint.interval>2 mins</nifi.flowfile.repository.checkpoint.interval> <nifi.flowfile.repository.always.sync>false</nifi.flowfile.repository.always.sync> + <nifi.flowfile.repository.updates.between.sync>128</nifi.flowfile.repository.updates.between.sync> <nifi.swap.manager.implementation>org.apache.nifi.controller.FileSystemSwapManager</nifi.swap.manager.implementation> <nifi.queue.swap.threshold>20000</nifi.queue.swap.threshold> <nifi.swap.in.period>5 sec</nifi.swap.in.period> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index e25f5d6..6dadf12 100644 --- a/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -80,7 +80,8 @@ public class NiFiProperties extends Properties { // flowfile repository properties public static final String FLOWFILE_REPOSITORY_IMPLEMENTATION = "nifi.flowfile.repository.implementation"; - public static final String FLOWFILE_REPOSITORY_ALWAYS_SYNC = "nifi.flowfile.repository.always.sync"; + public static final String FLOWFILE_REPOSITORY_ALWAYS_SYNC = "nifi.flowfile.repository.always.sync"; // deprecated + public static final String FLOWFILE_REPOSITORY_UPDATES_BETWEEN_SYNCS = "nifi.flowfile.repository.updates.between.sync"; public static final String FLOWFILE_REPOSITORY_DIRECTORY = "nifi.flowfile.repository.directory"; public static final String FLOWFILE_REPOSITORY_PARTITIONS = "nifi.flowfile.repository.partitions"; public static final String FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL = "nifi.flowfile.repository.checkpoint.interval"; @@ -282,7 +283,7 @@ public class NiFiProperties extends Properties { public File getFlowConfigurationFile() { try { return new File(getProperty(FLOW_CONFIGURATION_FILE)); - } catch (Exception ex) { + } catch (final Exception ex) { return null; } } @@ -290,7 +291,7 @@ public class NiFiProperties extends Properties { public File getFlowConfigurationFileDir() { try { return getFlowConfigurationFile().getParentFile(); - } catch (Exception ex) { + } catch (final Exception ex) { return null; } } @@ -396,7 +397,7 @@ public class NiFiProperties extends Properties { */ public Path getTemplateDirectory() { final String strVal = getProperty(TEMPLATE_DIRECTORY); - return (strVal == null) ? DEFAULT_TEMPLATE_DIRECTORY : Paths.get(strVal); + return strVal == null ? DEFAULT_TEMPLATE_DIRECTORY : Paths.get(strVal); } /** @@ -474,7 +475,7 @@ public class NiFiProperties extends Properties { */ public boolean getNeedClientAuth() { boolean needClientAuth = true; - String rawNeedClientAuth = getProperty(SECURITY_NEED_CLIENT_AUTH); + final String rawNeedClientAuth = getProperty(SECURITY_NEED_CLIENT_AUTH); if ("false".equalsIgnoreCase(rawNeedClientAuth)) { needClientAuth = false; } @@ -488,7 +489,7 @@ public class NiFiProperties extends Properties { public boolean getSupportNewAccountRequests() { boolean shouldSupport = true; - String rawShouldSupport = getProperty(SECURITY_SUPPORT_NEW_ACCOUNT_REQUESTS); + final String rawShouldSupport = getProperty(SECURITY_SUPPORT_NEW_ACCOUNT_REQUESTS); if ("false".equalsIgnoreCase(rawShouldSupport)) { shouldSupport = false; } @@ -500,7 +501,7 @@ public class NiFiProperties extends Properties { Integer port = null; try { port = Integer.parseInt(getProperty(WEB_HTTP_PORT)); - } catch (NumberFormatException nfe) { + } catch (final NumberFormatException nfe) { } return port; } @@ -509,7 +510,7 @@ public class NiFiProperties extends Properties { Integer sslPort = null; try { sslPort = Integer.parseInt(getProperty(WEB_HTTPS_PORT)); - } catch (NumberFormatException nfe) { + } catch (final NumberFormatException nfe) { } return sslPort; } @@ -540,15 +541,15 @@ public class NiFiProperties extends Properties { public List<Path> getNarLibraryDirectories() { - List<Path> narLibraryPaths = new ArrayList<>(); + final List<Path> narLibraryPaths = new ArrayList<>(); // go through each property - for (String propertyName : stringPropertyNames()) { + for (final String propertyName : stringPropertyNames()) { // determine if the property is a nar library path if (StringUtils.startsWith(propertyName, NAR_LIBRARY_DIRECTORY_PREFIX) || NAR_LIBRARY_DIRECTORY.equals(propertyName)) { // attempt to resolve the path specified - String narLib = getProperty(propertyName); + final String narLib = getProperty(propertyName); if (!StringUtils.isBlank(narLib)) { narLibraryPaths.add(Paths.get(narLib)); } @@ -615,10 +616,10 @@ public class NiFiProperties extends Properties { public InetSocketAddress getClusterProtocolMulticastAddress() { try { - String multicastAddress = getProperty(CLUSTER_PROTOCOL_MULTICAST_ADDRESS); - int multicastPort = Integer.parseInt(getProperty(CLUSTER_PROTOCOL_MULTICAST_PORT)); + final String multicastAddress = getProperty(CLUSTER_PROTOCOL_MULTICAST_ADDRESS); + final int multicastPort = Integer.parseInt(getProperty(CLUSTER_PROTOCOL_MULTICAST_PORT)); return new InetSocketAddress(multicastAddress, multicastPort); - } catch (Exception ex) { + } catch (final Exception ex) { throw new RuntimeException("Invalid multicast address/port due to: " + ex, ex); } } @@ -641,7 +642,7 @@ public class NiFiProperties extends Properties { try { return Integer .parseInt(getProperty(CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS)); - } catch (NumberFormatException nfe) { + } catch (final NumberFormatException nfe) { return DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS; } } @@ -662,9 +663,9 @@ public class NiFiProperties extends Properties { if (StringUtils.isBlank(socketAddress)) { socketAddress = "localhost"; } - int socketPort = getClusterNodeProtocolPort(); + final int socketPort = getClusterNodeProtocolPort(); return InetSocketAddress.createUnresolved(socketAddress, socketPort); - } catch (Exception ex) { + } catch (final Exception ex) { throw new RuntimeException("Invalid node protocol address/port due to: " + ex, ex); } } @@ -672,7 +673,7 @@ public class NiFiProperties extends Properties { public Integer getClusterNodeProtocolPort() { try { return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_PORT)); - } catch (NumberFormatException nfe) { + } catch (final NumberFormatException nfe) { return null; } } @@ -680,7 +681,7 @@ public class NiFiProperties extends Properties { public int getClusterNodeProtocolThreads() { try { return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_THREADS)); - } catch (NumberFormatException nfe) { + } catch (final NumberFormatException nfe) { return DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS; } } @@ -691,10 +692,10 @@ public class NiFiProperties extends Properties { if (StringUtils.isBlank(socketAddress)) { socketAddress = "localhost"; } - int socketPort = Integer + final int socketPort = Integer .parseInt(getProperty(CLUSTER_NODE_UNICAST_MANAGER_PROTOCOL_PORT)); return InetSocketAddress.createUnresolved(socketAddress, socketPort); - } catch (Exception ex) { + } catch (final Exception ex) { throw new RuntimeException("Invalid unicast manager address/port due to: " + ex, ex); } } @@ -710,9 +711,9 @@ public class NiFiProperties extends Properties { if (StringUtils.isBlank(socketAddress)) { socketAddress = "localhost"; } - int socketPort = getClusterManagerProtocolPort(); + final int socketPort = getClusterManagerProtocolPort(); return InetSocketAddress.createUnresolved(socketAddress, socketPort); - } catch (Exception ex) { + } catch (final Exception ex) { throw new RuntimeException("Invalid manager protocol address/port due to: " + ex, ex); } } @@ -720,7 +721,7 @@ public class NiFiProperties extends Properties { public Integer getClusterManagerProtocolPort() { try { return Integer.parseInt(getProperty(CLUSTER_MANAGER_PROTOCOL_PORT)); - } catch (NumberFormatException nfe) { + } catch (final NumberFormatException nfe) { return null; } } @@ -737,7 +738,7 @@ public class NiFiProperties extends Properties { public int getClusterManagerNodeEventHistorySize() { try { return Integer.parseInt(getProperty(CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE)); - } catch (NumberFormatException nfe) { + } catch (final NumberFormatException nfe) { return DEFAULT_CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE; } } @@ -755,7 +756,7 @@ public class NiFiProperties extends Properties { public int getClusterManagerNodeApiRequestThreads() { try { return Integer.parseInt(getProperty(CLUSTER_MANAGER_NODE_API_REQUEST_THREADS)); - } catch (NumberFormatException nfe) { + } catch (final NumberFormatException nfe) { return DEFAULT_CLUSTER_MANAGER_NODE_API_NUM_REQUEST_THREADS; } } @@ -768,7 +769,7 @@ public class NiFiProperties extends Properties { public int getClusterManagerProtocolThreads() { try { return Integer.parseInt(getProperty(CLUSTER_MANAGER_PROTOCOL_THREADS)); - } catch (NumberFormatException nfe) { + } catch (final NumberFormatException nfe) { return DEFAULT_CLUSTER_MANAGER_PROTOCOL_THREADS; } } @@ -790,7 +791,7 @@ public class NiFiProperties extends Properties { public InetSocketAddress getNodeApiAddress() { final String rawScheme = getClusterProtocolManagerToNodeApiScheme(); - final String scheme = (rawScheme == null) ? "http" : rawScheme; + final String scheme = rawScheme == null ? "http" : rawScheme; final String host; final Integer port; @@ -861,7 +862,7 @@ public class NiFiProperties extends Properties { final Map<String, Path> contentRepositoryPaths = new HashMap<>(); // go through each property - for (String propertyName : stringPropertyNames()) { + for (final String propertyName : stringPropertyNames()) { // determine if the property is a file repository path if (StringUtils.startsWith(propertyName, REPOSITORY_CONTENT_PREFIX)) { // get the repository key @@ -887,7 +888,7 @@ public class NiFiProperties extends Properties { final Map<String, Path> provenanceRepositoryPaths = new HashMap<>(); // go through each property - for (String propertyName : stringPropertyNames()) { + for (final String propertyName : stringPropertyNames()) { // determine if the property is a file repository path if (StringUtils.startsWith(propertyName, PROVENANCE_REPO_DIRECTORY_PREFIX)) { // get the repository key @@ -904,7 +905,7 @@ public class NiFiProperties extends Properties { public int getMaxFlowFilesPerClaim() { try { return Integer.parseInt(getProperty(MAX_FLOWFILES_PER_CLAIM)); - } catch (NumberFormatException nfe) { + } catch (final NumberFormatException nfe) { return DEFAULT_MAX_FLOWFILES_PER_CLAIM; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java index 5c8b4c8..4331504 100644 --- a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java +++ b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java @@ -60,7 +60,6 @@ import java.util.regex.Pattern; import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.BufferedOutputStream; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +79,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor private final Path partialPath; private final Path snapshotPath; + private final SortedSet<Path> basePaths; private final SerDe<T> serde; private final SyncListener syncListener; private final FileChannel lockChannel; @@ -120,9 +120,9 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor * @param syncListener the listener * @throws IOException if unable to initialize due to IO issue */ - @SuppressWarnings("unchecked") public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws IOException { this.syncListener = syncListener; + this.basePaths = paths; requireNonNull(paths); requireNonNull(serde); @@ -131,7 +131,6 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor throw new IllegalArgumentException("Paths must be non-empty"); } - int existingPartitions = 0; for (final Path path : paths) { if (!Files.exists(path)) { Files.createDirectories(path); @@ -150,21 +149,6 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor if (!file.canExecute()) { throw new IOException("Path given [" + path + "] is not executable"); } - - final File[] children = file.listFiles(); - if (children != null) { - for (final File child : children) { - if (child.isDirectory() && child.getName().startsWith("partition-")) { - existingPartitions++; - } - } - - if (existingPartitions != 0 && existingPartitions != partitionCount) { - logger.warn("Constructing MinimalLockingWriteAheadLog with partitionCount={}, but the repository currently has " - + "{} partitions; ignoring argument and proceeding with {} partitions", - new Object[]{partitionCount, existingPartitions, existingPartitions}); - } - } } this.basePath = paths.iterator().next(); @@ -176,19 +160,26 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor lockChannel = new FileOutputStream(lockPath.toFile()).getChannel(); lockChannel.lock(); - partitions = new Partition[partitionCount]; + partitions = createPartitions(partitionCount); + } - Iterator<Path> pathIterator = paths.iterator(); - for (int i = 0; i < partitionCount; i++) { + private Partition<T>[] createPartitions(final int count) throws IOException { + @SuppressWarnings("unchecked") + final Partition<T>[] partitions = new Partition[count]; + + Iterator<Path> pathIterator = basePaths.iterator(); + for (int i = 0; i < count; i++) { // If we're out of paths, create a new iterator to start over. if (!pathIterator.hasNext()) { - pathIterator = paths.iterator(); + pathIterator = basePaths.iterator(); } final Path partitionBasePath = pathIterator.next(); partitions[i] = new Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, getVersion()); } + + return partitions; } @Override @@ -284,11 +275,30 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor throw new IllegalStateException("Cannot recover records after updating the repository; must call recoverRecords first"); } + int existingPartitionCount = 0; final long recoverStart = System.nanoTime(); writeLock.lock(); try { + Long maxTransactionId = recoverFromSnapshot(recordMap); - recoverFromEdits(recordMap, maxTransactionId); + + for (final Path basePath : basePaths) { + final File file = basePath.toFile(); + final File[] children = file.listFiles(); + if (children != null) { + for (final File child : children) { + if (child.isDirectory() && child.getName().startsWith("partition-")) { + existingPartitionCount++; + } + } + } + } + + final Partition<T>[] recoverablePartitions = createPartitions(existingPartitionCount); + recoverFromEdits(recordMap, recoverablePartitions, maxTransactionId); + for (final Partition<T> partition : recoverablePartitions) { + partition.close(); + } for (final Partition<T> partition : partitions) { final long transId = partition.getMaxRecoveredTransactionId(); @@ -308,10 +318,48 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor logger.info("Successfully recovered {} records in {} milliseconds", recordMap.size(), recoveryMillis); checkpoint(); + // If there are any partitions that have an index higher than the current desired number + // of partitions, delete the partition. + for (final Path basePath : basePaths) { + final File file = basePath.toFile(); + final File[] children = file.listFiles(); + if (children != null) { + for (final File child : children) { + if (child.isDirectory() && child.getName().startsWith("partition-")) { + final String indexName = child.getName().substring(10); + final int index; + try { + index = Integer.parseInt(indexName); + } catch (final NumberFormatException nfe) { + // not a valid partition; ignore it and more on + continue; + } + + if (index >= partitions.length) { + deleteRecursively(child); + } + } + } + } + } + recovered = true; return recordMap.values(); } + private void deleteRecursively(final File file) { + if (file.isDirectory()) { + final File[] children = file.listFiles(); + if (children != null) { + for (final File child : children) { + deleteRecursively(child); + } + } + } + + file.delete(); + } + @Override public Set<String> getRecoveredSwapLocations() throws IOException { return recoveredExternalLocations; @@ -398,10 +446,11 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor * before modification. * * @param modifiableRecordMap map + * @param partitions the partitions to use to recover records from * @param maxTransactionIdRestored index of max restored transaction * @throws IOException if unable to recover from edits */ - private void recoverFromEdits(final Map<Object, T> modifiableRecordMap, final Long maxTransactionIdRestored) throws IOException { + private void recoverFromEdits(final Map<Object, T> modifiableRecordMap, final Partition<T>[] partitions, final Long maxTransactionIdRestored) throws IOException { final Map<Object, T> updateMap = new HashMap<>(); final Map<Object, T> unmodifiableRecordMap = Collections.unmodifiableMap(modifiableRecordMap); final Map<Object, T> ignorableMap = new HashMap<>(); @@ -918,7 +967,7 @@ public final class MinimalLockingWriteAheadLog<T> implements WriteAheadRepositor public Long getNextRecoverableTransactionId() throws IOException { while (true) { - DataInputStream recoveryStream = getRecoveryStream(); + final DataInputStream recoveryStream = getRecoveryStream(); if (recoveryStream == null) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java index 57f3495..f5971c6 100644 --- a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java +++ b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java @@ -28,6 +28,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.SortedSet; @@ -149,6 +150,98 @@ public class TestMinimalLockingWriteAheadLog { } @Test + public void testDecreasingPartitionCount() throws IOException { + final Path path = Paths.get("target/test-decrease-partition-count"); + deleteRecursively(path.toFile()); + assertTrue(path.toFile().mkdirs()); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, 64, serde, null); + final Collection<DummyRecord> initialRecs = repo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + for (int i = 0; i < 64; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + repo.update(Collections.singleton(record), false); + } + + repo.shutdown(); + + for (int i = 0; i < 64; i++) { + final String partName = "partition-" + i; + final Path partitionPath = path.resolve(partName); + assertTrue(Files.exists(partitionPath)); + } + + // create a new repo with only 8 partitions, verify that the other partitions are deleted, but that all + // records still exist. We do this twice to ensure that after we checkpoint with only 8 partitions, we + // still recover all 64 records again the next time. + for (int recoverIndex = 0; recoverIndex < 2; recoverIndex++) { + final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, 8, serde, null); + final Collection<DummyRecord> recovered = recoverRepo.recoverRecords(); + assertEquals(64, recovered.size()); + + assertTrue(Files.exists(path)); + + for (int i = 0; i < 8; i++) { + final String partName = "partition-" + i; + final Path partitionPath = path.resolve(partName); + assertTrue(Files.exists(partitionPath)); + } + for (int i = 8; i < 64; i++) { + final String partName = "partition-" + i; + final Path partitionPath = path.resolve(partName); + assertFalse(Files.exists(partitionPath)); + } + + recoverRepo.shutdown(); + } + } + + @Test + public void testIncreasingPartitionCount() throws IOException { + final Path path = Paths.get("target/test-decrease-partition-count"); + deleteRecursively(path.toFile()); + assertTrue(path.toFile().mkdirs()); + + final DummyRecordSerde serde = new DummyRecordSerde(); + final WriteAheadRepository<DummyRecord> repo = new MinimalLockingWriteAheadLog<>(path, 8, serde, null); + final Collection<DummyRecord> initialRecs = repo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + for (int i = 0; i < 64; i++) { + final DummyRecord record = new DummyRecord(String.valueOf(i), UpdateType.CREATE); + repo.update(Collections.singleton(record), false); + } + + repo.shutdown(); + + for (int i = 0; i < 8; i++) { + final String partName = "partition-" + i; + final Path partitionPath = path.resolve(partName); + assertTrue(Files.exists(partitionPath)); + } + + // create a new repo with 64 partitions. We do this twice to ensure that after we checkpoint with 64 partitions, we + // still recover all 64 records again the next time. + for (int recoverIndex = 0; recoverIndex < 2; recoverIndex++) { + final WriteAheadRepository<DummyRecord> recoverRepo = new MinimalLockingWriteAheadLog<>(path, 64, serde, null); + final Collection<DummyRecord> recovered = recoverRepo.recoverRecords(); + assertEquals(64, recovered.size()); + + assertTrue(Files.exists(path)); + + for (int i = 0; i < 64; i++) { + final String partName = "partition-" + i; + final Path partitionPath = path.resolve(partName); + assertTrue(Files.exists(partitionPath)); + } + + recoverRepo.shutdown(); + } + } + + @Test public void testCannotModifyLogAfterAllAreBlackListed() throws IOException { final int numPartitions = 5; final Path path = Paths.get("target/minimal-locking-repo-test-cannot-modify-after-all-blacklisted"); @@ -275,10 +368,10 @@ public class TestMinimalLockingWriteAheadLog { try { int counter = 0; for (final List<DummyRecord> list : records) { - final boolean forceSync = (++counter == records.size()); + final boolean forceSync = ++counter == records.size(); repo.update(list, forceSync); } - } catch (IOException e) { + } catch (final IOException e) { Assert.fail("Failed to update: " + e.toString()); e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-docs/src/main/asciidoc/administration-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi/nifi-docs/src/main/asciidoc/administration-guide.adoc index 5535c35..8c14f20 100644 --- a/nifi/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -433,9 +433,30 @@ to configure it on a separate drive if available. |*Property*|*Description* |nifi.flowfile.repository.implementation|The FlowFile Repository implementation. The default value is org.apache.nifi.controller.repository.WriteAheadFlowFileRepository and should not be changed. |nifi.flowfile.repository.directory*|The location of the FlowFile Repository. The default value is ./flowfile_repository. -|nifi.flowfile.repository.partitions|The number of partitions. The default value is 256. +|nifi.flowfile.repository.partitions|The number of partitions. This number indicates the maximum number of threads that are allowed to + update the FlowFile Repository simultaneously. However, each of these partitions must periodically be synchronized to the + underlying disk, which is very expensive. As a result, when this happens, the more partitions that exist, the more expensive + this synchronization is. Until version 0.2.0-incubating, the default value was 256. Starting at version 0.2.0-incubating, the default + value was changed to 32. |nifi.flowfile.repository.checkpoint.interval| The FlowFile Repository checkpoint interval. The default value is 2 mins. -|nifi.flowfile.repository.always.sync|If set to _true_, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very expensive and can significantly reduce NiFi performance. However, if it is _false_, there could be the potential for data loss if either there is a sudden power loss or the operating system crashes. The default value is _false_. +|nifi.flowfile.repository.always.sync|_*Deprecated*: Use of this property is deprecated as of NiFi 0.2.0-incubating. This property will be + removed in a later version of NiFi. In order to gain the equivalent functionality, set the value of the `nifi.flowfile.repository.updates.between.sync` + property to `1`._ + If set to _true_, any change to the repository will be synchronized to the disk, + meaning that NiFi will ask the operating system not to cache the information. This is very expensive and can significantly + reduce NiFi performance. However, if it is _false_, there could be the potential for data loss if either there is a sudden + power loss or the operating system crashes. The default value is _false_. +|nifi.flowfile.repository.updates.between.sync|Specifies the number of times that the repository can be updated before a partition is + synchronized with disk. A value of `0` indicates that it should never been synchronized on update. A value of `1` indicates that it should + be synchronized with each update. Very small values for this property may result in reduced overall + performance due to excessive disk utilization, as it causes the system to bypass the Operating System's caching mechanisms. + However, setting the value too large can result in sporadic performance. This is due to the fact that in order to prevent data loss, + the Content Repository cannot destroy content until the FlowFile Repository has performed a 'sync'. As a result, if this value is + very large (or set to 0), this will happen only when the repository is checkpointed. Consequently, each time that the repository is + checkpointed, the Content Repository is forced to destroy a potentially very large number of files, which drastically reduces overall + disk performance. Additionally, if the number of files is very large, the system will be forced to block while these deletions take + place in order to avoid depleting all of the system's resources. This property was introduced in version + 0.2.0-incubating. The default value for this property is 128. |==== *Swap Management* + http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 2ffdd4e..8144373 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -280,7 +280,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final NodeProtocolSender protocolSender; private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks"); - private final ContentClaimManager contentClaimManager = new StandardContentClaimManager(); + private final ContentClaimManager contentClaimManager; // guarded by rwLock /** @@ -389,14 +389,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process")); eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process")); + bulletinRepository = new VolatileBulletinRepository(); + nodeBulletinSubscriber = new AtomicReference<>(); + + contentClaimManager = new StandardContentClaimManager(createEventReporter(bulletinRepository)); final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, contentClaimManager); flowFileRepository = flowFileRepo; flowFileEventRepository = flowFileEventRepo; counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository()); - bulletinRepository = new VolatileBulletinRepository(); - nodeBulletinSubscriber = new AtomicReference<>(); - try { this.provenanceEventRepository = createProvenanceRepository(properties); this.provenanceEventRepository.initialize(createEventReporter(bulletinRepository)); @@ -593,7 +594,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R writeLock.lock(); try { if (startDelayedComponents) { - LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size())); + LOG.info("Starting {} processors/ports/funnels", startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size()); for (final Connectable connectable : startConnectablesAfterInitialization) { if (connectable.getScheduledState() == ScheduledState.DISABLED) { continue; @@ -1012,7 +1013,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public boolean isTerminated() { this.readLock.lock(); try { - return (null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated()); + return null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated(); } finally { this.readLock.unlock(); } @@ -1828,9 +1829,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } else if (id1.equals(id2)) { return true; } else { - final String comparable1 = (id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1); - final String comparable2 = (id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2); - return (comparable1.equals(comparable2)); + final String comparable1 = id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1; + final String comparable2 = id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2; + return comparable1.equals(comparable2); } } @@ -1964,7 +1965,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id; - return (root == null) ? null : root.findProcessGroup(searchId); + return root == null ? null : root.findProcessGroup(searchId); } @Override @@ -2079,8 +2080,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut()); connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut()); - flowFilesTransferred += (connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut()); - bytesTransferred += (connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut()); + flowFilesTransferred += connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut(); + bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut(); } if (StringUtils.isNotBlank(conn.getName())) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 4e4609a..864058c 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -539,7 +539,7 @@ public class FileSystemRepository implements ContentRepository { Path path = null; try { - path = getPath(claim, false); + path = getPath(claim, false, checkArchive); } catch (final ContentNotFoundException cnfe) { } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index cb14fea..668adb8 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -82,6 +82,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis private final Path flowFileRepositoryPath; private final int numPartitions; private final ScheduledExecutorService checkpointExecutor; + private final AtomicLong updateCount = new AtomicLong(0L); + private final int updatesBetweenSyncs; // effectively final private WriteAheadRepository<RepositoryRecord> wal; @@ -120,7 +122,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis flowFileRepositoryPath = properties.getFlowFileRepositoryPath(); numPartitions = properties.getFlowFileRepositoryPartitions(); checkpointDelayMillis = FormatUtils.getTimeDuration(properties.getFlowFileRepositoryCheckpointInterval(), TimeUnit.MILLISECONDS); - + updatesBetweenSyncs = properties.getIntegerProperty(NiFiProperties.FLOWFILE_REPOSITORY_UPDATES_BETWEEN_SYNCS, 128); checkpointExecutor = Executors.newSingleThreadScheduledExecutor(); } @@ -163,11 +165,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis return Files.getFileStore(flowFileRepositoryPath).getUsableSpace(); } - private final AtomicLong updateCount = new AtomicLong(0L); @Override public void updateRepository(final Collection<RepositoryRecord> records) throws IOException { final long updates = updateCount.incrementAndGet(); - updateRepository(records, alwaysSync || updates % 100 == 0); + updateRepository(records, alwaysSync || updatesBetweenSyncs > 0 && updates % updatesBetweenSyncs == 0); } private void updateRepository(final Collection<RepositoryRecord> records, final boolean sync) throws IOException { @@ -241,18 +242,27 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis for (final ContentClaim claim : claimsToDestroy) { markDestructable(claim); } + + logger.info("Marked {} Content Claims as destructable for FlowFile Repository partition {}", claimsToDestroy.size(), partitionIndex); } @Override public void onGlobalSync() { - for (final BlockingQueue<ContentClaim> claimQueue : claimsAwaitingDestruction.values()) { + int destroyed = 0; + for (final Map.Entry<Integer, BlockingQueue<ContentClaim>> entry : claimsAwaitingDestruction.entrySet()) { + final Integer partitionIndex = entry.getKey(); + final BlockingQueue<ContentClaim> claimQueue = entry.getValue(); final Set<ContentClaim> claimsToDestroy = new HashSet<>(); claimQueue.drainTo(claimsToDestroy); + destroyed += claimsToDestroy.size(); for (final ContentClaim claim : claimsToDestroy) { markDestructable(claim); } + logger.debug("As part of checkpointing FlowFile Repository, marked {} claims as destructable for partition {}", claimsToDestroy.size(), partitionIndex); } + + logger.info("Marked {} claims as destructable as a result of checkpointing FlowFile repository", destroyed); } /** http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java index 95f4c7f..f5494f9 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java @@ -24,6 +24,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.reporting.Severity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +35,11 @@ public class StandardContentClaimManager implements ContentClaimManager { private static final Logger logger = LoggerFactory.getLogger(StandardContentClaimManager.class); private static final ConcurrentMap<String, BlockingQueue<ContentClaim>> destructableClaims = new ConcurrentHashMap<>(); + private final EventReporter eventReporter; + + public StandardContentClaimManager(final EventReporter eventReporter) { + this.eventReporter = eventReporter; + } @Override public ContentClaim newContentClaim(final String container, final String section, final String id, final boolean lossTolerant) { @@ -114,7 +121,19 @@ public class StandardContentClaimManager implements ContentClaimManager { logger.debug("Marking claim {} as destructable", claim); try { final BlockingQueue<ContentClaim> destructableQueue = getDestructableClaimQueue(claim.getContainer()); - while (!destructableQueue.offer(claim, 30, TimeUnit.MINUTES)) { + final boolean accepted = destructableQueue.offer(claim); + if (!accepted) { + final long start = System.nanoTime(); + + while (!destructableQueue.offer(claim, 30, TimeUnit.MINUTES)) { + } + + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + if (millis > 10L) { + logger.warn("Total wait duration to add claim to Destructable Claim Queue was {} millis", millis); + eventReporter.reportEvent(Severity.WARNING, "Content Repository", "The Content Repository is unable to destroy content as fast " + + "as it is being created. The flow will be slowed in order to adjust for this."); + } } } catch (final InterruptedException ie) { } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index ada0775..b5cfcb6 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository; import org.apache.nifi.controller.repository.FileSystemRepository; import org.apache.nifi.controller.repository.ContentNotFoundException; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -41,10 +42,11 @@ import java.util.List; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.StandardContentClaimManager; import org.apache.nifi.controller.repository.util.DiskUtils; +import org.apache.nifi.events.EventReporter; import org.apache.nifi.util.NiFiProperties; - import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; public class TestFileSystemRepository { @@ -63,7 +65,7 @@ public class TestFileSystemRepository { } repository = new FileSystemRepository(); - repository.initialize(new StandardContentClaimManager()); + repository.initialize(new StandardContentClaimManager(Mockito.mock(EventReporter.class))); repository.purge(); } @@ -314,9 +316,9 @@ public class TestFileSystemRepository { } final ContentClaim destination = repository.create(true); - final byte[] headerBytes = (header == null) ? null : header.getBytes(); - final byte[] footerBytes = (footer == null) ? null : footer.getBytes(); - final byte[] demarcatorBytes = (demarcator == null) ? null : demarcator.getBytes(); + final byte[] headerBytes = header == null ? null : header.getBytes(); + final byte[] footerBytes = footer == null ? null : footer.getBytes(); + final byte[] demarcatorBytes = demarcator == null ? null : demarcator.getBytes(); repository.merge(claims, destination, headerBytes, footerBytes, demarcatorBytes); final StringBuilder sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 3486875..9441237 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -53,6 +53,7 @@ import org.apache.nifi.controller.StandardFlowFileQueue; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaimManager; import org.apache.nifi.controller.repository.claim.StandardContentClaimManager; +import org.apache.nifi.events.EventReporter; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.Relationship; @@ -179,7 +180,7 @@ public class TestStandardProcessSession { when(connectable.getConnections()).thenReturn(new HashSet<>(connList)); contentRepo = new MockContentRepository(); - contentRepo.initialize(new StandardContentClaimManager()); + contentRepo.initialize(new StandardContentClaimManager(Mockito.mock(EventReporter.class))); flowFileRepo = new MockFlowFileRepository(); final ProcessContext context = new ProcessContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo); @@ -321,7 +322,7 @@ public class TestStandardProcessSession { .entryDate(System.currentTimeMillis()) .build(); flowFileQueue.put(flowFileRecord); - FlowFile flowFile = session.get(); + final FlowFile flowFile = session.get(); assertNotNull(flowFile); final ObjectHolder<InputStream> inputStreamHolder = new ObjectHolder<>(null); session.read(flowFile, new InputStreamCallback() { @@ -402,7 +403,7 @@ public class TestStandardProcessSession { session.write(flowFile2, nop); - FlowFile flowFile3 = session.create(); + final FlowFile flowFile3 = session.create(); session.write(flowFile3, nop); session.rollback(); @@ -418,8 +419,8 @@ public class TestStandardProcessSession { flowFileQueue.put(flowFileRecord); - FlowFile orig = session.get(); - FlowFile newFlowFile = session.create(orig); + final FlowFile orig = session.get(); + final FlowFile newFlowFile = session.create(orig); session.remove(newFlowFile); session.commit(); @@ -435,8 +436,8 @@ public class TestStandardProcessSession { flowFileQueue.put(flowFileRecord); - FlowFile orig = session.get(); - FlowFile newFlowFile = session.create(orig); + final FlowFile orig = session.get(); + final FlowFile newFlowFile = session.create(orig); session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); session.commit(); @@ -452,9 +453,9 @@ public class TestStandardProcessSession { flowFileQueue.put(flowFileRecord); - FlowFile orig = session.get(); - FlowFile newFlowFile = session.create(orig); - FlowFile secondNewFlowFile = session.create(orig); + final FlowFile orig = session.get(); + final FlowFile newFlowFile = session.create(orig); + final FlowFile secondNewFlowFile = session.create(orig); session.remove(newFlowFile); session.transfer(secondNewFlowFile, new Relationship.Builder().name("A").build()); session.commit(); @@ -547,8 +548,8 @@ public class TestStandardProcessSession { // we have to increment the ID generator because we are creating a FlowFile without the FlowFile Repository's knowledge flowFileRepo.idGenerator.getAndIncrement(); - FlowFile orig = session.get(); - FlowFile newFlowFile = session.create(orig); + final FlowFile orig = session.get(); + final FlowFile newFlowFile = session.create(orig); session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); session.getProvenanceReporter().fork(newFlowFile, Collections.singleton(orig)); session.remove(orig); @@ -566,7 +567,7 @@ public class TestStandardProcessSession { @Test public void testProcessExceptionThrownIfCallbackThrowsInOutputStreamCallback() { - FlowFile ff1 = session.create(); + final FlowFile ff1 = session.create(); final RuntimeException runtime = new RuntimeException(); try { @@ -610,7 +611,7 @@ public class TestStandardProcessSession { @Test public void testProcessExceptionThrownIfCallbackThrowsInStreamCallback() { - FlowFile ff1 = session.create(); + final FlowFile ff1 = session.create(); final RuntimeException runtime = new RuntimeException(); try { @@ -689,7 +690,7 @@ public class TestStandardProcessSession { // attempt to read the data. try { - FlowFile ff1 = session.get(); + final FlowFile ff1 = session.get(); session.read(ff1, new InputStreamCallback() { @Override @@ -738,7 +739,7 @@ public class TestStandardProcessSession { // attempt to read the data. try { - FlowFile ff1 = session.get(); + final FlowFile ff1 = session.get(); session.write(ff1, new StreamCallback() { @Override @@ -830,7 +831,7 @@ public class TestStandardProcessSession { // attempt to read the data. try { session.get(); - FlowFile ff2 = session.get(); + final FlowFile ff2 = session.get(); session.write(ff2, new StreamCallback() { @Override public void process(InputStream in, OutputStream out) throws IOException { @@ -918,7 +919,7 @@ public class TestStandardProcessSession { // attempt to read the data. try { session.get(); - FlowFile ff2 = session.get(); + final FlowFile ff2 = session.get(); session.read(ff2, new InputStreamCallback() { @Override public void process(InputStream in) throws IOException { @@ -931,7 +932,7 @@ public class TestStandardProcessSession { @Test public void testProcessExceptionThrownIfCallbackThrowsInInputStreamCallback() { - FlowFile ff1 = session.create(); + final FlowFile ff1 = session.create(); final RuntimeException runtime = new RuntimeException(); try { @@ -975,7 +976,7 @@ public class TestStandardProcessSession { @Test public void testCreateEmitted() throws IOException { - FlowFile newFlowFile = session.create(); + final FlowFile newFlowFile = session.create(); session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); session.commit(); @@ -1114,7 +1115,7 @@ public class TestStandardProcessSession { private final AtomicLong claimsRemoved = new AtomicLong(0L); private ContentClaimManager claimManager; - private ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>(); + private final ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>(); @Override public void shutdown() { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java index a32f321..513ec10 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.repository; import org.apache.nifi.controller.repository.VolatileContentRepository; + import static org.junit.Assert.assertEquals; import java.io.ByteArrayOutputStream; @@ -31,8 +32,8 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaimManager; import org.apache.nifi.controller.repository.claim.StandardContentClaimManager; +import org.apache.nifi.events.EventReporter; import org.apache.nifi.util.NiFiProperties; - import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -45,7 +46,7 @@ public class TestVolatileContentRepository { @Before public void setup() { - claimManager = new StandardContentClaimManager(); + claimManager = new StandardContentClaimManager(Mockito.mock(EventReporter.class)); } @Test http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index 054ef5e..c969bd4 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -19,6 +19,7 @@ package org.apache.nifi.controller.repository; import org.apache.nifi.controller.repository.WriteAheadFlowFileRepository; import org.apache.nifi.controller.repository.StandardRepositoryRecord; import org.apache.nifi.controller.repository.StandardFlowFileRecord; + import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; @@ -35,8 +36,8 @@ import java.util.List; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.FlowFileQueue; import org.apache.nifi.controller.repository.claim.StandardContentClaimManager; +import org.apache.nifi.events.EventReporter; import org.apache.nifi.util.file.FileUtils; - import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -53,7 +54,7 @@ public class TestWriteAheadFlowFileRepository { } final WriteAheadFlowFileRepository repo = new WriteAheadFlowFileRepository(); - repo.initialize(new StandardContentClaimManager()); + repo.initialize(new StandardContentClaimManager(Mockito.mock(EventReporter.class))); final List<Connection> connectionList = new ArrayList<>(); final QueueProvider queueProvider = new QueueProvider() { @@ -119,7 +120,7 @@ public class TestWriteAheadFlowFileRepository { // restore final WriteAheadFlowFileRepository repo2 = new WriteAheadFlowFileRepository(); - repo2.initialize(new StandardContentClaimManager()); + repo2.initialize(new StandardContentClaimManager(Mockito.mock(EventReporter.class))); repo2.loadFlowFiles(queueProvider, 0L); assertEquals(1, flowFileCollection.size()); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 4043076..f560c31 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -42,6 +42,7 @@ nifi.flowfile.repository.directory=${nifi.flowfile.repository.directory} nifi.flowfile.repository.partitions=${nifi.flowfile.repository.partitions} nifi.flowfile.repository.checkpoint.interval=${nifi.flowfile.repository.checkpoint.interval} nifi.flowfile.repository.always.sync=${nifi.flowfile.repository.always.sync} +nifi.flowfile.repository.updates.between.sync=${nifi.flowfile.repository.updates.between.sync} nifi.swap.manager.implementation=${nifi.swap.manager.implementation} nifi.queue.swap.threshold=${nifi.queue.swap.threshold}