[ https://issues.apache.org/jira/browse/NIFI-2854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15664011#comment-15664011 ]
ASF GitHub Bot commented on NIFI-2854: -------------------------------------- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1202#discussion_r87806305 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java --- @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; +import org.apache.nifi.flowfile.FlowFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wali.SerDe; +import org.wali.UpdateType; + +public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> { + private static final Logger logger = LoggerFactory.getLogger(WriteAheadRepositoryRecordSerde.class); + + private static final int CURRENT_ENCODING_VERSION = 9; + + public static final byte ACTION_CREATE = 0; + public static final byte ACTION_UPDATE = 1; + public static final byte ACTION_DELETE = 2; + public static final byte ACTION_SWAPPED_OUT = 3; + public static final byte ACTION_SWAPPED_IN = 4; + + private long recordsRestored = 0L; + private final ResourceClaimManager claimManager; + + public WriteAheadRepositoryRecordSerde(final ResourceClaimManager claimManager) { + this.claimManager = claimManager; + } + + @Override + public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out) throws IOException { + serializeEdit(previousRecordState, record, out, false); + } + + public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out, final boolean forceAttributesWritten) throws IOException { + if (record.isMarkedForAbort()) { + logger.warn("Repository Record {} is marked to be aborted; it will be persisted in the FlowFileRepository as a DELETE record", record); + out.write(ACTION_DELETE); + out.writeLong(getRecordIdentifier(record)); + serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out); + return; + } + + final UpdateType updateType = getUpdateType(record); + + if (updateType.equals(UpdateType.DELETE)) { + out.write(ACTION_DELETE); + out.writeLong(getRecordIdentifier(record)); + serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out); + return; + } + + // If there's a Destination Connection, that's the one that we want to associated with this record. + // However, on restart, we will restore the FlowFile and set this connection to its "originalConnection". + // If we then serialize the FlowFile again before it's transferred, it's important to allow this to happen, + // so we use the originalConnection instead + FlowFileQueue associatedQueue = record.getDestination(); + if (associatedQueue == null) { + associatedQueue = record.getOriginalQueue(); + } + + if (updateType.equals(UpdateType.SWAP_OUT)) { + out.write(ACTION_SWAPPED_OUT); + out.writeLong(getRecordIdentifier(record)); + out.writeUTF(associatedQueue.getIdentifier()); + out.writeUTF(getLocation(record)); + return; + } + + final FlowFile flowFile = record.getCurrent(); + final ContentClaim claim = record.getCurrentClaim(); + + switch (updateType) { + case UPDATE: + out.write(ACTION_UPDATE); + break; + case CREATE: + out.write(ACTION_CREATE); + break; + case SWAP_IN: + out.write(ACTION_SWAPPED_IN); + break; + default: + throw new AssertionError(); + } + + out.writeLong(getRecordIdentifier(record)); + out.writeLong(flowFile.getEntryDate()); + out.writeLong(flowFile.getLineageStartDate()); + out.writeLong(flowFile.getLineageStartIndex()); + + final Long queueDate = flowFile.getLastQueueDate(); + out.writeLong(queueDate == null ? System.currentTimeMillis() : queueDate); + out.writeLong(flowFile.getQueueDateIndex()); + out.writeLong(flowFile.getSize()); + + if (associatedQueue == null) { + logger.warn("{} Repository Record {} has no Connection associated with it; it will be destroyed on restart", + new Object[] {this, record}); + writeString("", out); + } else { + writeString(associatedQueue.getIdentifier(), out); + } + + serializeContentClaim(claim, record.getCurrentClaimOffset(), out); + + if (forceAttributesWritten || record.isAttributesChanged() || updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) { + out.write(1); // indicate attributes changed + final Map<String, String> attributes = flowFile.getAttributes(); + out.writeInt(attributes.size()); + for (final Map.Entry<String, String> entry : attributes.entrySet()) { + writeString(entry.getKey(), out); + writeString(entry.getValue(), out); + } + } else { + out.write(0); // indicate attributes did not change + } + + if (updateType == UpdateType.SWAP_IN) { + out.writeUTF(record.getSwapLocation()); + } + } + + @Override + public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException { + final int action = in.read(); + final long recordId = in.readLong(); + if (action == ACTION_DELETE) { + final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId); + + if (version > 4) { + deserializeClaim(in, version, ffBuilder); + } + + final FlowFileRecord flowFileRecord = ffBuilder.build(); + final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord); + record.markForDelete(); + + return record; + } + + if (action == ACTION_SWAPPED_OUT) { + final String queueId = in.readUTF(); + final String location = in.readUTF(); + final FlowFileQueue queue = getFlowFileQueue(queueId); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(recordId) + .build(); + + return new StandardRepositoryRecord(queue, flowFileRecord, location); + } + + final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); + final RepositoryRecord record = currentRecordStates.get(recordId); + ffBuilder.id(recordId); + if (record != null) { + ffBuilder.fromFlowFile(record.getCurrent()); + } + ffBuilder.entryDate(in.readLong()); + + if (version > 1) { + // read the lineage identifiers and lineage start date, which were added in version 2. + if (version < 9) { + final int numLineageIds = in.readInt(); + for (int i = 0; i < numLineageIds; i++) { + in.readUTF(); //skip identifiers + } + } + final long lineageStartDate = in.readLong(); + final long lineageStartIndex; + if (version > 7) { + lineageStartIndex = in.readLong(); + } else { + lineageStartIndex = 0L; + } + ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); + + if (version > 5) { + final long lastQueueDate = in.readLong(); + final long queueDateIndex; + if (version > 7) { + queueDateIndex = in.readLong(); + } else { + queueDateIndex = 0L; + } + + ffBuilder.lastQueued(lastQueueDate, queueDateIndex); + } + } + + ffBuilder.size(in.readLong()); + final String connectionId = readString(in); + + logger.debug("{} -> {}", new Object[] {recordId, connectionId}); + + deserializeClaim(in, version, ffBuilder); + + // recover new attributes, if they changed + final int attributesChanged = in.read(); + if (attributesChanged == -1) { + throw new EOFException(); + } else if (attributesChanged == 1) { + final int numAttributes = in.readInt(); + final Map<String, String> attributes = new HashMap<>(); + for (int i = 0; i < numAttributes; i++) { + final String key = readString(in); + final String value = readString(in); + attributes.put(key, value); + } + + ffBuilder.addAttributes(attributes); + } else if (attributesChanged != 0) { + throw new IOException("Attribute Change Qualifier not found in stream; found value: " + + attributesChanged + " after successfully restoring " + recordsRestored + " records. The FlowFile Repository appears to be corrupt!"); + } + + final FlowFileRecord flowFile = ffBuilder.build(); + String swapLocation = null; + if (action == ACTION_SWAPPED_IN) { + swapLocation = in.readUTF(); + } + + final FlowFileQueue queue = getFlowFileQueue(connectionId); + final StandardRepositoryRecord standardRepoRecord = new StandardRepositoryRecord(queue, flowFile); + if (swapLocation != null) { --- End diff -- Before, it was checking of the flowFileQueueMap was null, I believe, not the flowfile queue itself. So this is okay - at this point, it's guaranteed non-null. > Enable repositories to support upgrades and rollback in well defined scenarios > ------------------------------------------------------------------------------ > > Key: NIFI-2854 > URL: https://issues.apache.org/jira/browse/NIFI-2854 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework > Reporter: Mark Payne > Assignee: Mark Payne > Fix For: 1.1.0 > > > The flowfile, swapfile, provenance, and content repositories play a very > important roll in NiFi's ability to be safely upgraded and rolled back. We > need to have well documented behaviors, designs, and version adherence so > that users can safely rely on these mechanisms. > Once this is formalized and in place we should update our versioning guidance > to reflect this as well. > The following would be true from NiFi 1.2.0 onward > * No changes to how the repositories are persisted to disk can be made which > will break forward/backward compatibility and specifically this means that > things like the way each is serialized to disk cannot change. > * If changes are made which impact forward or backward compatibility they > should be reserved for major releases only and should include a utility to > help users with pre-existing data convert from some older format to the newer > format. It may not be feasible to have rollback on major releases. > * The content repository should not be changed within a major release cycle > in any way that will harm forward or backward compatibility. > * The flow file repository can change in that new fields can be added to > existing write ahead log record types but no fields can be removed nor can > any new types be added. Once a field is considered required it must remain > required. Changes may only be made across minor version changes - not > incremental. > * Swap File storage should follow very similar rules to the flow file > repository. Adding a schema to the swap file header may allow some variation > there but the variation should only be hints to optimize how they're > processed and not change their behavior otherwise. Changes are only permitted > during minor version releases. > * Provenance repository changes are only permitted during minor version > releases. These changes may include adding or removing fields from existing > event types. If a field is considered required it must always be considered > required. If a field is removed then it must not be a required field and > there must be a sensible default an older version could use if that value is > not found in new data once rolled back. New event types may be added. > Fields or event types not known to older version, if seen after a rollback, > will simply be ignored. -- This message was sent by Atlassian JIRA (v6.3.4#6332)