http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 54987b9..16a6534 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -16,6 +16,32 @@ */ package org.apache.nifi.controller.repository; +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + import org.apache.commons.io.IOUtils; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; @@ -23,6 +49,7 @@ import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream; import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream; @@ -53,32 +80,6 @@ import org.apache.nifi.stream.io.StreamUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedOutputStream; -import java.io.ByteArrayInputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - /** * <p> * Provides a ProcessSession that ensures all accesses, changes and transfers @@ -143,6 +144,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private final Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders = new HashMap<>(); private Checkpoint checkpoint = new Checkpoint(); + private final ContentClaimWriteCache claimCache; public StandardProcessSession(final ProcessContext context) { this.context = context; @@ -180,7 +182,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE context.getProvenanceRepository(), this); this.sessionId = idGenerator.getAndIncrement(); this.connectableDescription = description; - + this.claimCache = new ContentClaimWriteCache(context.getContentRepository()); LOG.trace("Session {} created for {}", this, connectableDescription); processingStartTime = System.nanoTime(); } @@ -312,6 +314,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final long commitStartNanos = System.nanoTime(); resetReadClaim(); + try { + claimCache.flush(); + } finally { + claimCache.reset(); + } final long updateProvenanceStart = System.nanoTime(); updateProvenanceRepo(checkpoint); @@ -375,7 +382,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE updateEventRepository(checkpoint); final long updateEventRepositoryFinishNanos = System.nanoTime(); - final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - claimRemovalFinishNanos; + final long updateEventRepositoryNanos = updateEventRepositoryFinishNanos - flowFileRepoUpdateFinishNanos; // transfer the flowfiles to the connections' queues. final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = new HashMap<>(); @@ -454,7 +461,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } catch (final Exception e1) { e.addSuppressed(e1); } - throw e; + + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else { + throw new ProcessException(e); + } } } @@ -904,6 +916,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } + try { + claimCache.reset(); + } catch (IOException e1) { + LOG.warn("{} Attempted to close Output Stream for {} due to session rollback but close failed", this, this.connectableDescription, e1); + } + final Set<StandardRepositoryRecord> recordsToHandle = new HashSet<>(); recordsToHandle.addAll(records.values()); if (rollbackCheckpoint) { @@ -2033,6 +2051,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } + claimCache.flush(claim); final InputStream rawInStream = context.getContentRepository().read(claim); if (currentReadClaimStream != null) { @@ -2047,6 +2066,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // reuse the same InputStream for the next FlowFile return new DisableOnCloseInputStream(currentReadClaimStream); } else { + claimCache.flush(claim); final InputStream rawInStream = context.getContentRepository().read(claim); try { StreamUtils.skip(rawInStream, offset); @@ -2077,6 +2097,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try { ensureNotAppending(record.getCurrentClaim()); + claimCache.flush(record.getCurrentClaim()); } catch (final IOException e) { throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e); } @@ -2241,6 +2262,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try { ensureNotAppending(record.getCurrentClaim()); + claimCache.flush(record.getCurrentClaim()); } catch (final IOException e) { throw new FlowFileAccessException("Unable to read from source " + source + " due to " + e.toString(), e); } @@ -2334,11 +2356,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE long writtenToFlowFile = 0L; ContentClaim newClaim = null; try { - newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); + newClaim = claimCache.getContentClaim(); claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source); ensureNotAppending(newClaim); - try (final OutputStream stream = context.getContentRepository().write(newClaim); + try (final OutputStream stream = claimCache.write(newClaim); final OutputStream disableOnClose = new DisableOnCloseOutputStream(stream); final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose)) { try { @@ -2373,7 +2395,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final FlowFileRecord newFile = new StandardFlowFileRecord.Builder() .fromFlowFile(record.getCurrent()) .contentClaim(newClaim) - .contentClaimOffset(0) + .contentClaimOffset(Math.max(0, newClaim.getLength() - writtenToFlowFile)) .size(writtenToFlowFile) .build(); @@ -2396,6 +2418,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE ContentClaim newClaim = null; try { if (outStream == null) { + claimCache.flush(oldClaim); + try (final InputStream oldClaimIn = context.getContentRepository().read(oldClaim)) { newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); claimLog.debug("Creating ContentClaim {} for 'append' for {}", newClaim, source); @@ -2568,16 +2592,20 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE long writtenToFlowFile = 0L; ContentClaim newClaim = null; try { - newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); + newClaim = claimCache.getContentClaim(); claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source); ensureNotAppending(newClaim); + if (currClaim != null) { + claimCache.flush(currClaim.getResourceClaim()); + } + try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset(), true); final InputStream limitedIn = new LimitedInputStream(is, source.getSize()); final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn); final ByteCountingInputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead); - final OutputStream os = context.getContentRepository().write(newClaim); + final OutputStream os = claimCache.write(newClaim); final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os); final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut)) { @@ -2626,7 +2654,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final FlowFileRecord newFile = new StandardFlowFileRecord.Builder() .fromFlowFile(record.getCurrent()) .contentClaim(newClaim) - .contentClaimOffset(0L) + .contentClaimOffset(Math.max(0L, newClaim.getLength() - writtenToFlowFile)) .size(writtenToFlowFile) .build(); @@ -2668,8 +2696,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE removeTemporaryClaim(record); - final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()) - .contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize) + final FlowFileRecord newFile = new StandardFlowFileRecord.Builder() + .fromFlowFile(record.getCurrent()) + .contentClaim(newClaim) + .contentClaimOffset(claimOffset) + .size(newSize) .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName()) .build(); record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName()); @@ -2708,7 +2739,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } removeTemporaryClaim(record); - final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize).build(); + final FlowFileRecord newFile = new StandardFlowFileRecord.Builder() + .fromFlowFile(record.getCurrent()) + .contentClaim(newClaim) + .contentClaimOffset(claimOffset) + .size(newSize) + .build(); record.setWorking(newFile); return newFile; } @@ -2720,6 +2756,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try { ensureNotAppending(record.getCurrentClaim()); + claimCache.flush(record.getCurrentClaim()); final long copyCount = context.getContentRepository().exportTo(record.getCurrentClaim(), destination, append, record.getCurrentClaimOffset(), source.getSize()); bytesRead += copyCount; bytesWritten += copyCount; @@ -2741,6 +2778,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE try { ensureNotAppending(record.getCurrentClaim()); + claimCache.flush(record.getCurrentClaim()); } catch (final IOException e) { throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e); }
http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index b5807ca..9f8ff98 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -19,6 +19,7 @@ package org.apache.nifi.controller.repository; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -27,6 +28,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -74,6 +77,7 @@ import org.wali.WriteAheadRepository; * </p> */ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncListener { + private static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = "nifi.flowfile.repository.directory"; private final AtomicLong flowFileSequenceGenerator = new AtomicLong(0L); private final boolean alwaysSync; @@ -82,7 +86,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis private volatile ScheduledFuture<?> checkpointFuture; private final long checkpointDelayMillis; - private final Path flowFileRepositoryPath; + private final SortedSet<Path> flowFileRepositoryPaths = new TreeSet<>(); private final int numPartitions; private final ScheduledExecutorService checkpointExecutor; @@ -120,7 +124,6 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis public WriteAheadFlowFileRepository() { alwaysSync = false; checkpointDelayMillis = 0l; - flowFileRepositoryPath = null; numPartitions = 0; checkpointExecutor = null; } @@ -129,7 +132,13 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis alwaysSync = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC, "false")); // determine the database file path and ensure it exists - flowFileRepositoryPath = nifiProperties.getFlowFileRepositoryPath(); + for (final String propertyName : nifiProperties.getPropertyKeys()) { + if (propertyName.startsWith(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX)) { + final String directoryName = nifiProperties.getProperty(propertyName); + flowFileRepositoryPaths.add(Paths.get(directoryName)); + } + } + numPartitions = nifiProperties.getFlowFileRepositoryPartitions(); checkpointDelayMillis = FormatUtils.getTimeDuration(nifiProperties.getFlowFileRepositoryCheckpointInterval(), TimeUnit.MILLISECONDS); @@ -140,14 +149,17 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis public void initialize(final ResourceClaimManager claimManager) throws IOException { this.claimManager = claimManager; - Files.createDirectories(flowFileRepositoryPath); + for (final Path path : flowFileRepositoryPaths) { + Files.createDirectories(path); + } // TODO: Should ensure that only 1 instance running and pointing at a particular path // TODO: Allow for backup path that can be used if disk out of space?? Would allow a snapshot to be stored on // backup and then the data deleted from the normal location; then can move backup to normal location and // delete backup. On restore, if no files exist in partition's directory, would have to check backup directory serdeFactory = new RepositoryRecordSerdeFactory(claimManager); - wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPath, numPartitions, serdeFactory, this); + wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPaths, numPartitions, serdeFactory, this); + logger.info("Initialized FlowFile Repository using {} partitions", numPartitions); } @Override @@ -167,12 +179,22 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis @Override public long getStorageCapacity() throws IOException { - return Files.getFileStore(flowFileRepositoryPath).getTotalSpace(); + long capacity = 0L; + for (final Path path : flowFileRepositoryPaths) { + capacity += Files.getFileStore(path).getTotalSpace(); + } + + return capacity; } @Override public long getUsableStorageSpace() throws IOException { - return Files.getFileStore(flowFileRepositoryPath).getUsableSpace(); + long usableSpace = 0L; + for (final Path path : flowFileRepositoryPaths) { + usableSpace += Files.getFileStore(path).getUsableSpace(); + } + + return usableSpace; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java new file mode 100644 index 0000000..6b608ac --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.claim; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; + +import org.apache.nifi.controller.repository.ContentRepository; + +public class ContentClaimWriteCache { + private final ContentRepository contentRepo; + private final Map<ResourceClaim, OutputStream> streamMap = new HashMap<>(); + private final Queue<ContentClaim> queue = new LinkedList<>(); + private final int bufferSize; + + public ContentClaimWriteCache(final ContentRepository contentRepo) { + this(contentRepo, 8192); + } + + public ContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize) { + this.contentRepo = contentRepo; + this.bufferSize = bufferSize; + } + + public void reset() throws IOException { + try { + forEachStream(OutputStream::close); + } finally { + streamMap.clear(); + queue.clear(); + } + } + + public ContentClaim getContentClaim() throws IOException { + final ContentClaim contentClaim = queue.poll(); + if (contentClaim != null) { + contentRepo.incrementClaimaintCount(contentClaim); + return contentClaim; + } + + final ContentClaim claim = contentRepo.create(false); + registerStream(claim); + return claim; + } + + private OutputStream registerStream(final ContentClaim contentClaim) throws IOException { + final OutputStream out = contentRepo.write(contentClaim); + final OutputStream buffered = new BufferedOutputStream(out, bufferSize); + streamMap.put(contentClaim.getResourceClaim(), buffered); + return buffered; + } + + public OutputStream write(final ContentClaim claim) throws IOException { + OutputStream out = streamMap.get(claim.getResourceClaim()); + if (out == null) { + out = registerStream(claim); + } + + if (!(claim instanceof StandardContentClaim)) { + // we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything + // else, just throw an Exception because it is not valid for this Repository + throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Claim Cache"); + } + + final StandardContentClaim scc = (StandardContentClaim) claim; + final long initialLength = Math.max(0L, scc.getLength()); + + final OutputStream bcos = out; + return new OutputStream() { + private long bytesWritten = 0L; + + @Override + public void write(final int b) throws IOException { + bcos.write(b); + bytesWritten++; + scc.setLength(initialLength + bytesWritten); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + bcos.write(b, off, len); + bytesWritten += len; + scc.setLength(initialLength + bytesWritten); + } + + @Override + public void write(final byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void flush() throws IOException { + // do nothing - do not flush underlying stream. + } + + @Override + public void close() throws IOException { + queue.offer(claim); + } + }; + } + + public void flush(final ContentClaim contentClaim) throws IOException { + if (contentClaim == null) { + return; + } + + flush(contentClaim.getResourceClaim()); + } + + public void flush(final ResourceClaim claim) throws IOException { + final OutputStream out = streamMap.get(claim); + if (out != null) { + out.flush(); + } + } + + public void flush() throws IOException { + forEachStream(OutputStream::flush); + } + + private void forEachStream(final StreamProcessor proc) throws IOException { + IOException exception = null; + + for (final OutputStream out : streamMap.values()) { + try { + proc.process(out); + } catch (final IOException ioe) { + if (exception == null) { + exception = ioe; + } else { + ioe.addSuppressed(exception); + exception = ioe; + } + } + } + + if (exception != null) { + throw exception; + } + } + + private interface StreamProcessor { + void process(final OutputStream out) throws IOException; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java index 76c208d..b218ee6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java @@ -65,6 +65,39 @@ public class ContentClaimFieldMap implements Record { } @Override + public int hashCode() { + return (int) (31 + contentClaimOffset + 21 * resourceClaimFieldMap.hashCode()); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + + ContentClaimFieldMap other = (ContentClaimFieldMap) obj; + if (contentClaimOffset != other.contentClaimOffset) { + return false; + } + + if (resourceClaimFieldMap == null) { + if (other.resourceClaimFieldMap != null) { + return false; + } + } else if (!resourceClaimFieldMap.equals(other.resourceClaimFieldMap)) { + return false; + } + + return true; + } + + @Override public String toString() { return "ContentClaimFieldMap[" + contentClaim + "]"; } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java index c11353b..93fa4e4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.repository.schema; +import org.apache.nifi.controller.repository.RepositoryRecordType; import org.apache.nifi.repository.schema.NamedValue; import org.apache.nifi.repository.schema.Record; import org.apache.nifi.repository.schema.RecordSchema; @@ -39,7 +40,10 @@ public class RepositoryRecordUpdate implements Record { @Override public Object getFieldValue(final String fieldName) { if (RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V2.equals(fieldName)) { - final String actionType = (String) fieldMap.getFieldValue(RepositoryRecordSchema.ACTION_TYPE); + String actionType = (String) fieldMap.getFieldValue(RepositoryRecordSchema.ACTION_TYPE); + if (RepositoryRecordType.CONTENTMISSING.name().equals(actionType)) { + actionType = RepositoryRecordType.DELETE.name(); + } final UpdateType updateType = UpdateType.valueOf(actionType); final String actionName; http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index 0c4972b..22684a6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -83,7 +83,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { for (int i = 0; i < maxThreadCount; i++) { final Runnable eventDrivenTask = new EventDrivenTask(workerQueue); - flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 30000, TimeUnit.NANOSECONDS); + flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 1L, TimeUnit.NANOSECONDS); } } @@ -132,7 +132,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { final int tasksToAdd = maxThreadCount - oldMax; for (int i = 0; i < tasksToAdd; i++) { final Runnable eventDrivenTask = new EventDrivenTask(workerQueue); - flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 30000, TimeUnit.NANOSECONDS); + flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 1L, TimeUnit.NANOSECONDS); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java index 05c32e1..59a4b1b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java @@ -87,6 +87,13 @@ public enum ProcessorStatusDescriptor { Formatter.DURATION, s -> TimeUnit.MILLISECONDS.convert(s.getProcessingNanos(), TimeUnit.NANOSECONDS))), + TASK_NANOS(new StandardMetricDescriptor<ProcessorStatus>( + "taskNanos", + "Total Task Time (nanos)", + "The total number of thread-nanoseconds that the Processor has used to complete its tasks in the past 5 minutes", + Formatter.COUNT, + ProcessorStatus::getProcessingNanos), false), + FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>( "flowFilesRemoved", "FlowFiles Removed (5 mins)", @@ -122,35 +129,50 @@ public enum ProcessorStatusDescriptor { } )), - AVERAGE_TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>( - "averageTaskMillis", - "Average Task Duration", - "The average duration it took this Processor to complete a task, as averaged over the past 5 minutes", - Formatter.DURATION, - s -> s.getInvocations() == 0 ? 0 : TimeUnit.MILLISECONDS.convert(s.getProcessingNanos(), TimeUnit.NANOSECONDS) / s.getInvocations(), + AVERAGE_TASK_NANOS(new StandardMetricDescriptor<ProcessorStatus>( + "averageTaskNanos", + "Average Task Duration (nanoseconds)", + "The average number of nanoseconds it took this Processor to complete a task, over the past 5 minutes", + Formatter.COUNT, + s -> s.getInvocations() == 0 ? 0 : s.getProcessingNanos() / s.getInvocations(), new ValueReducer<StatusSnapshot, Long>() { @Override public Long reduce(final List<StatusSnapshot> values) { - long procMillis = 0L; + long procNanos = 0L; int invocations = 0; for (final StatusSnapshot snapshot : values) { - procMillis += snapshot.getStatusMetrics().get(TASK_MILLIS.getDescriptor()).longValue(); - invocations += snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor()).intValue(); + final Long taskNanos = snapshot.getStatusMetrics().get(TASK_NANOS.getDescriptor()); + if (taskNanos != null) { + procNanos += taskNanos.longValue(); + } + + final Long taskInvocations = snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor()); + if (taskInvocations != null) { + invocations += taskInvocations.intValue(); + } } if (invocations == 0) { return 0L; } - return procMillis / invocations; + return procNanos / invocations; } })); - private MetricDescriptor<ProcessorStatus> descriptor; + + + private final MetricDescriptor<ProcessorStatus> descriptor; + private final boolean visible; private ProcessorStatusDescriptor(final MetricDescriptor<ProcessorStatus> descriptor) { + this(descriptor, true); + } + + private ProcessorStatusDescriptor(final MetricDescriptor<ProcessorStatus> descriptor, final boolean visible) { this.descriptor = descriptor; + this.visible = visible; } public String getField() { @@ -160,4 +182,8 @@ public enum ProcessorStatusDescriptor { public MetricDescriptor<ProcessorStatus> getDescriptor() { return descriptor; } + + public boolean isVisible() { + return visible; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java index 4c620d1..3203972 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java @@ -93,7 +93,9 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit snapshot.setTimestamp(capture.getCaptureDate()); for (final ProcessorStatusDescriptor descriptor : ProcessorStatusDescriptor.values()) { - snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status)); + if (descriptor.isVisible()) { + snapshot.addStatusMetric(descriptor.getDescriptor(), descriptor.getDescriptor().getValueFunction().getValue(status)); + } } history.addStatusSnapshot(snapshot); http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java index 3bc2356..01f3c8c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java @@ -82,6 +82,13 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> { return procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || !Connectables.hasNonLoopConnection(procNode) || Connectables.flowFilesQueued(procNode); } + private boolean isBackPressureEngaged() { + return procNode.getIncomingConnections().stream() + .filter(con -> con.getSource() == procNode) + .map(con -> con.getFlowFileQueue()) + .anyMatch(queue -> queue.isFull()); + } + @Override public Boolean call() { // make sure processor is not yielded @@ -127,6 +134,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> { scheduleState.incrementActiveThreadCount(); final long startNanos = System.nanoTime(); + final long finishIfBackpressureEngaged = startNanos + (batchNanos / 25L); final long finishNanos = startNanos + batchNanos; int invocationCount = 0; try { @@ -140,10 +148,16 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> { return false; } - if (System.nanoTime() > finishNanos) { + final long nanoTime = System.nanoTime(); + if (nanoTime > finishNanos) { + return false; + } + + if (nanoTime > finishIfBackpressureEngaged && isBackPressureEngaged()) { return false; } + if (!isWorkToDo(procNode)) { break; } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index 7edb0e7..845ca56 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -34,10 +34,15 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.nio.file.StandardOpenOption; +import java.text.NumberFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.nifi.controller.repository.claim.ContentClaim; @@ -56,8 +61,6 @@ import ch.qos.logback.classic.Level; import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; -import java.util.HashMap; -import java.util.Map; public class TestFileSystemRepository { @@ -89,6 +92,38 @@ public class TestFileSystemRepository { } @Test + public void testWritePerformance() throws IOException { + final long bytesToWrite = 1_000_000_000L; + final int contentSize = 100; + + final int iterations = (int) (bytesToWrite / contentSize); + final byte[] content = new byte[contentSize]; + final Random random = new Random(); + random.nextBytes(content); + + // final ContentClaimWriteCache cache = new ContentClaimWriteCache(repository); + + final long start = System.nanoTime(); + for (int i = 0; i < iterations; i++) { + final ContentClaim claim = repository.create(false); + try (final OutputStream out = repository.write(claim)) { + out.write(content); + } + // final ContentClaim claim = cache.getContentClaim(); + // try (final OutputStream out = cache.write(claim)) { + // out.write(content); + // } + } + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + + final long mb = bytesToWrite / (1024 * 1024); + final long seconds = millis / 1000L; + final double mbps = (double) mb / (double) seconds; + System.out.println("Took " + millis + " millis to write " + contentSize + " bytes " + iterations + " times (total of " + + NumberFormat.getNumberInstance(Locale.US).format(bytesToWrite) + " bytes) for a write rate of " + mbps + " MB/s"); + } + + @Test public void testMinimalArchiveCleanupIntervalHonoredAndLogged() throws Exception { // We are going to construct our own repository using different properties, so // we need to shutdown the existing one. http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 9070d0c..e0c9ffe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -86,6 +86,7 @@ import org.apache.nifi.util.NiFiProperties; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -987,6 +988,7 @@ public class TestStandardProcessSession { } @Test + @Ignore public void testManyFilesOpened() throws IOException { StandardProcessSession[] standardProcessSessions = new StandardProcessSession[100000]; @@ -1672,9 +1674,9 @@ public class TestStandardProcessSession { @Override public int incrementClaimaintCount(ContentClaim claim) { - final AtomicInteger count = claimantCounts.get(claim); + AtomicInteger count = claimantCounts.get(claim); if (count == null) { - throw new IllegalArgumentException("Unknown Claim: " + claim); + count = new AtomicInteger(0); } return count.incrementAndGet(); } @@ -1835,6 +1837,11 @@ public class TestStandardProcessSession { fos.write(b); ((StandardContentClaim) claim).setLength(claim.getLength() + b.length); } + + @Override + public void close() throws IOException { + super.close(); + } }; } http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index 6525822..329b268 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; @@ -33,12 +34,17 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Queue; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.StandardFlowFileQueue; +import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.ListFlowFileStatus; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; @@ -47,15 +53,22 @@ import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.swap.StandardSwapContents; import org.apache.nifi.controller.swap.StandardSwapSummary; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.file.FileUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.wali.MinimalLockingWriteAheadLog; +import org.wali.WriteAheadRepository; public class TestWriteAheadFlowFileRepository { @@ -74,6 +87,254 @@ public class TestWriteAheadFlowFileRepository { } } + + @Test + @Ignore("Intended only for local performance testing before/after making changes") + public void testUpdatePerformance() throws IOException, InterruptedException { + final FlowFileQueue queue = new FlowFileQueue() { + + @Override + public String getIdentifier() { + return "4444"; + } + + @Override + public List<FlowFilePrioritizer> getPriorities() { + return null; + } + + @Override + public SwapSummary recoverSwappedFlowFiles() { + return null; + } + + @Override + public void purgeSwapFiles() { + } + + @Override + public void setPriorities(List<FlowFilePrioritizer> newPriorities) { + } + + @Override + public void setBackPressureObjectThreshold(long maxQueueSize) { + } + + @Override + public long getBackPressureObjectThreshold() { + return 0; + } + + @Override + public void setBackPressureDataSizeThreshold(String maxDataSize) { + } + + @Override + public String getBackPressureDataSizeThreshold() { + return null; + } + + @Override + public QueueSize size() { + return null; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public boolean isActiveQueueEmpty() { + return false; + } + + @Override + public QueueSize getUnacknowledgedQueueSize() { + return null; + } + + @Override + public void acknowledge(FlowFileRecord flowFile) { + } + + @Override + public void acknowledge(Collection<FlowFileRecord> flowFiles) { + } + + @Override + public boolean isFull() { + return false; + } + + @Override + public void put(FlowFileRecord file) { + } + + @Override + public void putAll(Collection<FlowFileRecord> files) { + } + + @Override + public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords) { + return null; + } + + @Override + public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords) { + return null; + } + + @Override + public long drainQueue(Queue<FlowFileRecord> sourceQueue, List<FlowFileRecord> destination, int maxResults, Set<FlowFileRecord> expiredRecords) { + return 0; + } + + @Override + public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) { + return null; + } + + @Override + public String getFlowFileExpiration() { + return null; + } + + @Override + public int getFlowFileExpiration(TimeUnit timeUnit) { + return 0; + } + + @Override + public void setFlowFileExpiration(String flowExpirationPeriod) { + } + + @Override + public DropFlowFileStatus dropFlowFiles(String requestIdentifier, String requestor) { + return null; + } + + @Override + public DropFlowFileStatus getDropFlowFileStatus(String requestIdentifier) { + return null; + } + + @Override + public DropFlowFileStatus cancelDropFlowFileRequest(String requestIdentifier) { + return null; + } + + @Override + public ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults) { + return null; + } + + @Override + public ListFlowFileStatus getListFlowFileStatus(String requestIdentifier) { + return null; + } + + @Override + public ListFlowFileStatus cancelListFlowFileRequest(String requestIdentifier) { + return null; + } + + @Override + public FlowFileRecord getFlowFile(String flowFileUuid) throws IOException { + return null; + } + + @Override + public void verifyCanList() throws IllegalStateException { + } + }; + + + final int numPartitions = 16; + final int numThreads = 8; + final int totalUpdates = 160_000_000; + final int batchSize = 10; + + final Path path = Paths.get("target/minimal-locking-repo"); + deleteRecursively(path.toFile()); + assertTrue(path.toFile().mkdirs()); + + final ResourceClaimManager claimManager = new StandardResourceClaimManager(); + final RepositoryRecordSerdeFactory serdeFactory = new RepositoryRecordSerdeFactory(claimManager); + final WriteAheadRepository<RepositoryRecord> repo = new MinimalLockingWriteAheadLog<>(path, numPartitions, serdeFactory, null); + final Collection<RepositoryRecord> initialRecs = repo.recoverRecords(); + assertTrue(initialRecs.isEmpty()); + + final int updateCountPerThread = totalUpdates / numThreads; + + final Thread[] threads = new Thread[numThreads]; + for (int j = 0; j < 2; j++) { + for (int i = 0; i < numThreads; i++) { + final Thread t = new Thread(new Runnable() { + @Override + public void run() { + final List<RepositoryRecord> records = new ArrayList<>(); + final int numBatches = updateCountPerThread / batchSize; + final MockFlowFile baseFlowFile = new MockFlowFile(0L); + + for (int i = 0; i < numBatches; i++) { + records.clear(); + for (int k = 0; k < batchSize; k++) { + final FlowFileRecord flowFile = new MockFlowFile(i % 100_000, baseFlowFile); + final String uuid = flowFile.getAttribute("uuid"); + + final StandardRepositoryRecord record = new StandardRepositoryRecord(null, flowFile); + record.setDestination(queue); + final Map<String, String> updatedAttrs = Collections.singletonMap("uuid", uuid); + record.setWorking(flowFile, updatedAttrs); + + records.add(record); + } + + try { + repo.update(records, false); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + } + } + }); + + t.setDaemon(true); + threads[i] = t; + } + + final long start = System.nanoTime(); + for (final Thread t : threads) { + t.start(); + } + for (final Thread t : threads) { + t.join(); + } + + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + if (j == 0) { + System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numPartitions + " partitions and " + numThreads + " threads, *as a warmup!*"); + } else { + System.out.println(millis + " ms to insert " + updateCountPerThread * numThreads + " updates using " + numPartitions + " partitions and " + numThreads + " threads"); + } + } + } + + private void deleteRecursively(final File file) { + final File[] children = file.listFiles(); + if (children != null) { + for (final File child : children) { + deleteRecursively(child); + } + } + + file.delete(); + } + + + @Test public void testResourceClaimsIncremented() throws IOException { final ResourceClaimManager claimManager = new StandardResourceClaimManager(); http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestContentClaimWriteCache.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestContentClaimWriteCache.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestContentClaimWriteCache.java new file mode 100644 index 0000000..fc08f55 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestContentClaimWriteCache.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.claim; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.nifi.controller.repository.FileSystemRepository; +import org.apache.nifi.controller.repository.TestFileSystemRepository; +import org.apache.nifi.controller.repository.util.DiskUtils; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.NiFiProperties; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestContentClaimWriteCache { + + private FileSystemRepository repository = null; + private StandardResourceClaimManager claimManager = null; + private final File rootFile = new File("target/testContentClaimWriteCache"); + private NiFiProperties nifiProperties; + + @Before + public void setup() throws IOException { + System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestFileSystemRepository.class.getResource("/conf/nifi.properties").getFile()); + nifiProperties = NiFiProperties.createBasicNiFiProperties(null, null); + if (rootFile.exists()) { + DiskUtils.deleteRecursively(rootFile); + } + repository = new FileSystemRepository(nifiProperties); + claimManager = new StandardResourceClaimManager(); + repository.initialize(claimManager); + repository.purge(); + } + + @After + public void shutdown() throws IOException { + repository.shutdown(); + } + + @Test + public void testFlushWriteCorrectData() throws IOException { + final ContentClaimWriteCache cache = new ContentClaimWriteCache(repository, 4); + + final ContentClaim claim1 = cache.getContentClaim(); + assertNotNull(claim1); + + final OutputStream out = cache.write(claim1); + assertNotNull(out); + out.write("hello".getBytes()); + out.write("good-bye".getBytes()); + + cache.flush(); + + assertEquals(13L, claim1.getLength()); + final InputStream in = repository.read(claim1); + final byte[] buff = new byte[(int) claim1.getLength()]; + StreamUtils.fillBuffer(in, buff); + Assert.assertArrayEquals("hellogood-bye".getBytes(), buff); + + final ContentClaim claim2 = cache.getContentClaim(); + final OutputStream out2 = cache.write(claim2); + assertNotNull(out2); + out2.write("good-day".getBytes()); + out2.write("hello".getBytes()); + + cache.flush(); + + assertEquals(13L, claim2.getLength()); + final InputStream in2 = repository.read(claim2); + final byte[] buff2 = new byte[(int) claim2.getLength()]; + StreamUtils.fillBuffer(in2, buff2); + Assert.assertArrayEquals("good-dayhello".getBytes(), buff2); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml index 662f87e..cecfabf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml @@ -98,7 +98,7 @@ <nifi.provenance.repository.rollover.time>30 secs</nifi.provenance.repository.rollover.time> <nifi.provenance.repository.rollover.size>100 MB</nifi.provenance.repository.rollover.size> <nifi.provenance.repository.query.threads>2</nifi.provenance.repository.query.threads> - <nifi.provenance.repository.index.threads>1</nifi.provenance.repository.index.threads> + <nifi.provenance.repository.index.threads>2</nifi.provenance.repository.index.threads> <nifi.provenance.repository.compress.on.rollover>true</nifi.provenance.repository.compress.on.rollover> <nifi.provenance.repository.indexed.fields>EventType, FlowFileUUID, Filename, ProcessorID, Relationship</nifi.provenance.repository.indexed.fields> <nifi.provenance.repository.indexed.attributes /> @@ -106,6 +106,8 @@ <nifi.provenance.repository.always.sync>false</nifi.provenance.repository.always.sync> <nifi.provenance.repository.journal.count>16</nifi.provenance.repository.journal.count> <nifi.provenance.repository.max.attribute.length>65536</nifi.provenance.repository.max.attribute.length> + <nifi.provenance.repository.concurrent.merge.threads>2</nifi.provenance.repository.concurrent.merge.threads> + <nifi.provenance.repository.warm.cache.frequency>1 hour</nifi.provenance.repository.warm.cache.frequency> <!-- volatile provenance repository properties --> <nifi.provenance.repository.buffer.size>100000</nifi.provenance.repository.buffer.size> http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java index 2c84861..fcc481b 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java @@ -32,18 +32,26 @@ public abstract class AbstractRecordWriter implements RecordWriter { private static final Logger logger = LoggerFactory.getLogger(AbstractRecordWriter.class); private final File file; + private final String storageLocation; private final TocWriter tocWriter; private final Lock lock = new ReentrantLock(); private volatile boolean dirty = false; private volatile boolean closed = false; - private int recordsWritten = 0; - public AbstractRecordWriter(final File file, final TocWriter writer) throws IOException { logger.trace("Creating Record Writer for {}", file); this.file = file; + this.storageLocation = file.getName(); + this.tocWriter = writer; + } + + public AbstractRecordWriter(final String storageLocation, final TocWriter writer) throws IOException { + logger.trace("Creating Record Writer for {}", storageLocation); + + this.file = null; + this.storageLocation = storageLocation; this.tocWriter = writer; } @@ -51,7 +59,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { public synchronized void close() throws IOException { closed = true; - logger.trace("Closing Record Writer for {}", file == null ? null : file.getName()); + logger.trace("Closing Record Writer for {}", getStorageLocation()); lock(); try { @@ -94,9 +102,8 @@ public abstract class AbstractRecordWriter implements RecordWriter { } } - @Override - public int getRecordsWritten() { - return recordsWritten; + protected String getStorageLocation() { + return storageLocation; } @Override @@ -133,6 +140,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { this.dirty = true; } + @Override public boolean isDirty() { return dirty; } @@ -142,7 +150,7 @@ public abstract class AbstractRecordWriter implements RecordWriter { } @Override - public void sync() throws IOException { + public synchronized void sync() throws IOException { try { if (tocWriter != null) { tocWriter.sync(); http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java index cae2f40..2a42501 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java @@ -18,34 +18,37 @@ package org.apache.nifi.provenance; import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.nifi.provenance.schema.EventFieldNames; import org.apache.nifi.provenance.schema.EventRecord; -import org.apache.nifi.provenance.schema.EventRecordFields; import org.apache.nifi.provenance.schema.ProvenanceEventSchema; import org.apache.nifi.provenance.serialization.CompressableRecordWriter; import org.apache.nifi.provenance.toc.TocWriter; import org.apache.nifi.repository.schema.Record; import org.apache.nifi.repository.schema.RecordSchema; import org.apache.nifi.repository.schema.SchemaRecordWriter; -import org.apache.nifi.stream.io.DataOutputStream; public class ByteArraySchemaRecordWriter extends CompressableRecordWriter { private static final RecordSchema eventSchema = ProvenanceEventSchema.PROVENANCE_EVENT_SCHEMA_V1; - private static final RecordSchema contentClaimSchema = new RecordSchema(eventSchema.getField(EventRecordFields.Names.CONTENT_CLAIM).getSubFields()); + private static final RecordSchema contentClaimSchema = new RecordSchema(eventSchema.getField(EventFieldNames.CONTENT_CLAIM).getSubFields()); public static final int SERIALIZATION_VERSION = 1; public static final String SERIALIZATION_NAME = "ByteArraySchemaRecordWriter"; private final SchemaRecordWriter recordWriter = new SchemaRecordWriter(); - public ByteArraySchemaRecordWriter(final File file, final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException { - super(file, tocWriter, compressed, uncompressedBlockSize); + public ByteArraySchemaRecordWriter(final File file, final AtomicLong idGenerator, final TocWriter tocWriter, final boolean compressed, + final int uncompressedBlockSize) throws IOException { + super(file, idGenerator, tocWriter, compressed, uncompressedBlockSize); } - public ByteArraySchemaRecordWriter(final OutputStream out, final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException { - super(out, tocWriter, compressed, uncompressedBlockSize); + public ByteArraySchemaRecordWriter(final OutputStream out, final String storageLocation, final AtomicLong idGenerator, final TocWriter tocWriter, final boolean compressed, + final int uncompressedBlockSize) throws IOException { + super(out, storageLocation, idGenerator, tocWriter, compressed, uncompressedBlockSize); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java new file mode 100644 index 0000000..612b6c8 --- /dev/null +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.provenance; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Optional; + +import org.apache.nifi.provenance.schema.EventIdFirstHeaderSchema; +import org.apache.nifi.provenance.schema.LookupTableEventRecord; +import org.apache.nifi.provenance.serialization.CompressableRecordReader; +import org.apache.nifi.provenance.toc.TocReader; +import org.apache.nifi.repository.schema.Record; +import org.apache.nifi.repository.schema.RecordSchema; +import org.apache.nifi.repository.schema.SchemaRecordReader; +import org.apache.nifi.stream.io.LimitingInputStream; +import org.apache.nifi.stream.io.StreamUtils; + +public class EventIdFirstSchemaRecordReader extends CompressableRecordReader { + private RecordSchema schema; // effectively final + private SchemaRecordReader recordReader; // effectively final + + private List<String> componentIds; + private List<String> componentTypes; + private List<String> queueIds; + private List<String> eventTypes; + private long firstEventId; + private long systemTimeOffset; + + public EventIdFirstSchemaRecordReader(final InputStream in, final String filename, final TocReader tocReader, final int maxAttributeChars) throws IOException { + super(in, filename, tocReader, maxAttributeChars); + } + + private void verifySerializationVersion(final int serializationVersion) { + if (serializationVersion > EventIdFirstSchemaRecordWriter.SERIALIZATION_VERSION) { + throw new IllegalArgumentException("Unable to deserialize record because the version is " + serializationVersion + + " and supported versions are 1-" + EventIdFirstSchemaRecordWriter.SERIALIZATION_VERSION); + } + } + + @Override + @SuppressWarnings("unchecked") + protected synchronized void readHeader(final DataInputStream in, final int serializationVersion) throws IOException { + verifySerializationVersion(serializationVersion); + final int eventSchemaLength = in.readInt(); + final byte[] buffer = new byte[eventSchemaLength]; + StreamUtils.fillBuffer(in, buffer); + + try (final ByteArrayInputStream bais = new ByteArrayInputStream(buffer)) { + schema = RecordSchema.readFrom(bais); + } + + recordReader = SchemaRecordReader.fromSchema(schema); + + final int headerSchemaLength = in.readInt(); + final byte[] headerSchemaBuffer = new byte[headerSchemaLength]; + StreamUtils.fillBuffer(in, headerSchemaBuffer); + + final RecordSchema headerSchema; + try (final ByteArrayInputStream bais = new ByteArrayInputStream(headerSchemaBuffer)) { + headerSchema = RecordSchema.readFrom(bais); + } + + final SchemaRecordReader headerReader = SchemaRecordReader.fromSchema(headerSchema); + final Record headerRecord = headerReader.readRecord(in); + componentIds = (List<String>) headerRecord.getFieldValue(EventIdFirstHeaderSchema.FieldNames.COMPONENT_IDS); + componentTypes = (List<String>) headerRecord.getFieldValue(EventIdFirstHeaderSchema.FieldNames.COMPONENT_TYPES); + queueIds = (List<String>) headerRecord.getFieldValue(EventIdFirstHeaderSchema.FieldNames.QUEUE_IDS); + eventTypes = (List<String>) headerRecord.getFieldValue(EventIdFirstHeaderSchema.FieldNames.EVENT_TYPES); + firstEventId = (Long) headerRecord.getFieldValue(EventIdFirstHeaderSchema.FieldNames.FIRST_EVENT_ID); + systemTimeOffset = (Long) headerRecord.getFieldValue(EventIdFirstHeaderSchema.FieldNames.TIMESTAMP_OFFSET); + } + + @Override + protected StandardProvenanceEventRecord nextRecord(final DataInputStream in, final int serializationVersion) throws IOException { + verifySerializationVersion(serializationVersion); + + final long byteOffset = getBytesConsumed(); + final long eventId = in.readInt() + firstEventId; + final int recordLength = in.readInt(); + + return readRecord(in, eventId, byteOffset, recordLength); + } + + private StandardProvenanceEventRecord readRecord(final DataInputStream in, final long eventId, final long startOffset, final int recordLength) throws IOException { + final InputStream limitedIn = new LimitingInputStream(in, recordLength); + + final Record eventRecord = recordReader.readRecord(limitedIn); + if (eventRecord == null) { + return null; + } + + final StandardProvenanceEventRecord deserializedEvent = LookupTableEventRecord.getEvent(eventRecord, getFilename(), startOffset, getMaxAttributeLength(), + firstEventId, systemTimeOffset, componentIds, componentTypes, queueIds, eventTypes); + deserializedEvent.setEventId(eventId); + return deserializedEvent; + } + + private boolean isData(final InputStream in) throws IOException { + in.mark(1); + final int nextByte = in.read(); + in.reset(); + + return nextByte > -1; + } + + @Override + protected Optional<StandardProvenanceEventRecord> readToEvent(final long eventId, final DataInputStream dis, final int serializationVersion) throws IOException { + verifySerializationVersion(serializationVersion); + + while (isData(dis)) { + final long startOffset = getBytesConsumed(); + final long id = dis.readInt() + firstEventId; + final int recordLength = dis.readInt(); + + if (id >= eventId) { + final StandardProvenanceEventRecord event = readRecord(dis, id, startOffset, recordLength); + return Optional.ofNullable(event); + } else { + // This is not the record we want. Skip over it instead of deserializing it. + StreamUtils.skip(dis, recordLength); + } + } + + return Optional.empty(); + } +}