bug fixes and additional pieces of repo implemented
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b95e7569 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b95e7569 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b95e7569 Branch: refs/heads/journaling-prov-repo Commit: b95e7569f75c7beb2fd214d3304ae9630f4a8545 Parents: a68bef6 Author: Mark Payne <marka...@hotmail.com> Authored: Thu Feb 12 19:19:37 2015 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Thu Feb 12 19:19:37 2015 -0500 ---------------------------------------------------------------------- .../apache/nifi/provenance/EventIdLocation.java | 31 +++ .../provenance/IdEnrichedProvenanceEvent.java | 175 ++++++++++++++++ .../nifi/provenance/StandardLineageResult.java | 2 +- .../org/apache/nifi/util/file/FileUtils.java | 1 - .../MockProvenanceEventRepository.java | 41 +++- .../manager/impl/ClusteredEventAccess.java | 32 ++- .../repository/StandardProcessSession.java | 164 ++++++--------- .../nifi/controller/tasks/ExpireFlowFiles.java | 27 ++- .../repository/TestStandardProcessSession.java | 13 +- .../nifi/web/controller/ControllerFacade.java | 6 +- .../pom.xml | 5 + .../JournalingProvenanceRepository.java | 31 +-- .../config/JournalingRepositoryConfig.java | 2 +- .../journals/StandardJournalReader.java | 2 +- .../partition/JournalingPartition.java | 6 + .../partition/QueuingPartitionManager.java | 39 +++- .../journaling/query/QueryManager.java | 42 ++++ .../journaling/query/StandardQueryManager.java | 144 ++++++++++++++ ...he.nifi.provenance.ProvenanceEventRepository | 15 ++ .../TestJournalingProvenanceRepository.java | 144 ++++++++++++++ .../nifi/provenance/journaling/TestUtil.java | 8 + .../PersistentProvenanceRepository.java | 80 ++++++-- .../nifi/provenance/lucene/DocsReader.java | 21 +- .../nifi/provenance/lucene/IndexSearch.java | 8 +- .../nifi/provenance/lucene/LineageQuery.java | 11 +- .../TestPersistentProvenanceRepository.java | 44 ++--- .../nifi-provenance-repository-nar/pom.xml | 4 + .../VolatileProvenanceRepository.java | 198 +------------------ .../nifi-provenance-repository-bundle/pom.xml | 6 + 29 files changed, 909 insertions(+), 393 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/EventIdLocation.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/EventIdLocation.java b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/EventIdLocation.java new file mode 100644 index 0000000..9cc6c4d --- /dev/null +++ b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/EventIdLocation.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import org.apache.nifi.provenance.StorageLocation; + +public class EventIdLocation implements StorageLocation { + private final long id; + + public EventIdLocation(final long id) { + this.id = id; + } + + public long getId() { + return id; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/IdEnrichedProvenanceEvent.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/IdEnrichedProvenanceEvent.java b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/IdEnrichedProvenanceEvent.java new file mode 100644 index 0000000..4ef0e5d --- /dev/null +++ b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/IdEnrichedProvenanceEvent.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.StorageLocation; +import org.apache.nifi.provenance.StoredProvenanceEvent; + +public class IdEnrichedProvenanceEvent implements StoredProvenanceEvent { + + private final ProvenanceEventRecord event; + private final long id; + + public IdEnrichedProvenanceEvent(final ProvenanceEventRecord event) { + this(event, event.getEventId()); + } + + public IdEnrichedProvenanceEvent(final ProvenanceEventRecord event, final long id) { + this.event = event; + this.id = id; + } + + @Override + public StorageLocation getStorageLocation() { + return new EventIdLocation(id); + } + + public long getEventId() { + return id; + } + + public long getEventTime() { + return event.getEventTime(); + } + + public long getFlowFileEntryDate() { + return event.getFlowFileEntryDate(); + } + + public long getLineageStartDate() { + return event.getLineageStartDate(); + } + + public Set<String> getLineageIdentifiers() { + return event.getLineageIdentifiers(); + } + + public long getFileSize() { + return event.getFileSize(); + } + + public Long getPreviousFileSize() { + return event.getPreviousFileSize(); + } + + public long getEventDuration() { + return event.getEventDuration(); + } + + public ProvenanceEventType getEventType() { + return event.getEventType(); + } + + public Map<String, String> getAttributes() { + return event.getAttributes(); + } + + public Map<String, String> getPreviousAttributes() { + return event.getPreviousAttributes(); + } + + public Map<String, String> getUpdatedAttributes() { + return event.getUpdatedAttributes(); + } + + public String getComponentId() { + return event.getComponentId(); + } + + public String getComponentType() { + return event.getComponentType(); + } + + public String getTransitUri() { + return event.getTransitUri(); + } + + public String getSourceSystemFlowFileIdentifier() { + return event.getSourceSystemFlowFileIdentifier(); + } + + public String getFlowFileUuid() { + return event.getFlowFileUuid(); + } + + public List<String> getParentUuids() { + return event.getParentUuids(); + } + + public List<String> getChildUuids() { + return event.getChildUuids(); + } + + public String getAlternateIdentifierUri() { + return event.getAlternateIdentifierUri(); + } + + public String getDetails() { + return event.getDetails(); + } + + public String getRelationship() { + return event.getRelationship(); + } + + public String getSourceQueueIdentifier() { + return event.getSourceQueueIdentifier(); + } + + public String getContentClaimSection() { + return event.getContentClaimSection(); + } + + public String getPreviousContentClaimSection() { + return event.getPreviousContentClaimSection(); + } + + public String getContentClaimContainer() { + return event.getContentClaimContainer(); + } + + public String getPreviousContentClaimContainer() { + return event.getPreviousContentClaimContainer(); + } + + public String getContentClaimIdentifier() { + return event.getContentClaimIdentifier(); + } + + public String getPreviousContentClaimIdentifier() { + return event.getPreviousContentClaimIdentifier(); + } + + public Long getContentClaimOffset() { + return event.getContentClaimOffset(); + } + + public Long getPreviousContentClaimOffset() { + return event.getPreviousContentClaimOffset(); + } + + @Override + public String toString() { + return event.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java index afb56e8..0f454bd 100644 --- a/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java +++ b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java @@ -178,7 +178,7 @@ public class StandardLineageResult implements ComputeLineageResult { } } - public void update(final Collection<ProvenanceEventRecord> records) { + public void update(final Collection<StoredProvenanceEvent> records) { writeLock.lock(); try { relevantRecords.addAll(records); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java index 41a0557..71dbc79 100644 --- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java +++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.util.file; -import java.io.BufferedInputStream; import java.io.Closeable; import java.io.File; import java.io.FileInputStream; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEventRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEventRepository.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEventRepository.java index 241041a..c4caa71 100644 --- a/nifi/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEventRepository.java +++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEventRepository.java @@ -18,6 +18,7 @@ package org.apache.nifi.provenance; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -30,11 +31,11 @@ import org.apache.nifi.provenance.search.SearchableField; public class MockProvenanceEventRepository implements ProvenanceEventRepository { - private final List<ProvenanceEventRecord> records = new ArrayList<>(); + private final List<StoredProvenanceEvent> records = new ArrayList<>(); private final AtomicLong idGenerator = new AtomicLong(0L); @Override - public void registerEvents(final Iterable<ProvenanceEventRecord> events) { + public void registerEvents(final Collection<ProvenanceEventRecord> events) { for (final ProvenanceEventRecord event : events) { registerEvent(event); } @@ -50,7 +51,7 @@ public class MockProvenanceEventRepository implements ProvenanceEventRepository } newRecord.setEventId(idGenerator.getAndIncrement()); - records.add(newRecord); + records.add(new IdEnrichedProvenanceEvent(newRecord)); } @Override @@ -58,7 +59,7 @@ public class MockProvenanceEventRepository implements ProvenanceEventRepository } @Override - public List<ProvenanceEventRecord> getEvents(long firstRecordId, int maxRecords) throws IOException { + public List<StoredProvenanceEvent> getEvents(long firstRecordId, int maxRecords) throws IOException { if (firstRecordId > records.size()) { return Collections.emptyList(); } @@ -92,7 +93,7 @@ public class MockProvenanceEventRepository implements ProvenanceEventRepository } @Override - public ProvenanceEventRecord getEvent(long id) throws IOException { + public StoredProvenanceEvent getEvent(long id) throws IOException { if (id > records.size()) { return null; } @@ -128,4 +129,34 @@ public class MockProvenanceEventRepository implements ProvenanceEventRepository public ProvenanceEventBuilder eventBuilder() { return new StandardProvenanceEventRecord.Builder(); } + + @Override + public Long getEarliestEventTime() throws IOException { + final StoredProvenanceEvent event = getEvent(0); + if ( event == null ) { + return null; + } + + return event.getEventTime(); + } + + @Override + public StoredProvenanceEvent getEvent(final StorageLocation location) throws IOException { + if ( location instanceof EventIdLocation ) { + return getEvent( ((EventIdLocation) location).getId() ); + } + throw new IllegalArgumentException("Invalid StorageLocation"); + } + + @Override + public List<StoredProvenanceEvent> getEvents(final List<StorageLocation> storageLocations) throws IOException { + final List<StoredProvenanceEvent> events = new ArrayList<>(storageLocations.size()); + for ( final StorageLocation location : storageLocations ) { + final StoredProvenanceEvent event = getEvent(location); + if ( event != null ) { + events.add(event); + } + } + return events; + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java index 2015530..7780d04 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java @@ -18,6 +18,8 @@ package org.apache.nifi.cluster.manager.impl; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import org.apache.nifi.controller.status.ProcessGroupStatus; @@ -25,6 +27,8 @@ import org.apache.nifi.events.EventReporter; import org.apache.nifi.provenance.ProvenanceEventBuilder; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.StorageLocation; +import org.apache.nifi.provenance.StoredProvenanceEvent; import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; import org.apache.nifi.provenance.search.Query; import org.apache.nifi.provenance.search.QuerySubmission; @@ -59,12 +63,12 @@ public class ClusteredEventAccess implements EventAccess { } @Override - public ProvenanceEventRecord getEvent(long eventId) throws IOException { + public StoredProvenanceEvent getEvent(long eventId) throws IOException { return null; } @Override - public List<ProvenanceEventRecord> getEvents(long startEventId, int maxEvents) throws IOException { + public List<StoredProvenanceEvent> getEvents(long startEventId, int maxEvents) throws IOException { return new ArrayList<>(); } @@ -88,10 +92,6 @@ public class ClusteredEventAccess implements EventAccess { } @Override - public void registerEvents(final Iterable<ProvenanceEventRecord> events) { - } - - @Override public ComputeLineageSubmission retrieveLineageSubmission(final String submissionId) { return null; } @@ -130,6 +130,26 @@ public class ClusteredEventAccess implements EventAccess { public void initialize(EventReporter eventReporter) throws IOException { } + + @Override + public Long getEarliestEventTime() throws IOException { + return null; + } + + @Override + public StoredProvenanceEvent getEvent(final StorageLocation location) throws IOException { + return null; + } + + @Override + public List<StoredProvenanceEvent> getEvents(final List<StorageLocation> storageLocations) throws IOException { + return Collections.emptyList(); + } + + @Override + public void registerEvents(final Collection<ProvenanceEventRecord> events) throws IOException { + + } }; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index dcb461c..899fccc 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -28,11 +28,9 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -53,9 +51,6 @@ import org.apache.nifi.controller.repository.io.LimitedInputStream; import org.apache.nifi.controller.repository.io.LongHolder; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.stream.io.NonCloseableInputStream; -import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.ProcessSession; @@ -74,6 +69,9 @@ import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.stream.io.NonCloseableInputStream; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -299,7 +297,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE resetReadClaim(); final long updateProvenanceStart = System.nanoTime(); - updateProvenanceRepo(checkpoint); + try { + updateProvenanceRepo(checkpoint); + } catch (final IOException ioe) { + rollback(); + throw new ProcessException("Provenance Repository failed to update", ioe); + } final long claimRemovalStart = System.nanoTime(); final long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart; @@ -497,7 +500,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE eventTypes.add(eventType); } - private void updateProvenanceRepo(final Checkpoint checkpoint) { + private void updateProvenanceRepo(final Checkpoint checkpoint) throws IOException { // Update Provenance Repository final ProvenanceEventRepository provenanceRepo = context.getProvenanceRepository(); @@ -641,46 +644,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } final List<ProvenanceEventRecord> autoTermEvents = checkpoint.autoTerminatedEvents; - final Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>() { - final Iterator<ProvenanceEventRecord> recordsToSubmitIterator = recordsToSubmit.iterator(); - final Iterator<ProvenanceEventRecord> autoTermIterator = autoTermEvents == null ? null : autoTermEvents.iterator(); - - @Override - public Iterator<ProvenanceEventRecord> iterator() { - return new Iterator<ProvenanceEventRecord>() { - @Override - public boolean hasNext() { - return recordsToSubmitIterator.hasNext() || (autoTermIterator != null && autoTermIterator.hasNext()); - } - - @Override - public ProvenanceEventRecord next() { - if (recordsToSubmitIterator.hasNext()) { - final ProvenanceEventRecord rawEvent = recordsToSubmitIterator.next(); - - // Update the Provenance Event Record with all of the info that we know about the event. - // For SEND events, we do not want to update the FlowFile info on the Event, because the event should - // reflect the FlowFile as it was sent to the remote system. However, for other events, we want to use - // the representation of the FlowFile as it is committed, as this is the only way in which it really - // exists in our system -- all other representations are volatile representations that have not been - // exposed. - return enrich(rawEvent, flowFileRecordMap, checkpoint.records, rawEvent.getEventType() != ProvenanceEventType.SEND); - } else if (autoTermIterator != null && autoTermIterator.hasNext()) { - return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true); - } - - throw new NoSuchElementException(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - }; - - provenanceRepo.registerEvents(iterable); + + final List<ProvenanceEventRecord> enrichedEvents = new ArrayList<>(); + for ( final ProvenanceEventRecord record : recordsToSubmit ) { + enrichedEvents.add(enrich(record, flowFileRecordMap, checkpoint.records, record.getEventType() != ProvenanceEventType.SEND)); + } + for ( final ProvenanceEventRecord record : autoTermEvents ) { + enrichedEvents.add(enrich(record, flowFileRecordMap, checkpoint.records, true)); + } + + provenanceRepo.registerEvents(enrichedEvents); } private void updateEventContentClaims(final ProvenanceEventBuilder builder, final FlowFile flowFile, final StandardRepositoryRecord repoRecord) { @@ -1140,7 +1113,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final Connection conn = connections.get(context.getNextIncomingConnectionIndex() % connections.size()); final Set<FlowFileRecord> expired = new HashSet<>(); final FlowFileRecord flowFile = conn.getFlowFileQueue().poll(expired); - removeExpired(expired, conn); + + try { + removeExpired(expired, conn); + } catch (final IOException ioe) { + throw new ProcessException("Failed to update repositories to remove expired FlowFiles", ioe); + } if (flowFile != null) { registerDequeuedRecord(flowFile, conn); @@ -1201,7 +1179,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE for (final Connection conn : connections) { final Set<FlowFileRecord> expired = new HashSet<>(); final List<FlowFileRecord> newlySelected = poller.poll(conn.getFlowFileQueue(), expired); - removeExpired(expired, conn); + try { + removeExpired(expired, conn); + } catch (final IOException ioe) { + throw new ProcessException("Failed to update repositories to remove expired FlowFiles", ioe); + } if (newlySelected.isEmpty() && expired.isEmpty()) { continue; @@ -1571,7 +1553,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } - public void expireFlowFiles() { + public void expireFlowFiles() throws IOException { final Set<FlowFileRecord> expired = new HashSet<>(); final FlowFileFilter filter = new FlowFileFilter() { @Override @@ -1589,7 +1571,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } - private void removeExpired(final Set<FlowFileRecord> flowFiles, final Connection connection) { + private void removeExpired(final Set<FlowFileRecord> flowFiles, final Connection connection) throws IOException { if (flowFiles.isEmpty()) { return; } @@ -1612,7 +1594,31 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final Map<String, FlowFileRecord> recordIdMap = new HashMap<>(); for (final FlowFileRecord flowFile : flowFiles) { recordIdMap.put(flowFile.getAttribute(CoreAttributes.UUID.key()), flowFile); + } + + final Set<ProvenanceEventRecord> expiredEvents = expiredReporter.getEvents(); + final List<ProvenanceEventRecord> events = new ArrayList<>(expiredEvents.size()); + for ( final ProvenanceEventRecord event : expiredEvents ) { + final StandardProvenanceEventRecord.Builder enriched = new StandardProvenanceEventRecord.Builder().fromEvent(event); + final FlowFileRecord record = recordIdMap.get(event.getFlowFileUuid()); + if (record == null) { + continue; + } + + final ContentClaim claim = record.getContentClaim(); + if (claim != null) { + enriched.setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize()); + enriched.setPreviousContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize()); + } + enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap()); + events.add(enriched.build()); + } + + context.getProvenanceRepository().registerEvents(events); + context.getFlowFileRepository().updateRepository(expiredRecords); + + for ( final FlowFileRecord flowFile : flowFiles ) { final StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile); record.markForDelete(); expiredRecords.add(record); @@ -1623,53 +1629,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final Object terminator = (connectable instanceof ProcessorNode) ? ((ProcessorNode) connectable).getProcessor() : connectable; LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife}); } - - try { - final Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>() { - @Override - public Iterator<ProvenanceEventRecord> iterator() { - final Iterator<ProvenanceEventRecord> expiredEventIterator = expiredReporter.getEvents().iterator(); - final Iterator<ProvenanceEventRecord> enrichingIterator = new Iterator<ProvenanceEventRecord>() { - @Override - public boolean hasNext() { - return expiredEventIterator.hasNext(); - } - - @Override - public ProvenanceEventRecord next() { - final ProvenanceEventRecord event = expiredEventIterator.next(); - final StandardProvenanceEventRecord.Builder enriched = new StandardProvenanceEventRecord.Builder().fromEvent(event); - final FlowFileRecord record = recordIdMap.get(event.getFlowFileUuid()); - if (record == null) { - return null; - } - - final ContentClaim claim = record.getContentClaim(); - if (claim != null) { - enriched.setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize()); - enriched.setPreviousContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize()); - } - - enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap()); - return enriched.build(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - - return enrichingIterator; - } - }; - - context.getProvenanceRepository().registerEvents(iterable); - context.getFlowFileRepository().updateRepository(expiredRecords); - } catch (final IOException e) { - LOG.error("Failed to update FlowFile Repository to record expired records due to {}", e); - } - } private InputStream getInputStream(final FlowFile flowFile, final ContentClaim claim, final long offset) throws ContentNotFoundException { @@ -2438,7 +2397,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final ProvenanceEventRecord dropEvent = provenanceReporter.drop(suspectRecord.getCurrent(), nfe.getMessage() == null ? "Content Not Found" : nfe.getMessage()); if (dropEvent != null) { - context.getProvenanceRepository().registerEvent(dropEvent); + try { + context.getProvenanceRepository().registerEvent(dropEvent); + } catch (final IOException ioe) { + LOG.error("{} Failed to register DROP Provenance event for {} when handling ContentNotFound error due to {}", this, suspectRecord.getCurrent(), ioe.toString()); + if ( LOG.isDebugEnabled() ) { + LOG.error("", ioe); + } + } } if (missingClaim == registeredClaim) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java index a351a68..d0020b5 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java @@ -16,9 +16,12 @@ */ package org.apache.nifi.controller.tasks; +import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import javax.print.attribute.standard.Severity; + import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Funnel; @@ -29,9 +32,12 @@ import org.apache.nifi.controller.repository.ProcessContext; import org.apache.nifi.controller.repository.StandardProcessSession; import org.apache.nifi.controller.repository.StandardProcessSessionFactory; import org.apache.nifi.controller.scheduling.ProcessContextFactory; +import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This task runs through all Connectable Components and goes through its @@ -39,7 +45,8 @@ import org.apache.nifi.util.FormatUtils; * desired side effect of expiring old FlowFiles. */ public class ExpireFlowFiles implements Runnable { - + private static final Logger logger = LoggerFactory.getLogger(ExpireFlowFiles.class); + private final FlowController flowController; private final ProcessContextFactory contextFactory; @@ -51,7 +58,19 @@ public class ExpireFlowFiles implements Runnable { @Override public void run() { final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId()); - expireFlowFiles(rootGroup); + + try { + expireFlowFiles(rootGroup); + } catch (final Exception e) { + logger.error("Failed to expire FlowFiles due to {}", e.toString()); + if ( logger.isDebugEnabled() ) { + logger.error("", e); + } + + flowController.getBulletinRepository().addBulletin(BulletinFactory.createBulletin( + "FlowFile Expiration", Severity.ERROR.getName(), "Could not expire FlowFiles due to " + e)); + + } } private StandardProcessSession createSession(final Connectable connectable) { @@ -60,7 +79,7 @@ public class ExpireFlowFiles implements Runnable { return sessionFactory.createSession(); } - private void expireFlowFiles(final Connectable connectable) { + private void expireFlowFiles(final Connectable connectable) throws IOException { // determine if the incoming connections for this Connectable have Expiration configured. boolean expirationConfigured = false; for (final Connection incomingConn : connectable.getIncomingConnections()) { @@ -80,7 +99,7 @@ public class ExpireFlowFiles implements Runnable { session.commit(); } - private void expireFlowFiles(final ProcessGroup group) { + private void expireFlowFiles(final ProcessGroup group) throws IOException { for (final ProcessorNode procNode : group.getProcessors()) { expireFlowFiles(procNode); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 1ff63c5..7ae156c 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -65,6 +65,7 @@ import org.apache.nifi.provenance.MockProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.StoredProvenanceEvent; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -357,7 +358,7 @@ public class TestStandardProcessSession { session.commit(); - final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 1000); + final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 1000); // We should have a JOIN and 2 ATTRIBUTE_MODIFIED's assertEquals(3, events.size()); @@ -412,7 +413,7 @@ public class TestStandardProcessSession { session.remove(orig); session.commit(); - final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 1000); + final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 1000); assertEquals(2, events.size()); final ProvenanceEventRecord firstRecord = events.get(0); @@ -838,7 +839,7 @@ public class TestStandardProcessSession { session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); session.commit(); - final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000); + final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 10000); assertFalse(events.isEmpty()); assertEquals(1, events.size()); @@ -857,7 +858,7 @@ public class TestStandardProcessSession { session.transfer(newFlowFile, new Relationship.Builder().name("A").build()); session.commit(); - final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000); + final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 10000); assertFalse(events.isEmpty()); assertEquals(1, events.size()); @@ -883,7 +884,7 @@ public class TestStandardProcessSession { session.transfer(existingFlowFile, new Relationship.Builder().name("A").build()); session.commit(); - final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000); + final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 10000); assertFalse(events.isEmpty()); assertEquals(1, events.size()); @@ -904,7 +905,7 @@ public class TestStandardProcessSession { session.transfer(existingFlowFile, new Relationship.Builder().name("A").build()); session.commit(); - final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000); + final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 10000); assertFalse(events.isEmpty()); assertEquals(1, events.size()); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index b009581..56db464 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -697,9 +697,9 @@ public class ControllerFacade implements ControllerServiceProvider { resultsDto.setTimeOffset(TimeZone.getDefault().getOffset(now.getTime())); // get the oldest available event time - final List<ProvenanceEventRecord> firstEvent = provenanceRepository.getEvents(0, 1); - if (!firstEvent.isEmpty()) { - resultsDto.setOldestEvent(new Date(firstEvent.get(0).getEventTime())); + final Long oldestEventTime = provenanceRepository.getEarliestEventTime(); + if (oldestEventTime != null) { + resultsDto.setOldestEvent(new Date(oldestEventTime)); } provenanceDto.setResults(resultsDto); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml index 5997281..4e9e9fb 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml @@ -36,5 +36,10 @@ <groupId>org.apache.lucene</groupId> <artifactId>lucene-queryparser</artifactId> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java index 2130e73..7911d73 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java @@ -55,6 +55,8 @@ import org.apache.nifi.provenance.journaling.partition.PartitionAction; import org.apache.nifi.provenance.journaling.partition.PartitionManager; import org.apache.nifi.provenance.journaling.partition.QueuingPartitionManager; import org.apache.nifi.provenance.journaling.partition.VoidPartitionAction; +import org.apache.nifi.provenance.journaling.query.QueryManager; +import org.apache.nifi.provenance.journaling.query.StandardQueryManager; import org.apache.nifi.provenance.journaling.toc.StandardTocReader; import org.apache.nifi.provenance.journaling.toc.TocReader; import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; @@ -67,15 +69,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class JournalingProvenanceRepository implements ProvenanceEventRepository { + public static final String BLOCK_SIZE = "nifi.provenance.block.size"; + private static final Logger logger = LoggerFactory.getLogger(JournalingProvenanceRepository.class); private final JournalingRepositoryConfig config; - private final PartitionManager partitionManager; private final AtomicLong idGenerator = new AtomicLong(0L); - - private EventReporter eventReporter; // effectively final private final ExecutorService executor; + private EventReporter eventReporter; // effectively final + private PartitionManager partitionManager; // effectively final + private QueryManager queryManager; // effectively final public JournalingProvenanceRepository() throws IOException { this(createConfig()); @@ -84,7 +88,6 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository public JournalingProvenanceRepository(final JournalingRepositoryConfig config) throws IOException { this.config = config; this.executor = Executors.newFixedThreadPool(config.getThreadPoolSize()); - this.partitionManager = new QueuingPartitionManager(config, executor); } @@ -110,7 +113,8 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository final boolean compressOnRollover = Boolean.parseBoolean(properties.getProperty(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER)); final String indexedFieldString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS); final String indexedAttrString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES); - + final int blockSize = properties.getIntegerProperty(BLOCK_SIZE, 1000); + final Boolean alwaysSync = Boolean.parseBoolean(properties.getProperty("nifi.provenance.repository.always.sync", "false")); final List<SearchableField> searchableFields = SearchableFieldParser.extractSearchableFields(indexedFieldString, true); @@ -137,7 +141,8 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository config.setMaxStorageCapacity(maxStorageBytes); config.setThreadPoolSize(queryThreads); config.setPartitionCount(journalCount); - + config.setBlockSize(blockSize); + if (shardSize != null) { config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue()); } @@ -150,6 +155,9 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository @Override public synchronized void initialize(final EventReporter eventReporter) throws IOException { this.eventReporter = eventReporter; + + this.partitionManager = new QueuingPartitionManager(config, executor); + this.queryManager = new StandardQueryManager(partitionManager, config, 10); } @Override @@ -328,14 +336,12 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository @Override public QuerySubmission submitQuery(final Query query) { - // TODO Auto-generated method stub - return null; + return queryManager.submitQuery(query); } @Override public QuerySubmission retrieveQuerySubmission(final String queryIdentifier) { - // TODO Auto-generated method stub - return null; + return queryManager.retrieveQuerySubmission(queryIdentifier); } @Override @@ -364,7 +370,10 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository @Override public void close() throws IOException { - partitionManager.shutdown(); + if ( partitionManager != null ) { + partitionManager.shutdown(); + } + executor.shutdown(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java index 6dd7be9..8998932 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java @@ -34,7 +34,7 @@ public class JournalingRepositoryConfig { private long journalCapacity = 1024L * 1024L * 5L; // 5 MB private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB private int partitionCount = 16; - private int blockSize = 100; + private int blockSize = 5000; private List<SearchableField> searchableFields = new ArrayList<>(); private List<SearchableField> searchableAttributes = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java index 82ef39b..2ec5131 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java @@ -163,7 +163,7 @@ public class StandardJournalReader implements JournalReader { } } - throw new IOException("Could not find event with ID " + eventId); + throw new IOException("Could not find event with ID " + eventId + " in " + this); } @Override http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java index 51f84a2..651c41e 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java @@ -214,6 +214,7 @@ public class JournalingPartition implements Partition { final File journalFile = new File(journalsDir, firstEventId + JOURNAL_FILE_EXTENSION); journalWriter = new StandardJournalWriter(firstEventId, journalFile, false, new StandardEventSerializer()); tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false); + tocWriter.addBlockOffset(journalWriter.getSize()); numEventsAtEndOfLastBlock = 0; } @@ -421,4 +422,9 @@ public class JournalingPartition implements Partition { public Long getEarliestEventTime() throws IOException { return earliestEventTime; } + + @Override + public String toString() { + return "Partition[section=" + sectionName + "]"; + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java index 4ac0fc6..51d90e2 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java @@ -35,9 +35,13 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; import org.apache.nifi.util.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class QueuingPartitionManager implements PartitionManager { - + + private static final Logger logger = LoggerFactory.getLogger(QueuingPartitionManager.class); + private final JournalingRepositoryConfig config; private final BlockingQueue<Partition> partitionQueue; private final JournalingPartition[] partitionArray; @@ -180,6 +184,39 @@ public class QueuingPartitionManager implements PartitionManager { @Override public void withEachPartition(final VoidPartitionAction action, final boolean async) { + // TODO: Do not use blacklisted partitions. + final Map<Partition, Future<?>> futures = new HashMap<>(partitionArray.length); + for ( final Partition partition : partitionArray ) { + final Runnable runnable = new Runnable() { + @Override + public void run() { + try { + action.perform(partition); + } catch (final Throwable t) { + logger.error("Failed to perform action against " + partition + " due to " + t); + if ( logger.isDebugEnabled() ) { + logger.error("", t); + } + } + } + }; + + final Future<?> future = executor.submit(runnable); + futures.put(partition, future); + } + if ( !async ) { + for ( final Map.Entry<Partition, Future<?>> entry : futures.entrySet() ) { + try { + // throw any exception thrown by runnable + entry.getValue().get(); + } catch (final ExecutionException ee) { + final Throwable cause = ee.getCause(); + throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java new file mode 100644 index 0000000..4edc6ad --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.query; + +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.QuerySubmission; + +public interface QueryManager { + /** + * Submits an asynchronous request to process the given query, returning an + * identifier that can be used to fetch the results at a later time + * + * @param query + * @return + */ + QuerySubmission submitQuery(Query query); + + /** + * Returns the QueryResult associated with the given identifier, if the + * query has finished processing. If the query has not yet finished running, + * returns <code>null</code>. + * + * @param queryIdentifier + * + * @return + */ + QuerySubmission retrieveQuerySubmission(String queryIdentifier); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java new file mode 100644 index 0000000..4cce231 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling.query; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.nifi.provenance.AsyncQuerySubmission; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.StoredProvenanceEvent; +import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent; +import org.apache.nifi.provenance.journaling.JournaledStorageLocation; +import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; +import org.apache.nifi.provenance.journaling.index.EventIndexSearcher; +import org.apache.nifi.provenance.journaling.index.QueryUtils; +import org.apache.nifi.provenance.journaling.index.SearchResult; +import org.apache.nifi.provenance.journaling.journals.JournalReader; +import org.apache.nifi.provenance.journaling.journals.StandardJournalReader; +import org.apache.nifi.provenance.journaling.partition.Partition; +import org.apache.nifi.provenance.journaling.partition.PartitionManager; +import org.apache.nifi.provenance.journaling.partition.VoidPartitionAction; +import org.apache.nifi.provenance.journaling.toc.StandardTocReader; +import org.apache.nifi.provenance.journaling.toc.TocReader; +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.QuerySubmission; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StandardQueryManager implements QueryManager { + private static final Logger logger = LoggerFactory.getLogger(StandardQueryManager.class); + + private final int maxConcurrentQueries; + private final JournalingRepositoryConfig config; + private final PartitionManager partitionManager; + private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<>(); + + public StandardQueryManager(final PartitionManager partitionManager, final JournalingRepositoryConfig config, final int maxConcurrentQueries) { + this.config = config; + this.maxConcurrentQueries = maxConcurrentQueries; + this.partitionManager = partitionManager; + } + + @Override + public QuerySubmission submitQuery(final Query query) { + final int numQueries = querySubmissionMap.size(); + if (numQueries > maxConcurrentQueries) { + throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not been deleted (likely due to poorly behaving clients not issuing DELETE requests). Please try again later."); + } + + if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) { + throw new IllegalArgumentException("Query End Time cannot be before Query Start Time"); + } + + if (query.getSearchTerms().isEmpty() && query.getStartDate() == null && query.getEndDate() == null) { + final AsyncQuerySubmission result = new AsyncQuerySubmission(query, 1); + + querySubmissionMap.put(query.getIdentifier(), result); + return result; + } + + final AtomicInteger retrievalCount = new AtomicInteger(query.getMaxResults()); + final AsyncQuerySubmission submission = new AsyncQuerySubmission(query, config.getPartitionCount()) { + @Override + public void cancel() { + super.cancel(); + querySubmissionMap.remove(query.getIdentifier()); + } + }; + + querySubmissionMap.put(query.getIdentifier(), submission); + + partitionManager.withEachPartition(new VoidPartitionAction() { + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public void perform(final Partition partition) throws IOException { + logger.debug("Running {} against {}", query, partition); + + try (final EventIndexSearcher searcher = partition.newIndexSearcher()) { + final SearchResult searchResult = searcher.search(query); + logger.debug("{} has {} hits against {} over {} files", query, searchResult.getTotalCount(), partition, searchResult.getLocations().size()); + + final List<StoredProvenanceEvent> matchingRecords = new ArrayList<>(); + final Map<File, List<JournaledStorageLocation>> locationMap = QueryUtils.orderLocations((List) searchResult.getLocations(), config); + for ( final Map.Entry<File, List<JournaledStorageLocation>> entry : locationMap.entrySet() ) { + final File journalFile = entry.getKey(); + final List<JournaledStorageLocation> locations = entry.getValue(); + + if ( retrievalCount.get() <= 0 ) { + break; + } + + try (final JournalReader reader = new StandardJournalReader(journalFile); + final TocReader tocReader = new StandardTocReader(QueryUtils.getTocFile(journalFile))) { + + for ( final JournaledStorageLocation location : locations ) { + final long blockOffset = tocReader.getBlockOffset(location.getBlockIndex()); + final ProvenanceEventRecord event = reader.getEvent(blockOffset, location.getEventId()); + matchingRecords.add(new JournaledProvenanceEvent(event, location)); + + final int recordsLeft = retrievalCount.decrementAndGet(); + if ( recordsLeft <= 0 ) { + break; + } + } + } + } + + logger.debug("Finished executing {} against {}", query, partition); + submission.getResult().update(matchingRecords, searchResult.getTotalCount()); + } catch (final Exception e) { + submission.getResult().setError("Failed to query " + partition + " due to " + e.toString()); + throw e; + } + } + }, true); + + return submission; + } + + @Override + public QuerySubmission retrieveQuerySubmission(final String queryIdentifier) { + return querySubmissionMap.get(queryIdentifier); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceEventRepository ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceEventRepository b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceEventRepository new file mode 100644 index 0000000..e224c51 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceEventRepository @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.provenance.journaling.JournalingProvenanceRepository \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java new file mode 100644 index 0000000..a547a8a --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.journaling; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.nifi.provenance.SearchableFields; +import org.apache.nifi.provenance.StoredProvenanceEvent; +import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig; +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.QueryResult; +import org.apache.nifi.provenance.search.QuerySubmission; +import org.apache.nifi.provenance.search.SearchTerms; +import org.apache.nifi.provenance.search.SearchableField; +import org.apache.nifi.util.file.FileUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestJournalingProvenanceRepository { + + + @BeforeClass + public static void setupLogging() { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance.journaling", "DEBUG"); + } + + @Test + public void testStoreAndRetrieve() throws IOException { + final JournalingRepositoryConfig config = new JournalingRepositoryConfig(); + final Map<String, File> containers = new HashMap<>(); + containers.put("container1", new File("target/" + UUID.randomUUID().toString())); + containers.put("container2", new File("target/" + UUID.randomUUID().toString())); + config.setContainers(containers); + config.setPartitionCount(3); + + try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) { + repo.initialize(null); + final Map<String, String> attributes = new HashMap<>(); + + for (int i=0; i < 10; i++) { + attributes.put("i", String.valueOf(i)); + repo.registerEvent(TestUtil.generateEvent(i, attributes)); + } + + // retrieve records one at a time. + for (int i=0; i < 10; i++) { + final StoredProvenanceEvent event = repo.getEvent(i); + assertNotNull(event); + assertEquals((long) i, event.getEventId()); + assertEquals("00000000-0000-0000-0000-00000000000" + i, event.getFlowFileUuid()); + } + + final List<StoredProvenanceEvent> events = repo.getEvents(0, 1000); + assertNotNull(events); + assertEquals(10, events.size()); + for (int i=0; i < 10; i++) { + final StoredProvenanceEvent event = events.get(i); + assertNotNull(event); + assertEquals((long) i, event.getEventId()); + assertEquals("00000000-0000-0000-0000-00000000000" + i, event.getFlowFileUuid()); + } + } finally { + for ( final File file : containers.values() ) { + if ( file.exists() ) { + FileUtils.deleteFile(file, true); + } + } + } + } + + + @Test(timeout=10000000) + public void testSearchByUUID() throws IOException, InterruptedException { + final JournalingRepositoryConfig config = new JournalingRepositoryConfig(); + final Map<String, File> containers = new HashMap<>(); + containers.put("container1", new File("target/" + UUID.randomUUID().toString())); + containers.put("container2", new File("target/" + UUID.randomUUID().toString())); + config.setContainers(containers); + + config.setPartitionCount(3); + config.setSearchableFields(Arrays.asList(new SearchableField[] { + SearchableFields.FlowFileUUID + })); + + try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) { + repo.initialize(null); + + final Map<String, String> attributes = new HashMap<>(); + + for (int i=0; i < 10; i++) { + attributes.put("i", String.valueOf(i)); + repo.registerEvent(TestUtil.generateEvent(i, attributes)); + } + + final Query query = new Query("query"); + query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000-000000000005")); + final QuerySubmission submission = repo.submitQuery(query); + assertNotNull(submission); + + final QueryResult result = submission.getResult(); + while ( !result.isFinished() ) { + Thread.sleep(50L); + } + + assertNull(result.getError()); + final List<StoredProvenanceEvent> matches = result.getMatchingEvents(); + assertNotNull(matches); + assertEquals(1, matches.size()); + + final StoredProvenanceEvent event = matches.get(0); + assertEquals(5, event.getEventId()); + assertEquals("00000000-0000-0000-0000-000000000005", event.getFlowFileUuid()); + } finally { + for ( final File file : containers.values() ) { + FileUtils.deleteFile(file, true); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java index 45b7338..6d05f7a 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.provenance.journaling; +import java.util.Collections; +import java.util.Map; import java.util.UUID; import org.apache.nifi.provenance.ProvenanceEventRecord; @@ -23,7 +25,12 @@ import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.StandardProvenanceEventRecord; public class TestUtil { + public static ProvenanceEventRecord generateEvent(final long id) { + return generateEvent(id, Collections.<String, String>emptyMap()); + } + + public static ProvenanceEventRecord generateEvent(final long id, final Map<String, String> attributes) { // Create prov event to add to the stream final ProvenanceEventRecord event = new StandardProvenanceEventRecord.Builder() .setEventType(ProvenanceEventType.CREATE) @@ -34,6 +41,7 @@ public class TestUtil { .setFlowFileEntryDate(System.currentTimeMillis() - 1000L) .setLineageStartDate(System.currentTimeMillis() - 2000L) .setCurrentContentClaim(null, null, null, null, 0L) + .setAttributes(null, attributes == null ? Collections.<String, String>emptyMap() : attributes) .build(); return event;