NIFI-4060: Initial implementation of MergeRecord NIFI-4060: Addressed threading issue with RecordBin being updated after it is completed; fixed issue that caused mime.type attribute not to be written properly if all incoming flowfiles already have a different value for that attribute
NIFI-4060: Bug fixes; improved documentation; added a lot of debug information; updated StandardProcessSession to produce more accurate logs in case of a session being committed/rolled back with open input/output streams Signed-off-by: Matt Burgess <mattyb...@apache.org> This closes #1958 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b603cb95 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b603cb95 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b603cb95 Branch: refs/heads/master Commit: b603cb955dcd1d3d9b5e374e5760f2f9b047bda9 Parents: eefad29 Author: Mark Payne <marka...@hotmail.com> Authored: Mon Jun 26 13:15:03 2017 -0400 Committer: Matt Burgess <mattyb...@apache.org> Committed: Wed Jul 12 16:36:48 2017 -0400 ---------------------------------------------------------------------- .../nifi/processor/util/bin/BinFiles.java | 14 +- .../record/CommaSeparatedRecordReader.java | 102 +++++ .../repository/StandardProcessSession.java | 19 +- .../nifi/processors/standard/MergeContent.java | 104 +---- .../nifi/processors/standard/MergeRecord.java | 358 ++++++++++++++++ .../standard/merge/AttributeStrategy.java | 27 ++ .../standard/merge/AttributeStrategyUtil.java | 56 +++ .../merge/KeepCommonAttributeStrategy.java | 64 +++ .../merge/KeepUniqueAttributeStrategy.java | 58 +++ .../processors/standard/merge/RecordBin.java | 424 +++++++++++++++++++ .../standard/merge/RecordBinManager.java | 295 +++++++++++++ .../standard/merge/RecordBinThresholds.java | 69 +++ .../org.apache.nifi.processor.Processor | 1 + .../additionalDetails.html | 229 ++++++++++ .../processors/standard/TestMergeContent.java | 9 +- .../processors/standard/TestMergeRecord.java | 360 ++++++++++++++++ 16 files changed, 2076 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java index b15d23b..7f79b70 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java @@ -131,9 +131,10 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { * * @param context context * @param flowFile flowFile + * @param session the session for accessing the FlowFile * @return The appropriate group ID */ - protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile); + protected abstract String getGroupId(final ProcessContext context, final FlowFile flowFile, final ProcessSession session); /** * Performs any additional setup of the bin manager. Called during the OnScheduled phase. @@ -271,8 +272,15 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { final Map<String, List<FlowFile>> flowFileGroups = new HashMap<>(); for (FlowFile flowFile : flowFiles) { flowFile = this.preprocessFlowFile(context, session, flowFile); - final String groupingIdentifier = getGroupId(context, flowFile); - flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile); + + try { + final String groupingIdentifier = getGroupId(context, flowFile, session); + flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile); + } catch (final Exception e) { + getLogger().error("Could not determine which Bin to add {} to; will route to failure", new Object[] {flowFile}, e); + session.transfer(flowFile, REL_FAILURE); + continue; + } } for (final Map.Entry<String, List<FlowFile>> entry : flowFileGroups.entrySet()) { http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java new file mode 100644 index 0000000..8973055 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; + +public class CommaSeparatedRecordReader extends AbstractControllerService implements RecordReaderFactory { + private int failAfterN; + private int recordCount = 0; + + public CommaSeparatedRecordReader() { + this(-1); + } + + public CommaSeparatedRecordReader(final int failAfterN) { + this.failAfterN = failAfterN; + } + + public void failAfter(final int failAfterN) { + this.failAfterN = failAfterN; + } + + @Override + public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { + final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + + final List<RecordField> fields = new ArrayList<>(); + + final String headerLine = reader.readLine(); + for (final String colName : headerLine.split(",")) { + fields.add(new RecordField(colName.trim(), RecordFieldType.STRING.getDataType())); + } + + return new RecordReader() { + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public Record nextRecord() throws IOException, MalformedRecordException { + if (failAfterN > -1 && recordCount >= failAfterN) { + throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read"); + } + + final String nextLine = reader.readLine(); + if (nextLine == null) { + return null; + } + + recordCount++; + + final String[] values = nextLine.split(","); + final Map<String, Object> valueMap = new HashMap<>(); + int i = 0; + for (final RecordField field : fields) { + final String fieldName = field.getFieldName(); + valueMap.put(fieldName, values[i++].trim()); + } + + return new MapRecord(new SimpleRecordSchema(fields), valueMap); + } + + @Override + public RecordSchema getSchema() { + return new SimpleRecordSchema(fields); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index d34d8cf..d2a6af6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -191,13 +191,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE processingStartTime = System.nanoTime(); } - private void closeStreams(final Map<FlowFile, ? extends Closeable> streamMap) { + private void closeStreams(final Map<FlowFile, ? extends Closeable> streamMap, final String action, final String streamType) { final Map<FlowFile, ? extends Closeable> openStreamCopy = new HashMap<>(streamMap); // avoid ConcurrentModificationException by creating a copy of the List for (final Map.Entry<FlowFile, ? extends Closeable> entry : openStreamCopy.entrySet()) { final FlowFile flowFile = entry.getKey(); final Closeable openStream = entry.getValue(); - LOG.warn("{} closing {} for {} because the session was committed without the stream being closed.", this, openStream, flowFile); + LOG.warn("{} closing {} for {} because the session was {} without the {} stream being closed.", this, openStream, flowFile, action, streamType); try { openStream.close(); @@ -212,8 +212,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE resetWriteClaims(false); - closeStreams(openInputStreams); - closeStreams(openOutputStreams); + closeStreams(openInputStreams, "committed", "input"); + closeStreams(openOutputStreams, "committed", "output"); if (!readRecursionSet.isEmpty()) { throw new IllegalStateException(); @@ -914,8 +914,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE deleteOnCommit.clear(); - closeStreams(openInputStreams); - closeStreams(openOutputStreams); + closeStreams(openInputStreams, "rolled back", "input"); + closeStreams(openOutputStreams, "rolled back", "output"); try { claimCache.reset(); @@ -2171,7 +2171,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE throw new FlowFileAccessException("Failed to access ContentClaim for " + source.toString(), e); } - final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), false); + final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), true); final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize()); final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn); final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim()); @@ -2470,7 +2470,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final long bytesWritten = countingOut.getBytesWritten(); StandardProcessSession.this.bytesWritten += bytesWritten; - openOutputStreams.remove(sourceFlowFile); + final OutputStream removed = openOutputStreams.remove(sourceFlowFile); + if (removed == null) { + LOG.error("Closed Session's OutputStream but there was no entry for it in the map; sourceFlowFile={}; map={}", sourceFlowFile, openOutputStreams); + } flush(); removeTemporaryClaim(record); http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index 3401d66..edbc033 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -82,6 +82,8 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.bin.Bin; import org.apache.nifi.processor.util.bin.BinFiles; import org.apache.nifi.processor.util.bin.BinManager; +import org.apache.nifi.processors.standard.merge.AttributeStrategy; +import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil; import org.apache.nifi.stream.io.NonCloseableOutputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.FlowFilePackager; @@ -126,7 +128,7 @@ import org.apache.nifi.util.FlowFilePackagerV3; @WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"), @WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively " + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output") }) -@SeeAlso(SegmentContent.class) +@SeeAlso({SegmentContent.class, MergeRecord.class}) public class MergeContent extends BinFiles { // preferred attributes @@ -201,8 +203,6 @@ public class MergeContent extends BinFiles { MERGE_FORMAT_AVRO_VALUE, "The Avro contents of all FlowFiles will be concatenated together into a single FlowFile"); - public static final String ATTRIBUTE_STRATEGY_ALL_COMMON = "Keep Only Common Attributes"; - public static final String ATTRIBUTE_STRATEGY_ALL_UNIQUE = "Keep All Unique Attributes"; public static final String TAR_PERMISSIONS_ATTRIBUTE = "tar.permissions"; public static final String MERGE_COUNT_ATTRIBUTE = "merge.count"; @@ -224,16 +224,6 @@ public class MergeContent extends BinFiles { .allowableValues(MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP, MERGE_FORMAT_FLOWFILE_STREAM_V3, MERGE_FORMAT_FLOWFILE_STREAM_V2, MERGE_FORMAT_FLOWFILE_TAR_V1, MERGE_FORMAT_CONCAT, MERGE_FORMAT_AVRO) .defaultValue(MERGE_FORMAT_CONCAT.getValue()) .build(); - public static final PropertyDescriptor ATTRIBUTE_STRATEGY = new PropertyDescriptor.Builder() - .required(true) - .name("Attribute Strategy") - .description("Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any " - + "attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. " - + "If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same " - + "value, will be preserved.") - .allowableValues(ATTRIBUTE_STRATEGY_ALL_COMMON, ATTRIBUTE_STRATEGY_ALL_UNIQUE) - .defaultValue(ATTRIBUTE_STRATEGY_ALL_COMMON) - .build(); public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() .name("Correlation Attribute Name") @@ -315,7 +305,7 @@ public class MergeContent extends BinFiles { final List<PropertyDescriptor> descriptors = new ArrayList<>(); descriptors.add(MERGE_STRATEGY); descriptors.add(MERGE_FORMAT); - descriptors.add(ATTRIBUTE_STRATEGY); + descriptors.add(AttributeStrategyUtil.ATTRIBUTE_STRATEGY); descriptors.add(CORRELATION_ATTRIBUTE_NAME); descriptors.add(MIN_ENTRIES); descriptors.add(MAX_ENTRIES); @@ -378,7 +368,7 @@ public class MergeContent extends BinFiles { } @Override - protected String getGroupId(final ProcessContext context, final FlowFile flowFile) { + protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final ProcessSession session) { final String correlationAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME) .evaluateAttributeExpressions(flowFile).getValue(); String groupId = correlationAttributeName == null ? null : flowFile.getAttribute(correlationAttributeName); @@ -429,16 +419,7 @@ public class MergeContent extends BinFiles { throw new AssertionError(); } - final AttributeStrategy attributeStrategy; - switch (context.getProperty(ATTRIBUTE_STRATEGY).getValue()) { - case ATTRIBUTE_STRATEGY_ALL_UNIQUE: - attributeStrategy = new KeepUniqueAttributeStrategy(); - break; - case ATTRIBUTE_STRATEGY_ALL_COMMON: - default: - attributeStrategy = new KeepCommonAttributeStrategy(); - break; - } + final AttributeStrategy attributeStrategy = AttributeStrategyUtil.strategyFor(context); final List<FlowFile> contents = bin.getContents(); final ProcessSession binSession = bin.getSession(); @@ -989,76 +970,7 @@ public class MergeContent extends BinFiles { } } - private static class KeepUniqueAttributeStrategy implements AttributeStrategy { - - @Override - public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) { - final Map<String, String> newAttributes = new HashMap<>(); - final Set<String> conflicting = new HashSet<>(); - - for (final FlowFile flowFile : flowFiles) { - for (final Map.Entry<String, String> attributeEntry : flowFile.getAttributes().entrySet()) { - final String name = attributeEntry.getKey(); - final String value = attributeEntry.getValue(); - - final String existingValue = newAttributes.get(name); - if (existingValue != null && !existingValue.equals(value)) { - conflicting.add(name); - } else { - newAttributes.put(name, value); - } - } - } - - for (final String attributeToRemove : conflicting) { - newAttributes.remove(attributeToRemove); - } - - // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent. - newAttributes.remove(CoreAttributes.UUID.key()); - return newAttributes; - } - } - - private static class KeepCommonAttributeStrategy implements AttributeStrategy { - - @Override - public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) { - final Map<String, String> result = new HashMap<>(); - - //trivial cases - if (flowFiles == null || flowFiles.isEmpty()) { - return result; - } else if (flowFiles.size() == 1) { - result.putAll(flowFiles.iterator().next().getAttributes()); - } - - /* - * Start with the first attribute map and only put an entry to the - * resultant map if it is common to every map. - */ - final Map<String, String> firstMap = flowFiles.iterator().next().getAttributes(); - - outer: - for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) { - final String key = mapEntry.getKey(); - final String value = mapEntry.getValue(); - - for (final FlowFile flowFile : flowFiles) { - final Map<String, String> currMap = flowFile.getAttributes(); - final String curVal = currMap.get(key); - if (curVal == null || !curVal.equals(value)) { - continue outer; - } - } - result.put(key, value); - } - // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent. - result.remove(CoreAttributes.UUID.key()); - return result; - } - } private static class FragmentComparator implements Comparator<FlowFile> { @@ -1079,8 +991,4 @@ public class MergeContent extends BinFiles { List<FlowFile> getUnmergedFlowFiles(); } - private interface AttributeStrategy { - - Map<String, String> getMergedAttributes(List<FlowFile> flowFiles); - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java new file mode 100644 index 0000000..b0e3f48 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.FlowFileFilters; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil; +import org.apache.nifi.processors.standard.merge.RecordBinManager; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; + + +@SideEffectFree +@TriggerWhenEmpty +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"merge", "record", "content", "correlation", "stream", "event"}) +@CapabilityDescription("This Processor merges together multiple record-oriented FlowFiles into a single FlowFile that contains all of the Records of the input FlowFiles. " + + "This Processor works by creating 'bins' and then adding FlowFiles to these bins until they are full. Once a bin is full, all of the FlowFiles will be combined into " + + "a single output FlowFile, and that FlowFile will be routed to the 'merged' Relationship. A bin will consist of potentially many 'like FlowFiles'. In order for two " + + "FlowFiles to be considered 'like FlowFiles', they must have the same Schema (as identified by the Record Reader) and, if the <Correlation Attribute Name> property " + + "is set, the same value for the specified attribute. See Processor Usage and Additional Details for more information.") +@ReadsAttributes({ + @ReadsAttribute(attribute = "fragment.identifier", description = "Applicable only if the <Merge Strategy> property is set to Defragment. " + + "All FlowFiles with the same value for this attribute will be bundled together."), + @ReadsAttribute(attribute = "fragment.count", description = "Applicable only if the <Merge Strategy> property is set to Defragment. This " + + "attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same " + + "bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected " + + "in the given bundle."), +}) +@WritesAttributes({ + @WritesAttribute(attribute = "record.count", description = "The merged FlowFile will have a 'record.count' attribute indicating the number of records " + + "that were written to the FlowFile."), + @WritesAttribute(attribute = "mime.type", description = "The MIME Type indicated by the Record Writer"), + @WritesAttribute(attribute = "merge.count", description = "The number of FlowFiles that were merged into this bundle"), + @WritesAttribute(attribute = "merge.bin.age", description = "The age of the bin, in milliseconds, when it was merged and output. Effectively " + + "this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output"), + @WritesAttribute(attribute = "<Attributes from Record Writer>", description = "Any Attribute that the configured Record Writer returns will be added to the FlowFile.") +}) +@SeeAlso({MergeContent.class, SplitRecord.class, PartitionRecord.class}) +public class MergeRecord extends AbstractSessionFactoryProcessor { + // attributes for defragmentation + public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key(); + public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key(); + public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key(); + + public static final String MERGE_COUNT_ATTRIBUTE = "merge.count"; + public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age"; + + public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue( + "Bin-Packing Algorithm", + "Bin-Packing Algorithm", + "Generates 'bins' of FlowFiles and fills each bin as full as possible. FlowFiles are placed into a bin based on their size and optionally " + + "their attributes (if the <Correlation Attribute> property is set)"); + public static final AllowableValue MERGE_STRATEGY_DEFRAGMENT = new AllowableValue( + "Defragment", + "Defragment", + "Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must " + + "have the attributes <fragment.identifier> and <fragment.count>. All FlowFiles with the same value for \"fragment.identifier\" " + + "will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. The ordering of " + + "the Records that are output is not guaranteed."); + + + public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for reading incoming data") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + public static final PropertyDescriptor MERGE_STRATEGY = new PropertyDescriptor.Builder() + .name("merge-strategy") + .displayName("Merge Strategy") + .description("Specifies the algorithm used to merge records. The 'Defragment' algorithm combines fragments that are associated by " + + "attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily " + + "chosen FlowFiles") + .required(true) + .allowableValues(MERGE_STRATEGY_BIN_PACK, MERGE_STRATEGY_DEFRAGMENT) + .defaultValue(MERGE_STRATEGY_BIN_PACK.getValue()) + .build(); + public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new PropertyDescriptor.Builder() + .name("correlation-attribute-name") + .displayName("Correlation Attribute Name") + .description("If specified, two FlowFiles will be binned together only if they have the same value for " + + "this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.") + .required(false) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) + .defaultValue(null) + .build(); + public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder() + .name("min-bin-size") + .displayName("Minimum Bin Size") + .description("The minimum size of for the bin") + .required(true) + .defaultValue("0 B") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder() + .name("max-bin-size") + .displayName("Maximum Bin Size") + .description("The maximum size for the bundle. If not specified, there is no maximum. This is a 'soft limit' in that if a FlowFile is added to a bin, " + + "all records in that FlowFile will be added, so this limit may be exceeded by up to the number of bytes in last input FlowFile.") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor MIN_RECORDS = new PropertyDescriptor.Builder() + .name("min-records") + .displayName("Minimum Number of Records") + .description("The minimum number of records to include in a bin") + .required(true) + .defaultValue("1") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + public static final PropertyDescriptor MAX_RECORDS = new PropertyDescriptor.Builder() + .name("max-records") + .displayName("Maximum Number of Records") + .description("The maximum number of Records to include in a bin. This is a 'soft limit' in that if a FlowFIle is added to a bin, all records in that FlowFile will be added, " + + "so this limit may be exceeded by up to the number of records in the last input FlowFile.") + .required(false) + .defaultValue("1000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder() + .name("max.bin.count") + .displayName("Maximum Number of Bins") + .description("Specifies the maximum number of bins that can be held in memory at any one time. " + + "This number should not be smaller than the maximum number of conurrent threads for this Processor, " + + "or the bins that are created will often consist only of a single incoming FlowFile.") + .defaultValue("10") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder() + .name("max-bin-age") + .displayName("Max Bin Age") + .description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> " + + "where <duration> is a positive integer and time unit is one of seconds, minutes, hours") + .required(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + + + public static final Relationship REL_MERGED = new Relationship.Builder() + .name("merged") + .description("The FlowFile containing the merged records") + .build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The FlowFiles that were used to create the bundle") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure") + .build(); + + private final AtomicReference<RecordBinManager> binManager = new AtomicReference<>(); + + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER); + properties.add(RECORD_WRITER); + properties.add(MERGE_STRATEGY); + properties.add(CORRELATION_ATTRIBUTE_NAME); + properties.add(AttributeStrategyUtil.ATTRIBUTE_STRATEGY); + properties.add(MIN_RECORDS); + properties.add(MAX_RECORDS); + properties.add(MIN_SIZE); + properties.add(MAX_SIZE); + properties.add(MAX_BIN_AGE); + properties.add(MAX_BIN_COUNT); + return properties; + } + + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_ORIGINAL); + relationships.add(REL_FAILURE); + relationships.add(REL_MERGED); + return relationships; + } + + + @OnStopped + public final void resetState() { + final RecordBinManager manager = binManager.get(); + if (manager != null) { + manager.purge(); + } + binManager.set(null); + } + + + @Override + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + RecordBinManager manager = binManager.get(); + while (manager == null) { + manager = new RecordBinManager(context, sessionFactory, getLogger()); + manager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); + final boolean updated = binManager.compareAndSet(null, manager); + if (!updated) { + manager = binManager.get(); + } + } + + final ProcessSession session = sessionFactory.createSession(); + final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250)); + if (getLogger().isDebugEnabled()) { + final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList()); + getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids}); + } + + final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue(); + final boolean block; + if (MERGE_STRATEGY_DEFRAGMENT.equals(mergeStrategy)) { + block = true; + } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) { + block = true; + } else { + block = false; + } + + try { + for (final FlowFile flowFile : flowFiles) { + try { + binFlowFile(context, flowFile, session, manager, block); + } catch (final Exception e) { + getLogger().error("Failed to bin {} due to {}", new Object[] {flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + } + } + } finally { + session.commit(); + } + + try { + manager.completeExpiredBins(); + } catch (final Exception e) { + getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e); + } + + if (flowFiles.isEmpty()) { + getLogger().debug("No FlowFiles to bin; will yield"); + context.yield(); + } + } + + + private void binFlowFile(final ProcessContext context, final FlowFile flowFile, final ProcessSession session, final RecordBinManager binManager, final boolean block) { + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + try (final InputStream in = session.read(flowFile); + final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) { + + final RecordSchema schema = reader.getSchema(); + + final String groupId = getGroupId(context, flowFile, schema, session); + getLogger().debug("Got Group ID {} for {}", new Object[] {groupId, flowFile}); + + binManager.add(groupId, flowFile, reader, session, block); + } catch (MalformedRecordException | IOException | SchemaNotFoundException e) { + throw new ProcessException(e); + } + } + + + protected String getGroupId(final ProcessContext context, final FlowFile flowFile, final RecordSchema schema, final ProcessSession session) { + final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue(); + if (MERGE_STRATEGY_DEFRAGMENT.equals(mergeStrategy)) { + return flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE); + } + + final Optional<String> optionalText = schema.getSchemaText(); + final String schemaText = optionalText.isPresent() ? optionalText.get() : AvroTypeUtil.extractAvroSchema(schema).toString(); + + final String groupId; + final String correlationshipAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue(); + if (correlationshipAttributeName != null) { + final String correlationAttr = flowFile.getAttribute(correlationshipAttributeName); + groupId = correlationAttr == null ? schemaText : schemaText + correlationAttr; + } else { + groupId = schemaText; + } + + return groupId; + } + + int getBinCount() { + return binManager.get().getBinCount(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java new file mode 100644 index 0000000..e3c9da9 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategy.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard.merge; + +import java.util.List; +import java.util.Map; + +import org.apache.nifi.flowfile.FlowFile; + +public interface AttributeStrategy { + Map<String, String> getMergedAttributes(List<FlowFile> flowFiles); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java new file mode 100644 index 0000000..221eb04 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/AttributeStrategyUtil.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard.merge; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; + +public class AttributeStrategyUtil { + + public static final AllowableValue ATTRIBUTE_STRATEGY_ALL_COMMON = new AllowableValue("Keep Only Common Attributes", "Keep Only Common Attributes", + "Any attribute that is not the same on all FlowFiles in a bin will be dropped. Those that are the same across all FlowFiles will be retained."); + public static final AllowableValue ATTRIBUTE_STRATEGY_ALL_UNIQUE = new AllowableValue("Keep All Unique Attributes", "Keep All Unique Attributes", + "Any attribute that has the same value for all FlowFiles in a bin, or has no value for a FlowFile, will be kept. For example, if a bin consists of 3 FlowFiles " + + "and 2 of them have a value of 'hello' for the 'greeting' attribute and the third FlowFile has no 'greeting' attribute then the outbound FlowFile will get " + + "a 'greeting' attribute with the value 'hello'."); + + public static final PropertyDescriptor ATTRIBUTE_STRATEGY = new PropertyDescriptor.Builder() + .required(true) + .name("Attribute Strategy") + .description("Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any " + + "attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. " + + "If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same " + + "value, will be preserved.") + .allowableValues(ATTRIBUTE_STRATEGY_ALL_COMMON, ATTRIBUTE_STRATEGY_ALL_UNIQUE) + .defaultValue(ATTRIBUTE_STRATEGY_ALL_COMMON.getValue()) + .build(); + + + public static AttributeStrategy strategyFor(ProcessContext context) { + final String strategyName = context.getProperty(ATTRIBUTE_STRATEGY).getValue(); + if (ATTRIBUTE_STRATEGY_ALL_UNIQUE.getValue().equals(strategyName)) { + return new KeepUniqueAttributeStrategy(); + } + if (ATTRIBUTE_STRATEGY_ALL_COMMON.getValue().equals(strategyName)) { + return new KeepCommonAttributeStrategy(); + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java new file mode 100644 index 0000000..5e7920a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepCommonAttributeStrategy.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard.merge; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; + +public class KeepCommonAttributeStrategy implements AttributeStrategy { + + @Override + public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) { + final Map<String, String> result = new HashMap<>(); + + //trivial cases + if (flowFiles == null || flowFiles.isEmpty()) { + return result; + } else if (flowFiles.size() == 1) { + result.putAll(flowFiles.iterator().next().getAttributes()); + } + + /* + * Start with the first attribute map and only put an entry to the + * resultant map if it is common to every map. + */ + final Map<String, String> firstMap = flowFiles.iterator().next().getAttributes(); + + outer: for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) { + final String key = mapEntry.getKey(); + final String value = mapEntry.getValue(); + + for (final FlowFile flowFile : flowFiles) { + final Map<String, String> currMap = flowFile.getAttributes(); + final String curVal = currMap.get(key); + if (curVal == null || !curVal.equals(value)) { + continue outer; + } + } + result.put(key, value); + } + + // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent. + result.remove(CoreAttributes.UUID.key()); + return result; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java new file mode 100644 index 0000000..86fb198 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/KeepUniqueAttributeStrategy.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard.merge; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; + +public class KeepUniqueAttributeStrategy implements AttributeStrategy { + + @Override + public Map<String, String> getMergedAttributes(final List<FlowFile> flowFiles) { + final Map<String, String> newAttributes = new HashMap<>(); + final Set<String> conflicting = new HashSet<>(); + + for (final FlowFile flowFile : flowFiles) { + for (final Map.Entry<String, String> attributeEntry : flowFile.getAttributes().entrySet()) { + final String name = attributeEntry.getKey(); + final String value = attributeEntry.getValue(); + + final String existingValue = newAttributes.get(name); + if (existingValue != null && !existingValue.equals(value)) { + conflicting.add(name); + } else { + newAttributes.put(name, value); + } + } + } + + for (final String attributeToRemove : conflicting) { + newAttributes.remove(attributeToRemove); + } + + // Never copy the UUID from the parents - which could happen if we don't remove it and there is only 1 parent. + newAttributes.remove(CoreAttributes.UUID.key()); + return newAttributes; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java new file mode 100644 index 0000000..0b4cf31 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard.merge; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processors.standard.MergeRecord; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.stream.io.ByteCountingOutputStream; + +public class RecordBin { + public static final String MERGE_COUNT_ATTRIBUTE = "merge.count"; + public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age"; + + private final ComponentLog logger; + private final ProcessSession session; + private final RecordSetWriterFactory writerFactory; + private final RecordBinThresholds thresholds; + private final ProcessContext context; + + private final List<FlowFile> flowFiles = new ArrayList<>(); + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + private final Lock writeLock = rwLock.writeLock(); + private final long creationNanos = System.nanoTime(); + + private FlowFile merged; + private RecordSetWriter recordWriter; + private ByteCountingOutputStream out; + private int recordCount = 0; + private volatile boolean complete = false; + + private static final AtomicLong idGenerator = new AtomicLong(0L); + private final long id = idGenerator.getAndIncrement(); + + + public RecordBin(final ProcessContext context, final ProcessSession session, final ComponentLog logger, final RecordBinThresholds thresholds) { + this.session = session; + this.writerFactory = context.getProperty(MergeRecord.RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + this.logger = logger; + this.context = context; + + this.merged = session.create(); + this.thresholds = thresholds; + } + + public boolean isOlderThan(final RecordBin other) { + return creationNanos < other.creationNanos; + } + + public boolean isOlderThan(final long period, final TimeUnit unit) { + final long nanos = unit.toNanos(period); + return creationNanos < System.nanoTime() - nanos; + } + + public boolean isComplete() { + return complete; + } + + public boolean offer(final FlowFile flowFile, final RecordReader recordReader, final ProcessSession flowFileSession, final boolean block) + throws IOException, MalformedRecordException, SchemaNotFoundException { + + if (isComplete()) { + logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this}); + return false; + } + + final boolean locked; + if (block) { + writeLock.lock(); + locked = true; + } else { + locked = writeLock.tryLock(); + } + + if (!locked) { + logger.debug("RecordBin.offer for id={} returning false because failed to get lock for {}", new Object[] {flowFile.getId(), this}); + return false; + } + + boolean flowFileMigrated = false; + + try { + if (isComplete()) { + logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this}); + return false; + } + + logger.debug("Migrating id={} to {}", new Object[] {flowFile.getId(), this}); + + Record record; + while ((record = recordReader.nextRecord()) != null) { + if (recordWriter == null) { + final OutputStream rawOut = session.write(merged); + logger.debug("Created OutputStream using session {} for {}", new Object[] {session, this}); + + this.out = new ByteCountingOutputStream(rawOut); + + recordWriter = writerFactory.createWriter(logger, record.getSchema(), flowFile, out); + recordWriter.beginRecordSet(); + } + + recordWriter.write(record); + recordCount++; + } + + // This will be closed by the MergeRecord class anyway but we have to close it + // here because it needs to be closed before we are able to migrate the FlowFile + // to a new Session. + recordReader.close(); + flowFileSession.migrate(this.session, Collections.singleton(flowFile)); + flowFileMigrated = true; + this.flowFiles.add(flowFile); + + if (isFull()) { + logger.debug(this + " is now full. Completing bin."); + complete("Bin is full"); + } else if (isOlderThan(thresholds.getMaxBinMillis(), TimeUnit.MILLISECONDS)) { + logger.debug(this + " is now expired. Completing bin."); + complete("Bin is older than " + thresholds.getMaxBinAge()); + } + + return true; + } catch (final Exception e) { + logger.error("Failed to create merged FlowFile from " + (flowFiles.size() + 1) + " input FlowFiles; routing originals to failure", e); + + try { + // This will be closed by the MergeRecord class anyway but we have to close it + // here because it needs to be closed before we are able to migrate the FlowFile + // to a new Session. + recordReader.close(); + + if (recordWriter != null) { + recordWriter.close(); + } + if (this.out != null) { + this.out.close(); + } + + if (!flowFileMigrated) { + flowFileSession.migrate(this.session, Collections.singleton(flowFile)); + this.flowFiles.add(flowFile); + } + } finally { + complete = true; + session.remove(merged); + session.transfer(flowFiles, MergeRecord.REL_FAILURE); + session.commit(); + } + + return true; + } finally { + writeLock.unlock(); + } + } + + public boolean isFull() { + readLock.lock(); + try { + if (!isFullEnough()) { + return false; + } + + int maxRecords; + final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute(); + if (recordCountAttribute.isPresent()) { + final Optional<String> recordCountValue = flowFiles.stream() + .filter(ff -> ff.getAttribute(recordCountAttribute.get()) != null) + .map(ff -> ff.getAttribute(recordCountAttribute.get())) + .findFirst(); + + if (!recordCountValue.isPresent()) { + return false; + } + + try { + maxRecords = Integer.parseInt(recordCountValue.get()); + } catch (final NumberFormatException e) { + maxRecords = 1; + } + } else { + maxRecords = thresholds.getMaxRecords(); + } + + if (recordCount >= maxRecords) { + return true; + } + + if (out.getBytesWritten() >= thresholds.getMaxBytes()) { + return true; + } + + return false; + } finally { + readLock.unlock(); + } + } + + public boolean isFullEnough() { + readLock.lock(); + try { + if (flowFiles.isEmpty()) { + return false; + } + + int requiredRecordCount; + final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute(); + if (recordCountAttribute.isPresent()) { + final String recordCountValue = flowFiles.get(0).getAttribute(recordCountAttribute.get()); + try { + requiredRecordCount = Integer.parseInt(recordCountValue); + } catch (final NumberFormatException e) { + requiredRecordCount = 1; + } + } else { + requiredRecordCount = thresholds.getMinRecords(); + } + + return (recordCount >= requiredRecordCount && out.getBytesWritten() >= thresholds.getMinBytes()); + } finally { + readLock.unlock(); + } + } + + + public void rollback() { + complete = true; + logger.debug("Marked {} as complete because rollback() was called", new Object[] {this}); + + writeLock.lock(); + try { + if (recordWriter != null) { + try { + recordWriter.close(); + } catch (IOException e) { + logger.warn("Failed to close Record Writer", e); + } + } + + session.rollback(); + + if (logger.isDebugEnabled()) { + final List<String> ids = flowFiles.stream().map(ff -> " id=" + ff.getId() + ",").collect(Collectors.toList()); + logger.debug("Rolled back bin {} containing input FlowFiles {}", new Object[] {this, ids}); + } + } finally { + writeLock.unlock(); + } + } + + private long getBinAge() { + return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - creationNanos); + } + + private void fail() { + complete = true; + logger.debug("Marked {} as complete because fail() was called", new Object[] {this}); + + writeLock.lock(); + try { + if (recordWriter != null) { + try { + recordWriter.close(); + } catch (IOException e) { + logger.warn("Failed to close Record Writer", e); + } + } + + session.remove(merged); + session.transfer(flowFiles, MergeRecord.REL_FAILURE); + session.commit(); + } finally { + writeLock.unlock(); + } + } + + public void complete(final String completionReason) throws IOException { + writeLock.lock(); + try { + if (isComplete()) { + logger.debug("Cannot complete {} because it is already completed", new Object[] {this}); + return; + } + + complete = true; + logger.debug("Marked {} as complete because complete() was called", new Object[] {this}); + + final WriteResult writeResult = recordWriter.finishRecordSet(); + recordWriter.close(); + logger.debug("Closed Record Writer using session {} for {}", new Object[] {session, this}); + + if (flowFiles.isEmpty()) { + session.remove(merged); + return; + } + + // If using defragment mode, and we don't have enough FlowFiles, then we need to fail this bin. + final Optional<String> countAttr = thresholds.getRecordCountAttribute(); + if (countAttr.isPresent()) { + // Ensure that at least one FlowFile has a fragment.count attribute and that they all have the same value, if they have a value. + Integer expectedBinCount = null; + for (final FlowFile flowFile : flowFiles) { + final String countVal = flowFile.getAttribute(countAttr.get()); + if (countVal == null) { + continue; + } + + final int count; + try { + count = Integer.parseInt(countVal); + } catch (final NumberFormatException nfe) { + logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' for {} but expected a number", + new Object[] {flowFiles.size(), countAttr.get(), countVal, flowFile}); + fail(); + return; + } + + if (expectedBinCount != null && count != expectedBinCount) { + logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' for {} but another FlowFile in the bin had a value of {}", + new Object[] {flowFiles.size(), countAttr.get(), countVal, flowFile, expectedBinCount}); + fail(); + return; + } + + expectedBinCount = count; + } + + if (expectedBinCount == null) { + logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute was not present on any of the FlowFiles", + new Object[] {flowFiles.size(), countAttr.get()}); + fail(); + return; + } + + if (expectedBinCount != flowFiles.size()) { + logger.error("Could not merge bin with {} FlowFiles because the '{}' attribute had a value of '{}' but only {} of {} FlowFiles were encountered before this bin was evicted " + + "(due to to Max Bin Age being reached or due to the Maximum Number of Bins being exceeded).", + new Object[] {flowFiles.size(), countAttr.get(), expectedBinCount, flowFiles.size(), expectedBinCount}); + fail(); + return; + } + } + + final Map<String, String> attributes = new HashMap<>(); + + final AttributeStrategy attributeStrategy = AttributeStrategyUtil.strategyFor(context); + final Map<String, String> mergedAttributes = attributeStrategy.getMergedAttributes(flowFiles); + attributes.putAll(mergedAttributes); + + attributes.putAll(writeResult.getAttributes()); + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), recordWriter.getMimeType()); + attributes.put(MERGE_COUNT_ATTRIBUTE, Integer.toString(flowFiles.size())); + attributes.put(MERGE_BIN_AGE_ATTRIBUTE, Long.toString(getBinAge())); + + merged = session.putAllAttributes(merged, attributes); + + session.getProvenanceReporter().join(flowFiles, merged, "Records Merged due to: " + completionReason); + session.transfer(merged, MergeRecord.REL_MERGED); + session.transfer(flowFiles, MergeRecord.REL_ORIGINAL); + session.adjustCounter("Records Merged", writeResult.getRecordCount(), false); + session.commit(); + + if (logger.isDebugEnabled()) { + final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList()); + logger.debug("Completed bin {} with {} records with Merged FlowFile {} using input FlowFiles {}", new Object[] {this, writeResult.getRecordCount(), merged, ids}); + } + } catch (final Exception e) { + session.rollback(true); + throw e; + } finally { + writeLock.unlock(); + } + } + + @Override + public String toString() { + readLock.lock(); + try { + return "RecordBin[size=" + flowFiles.size() + ", full=" + isFull() + ", isComplete=" + isComplete() + ", id=" + id + "]"; + } finally { + readLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b603cb95/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java new file mode 100644 index 0000000..8496a4d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard.merge; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processors.standard.MergeContent; +import org.apache.nifi.processors.standard.MergeRecord; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; + +public class RecordBinManager { + + private final ProcessContext context; + private final ProcessSessionFactory sessionFactory; + private final ComponentLog logger; + private final int maxBinCount; + + private final AtomicLong maxBinAgeNanos = new AtomicLong(Long.MAX_VALUE); + private final Map<String, List<RecordBin>> groupBinMap = new HashMap<>(); // guarded by lock + private final Lock lock = new ReentrantLock(); + + private final AtomicInteger binCount = new AtomicInteger(0); + + public RecordBinManager(final ProcessContext context, final ProcessSessionFactory sessionFactory, final ComponentLog logger) { + this.context = context; + this.sessionFactory = sessionFactory; + this.logger = logger; + + final Integer maxBins = context.getProperty(MergeRecord.MAX_BIN_COUNT).asInteger(); + this.maxBinCount = maxBins == null ? Integer.MAX_VALUE : maxBins.intValue(); + } + + /** + * Must be called only when there are no active threads modifying the bins. + */ + public void purge() { + lock.lock(); + try { + for (final List<RecordBin> binList : groupBinMap.values()) { + for (final RecordBin bin : binList) { + bin.rollback(); + } + } + groupBinMap.clear(); + binCount.set(0); + } finally { + lock.unlock(); + } + } + + + public void setMaxBinAge(final Long timePeriod, final TimeUnit timeUnit) { + if (timePeriod == null) { + maxBinAgeNanos.set(Long.MAX_VALUE); + } else { + maxBinAgeNanos.set(timeUnit.toNanos(timePeriod)); + } + } + + + public int getBinCount() { + return binCount.get(); + } + + /** + * Adds the given flowFiles to the first available bin in which it fits for the given group or creates a new bin in the specified group if necessary. + * <p/> + * + * @param groupIdentifier the group to which the flow file belongs; can be null + * @param flowFile flowFile to bin + * @param reader RecordReader to use for reading FlowFile + * @param session the ProcessSession to which the FlowFiles belong + * @param block if another thread is already writing to the desired bin, passing <code>true</code> for this parameter will block until the other thread(s) have finished so + * that the records can still be added to the desired bin. Passing <code>false</code> will result in moving on to another bin. + * + * @throws SchemaNotFoundException if unable to find the schema for the record writer + * @throws MalformedRecordException if unable to read a record + * @throws IOException if there is an IO problem reading from the stream or writing to the stream + */ + public void add(final String groupIdentifier, final FlowFile flowFile, final RecordReader reader, final ProcessSession session, final boolean block) + throws IOException, MalformedRecordException, SchemaNotFoundException { + + final List<RecordBin> currentBins; + lock.lock(); + try { + // Create a new List<RecordBin> if none exists for this Group ID. We use a CopyOnWriteArrayList here because + // we need to traverse the list in a couple of places and just below here, we call bin.offer() (which is very expensive) + // while traversing the List, so we don't want to do this within a synchronized block. If we end up seeing poor performance + // from this, we could look at instead using a Synchronized List and instead of calling bin.offer() while iterating allow for some + // sort of bin.tryLock() and have that lock only if the flowfile should be added. Then if it returns true, we can stop iterating + // and perform the expensive part and then ensure that we always unlock + currentBins = groupBinMap.computeIfAbsent(groupIdentifier, grpId -> new CopyOnWriteArrayList<>()); + } finally { + lock.unlock(); + } + + RecordBin acceptedBin = null; + for (final RecordBin bin : currentBins) { + final boolean accepted = bin.offer(flowFile, reader, session, block); + + if (accepted) { + acceptedBin = bin; + logger.debug("Transferred id={} to {}", new Object[] {flowFile.getId(), bin}); + break; + } + } + + // We have to do this outside of our for-loop above in order to avoid a concurrent modification Exception. + if (acceptedBin != null) { + if (acceptedBin.isComplete()) { + removeBins(groupIdentifier, Collections.singletonList(acceptedBin)); + } + + return; + } + + // if we've reached this point then we couldn't fit it into any existing bins - gotta make a new one + final RecordBin bin = new RecordBin(context, sessionFactory.createSession(), logger, createThresholds()); + final boolean binAccepted = bin.offer(flowFile, reader, session, true); + if (!binAccepted) { + session.rollback(); + throw new RuntimeException("Attempted to add " + flowFile + " to a new bin but failed. This is unexpected. Will roll back session and try again."); + } + + logger.debug("Transferred id={} to {}", new Object[] {flowFile.getId(), bin}); + + if (!bin.isComplete()) { + final int updatedBinCount = binCount.incrementAndGet(); + + lock.lock(); + try { + // We have already obtained the list of RecordBins from this Map above. However, we released + // the lock in order to avoid blocking while writing to a Bin. Because of this, it is possible + // that another thread may have already come in and removed this List from the Map, if all + // Bins in the List have been completed. As a result, we must now obtain the write lock again + // and obtain the List (or a new one), and then update that. This ensures that we never lose + // track of a Bin. If we don't lose this, we could completely lose a Bin. + final List<RecordBin> bins = groupBinMap.computeIfAbsent(groupIdentifier, grpId -> new CopyOnWriteArrayList<>()); + bins.add(bin); + } finally { + lock.unlock(); + } + + if (updatedBinCount > maxBinCount) { + completeOldestBin(); + } + } + } + + + private RecordBinThresholds createThresholds() { + final int minRecords = context.getProperty(MergeRecord.MIN_RECORDS).asInteger(); + final int maxRecords = context.getProperty(MergeRecord.MAX_RECORDS).asInteger(); + final long minBytes = context.getProperty(MergeRecord.MIN_SIZE).asDataSize(DataUnit.B).longValue(); + + final PropertyValue maxSizeValue = context.getProperty(MergeRecord.MAX_SIZE); + final long maxBytes = maxSizeValue.isSet() ? maxSizeValue.asDataSize(DataUnit.B).longValue() : Long.MAX_VALUE; + + final PropertyValue maxMillisValue = context.getProperty(MergeRecord.MAX_BIN_AGE); + final String maxBinAge = maxMillisValue.getValue(); + final long maxBinMillis = maxMillisValue.isSet() ? maxMillisValue.asTimePeriod(TimeUnit.MILLISECONDS).longValue() : Long.MAX_VALUE; + + final String recordCountAttribute; + final String mergeStrategy = context.getProperty(MergeRecord.MERGE_STRATEGY).getValue(); + if (MergeRecord.MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) { + recordCountAttribute = MergeContent.FRAGMENT_COUNT_ATTRIBUTE; + } else { + recordCountAttribute = null; + } + + return new RecordBinThresholds(minRecords, maxRecords, minBytes, maxBytes, maxBinMillis, maxBinAge, recordCountAttribute); + } + + + public void completeOldestBin() throws IOException { + RecordBin oldestBin = null; + + lock.lock(); + try { + String oldestBinGroup = null; + + for (final Map.Entry<String, List<RecordBin>> group : groupBinMap.entrySet()) { + for (final RecordBin bin : group.getValue()) { + if (oldestBin == null || bin.isOlderThan(oldestBin)) { + oldestBin = bin; + oldestBinGroup = group.getKey(); + } + } + } + + if (oldestBin == null) { + return; + } + + removeBins(oldestBinGroup, Collections.singletonList(oldestBin)); + } finally { + lock.unlock(); + } + + logger.debug("Completing Bin " + oldestBin + " because the maximum number of bins has been exceeded"); + oldestBin.complete("Maximum number of bins has been exceeded"); + } + + + public void completeExpiredBins() throws IOException { + final long maxNanos = maxBinAgeNanos.get(); + final Map<String, List<RecordBin>> expiredBinMap = new HashMap<>(); + + lock.lock(); + try { + for (final Map.Entry<String, List<RecordBin>> entry : groupBinMap.entrySet()) { + final String key = entry.getKey(); + final List<RecordBin> bins = entry.getValue(); + + for (final RecordBin bin : bins) { + if (bin.isOlderThan(maxNanos, TimeUnit.NANOSECONDS)) { + final List<RecordBin> expiredBinsForKey = expiredBinMap.computeIfAbsent(key, ignore -> new ArrayList<>()); + expiredBinsForKey.add(bin); + } + } + } + } finally { + lock.unlock(); + } + + for (final Map.Entry<String, List<RecordBin>> entry : expiredBinMap.entrySet()) { + final String key = entry.getKey(); + final List<RecordBin> expiredBins = entry.getValue(); + + for (final RecordBin bin : expiredBins) { + logger.debug("Completing Bin {} because it has expired"); + bin.complete("Bin has reached Max Bin Age"); + } + + removeBins(key, expiredBins); + } + } + + private void removeBins(final String key, final List<RecordBin> bins) { + lock.lock(); + try { + final List<RecordBin> list = groupBinMap.get(key); + if (list != null) { + final int initialSize = list.size(); + list.removeAll(bins); + + // Determine how many items were removed from the list and + // update our binCount to keep track of this. + final int removedCount = initialSize - list.size(); + binCount.addAndGet(-removedCount); + + if (list.isEmpty()) { + groupBinMap.remove(key); + } + } + } finally { + lock.unlock(); + } + } +}