http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java new file mode 100644 index 0000000..916fd76 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Map; + +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.schema.ContentClaimFieldMap; +import org.apache.nifi.controller.repository.schema.ContentClaimSchema; +import org.apache.nifi.controller.repository.schema.FlowFileSchema; +import org.apache.nifi.controller.repository.schema.RepositoryRecordFieldMap; +import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema; +import org.apache.nifi.controller.repository.schema.RepositoryRecordUpdate; +import org.apache.nifi.repository.schema.FieldType; +import org.apache.nifi.repository.schema.Record; +import org.apache.nifi.repository.schema.RecordSchema; +import org.apache.nifi.repository.schema.Repetition; +import org.apache.nifi.repository.schema.SchemaRecordReader; +import org.apache.nifi.repository.schema.SchemaRecordWriter; +import org.apache.nifi.repository.schema.SimpleRecordField; +import org.wali.SerDe; +import org.wali.UpdateType; + +public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> { + private static final int MAX_ENCODING_VERSION = 1; + + private final RecordSchema writeSchema = RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1; + private final RecordSchema contentClaimSchema = ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1; + + private final ResourceClaimManager resourceClaimManager; + private volatile RecordSchema recoverySchema; + + public SchemaRepositoryRecordSerde(final ResourceClaimManager resourceClaimManager) { + this.resourceClaimManager = resourceClaimManager; + } + + @Override + public void writeHeader(final DataOutputStream out) throws IOException { + writeSchema.writeTo(out); + } + + @Override + public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord newRecordState, final DataOutputStream out) throws IOException { + serializeRecord(newRecordState, out); + } + + @Override + public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException { + final RecordSchema schema; + switch (record.getType()) { + case CREATE: + case UPDATE: + schema = RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1; + break; + case CONTENTMISSING: + case DELETE: + schema = RepositoryRecordSchema.DELETE_SCHEMA_V1; + break; + case SWAP_IN: + schema = RepositoryRecordSchema.SWAP_IN_SCHEMA_V1; + break; + case SWAP_OUT: + schema = RepositoryRecordSchema.SWAP_OUT_SCHEMA_V1; + break; + default: + throw new IllegalArgumentException("Received Repository Record with unknown Update Type: " + record.getType()); // won't happen. + } + + final RepositoryRecordFieldMap fieldMap = new RepositoryRecordFieldMap(record, schema, contentClaimSchema); + final RepositoryRecordUpdate update = new RepositoryRecordUpdate(fieldMap, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + new SchemaRecordWriter().writeRecord(update, out); + } + + @Override + public void readHeader(final DataInputStream in) throws IOException { + recoverySchema = RecordSchema.readFrom(in); + } + + @Override + public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException { + return deserializeRecord(in, version); + } + + @Override + public RepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { + final SchemaRecordReader reader = SchemaRecordReader.fromSchema(recoverySchema); + final Record updateRecord = reader.readRecord(in); + + // Top level is always going to be a "Repository Record Update" record because we need a 'Union' type record at the + // top level that indicates which type of record we have. + final Record record = (Record) updateRecord.getFieldValue(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V1); + + final String actionType = (String) record.getFieldValue(RepositoryRecordSchema.ACTION_TYPE_FIELD); + final UpdateType updateType = UpdateType.valueOf(actionType); + switch (updateType) { + case CREATE: + return createRecord(record); + case DELETE: + return deleteRecord(record); + case SWAP_IN: + return swapInRecord(record); + case SWAP_OUT: + return swapOutRecord(record); + case UPDATE: + return updateRecord(record); + default: + throw new IOException("Found unrecognized Update Type '" + actionType + "'"); + } + } + + + @SuppressWarnings("unchecked") + private StandardRepositoryRecord createRecord(final Record record) { + final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); + ffBuilder.id((Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID)); + ffBuilder.entryDate((Long) record.getFieldValue(FlowFileSchema.ENTRY_DATE)); + + final Long lastQueueDate = (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE); + final Long queueDateIndex = (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE_INDEX); + ffBuilder.lastQueued(lastQueueDate, queueDateIndex); + + final Long lineageStartDate = (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_DATE); + final Long lineageStartIndex = (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_INDEX); + ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); + + populateContentClaim(ffBuilder, record); + ffBuilder.size((Long) record.getFieldValue(FlowFileSchema.FLOWFILE_SIZE)); + + ffBuilder.addAttributes((Map<String, String>) record.getFieldValue(FlowFileSchema.ATTRIBUTES)); + + final FlowFileRecord flowFileRecord = ffBuilder.build(); + + final String queueId = (String) record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER); + final FlowFileQueue queue = getFlowFileQueue(queueId); + + return new StandardRepositoryRecord(queue, flowFileRecord); + } + + private void populateContentClaim(final StandardFlowFileRecord.Builder ffBuilder, final Record record) { + final Object claimMap = record.getFieldValue(FlowFileSchema.CONTENT_CLAIM); + if (claimMap == null) { + return; + } + + final Record claimRecord = (Record) claimMap; + final ContentClaim contentClaim = ContentClaimFieldMap.getContentClaim(claimRecord, resourceClaimManager); + final Long offset = ContentClaimFieldMap.getContentClaimOffset(claimRecord); + + ffBuilder.contentClaim(contentClaim); + ffBuilder.contentClaimOffset(offset); + } + + private RepositoryRecord updateRecord(final Record record) { + return createRecord(record); + } + + private RepositoryRecord deleteRecord(final Record record) { + final Long recordId = (Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID_FIELD); + final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId); + final FlowFileRecord flowFileRecord = ffBuilder.build(); + + final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord); + repoRecord.markForDelete(); + return repoRecord; + } + + private RepositoryRecord swapInRecord(final Record record) { + final StandardRepositoryRecord repoRecord = createRecord(record); + final String swapLocation = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); + repoRecord.setSwapLocation(swapLocation); + return repoRecord; + } + + private RepositoryRecord swapOutRecord(final Record record) { + final Long recordId = (Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID_FIELD); + final String queueId = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE)); + final String swapLocation = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); + final FlowFileQueue queue = getFlowFileQueue(queueId); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(recordId) + .build(); + + return new StandardRepositoryRecord(queue, flowFileRecord, swapLocation); + } + + @Override + public int getVersion() { + return MAX_ENCODING_VERSION; + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index a696e79..10f0d8c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -49,8 +49,6 @@ import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; -import org.apache.nifi.controller.repository.io.ByteCountingInputStream; -import org.apache.nifi.controller.repository.io.ByteCountingOutputStream; import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream; import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream; import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream; @@ -75,6 +73,8 @@ import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.provenance.StandardProvenanceEventRecord; import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.stream.io.ByteCountingInputStream; +import org.apache.nifi.stream.io.ByteCountingOutputStream; import org.apache.nifi.stream.io.StreamUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,8 +121,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private int removedCount = 0; // number of flowfiles removed in this session private long removedBytes = 0L; // size of all flowfiles removed in this session - private final AtomicLong bytesRead = new AtomicLong(0L); - private final AtomicLong bytesWritten = new AtomicLong(0L); + private long bytesRead = 0L; + private long bytesWritten = 0L; private int flowFilesIn = 0, flowFilesOut = 0; private long contentSizeIn = 0L, contentSizeOut = 0L; @@ -975,8 +975,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final Connectable connectable = context.getConnectable(); final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier()); - flowFileEvent.setBytesRead(bytesRead.get()); - flowFileEvent.setBytesWritten(bytesWritten.get()); + flowFileEvent.setBytesRead(bytesRead); + flowFileEvent.setBytesWritten(bytesWritten); // update event repository try { @@ -1064,8 +1064,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE flowFilesOut = 0; removedCount = 0; removedBytes = 0L; - bytesRead.set(0L); - bytesWritten.set(0L); + bytesRead = 0L; + bytesWritten = 0L; connectionCounts.clear(); createdFlowFiles.clear(); removedFlowFiles.clear(); @@ -2006,8 +2006,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE // callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1. if (allowCachingOfStream && recursionSet.isEmpty()) { if (currentReadClaim == claim) { - if (currentReadClaimStream != null && currentReadClaimStream.getStreamLocation() <= offset) { - final long bytesToSkip = offset - currentReadClaimStream.getStreamLocation(); + if (currentReadClaimStream != null && currentReadClaimStream.getBytesConsumed() <= offset) { + final long bytesToSkip = offset - currentReadClaimStream.getBytesConsumed(); if (bytesToSkip > 0) { StreamUtils.skip(currentReadClaimStream, bytesToSkip); } @@ -2023,7 +2023,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } currentReadClaim = claim; - currentReadClaimStream = new ByteCountingInputStream(rawInStream, new AtomicLong(0L)); + currentReadClaimStream = new ByteCountingInputStream(rawInStream); StreamUtils.skip(currentReadClaimStream, offset); // Use a non-closeable stream because we want to keep it open after the callback has finished so that we can @@ -2270,8 +2270,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE writtenCount += footer.length; } } finally { - bytesWritten.getAndAdd(writtenCount); - bytesRead.getAndAdd(readCount); + bytesWritten += writtenCount; + bytesRead += readCount; } } catch (final ContentNotFoundException nfe) { destroyContent(newClaim); @@ -2311,8 +2311,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE validateRecordState(source); final StandardRepositoryRecord record = records.get(source); + long writtenToFlowFile = 0L; ContentClaim newClaim = null; - final AtomicLong writtenHolder = new AtomicLong(0L); try { newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source); @@ -2320,9 +2320,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE ensureNotAppending(newClaim); try (final OutputStream stream = context.getContentRepository().write(newClaim); final OutputStream disableOnClose = new DisableOnCloseOutputStream(stream); - final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) { - recursionSet.add(source); - writer.process(new FlowFileAccessOutputStream(countingOut, source)); + final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose)) { + try { + recursionSet.add(source); + writer.process(new FlowFileAccessOutputStream(countingOut, source)); + } finally { + writtenToFlowFile = countingOut.getBytesWritten(); + bytesWritten += countingOut.getBytesWritten(); + } } finally { recursionSet.remove(source); } @@ -2342,8 +2347,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE resetWriteClaims(); // need to reset write claim before we can remove the claim destroyContent(newClaim); throw t; - } finally { - bytesWritten.getAndAdd(writtenHolder.get()); } removeTemporaryClaim(record); @@ -2351,7 +2354,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE .fromFlowFile(record.getCurrent()) .contentClaim(newClaim) .contentClaimOffset(0) - .size(writtenHolder.get()) + .size(writtenToFlowFile) .build(); record.setWorking(newFile); @@ -2379,7 +2382,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final OutputStream rawOutStream = context.getContentRepository().write(newClaim); final OutputStream bufferedOutStream = new BufferedOutputStream(rawOutStream); - outStream = new ByteCountingOutputStream(bufferedOutStream, new AtomicLong(0L)); + outStream = new ByteCountingOutputStream(bufferedOutStream); originalByteWrittenCount = 0; appendableStreams.put(newClaim, outStream); @@ -2448,7 +2451,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } finally { if (outStream != null) { final long bytesWrittenThisIteration = outStream.getBytesWritten() - originalByteWrittenCount; - bytesWritten.getAndAdd(bytesWrittenThisIteration); + bytesWritten += bytesWrittenThisIteration; } } @@ -2542,8 +2545,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final StandardRepositoryRecord record = records.get(source); final ContentClaim currClaim = record.getCurrentClaim(); + long writtenToFlowFile = 0L; ContentClaim newClaim = null; - final AtomicLong writtenHolder = new AtomicLong(0L); try { newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source); @@ -2556,7 +2559,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead); final OutputStream os = context.getContentRepository().write(newClaim); final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os); - final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) { + final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut)) { recursionSet.add(source); @@ -2574,6 +2577,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE cnfeThrown = true; throw cnfe; } finally { + writtenToFlowFile = countingOut.getBytesWritten(); recursionSet.remove(source); // if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate. @@ -2595,7 +2599,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE destroyContent(newClaim); throw t; } finally { - bytesWritten.getAndAdd(writtenHolder.get()); + bytesWritten += writtenToFlowFile; } removeTemporaryClaim(record); @@ -2603,7 +2607,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE .fromFlowFile(record.getCurrent()) .contentClaim(newClaim) .contentClaimOffset(0L) - .size(writtenHolder.get()) + .size(writtenToFlowFile) .build(); record.setWorking(newFile); @@ -2635,8 +2639,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE long newSize = 0L; try { newSize = context.getContentRepository().importFrom(source, newClaim); - bytesWritten.getAndAdd(newSize); - bytesRead.getAndAdd(newSize); + bytesWritten += newSize; + bytesRead += newSize; } catch (final Throwable t) { destroyContent(newClaim); throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t); @@ -2671,7 +2675,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination); newSize = context.getContentRepository().importFrom(source, newClaim); - bytesWritten.getAndAdd(newSize); + bytesWritten += newSize; } catch (final IOException e) { throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e); } @@ -2697,8 +2701,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE ensureNotAppending(record.getCurrentClaim()); final long copyCount = context.getContentRepository().exportTo(record.getCurrentClaim(), destination, append, record.getCurrentClaimOffset(), source.getSize()); - bytesRead.getAndAdd(copyCount); - bytesWritten.getAndAdd(copyCount); + bytesRead += copyCount; + bytesWritten += copyCount; } catch (final ContentNotFoundException nfe) { handleContentNotFound(nfe, record); } catch (final Throwable t) { @@ -3016,8 +3020,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE this.removedCount += session.removedCount; this.removedBytes += session.removedBytes; - this.bytesRead += session.bytesRead.get(); - this.bytesWritten += session.bytesWritten.get(); + this.bytesRead += session.bytesRead; + this.bytesWritten += session.bytesWritten; this.flowFilesIn += session.flowFilesIn; this.flowFilesOut += session.flowFilesOut; this.contentSizeIn += session.contentSizeIn; http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 9c2a7d8..2a323de 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -16,12 +16,7 @@ */ package org.apache.nifi.controller.repository; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -45,16 +40,12 @@ import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; -import org.apache.nifi.controller.repository.claim.StandardContentClaim; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.wali.MinimalLockingWriteAheadLog; -import org.wali.SerDe; import org.wali.SyncListener; -import org.wali.UpdateType; import org.wali.WriteAheadRepository; /** @@ -95,7 +86,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // effectively final private WriteAheadRepository<RepositoryRecord> wal; - private WriteAheadRecordSerde serde; + private RepositoryRecordSerdeFactory serdeFactory; private ResourceClaimManager claimManager; // WALI Provides the ability to register callbacks for when a Partition or the entire Repository is sync'ed with the underlying disk. @@ -153,8 +144,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // TODO: Allow for backup path that can be used if disk out of space?? Would allow a snapshot to be stored on // backup and then the data deleted from the normal location; then can move backup to normal location and // delete backup. On restore, if no files exist in partition's directory, would have to check backup directory - serde = new WriteAheadRecordSerde(claimManager); - wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPath, numPartitions, serde, this); + serdeFactory = new RepositoryRecordSerdeFactory(claimManager); + wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPath, numPartitions, serdeFactory, this); } @Override @@ -319,6 +310,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis repoRecords.add(repoRecord); } + // TODO: We should probably update this to support bulk 'SWAP OUT' records. As-is, we have to write out a + // 'SWAP OUT' record for each FlowFile, which includes the Update Type, FlowFile ID, swap file location, and Queue ID. + // We could instead have a single record with Update Type of 'SWAP OUT' and just include swap file location, Queue ID, + // and all FlowFile ID's. // update WALI to indicate that the records were swapped out. wal.update(repoRecords, true); @@ -347,9 +342,9 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis for (final FlowFileQueue queue : queueProvider.getAllQueues()) { queueMap.put(queue.getIdentifier(), queue); } - serde.setQueueMap(queueMap); + serdeFactory.setQueueMap(queueMap); final Collection<RepositoryRecord> recordList = wal.recoverRecords(); - serde.setQueueMap(null); + serdeFactory.setQueueMap(null); for (final RepositoryRecord record : recordList) { final ContentClaim claim = record.getCurrentClaim(); @@ -361,7 +356,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis // Determine the next sequence number for FlowFiles long maxId = minimumSequenceNumber; for (final RepositoryRecord record : recordList) { - final long recordId = serde.getRecordIdentifier(record); + final long recordId = serdeFactory.getRecordIdentifier(record); if (recordId > maxId) { maxId = recordId; } @@ -414,526 +409,4 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis public int checkpoint() throws IOException { return wal.checkpoint(); } - - private static class WriteAheadRecordSerde implements SerDe<RepositoryRecord> { - - private static final int CURRENT_ENCODING_VERSION = 9; - - public static final byte ACTION_CREATE = 0; - public static final byte ACTION_UPDATE = 1; - public static final byte ACTION_DELETE = 2; - public static final byte ACTION_SWAPPED_OUT = 3; - public static final byte ACTION_SWAPPED_IN = 4; - - private Map<String, FlowFileQueue> flowFileQueueMap = null; - private long recordsRestored = 0L; - private final ResourceClaimManager claimManager; - - public WriteAheadRecordSerde(final ResourceClaimManager claimManager) { - this.claimManager = claimManager; - } - - private void setQueueMap(final Map<String, FlowFileQueue> queueMap) { - this.flowFileQueueMap = queueMap; - } - - @Override - public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out) throws IOException { - serializeEdit(previousRecordState, record, out, false); - } - - public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out, final boolean forceAttributesWritten) throws IOException { - if (record.isMarkedForAbort()) { - logger.warn("Repository Record {} is marked to be aborted; it will be persisted in the FlowFileRepository as a DELETE record", record); - out.write(ACTION_DELETE); - out.writeLong(getRecordIdentifier(record)); - serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out); - return; - } - - final UpdateType updateType = getUpdateType(record); - - if (updateType.equals(UpdateType.DELETE)) { - out.write(ACTION_DELETE); - out.writeLong(getRecordIdentifier(record)); - serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out); - return; - } - - // If there's a Destination Connection, that's the one that we want to associated with this record. - // However, on restart, we will restore the FlowFile and set this connection to its "originalConnection". - // If we then serialize the FlowFile again before it's transferred, it's important to allow this to happen, - // so we use the originalConnection instead - FlowFileQueue associatedQueue = record.getDestination(); - if (associatedQueue == null) { - associatedQueue = record.getOriginalQueue(); - } - - if (updateType.equals(UpdateType.SWAP_OUT)) { - out.write(ACTION_SWAPPED_OUT); - out.writeLong(getRecordIdentifier(record)); - out.writeUTF(associatedQueue.getIdentifier()); - out.writeUTF(getLocation(record)); - return; - } - - final FlowFile flowFile = record.getCurrent(); - final ContentClaim claim = record.getCurrentClaim(); - - switch (updateType) { - case UPDATE: - out.write(ACTION_UPDATE); - break; - case CREATE: - out.write(ACTION_CREATE); - break; - case SWAP_IN: - out.write(ACTION_SWAPPED_IN); - break; - default: - throw new AssertionError(); - } - - out.writeLong(getRecordIdentifier(record)); - out.writeLong(flowFile.getEntryDate()); - out.writeLong(flowFile.getLineageStartDate()); - out.writeLong(flowFile.getLineageStartIndex()); - - final Long queueDate = flowFile.getLastQueueDate(); - out.writeLong(queueDate == null ? System.currentTimeMillis() : queueDate); - out.writeLong(flowFile.getQueueDateIndex()); - out.writeLong(flowFile.getSize()); - - if (associatedQueue == null) { - logger.warn("{} Repository Record {} has no Connection associated with it; it will be destroyed on restart", - new Object[]{this, record}); - writeString("", out); - } else { - writeString(associatedQueue.getIdentifier(), out); - } - - serializeContentClaim(claim, record.getCurrentClaimOffset(), out); - - if (forceAttributesWritten || record.isAttributesChanged() || updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) { - out.write(1); // indicate attributes changed - final Map<String, String> attributes = flowFile.getAttributes(); - out.writeInt(attributes.size()); - for (final Map.Entry<String, String> entry : attributes.entrySet()) { - writeString(entry.getKey(), out); - writeString(entry.getValue(), out); - } - } else { - out.write(0); // indicate attributes did not change - } - - if (updateType == UpdateType.SWAP_IN) { - out.writeUTF(record.getSwapLocation()); - } - } - - @Override - public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException { - final int action = in.read(); - final long recordId = in.readLong(); - if (action == ACTION_DELETE) { - final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId); - - if (version > 4) { - deserializeClaim(in, version, ffBuilder); - } - - final FlowFileRecord flowFileRecord = ffBuilder.build(); - final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord); - record.markForDelete(); - - return record; - } - - if (action == ACTION_SWAPPED_OUT) { - final String queueId = in.readUTF(); - final String location = in.readUTF(); - final FlowFileQueue queue = flowFileQueueMap.get(queueId); - - final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .id(recordId) - .build(); - - return new StandardRepositoryRecord(queue, flowFileRecord, location); - } - - final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); - final RepositoryRecord record = currentRecordStates.get(recordId); - ffBuilder.id(recordId); - if (record != null) { - ffBuilder.fromFlowFile(record.getCurrent()); - } - ffBuilder.entryDate(in.readLong()); - - if (version > 1) { - // read the lineage identifiers and lineage start date, which were added in version 2. - if (version < 9) { - final int numLineageIds = in.readInt(); - for (int i = 0; i < numLineageIds; i++) { - in.readUTF(); //skip identifiers - } - } - final long lineageStartDate = in.readLong(); - final long lineageStartIndex; - if (version > 7) { - lineageStartIndex = in.readLong(); - } else { - lineageStartIndex = 0L; - } - ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); - - if (version > 5) { - final long lastQueueDate = in.readLong(); - final long queueDateIndex; - if (version > 7) { - queueDateIndex = in.readLong(); - } else { - queueDateIndex = 0L; - } - - ffBuilder.lastQueued(lastQueueDate, queueDateIndex); - } - } - - ffBuilder.size(in.readLong()); - final String connectionId = readString(in); - - logger.debug("{} -> {}", new Object[]{recordId, connectionId}); - - deserializeClaim(in, version, ffBuilder); - - // recover new attributes, if they changed - final int attributesChanged = in.read(); - if (attributesChanged == -1) { - throw new EOFException(); - } else if (attributesChanged == 1) { - final int numAttributes = in.readInt(); - final Map<String, String> attributes = new HashMap<>(); - for (int i = 0; i < numAttributes; i++) { - final String key = readString(in); - final String value = readString(in); - attributes.put(key, value); - } - - ffBuilder.addAttributes(attributes); - } else if (attributesChanged != 0) { - throw new IOException("Attribute Change Qualifier not found in stream; found value: " - + attributesChanged + " after successfully restoring " + recordsRestored + " records. The FlowFile Repository appears to be corrupt!"); - } - - final FlowFileRecord flowFile = ffBuilder.build(); - String swapLocation = null; - if (action == ACTION_SWAPPED_IN) { - swapLocation = in.readUTF(); - } - - final StandardRepositoryRecord standardRepoRecord; - - if (flowFileQueueMap == null) { - standardRepoRecord = new StandardRepositoryRecord(null, flowFile); - } else { - final FlowFileQueue queue = flowFileQueueMap.get(connectionId); - standardRepoRecord = new StandardRepositoryRecord(queue, flowFile); - if (swapLocation != null) { - standardRepoRecord.setSwapLocation(swapLocation); - } - - if (connectionId.isEmpty()) { - logger.warn("{} does not have a Queue associated with it; this record will be discarded", flowFile); - standardRepoRecord.markForAbort(); - } else if (queue == null) { - logger.warn("{} maps to unknown Queue {}; this record will be discarded", flowFile, connectionId); - standardRepoRecord.markForAbort(); - } - } - - recordsRestored++; - return standardRepoRecord; - } - - @Override - public StandardRepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { - final int action = in.read(); - if (action == -1) { - return null; - } - - final long recordId = in.readLong(); - if (action == ACTION_DELETE) { - final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId); - - if (version > 4) { - deserializeClaim(in, version, ffBuilder); - } - - final FlowFileRecord flowFileRecord = ffBuilder.build(); - final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord); - record.markForDelete(); - return record; - } - - // if action was not delete, it must be create/swap in - final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); - final long entryDate = in.readLong(); - - if (version > 1) { - // read the lineage identifiers and lineage start date, which were added in version 2. - if (version < 9) { - final int numLineageIds = in.readInt(); - for (int i = 0; i < numLineageIds; i++) { - in.readUTF(); //skip identifiers - } - } - - final long lineageStartDate = in.readLong(); - final long lineageStartIndex; - if (version > 7) { - lineageStartIndex = in.readLong(); - } else { - lineageStartIndex = 0L; - } - ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); - - if (version > 5) { - final long lastQueueDate = in.readLong(); - final long queueDateIndex; - if (version > 7) { - queueDateIndex = in.readLong(); - } else { - queueDateIndex = 0L; - } - - ffBuilder.lastQueued(lastQueueDate, queueDateIndex); - } - } - - final long size = in.readLong(); - final String connectionId = readString(in); - - logger.debug("{} -> {}", new Object[]{recordId, connectionId}); - - ffBuilder.id(recordId); - ffBuilder.entryDate(entryDate); - ffBuilder.size(size); - - deserializeClaim(in, version, ffBuilder); - - final int attributesChanged = in.read(); - if (attributesChanged == 1) { - final int numAttributes = in.readInt(); - final Map<String, String> attributes = new HashMap<>(); - for (int i = 0; i < numAttributes; i++) { - final String key = readString(in); - final String value = readString(in); - attributes.put(key, value); - } - - ffBuilder.addAttributes(attributes); - } else if (attributesChanged == -1) { - throw new EOFException(); - } else if (attributesChanged != 0) { - throw new IOException("Attribute Change Qualifier not found in stream; found value: " - + attributesChanged + " after successfully restoring " + recordsRestored + " records"); - } - - final FlowFileRecord flowFile = ffBuilder.build(); - String swapLocation = null; - if (action == ACTION_SWAPPED_IN) { - swapLocation = in.readUTF(); - } - - final StandardRepositoryRecord record; - - if (flowFileQueueMap == null) { - record = new StandardRepositoryRecord(null, flowFile); - } else { - final FlowFileQueue queue = flowFileQueueMap.get(connectionId); - record = new StandardRepositoryRecord(queue, flowFile); - if (swapLocation != null) { - record.setSwapLocation(swapLocation); - } - - if (connectionId.isEmpty()) { - logger.warn("{} does not have a FlowFile Queue associated with it; this record will be discarded", flowFile); - record.markForAbort(); - } else if (queue == null) { - logger.warn("{} maps to unknown FlowFile Queue {}; this record will be discarded", flowFile, connectionId); - record.markForAbort(); - } - } - - recordsRestored++; - return record; - } - - @Override - public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException { - serializeEdit(null, record, out, true); - } - - private void serializeContentClaim(final ContentClaim claim, final long offset, final DataOutputStream out) throws IOException { - if (claim == null) { - out.write(0); - } else { - out.write(1); - - final ResourceClaim resourceClaim = claim.getResourceClaim(); - writeString(resourceClaim.getId(), out); - writeString(resourceClaim.getContainer(), out); - writeString(resourceClaim.getSection(), out); - out.writeLong(claim.getOffset()); - out.writeLong(claim.getLength()); - - out.writeLong(offset); - out.writeBoolean(resourceClaim.isLossTolerant()); - } - } - - private void deserializeClaim(final DataInputStream in, final int serializationVersion, final StandardFlowFileRecord.Builder ffBuilder) throws IOException { - // determine current Content Claim. - final int claimExists = in.read(); - if (claimExists == 1) { - final String claimId; - if (serializationVersion < 4) { - claimId = String.valueOf(in.readLong()); - } else { - claimId = readString(in); - } - - final String container = readString(in); - final String section = readString(in); - - final long resourceOffset; - final long resourceLength; - if (serializationVersion < 7) { - resourceOffset = 0L; - resourceLength = -1L; - } else { - resourceOffset = in.readLong(); - resourceLength = in.readLong(); - } - - final long claimOffset = in.readLong(); - - final boolean lossTolerant; - if (serializationVersion >= 3) { - lossTolerant = in.readBoolean(); - } else { - lossTolerant = false; - } - - final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, false); - final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset); - contentClaim.setLength(resourceLength); - - ffBuilder.contentClaim(contentClaim); - ffBuilder.contentClaimOffset(claimOffset); - } else if (claimExists == -1) { - throw new EOFException(); - } else if (claimExists != 0) { - throw new IOException("Claim Existence Qualifier not found in stream; found value: " - + claimExists + " after successfully restoring " + recordsRestored + " records"); - } - } - - private void writeString(final String toWrite, final OutputStream out) throws IOException { - final byte[] bytes = toWrite.getBytes("UTF-8"); - final int utflen = bytes.length; - - if (utflen < 65535) { - out.write(utflen >>> 8); - out.write(utflen); - out.write(bytes); - } else { - out.write(255); - out.write(255); - out.write(utflen >>> 24); - out.write(utflen >>> 16); - out.write(utflen >>> 8); - out.write(utflen); - out.write(bytes); - } - } - - private String readString(final InputStream in) throws IOException { - final Integer numBytes = readFieldLength(in); - if (numBytes == null) { - throw new EOFException(); - } - final byte[] bytes = new byte[numBytes]; - fillBuffer(in, bytes, numBytes); - return new String(bytes, "UTF-8"); - } - - private Integer readFieldLength(final InputStream in) throws IOException { - final int firstValue = in.read(); - final int secondValue = in.read(); - if (firstValue < 0) { - return null; - } - if (secondValue < 0) { - throw new EOFException(); - } - if (firstValue == 0xff && secondValue == 0xff) { - final int ch1 = in.read(); - final int ch2 = in.read(); - final int ch3 = in.read(); - final int ch4 = in.read(); - if ((ch1 | ch2 | ch3 | ch4) < 0) { - throw new EOFException(); - } - return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4; - } else { - return (firstValue << 8) + secondValue; - } - } - - private void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException { - int bytesRead; - int totalBytesRead = 0; - while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) { - totalBytesRead += bytesRead; - } - if (totalBytesRead != length) { - throw new EOFException(); - } - } - - @Override - public Long getRecordIdentifier(final RepositoryRecord record) { - return record.getCurrent().getId(); - } - - @Override - public UpdateType getUpdateType(final RepositoryRecord record) { - switch (record.getType()) { - case CONTENTMISSING: - case DELETE: - return UpdateType.DELETE; - case CREATE: - return UpdateType.CREATE; - case UPDATE: - return UpdateType.UPDATE; - case SWAP_OUT: - return UpdateType.SWAP_OUT; - case SWAP_IN: - return UpdateType.SWAP_IN; - } - return null; - } - - @Override - public int getVersion() { - return CURRENT_ENCODING_VERSION; - } - - @Override - public String getLocation(final RepositoryRecord record) { - return record.getSwapLocation(); - } - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java new file mode 100644 index 0000000..e8ce44e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; +import org.apache.nifi.flowfile.FlowFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wali.SerDe; +import org.wali.UpdateType; + +public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> { + private static final Logger logger = LoggerFactory.getLogger(WriteAheadRepositoryRecordSerde.class); + + private static final int CURRENT_ENCODING_VERSION = 9; + + public static final byte ACTION_CREATE = 0; + public static final byte ACTION_UPDATE = 1; + public static final byte ACTION_DELETE = 2; + public static final byte ACTION_SWAPPED_OUT = 3; + public static final byte ACTION_SWAPPED_IN = 4; + + private long recordsRestored = 0L; + private final ResourceClaimManager claimManager; + + public WriteAheadRepositoryRecordSerde(final ResourceClaimManager claimManager) { + this.claimManager = claimManager; + } + + @Override + public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out) throws IOException { + serializeEdit(previousRecordState, record, out, false); + } + + public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out, final boolean forceAttributesWritten) throws IOException { + if (record.isMarkedForAbort()) { + logger.warn("Repository Record {} is marked to be aborted; it will be persisted in the FlowFileRepository as a DELETE record", record); + out.write(ACTION_DELETE); + out.writeLong(getRecordIdentifier(record)); + serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out); + return; + } + + final UpdateType updateType = getUpdateType(record); + + if (updateType.equals(UpdateType.DELETE)) { + out.write(ACTION_DELETE); + out.writeLong(getRecordIdentifier(record)); + serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out); + return; + } + + // If there's a Destination Connection, that's the one that we want to associated with this record. + // However, on restart, we will restore the FlowFile and set this connection to its "originalConnection". + // If we then serialize the FlowFile again before it's transferred, it's important to allow this to happen, + // so we use the originalConnection instead + FlowFileQueue associatedQueue = record.getDestination(); + if (associatedQueue == null) { + associatedQueue = record.getOriginalQueue(); + } + + if (updateType.equals(UpdateType.SWAP_OUT)) { + out.write(ACTION_SWAPPED_OUT); + out.writeLong(getRecordIdentifier(record)); + out.writeUTF(associatedQueue.getIdentifier()); + out.writeUTF(getLocation(record)); + return; + } + + final FlowFile flowFile = record.getCurrent(); + final ContentClaim claim = record.getCurrentClaim(); + + switch (updateType) { + case UPDATE: + out.write(ACTION_UPDATE); + break; + case CREATE: + out.write(ACTION_CREATE); + break; + case SWAP_IN: + out.write(ACTION_SWAPPED_IN); + break; + default: + throw new AssertionError(); + } + + out.writeLong(getRecordIdentifier(record)); + out.writeLong(flowFile.getEntryDate()); + out.writeLong(flowFile.getLineageStartDate()); + out.writeLong(flowFile.getLineageStartIndex()); + + final Long queueDate = flowFile.getLastQueueDate(); + out.writeLong(queueDate == null ? System.currentTimeMillis() : queueDate); + out.writeLong(flowFile.getQueueDateIndex()); + out.writeLong(flowFile.getSize()); + + if (associatedQueue == null) { + logger.warn("{} Repository Record {} has no Connection associated with it; it will be destroyed on restart", + new Object[] {this, record}); + writeString("", out); + } else { + writeString(associatedQueue.getIdentifier(), out); + } + + serializeContentClaim(claim, record.getCurrentClaimOffset(), out); + + if (forceAttributesWritten || record.isAttributesChanged() || updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) { + out.write(1); // indicate attributes changed + final Map<String, String> attributes = flowFile.getAttributes(); + out.writeInt(attributes.size()); + for (final Map.Entry<String, String> entry : attributes.entrySet()) { + writeString(entry.getKey(), out); + writeString(entry.getValue(), out); + } + } else { + out.write(0); // indicate attributes did not change + } + + if (updateType == UpdateType.SWAP_IN) { + out.writeUTF(record.getSwapLocation()); + } + } + + @Override + public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException { + final int action = in.read(); + final long recordId = in.readLong(); + if (action == ACTION_DELETE) { + final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId); + + if (version > 4) { + deserializeClaim(in, version, ffBuilder); + } + + final FlowFileRecord flowFileRecord = ffBuilder.build(); + final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord); + record.markForDelete(); + + return record; + } + + if (action == ACTION_SWAPPED_OUT) { + final String queueId = in.readUTF(); + final String location = in.readUTF(); + final FlowFileQueue queue = getFlowFileQueue(queueId); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(recordId) + .build(); + + return new StandardRepositoryRecord(queue, flowFileRecord, location); + } + + final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); + final RepositoryRecord record = currentRecordStates.get(recordId); + ffBuilder.id(recordId); + if (record != null) { + ffBuilder.fromFlowFile(record.getCurrent()); + } + ffBuilder.entryDate(in.readLong()); + + if (version > 1) { + // read the lineage identifiers and lineage start date, which were added in version 2. + if (version < 9) { + final int numLineageIds = in.readInt(); + for (int i = 0; i < numLineageIds; i++) { + in.readUTF(); //skip identifiers + } + } + final long lineageStartDate = in.readLong(); + final long lineageStartIndex; + if (version > 7) { + lineageStartIndex = in.readLong(); + } else { + lineageStartIndex = 0L; + } + ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); + + if (version > 5) { + final long lastQueueDate = in.readLong(); + final long queueDateIndex; + if (version > 7) { + queueDateIndex = in.readLong(); + } else { + queueDateIndex = 0L; + } + + ffBuilder.lastQueued(lastQueueDate, queueDateIndex); + } + } + + ffBuilder.size(in.readLong()); + final String connectionId = readString(in); + + logger.debug("{} -> {}", new Object[] {recordId, connectionId}); + + deserializeClaim(in, version, ffBuilder); + + // recover new attributes, if they changed + final int attributesChanged = in.read(); + if (attributesChanged == -1) { + throw new EOFException(); + } else if (attributesChanged == 1) { + final int numAttributes = in.readInt(); + final Map<String, String> attributes = new HashMap<>(); + for (int i = 0; i < numAttributes; i++) { + final String key = readString(in); + final String value = readString(in); + attributes.put(key, value); + } + + ffBuilder.addAttributes(attributes); + } else if (attributesChanged != 0) { + throw new IOException("Attribute Change Qualifier not found in stream; found value: " + + attributesChanged + " after successfully restoring " + recordsRestored + " records. The FlowFile Repository appears to be corrupt!"); + } + + final FlowFileRecord flowFile = ffBuilder.build(); + String swapLocation = null; + if (action == ACTION_SWAPPED_IN) { + swapLocation = in.readUTF(); + } + + final FlowFileQueue queue = getFlowFileQueue(connectionId); + final StandardRepositoryRecord standardRepoRecord = new StandardRepositoryRecord(queue, flowFile); + if (swapLocation != null) { + standardRepoRecord.setSwapLocation(swapLocation); + } + + if (connectionId.isEmpty()) { + logger.warn("{} does not have a Queue associated with it; this record will be discarded", flowFile); + standardRepoRecord.markForAbort(); + } else if (queue == null) { + logger.warn("{} maps to unknown Queue {}; this record will be discarded", flowFile, connectionId); + standardRepoRecord.markForAbort(); + } + + recordsRestored++; + return standardRepoRecord; + } + + @Override + public StandardRepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { + final int action = in.read(); + if (action == -1) { + return null; + } + + final long recordId = in.readLong(); + if (action == ACTION_DELETE) { + final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId); + + if (version > 4) { + deserializeClaim(in, version, ffBuilder); + } + + final FlowFileRecord flowFileRecord = ffBuilder.build(); + final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord); + record.markForDelete(); + return record; + } + + // if action was not delete, it must be create/swap in + final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); + final long entryDate = in.readLong(); + + if (version > 1) { + // read the lineage identifiers and lineage start date, which were added in version 2. + if (version < 9) { + final int numLineageIds = in.readInt(); + for (int i = 0; i < numLineageIds; i++) { + in.readUTF(); //skip identifiers + } + } + + final long lineageStartDate = in.readLong(); + final long lineageStartIndex; + if (version > 7) { + lineageStartIndex = in.readLong(); + } else { + lineageStartIndex = 0L; + } + ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); + + if (version > 5) { + final long lastQueueDate = in.readLong(); + final long queueDateIndex; + if (version > 7) { + queueDateIndex = in.readLong(); + } else { + queueDateIndex = 0L; + } + + ffBuilder.lastQueued(lastQueueDate, queueDateIndex); + } + } + + final long size = in.readLong(); + final String connectionId = readString(in); + + logger.debug("{} -> {}", new Object[] {recordId, connectionId}); + + ffBuilder.id(recordId); + ffBuilder.entryDate(entryDate); + ffBuilder.size(size); + + deserializeClaim(in, version, ffBuilder); + + final int attributesChanged = in.read(); + if (attributesChanged == 1) { + final int numAttributes = in.readInt(); + final Map<String, String> attributes = new HashMap<>(); + for (int i = 0; i < numAttributes; i++) { + final String key = readString(in); + final String value = readString(in); + attributes.put(key, value); + } + + ffBuilder.addAttributes(attributes); + } else if (attributesChanged == -1) { + throw new EOFException(); + } else if (attributesChanged != 0) { + throw new IOException("Attribute Change Qualifier not found in stream; found value: " + + attributesChanged + " after successfully restoring " + recordsRestored + " records"); + } + + final FlowFileRecord flowFile = ffBuilder.build(); + String swapLocation = null; + if (action == ACTION_SWAPPED_IN) { + swapLocation = in.readUTF(); + } + + final StandardRepositoryRecord record; + final FlowFileQueue queue = getFlowFileQueue(connectionId); + record = new StandardRepositoryRecord(queue, flowFile); + if (swapLocation != null) { + record.setSwapLocation(swapLocation); + } + + if (connectionId.isEmpty()) { + logger.warn("{} does not have a FlowFile Queue associated with it; this record will be discarded", flowFile); + record.markForAbort(); + } else if (queue == null) { + logger.warn("{} maps to unknown FlowFile Queue {}; this record will be discarded", flowFile, connectionId); + record.markForAbort(); + } + + recordsRestored++; + return record; + } + + @Override + public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException { + serializeEdit(null, record, out, true); + } + + private void serializeContentClaim(final ContentClaim claim, final long offset, final DataOutputStream out) throws IOException { + if (claim == null) { + out.write(0); + } else { + out.write(1); + + final ResourceClaim resourceClaim = claim.getResourceClaim(); + writeString(resourceClaim.getId(), out); + writeString(resourceClaim.getContainer(), out); + writeString(resourceClaim.getSection(), out); + out.writeLong(claim.getOffset()); + out.writeLong(claim.getLength()); + + out.writeLong(offset); + out.writeBoolean(resourceClaim.isLossTolerant()); + } + } + + private void deserializeClaim(final DataInputStream in, final int serializationVersion, final StandardFlowFileRecord.Builder ffBuilder) throws IOException { + // determine current Content Claim. + final int claimExists = in.read(); + if (claimExists == 1) { + final String claimId; + if (serializationVersion < 4) { + claimId = String.valueOf(in.readLong()); + } else { + claimId = readString(in); + } + + final String container = readString(in); + final String section = readString(in); + + final long resourceOffset; + final long resourceLength; + if (serializationVersion < 7) { + resourceOffset = 0L; + resourceLength = -1L; + } else { + resourceOffset = in.readLong(); + resourceLength = in.readLong(); + } + + final long claimOffset = in.readLong(); + + final boolean lossTolerant; + if (serializationVersion >= 3) { + lossTolerant = in.readBoolean(); + } else { + lossTolerant = false; + } + + final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, false); + final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset); + contentClaim.setLength(resourceLength); + + ffBuilder.contentClaim(contentClaim); + ffBuilder.contentClaimOffset(claimOffset); + } else if (claimExists == -1) { + throw new EOFException(); + } else if (claimExists != 0) { + throw new IOException("Claim Existence Qualifier not found in stream; found value: " + + claimExists + " after successfully restoring " + recordsRestored + " records"); + } + } + + private void writeString(final String toWrite, final OutputStream out) throws IOException { + final byte[] bytes = toWrite.getBytes("UTF-8"); + final int utflen = bytes.length; + + if (utflen < 65535) { + out.write(utflen >>> 8); + out.write(utflen); + out.write(bytes); + } else { + out.write(255); + out.write(255); + out.write(utflen >>> 24); + out.write(utflen >>> 16); + out.write(utflen >>> 8); + out.write(utflen); + out.write(bytes); + } + } + + private String readString(final InputStream in) throws IOException { + final Integer numBytes = readFieldLength(in); + if (numBytes == null) { + throw new EOFException(); + } + final byte[] bytes = new byte[numBytes]; + fillBuffer(in, bytes, numBytes); + return new String(bytes, "UTF-8"); + } + + private Integer readFieldLength(final InputStream in) throws IOException { + final int firstValue = in.read(); + final int secondValue = in.read(); + if (firstValue < 0) { + return null; + } + if (secondValue < 0) { + throw new EOFException(); + } + if (firstValue == 0xff && secondValue == 0xff) { + final int ch1 = in.read(); + final int ch2 = in.read(); + final int ch3 = in.read(); + final int ch4 = in.read(); + if ((ch1 | ch2 | ch3 | ch4) < 0) { + throw new EOFException(); + } + return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4; + } else { + return (firstValue << 8) + secondValue; + } + } + + private void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException { + int bytesRead; + int totalBytesRead = 0; + while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) { + totalBytesRead += bytesRead; + } + if (totalBytesRead != length) { + throw new EOFException(); + } + } + + @Override + public int getVersion() { + return CURRENT_ENCODING_VERSION; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java index 25dbaee..7e87199 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java @@ -64,29 +64,6 @@ public class StandardResourceClaim implements ResourceClaim, Comparable<Resource return section; } - /** - * Provides the natural ordering for ResourceClaim objects. By default they are sorted by their id, then container, then section - * - * @param other other claim - * @return x such that x <=1 if this is less than other; - * x=0 if this.equals(other); - * x >= 1 if this is greater than other - */ - @Override - public int compareTo(final ResourceClaim other) { - final int idComparison = id.compareTo(other.getId()); - if (idComparison != 0) { - return idComparison; - } - - final int containerComparison = container.compareTo(other.getContainer()); - if (containerComparison != 0) { - return containerComparison; - } - - return section.compareTo(other.getSection()); - } - @Override public boolean equals(final Object other) { if (this == other) { http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java index 7d554b1..e4f060e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java @@ -29,10 +29,9 @@ import org.slf4j.LoggerFactory; public class StandardResourceClaimManager implements ResourceClaimManager { - private static final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts = new ConcurrentHashMap<>(); private static final Logger logger = LoggerFactory.getLogger(StandardResourceClaimManager.class); - - private static final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000); + private final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts = new ConcurrentHashMap<>(); + private final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000); @Override public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant, final boolean writable) { @@ -50,7 +49,7 @@ public class StandardResourceClaimManager implements ResourceClaimManager { return (count == null) ? null : count.getClaim(); } - private static AtomicInteger getCounter(final ResourceClaim claim) { + private AtomicInteger getCounter(final ResourceClaim claim) { if (claim == null) { return null; } http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingInputStream.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingInputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingInputStream.java deleted file mode 100644 index 7de25ac..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingInputStream.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.repository.io; - -import java.io.IOException; -import java.io.InputStream; -import java.util.concurrent.atomic.AtomicLong; - -public class ByteCountingInputStream extends InputStream { - - private final AtomicLong bytesReadHolder; - private final InputStream in; - private long bytesSkipped = 0L; - - public ByteCountingInputStream(final InputStream in, final AtomicLong longHolder) { - this.in = in; - this.bytesReadHolder = longHolder; - } - - @Override - public int read() throws IOException { - final int fromSuper = in.read(); - if (fromSuper >= 0) { - bytesReadHolder.getAndIncrement(); - } - return fromSuper; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - final int fromSuper = in.read(b, off, len); - if (fromSuper >= 0) { - bytesReadHolder.getAndAdd(fromSuper); - } - - return fromSuper; - } - - @Override - public int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } - - @Override - public long skip(final long n) throws IOException { - final long skipped = in.skip(n); - bytesSkipped += skipped; - return skipped; - } - - @Override - public int available() throws IOException { - return in.available(); - } - - @Override - public void mark(int readlimit) { - in.mark(readlimit); - } - - @Override - public boolean markSupported() { - return in.markSupported(); - } - - @Override - public void reset() throws IOException { - in.reset(); - } - - @Override - public void close() throws IOException { - in.close(); - } - - public long getBytesRead() { - return bytesReadHolder.get(); - } - - public long getBytesSkipped() { - return bytesSkipped; - } - - public long getStreamLocation() { - return getBytesRead() + getBytesSkipped(); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java deleted file mode 100644 index 7c778a2..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.repository.io; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.concurrent.atomic.AtomicLong; - -public class ByteCountingOutputStream extends OutputStream { - - private final AtomicLong bytesWrittenHolder; - private final OutputStream out; - - public ByteCountingOutputStream(final OutputStream out, final AtomicLong longHolder) { - this.out = out; - this.bytesWrittenHolder = longHolder; - } - - @Override - public void write(int b) throws IOException { - out.write(b); - bytesWrittenHolder.getAndIncrement(); - } - - @Override - public void write(byte[] b) throws IOException { - write(b, 0, b.length); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - out.write(b, off, len); - bytesWrittenHolder.getAndAdd(len); - } - - public long getBytesWritten() { - return bytesWrittenHolder.get(); - } - - @Override - public void flush() throws IOException { - out.flush(); - } - - @Override - public void close() throws IOException { - out.close(); - } -}