[ 
https://issues.apache.org/jira/browse/NIFI-2854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15664011#comment-15664011
 ] 

ASF GitHub Bot commented on NIFI-2854:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1202#discussion_r87806305
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
 ---
    @@ -0,0 +1,517 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.controller.repository;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.EOFException;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import org.apache.nifi.controller.queue.FlowFileQueue;
    +import org.apache.nifi.controller.repository.claim.ContentClaim;
    +import org.apache.nifi.controller.repository.claim.ResourceClaim;
    +import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
    +import org.apache.nifi.controller.repository.claim.StandardContentClaim;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.wali.SerDe;
    +import org.wali.UpdateType;
    +
    +public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde 
implements SerDe<RepositoryRecord> {
    +    private static final Logger logger = 
LoggerFactory.getLogger(WriteAheadRepositoryRecordSerde.class);
    +
    +    private static final int CURRENT_ENCODING_VERSION = 9;
    +
    +    public static final byte ACTION_CREATE = 0;
    +    public static final byte ACTION_UPDATE = 1;
    +    public static final byte ACTION_DELETE = 2;
    +    public static final byte ACTION_SWAPPED_OUT = 3;
    +    public static final byte ACTION_SWAPPED_IN = 4;
    +
    +    private long recordsRestored = 0L;
    +    private final ResourceClaimManager claimManager;
    +
    +    public WriteAheadRepositoryRecordSerde(final ResourceClaimManager 
claimManager) {
    +        this.claimManager = claimManager;
    +    }
    +
    +    @Override
    +    public void serializeEdit(final RepositoryRecord previousRecordState, 
final RepositoryRecord record, final DataOutputStream out) throws IOException {
    +        serializeEdit(previousRecordState, record, out, false);
    +    }
    +
    +    public void serializeEdit(final RepositoryRecord previousRecordState, 
final RepositoryRecord record, final DataOutputStream out, final boolean 
forceAttributesWritten) throws IOException {
    +        if (record.isMarkedForAbort()) {
    +            logger.warn("Repository Record {} is marked to be aborted; it 
will be persisted in the FlowFileRepository as a DELETE record", record);
    +            out.write(ACTION_DELETE);
    +            out.writeLong(getRecordIdentifier(record));
    +            serializeContentClaim(record.getCurrentClaim(), 
record.getCurrentClaimOffset(), out);
    +            return;
    +        }
    +
    +        final UpdateType updateType = getUpdateType(record);
    +
    +        if (updateType.equals(UpdateType.DELETE)) {
    +            out.write(ACTION_DELETE);
    +            out.writeLong(getRecordIdentifier(record));
    +            serializeContentClaim(record.getCurrentClaim(), 
record.getCurrentClaimOffset(), out);
    +            return;
    +        }
    +
    +        // If there's a Destination Connection, that's the one that we 
want to associated with this record.
    +        // However, on restart, we will restore the FlowFile and set this 
connection to its "originalConnection".
    +        // If we then serialize the FlowFile again before it's 
transferred, it's important to allow this to happen,
    +        // so we use the originalConnection instead
    +        FlowFileQueue associatedQueue = record.getDestination();
    +        if (associatedQueue == null) {
    +            associatedQueue = record.getOriginalQueue();
    +        }
    +
    +        if (updateType.equals(UpdateType.SWAP_OUT)) {
    +            out.write(ACTION_SWAPPED_OUT);
    +            out.writeLong(getRecordIdentifier(record));
    +            out.writeUTF(associatedQueue.getIdentifier());
    +            out.writeUTF(getLocation(record));
    +            return;
    +        }
    +
    +        final FlowFile flowFile = record.getCurrent();
    +        final ContentClaim claim = record.getCurrentClaim();
    +
    +        switch (updateType) {
    +            case UPDATE:
    +                out.write(ACTION_UPDATE);
    +                break;
    +            case CREATE:
    +                out.write(ACTION_CREATE);
    +                break;
    +            case SWAP_IN:
    +                out.write(ACTION_SWAPPED_IN);
    +                break;
    +            default:
    +                throw new AssertionError();
    +        }
    +
    +        out.writeLong(getRecordIdentifier(record));
    +        out.writeLong(flowFile.getEntryDate());
    +        out.writeLong(flowFile.getLineageStartDate());
    +        out.writeLong(flowFile.getLineageStartIndex());
    +
    +        final Long queueDate = flowFile.getLastQueueDate();
    +        out.writeLong(queueDate == null ? System.currentTimeMillis() : 
queueDate);
    +        out.writeLong(flowFile.getQueueDateIndex());
    +        out.writeLong(flowFile.getSize());
    +
    +        if (associatedQueue == null) {
    +            logger.warn("{} Repository Record {} has no Connection 
associated with it; it will be destroyed on restart",
    +                new Object[] {this, record});
    +            writeString("", out);
    +        } else {
    +            writeString(associatedQueue.getIdentifier(), out);
    +        }
    +
    +        serializeContentClaim(claim, record.getCurrentClaimOffset(), out);
    +
    +        if (forceAttributesWritten || record.isAttributesChanged() || 
updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) {
    +            out.write(1);   // indicate attributes changed
    +            final Map<String, String> attributes = 
flowFile.getAttributes();
    +            out.writeInt(attributes.size());
    +            for (final Map.Entry<String, String> entry : 
attributes.entrySet()) {
    +                writeString(entry.getKey(), out);
    +                writeString(entry.getValue(), out);
    +            }
    +        } else {
    +            out.write(0);   // indicate attributes did not change
    +        }
    +
    +        if (updateType == UpdateType.SWAP_IN) {
    +            out.writeUTF(record.getSwapLocation());
    +        }
    +    }
    +
    +    @Override
    +    public RepositoryRecord deserializeEdit(final DataInputStream in, 
final Map<Object, RepositoryRecord> currentRecordStates, final int version) 
throws IOException {
    +        final int action = in.read();
    +        final long recordId = in.readLong();
    +        if (action == ACTION_DELETE) {
    +            final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder().id(recordId);
    +
    +            if (version > 4) {
    +                deserializeClaim(in, version, ffBuilder);
    +            }
    +
    +            final FlowFileRecord flowFileRecord = ffBuilder.build();
    +            final StandardRepositoryRecord record = new 
StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
    +            record.markForDelete();
    +
    +            return record;
    +        }
    +
    +        if (action == ACTION_SWAPPED_OUT) {
    +            final String queueId = in.readUTF();
    +            final String location = in.readUTF();
    +            final FlowFileQueue queue = getFlowFileQueue(queueId);
    +
    +            final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
    +                .id(recordId)
    +                .build();
    +
    +            return new StandardRepositoryRecord(queue, flowFileRecord, 
location);
    +        }
    +
    +        final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder();
    +        final RepositoryRecord record = currentRecordStates.get(recordId);
    +        ffBuilder.id(recordId);
    +        if (record != null) {
    +            ffBuilder.fromFlowFile(record.getCurrent());
    +        }
    +        ffBuilder.entryDate(in.readLong());
    +
    +        if (version > 1) {
    +            // read the lineage identifiers and lineage start date, which 
were added in version 2.
    +            if (version < 9) {
    +                final int numLineageIds = in.readInt();
    +                for (int i = 0; i < numLineageIds; i++) {
    +                    in.readUTF(); //skip identifiers
    +                }
    +            }
    +            final long lineageStartDate = in.readLong();
    +            final long lineageStartIndex;
    +            if (version > 7) {
    +                lineageStartIndex = in.readLong();
    +            } else {
    +                lineageStartIndex = 0L;
    +            }
    +            ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
    +
    +            if (version > 5) {
    +                final long lastQueueDate = in.readLong();
    +                final long queueDateIndex;
    +                if (version > 7) {
    +                    queueDateIndex = in.readLong();
    +                } else {
    +                    queueDateIndex = 0L;
    +                }
    +
    +                ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
    +            }
    +        }
    +
    +        ffBuilder.size(in.readLong());
    +        final String connectionId = readString(in);
    +
    +        logger.debug("{} -> {}", new Object[] {recordId, connectionId});
    +
    +        deserializeClaim(in, version, ffBuilder);
    +
    +        // recover new attributes, if they changed
    +        final int attributesChanged = in.read();
    +        if (attributesChanged == -1) {
    +            throw new EOFException();
    +        } else if (attributesChanged == 1) {
    +            final int numAttributes = in.readInt();
    +            final Map<String, String> attributes = new HashMap<>();
    +            for (int i = 0; i < numAttributes; i++) {
    +                final String key = readString(in);
    +                final String value = readString(in);
    +                attributes.put(key, value);
    +            }
    +
    +            ffBuilder.addAttributes(attributes);
    +        } else if (attributesChanged != 0) {
    +            throw new IOException("Attribute Change Qualifier not found in 
stream; found value: "
    +                + attributesChanged + " after successfully restoring " + 
recordsRestored + " records. The FlowFile Repository appears to be corrupt!");
    +        }
    +
    +        final FlowFileRecord flowFile = ffBuilder.build();
    +        String swapLocation = null;
    +        if (action == ACTION_SWAPPED_IN) {
    +            swapLocation = in.readUTF();
    +        }
    +
    +        final FlowFileQueue queue = getFlowFileQueue(connectionId);
    +        final StandardRepositoryRecord standardRepoRecord = new 
StandardRepositoryRecord(queue, flowFile);
    +        if (swapLocation != null) {
    --- End diff --
    
    Before, it was checking of the flowFileQueueMap was null, I believe, not 
the flowfile queue itself. So this is okay - at this point, it's guaranteed 
non-null.


> Enable repositories to support upgrades and rollback in well defined scenarios
> ------------------------------------------------------------------------------
>
>                 Key: NIFI-2854
>                 URL: https://issues.apache.org/jira/browse/NIFI-2854
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>             Fix For: 1.1.0
>
>
> The flowfile, swapfile, provenance, and content repositories play a very 
> important roll in NiFi's ability to be safely upgraded and rolled back.  We 
> need to have well documented behaviors, designs, and version adherence so 
> that users can safely rely on these mechanisms.
> Once this is formalized and in place we should update our versioning guidance 
> to reflect this as well.
> The following would be true from NiFi 1.2.0 onward
> * No changes to how the repositories are persisted to disk can be made which 
> will break forward/backward compatibility and specifically this means that 
> things like the way each is serialized to disk cannot change.
> * If changes are made which impact forward or backward compatibility they 
> should be reserved for major releases only and should include a utility to 
> help users with pre-existing data convert from some older format to the newer 
> format.  It may not be feasible to have rollback on major releases.
> * The content repository should not be changed within a major release cycle 
> in any way that will harm forward or backward compatibility.
> * The flow file repository can change in that new fields can be added to 
> existing write ahead log record types but no fields can be removed nor can 
> any new types be added.  Once a field is considered required it must remain 
> required.  Changes may only be made across minor version changes - not 
> incremental.
> * Swap File storage should follow very similar rules to the flow file 
> repository.  Adding a schema to the swap file header may allow some variation 
> there but the variation should only be hints to optimize how they're 
> processed and not change their behavior otherwise. Changes are only permitted 
> during minor version releases.
> * Provenance repository changes are only permitted during minor version 
> releases.  These changes may include adding or removing fields from existing 
> event types.  If a field is considered required it must always be considered 
> required.  If a field is removed then it must not be a required field and 
> there must be a sensible default an older version could use if that value is 
> not found in new data once rolled back.  New event types may be added.  
> Fields or event types not known to older version, if seen after a rollback, 
> will simply be ignored.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to