Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1202#discussion_r87806305
  
    --- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
 ---
    @@ -0,0 +1,517 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.controller.repository;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.EOFException;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import org.apache.nifi.controller.queue.FlowFileQueue;
    +import org.apache.nifi.controller.repository.claim.ContentClaim;
    +import org.apache.nifi.controller.repository.claim.ResourceClaim;
    +import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
    +import org.apache.nifi.controller.repository.claim.StandardContentClaim;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.wali.SerDe;
    +import org.wali.UpdateType;
    +
    +public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde 
implements SerDe<RepositoryRecord> {
    +    private static final Logger logger = 
LoggerFactory.getLogger(WriteAheadRepositoryRecordSerde.class);
    +
    +    private static final int CURRENT_ENCODING_VERSION = 9;
    +
    +    public static final byte ACTION_CREATE = 0;
    +    public static final byte ACTION_UPDATE = 1;
    +    public static final byte ACTION_DELETE = 2;
    +    public static final byte ACTION_SWAPPED_OUT = 3;
    +    public static final byte ACTION_SWAPPED_IN = 4;
    +
    +    private long recordsRestored = 0L;
    +    private final ResourceClaimManager claimManager;
    +
    +    public WriteAheadRepositoryRecordSerde(final ResourceClaimManager 
claimManager) {
    +        this.claimManager = claimManager;
    +    }
    +
    +    @Override
    +    public void serializeEdit(final RepositoryRecord previousRecordState, 
final RepositoryRecord record, final DataOutputStream out) throws IOException {
    +        serializeEdit(previousRecordState, record, out, false);
    +    }
    +
    +    public void serializeEdit(final RepositoryRecord previousRecordState, 
final RepositoryRecord record, final DataOutputStream out, final boolean 
forceAttributesWritten) throws IOException {
    +        if (record.isMarkedForAbort()) {
    +            logger.warn("Repository Record {} is marked to be aborted; it 
will be persisted in the FlowFileRepository as a DELETE record", record);
    +            out.write(ACTION_DELETE);
    +            out.writeLong(getRecordIdentifier(record));
    +            serializeContentClaim(record.getCurrentClaim(), 
record.getCurrentClaimOffset(), out);
    +            return;
    +        }
    +
    +        final UpdateType updateType = getUpdateType(record);
    +
    +        if (updateType.equals(UpdateType.DELETE)) {
    +            out.write(ACTION_DELETE);
    +            out.writeLong(getRecordIdentifier(record));
    +            serializeContentClaim(record.getCurrentClaim(), 
record.getCurrentClaimOffset(), out);
    +            return;
    +        }
    +
    +        // If there's a Destination Connection, that's the one that we 
want to associated with this record.
    +        // However, on restart, we will restore the FlowFile and set this 
connection to its "originalConnection".
    +        // If we then serialize the FlowFile again before it's 
transferred, it's important to allow this to happen,
    +        // so we use the originalConnection instead
    +        FlowFileQueue associatedQueue = record.getDestination();
    +        if (associatedQueue == null) {
    +            associatedQueue = record.getOriginalQueue();
    +        }
    +
    +        if (updateType.equals(UpdateType.SWAP_OUT)) {
    +            out.write(ACTION_SWAPPED_OUT);
    +            out.writeLong(getRecordIdentifier(record));
    +            out.writeUTF(associatedQueue.getIdentifier());
    +            out.writeUTF(getLocation(record));
    +            return;
    +        }
    +
    +        final FlowFile flowFile = record.getCurrent();
    +        final ContentClaim claim = record.getCurrentClaim();
    +
    +        switch (updateType) {
    +            case UPDATE:
    +                out.write(ACTION_UPDATE);
    +                break;
    +            case CREATE:
    +                out.write(ACTION_CREATE);
    +                break;
    +            case SWAP_IN:
    +                out.write(ACTION_SWAPPED_IN);
    +                break;
    +            default:
    +                throw new AssertionError();
    +        }
    +
    +        out.writeLong(getRecordIdentifier(record));
    +        out.writeLong(flowFile.getEntryDate());
    +        out.writeLong(flowFile.getLineageStartDate());
    +        out.writeLong(flowFile.getLineageStartIndex());
    +
    +        final Long queueDate = flowFile.getLastQueueDate();
    +        out.writeLong(queueDate == null ? System.currentTimeMillis() : 
queueDate);
    +        out.writeLong(flowFile.getQueueDateIndex());
    +        out.writeLong(flowFile.getSize());
    +
    +        if (associatedQueue == null) {
    +            logger.warn("{} Repository Record {} has no Connection 
associated with it; it will be destroyed on restart",
    +                new Object[] {this, record});
    +            writeString("", out);
    +        } else {
    +            writeString(associatedQueue.getIdentifier(), out);
    +        }
    +
    +        serializeContentClaim(claim, record.getCurrentClaimOffset(), out);
    +
    +        if (forceAttributesWritten || record.isAttributesChanged() || 
updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) {
    +            out.write(1);   // indicate attributes changed
    +            final Map<String, String> attributes = 
flowFile.getAttributes();
    +            out.writeInt(attributes.size());
    +            for (final Map.Entry<String, String> entry : 
attributes.entrySet()) {
    +                writeString(entry.getKey(), out);
    +                writeString(entry.getValue(), out);
    +            }
    +        } else {
    +            out.write(0);   // indicate attributes did not change
    +        }
    +
    +        if (updateType == UpdateType.SWAP_IN) {
    +            out.writeUTF(record.getSwapLocation());
    +        }
    +    }
    +
    +    @Override
    +    public RepositoryRecord deserializeEdit(final DataInputStream in, 
final Map<Object, RepositoryRecord> currentRecordStates, final int version) 
throws IOException {
    +        final int action = in.read();
    +        final long recordId = in.readLong();
    +        if (action == ACTION_DELETE) {
    +            final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder().id(recordId);
    +
    +            if (version > 4) {
    +                deserializeClaim(in, version, ffBuilder);
    +            }
    +
    +            final FlowFileRecord flowFileRecord = ffBuilder.build();
    +            final StandardRepositoryRecord record = new 
StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
    +            record.markForDelete();
    +
    +            return record;
    +        }
    +
    +        if (action == ACTION_SWAPPED_OUT) {
    +            final String queueId = in.readUTF();
    +            final String location = in.readUTF();
    +            final FlowFileQueue queue = getFlowFileQueue(queueId);
    +
    +            final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
    +                .id(recordId)
    +                .build();
    +
    +            return new StandardRepositoryRecord(queue, flowFileRecord, 
location);
    +        }
    +
    +        final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder();
    +        final RepositoryRecord record = currentRecordStates.get(recordId);
    +        ffBuilder.id(recordId);
    +        if (record != null) {
    +            ffBuilder.fromFlowFile(record.getCurrent());
    +        }
    +        ffBuilder.entryDate(in.readLong());
    +
    +        if (version > 1) {
    +            // read the lineage identifiers and lineage start date, which 
were added in version 2.
    +            if (version < 9) {
    +                final int numLineageIds = in.readInt();
    +                for (int i = 0; i < numLineageIds; i++) {
    +                    in.readUTF(); //skip identifiers
    +                }
    +            }
    +            final long lineageStartDate = in.readLong();
    +            final long lineageStartIndex;
    +            if (version > 7) {
    +                lineageStartIndex = in.readLong();
    +            } else {
    +                lineageStartIndex = 0L;
    +            }
    +            ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
    +
    +            if (version > 5) {
    +                final long lastQueueDate = in.readLong();
    +                final long queueDateIndex;
    +                if (version > 7) {
    +                    queueDateIndex = in.readLong();
    +                } else {
    +                    queueDateIndex = 0L;
    +                }
    +
    +                ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
    +            }
    +        }
    +
    +        ffBuilder.size(in.readLong());
    +        final String connectionId = readString(in);
    +
    +        logger.debug("{} -> {}", new Object[] {recordId, connectionId});
    +
    +        deserializeClaim(in, version, ffBuilder);
    +
    +        // recover new attributes, if they changed
    +        final int attributesChanged = in.read();
    +        if (attributesChanged == -1) {
    +            throw new EOFException();
    +        } else if (attributesChanged == 1) {
    +            final int numAttributes = in.readInt();
    +            final Map<String, String> attributes = new HashMap<>();
    +            for (int i = 0; i < numAttributes; i++) {
    +                final String key = readString(in);
    +                final String value = readString(in);
    +                attributes.put(key, value);
    +            }
    +
    +            ffBuilder.addAttributes(attributes);
    +        } else if (attributesChanged != 0) {
    +            throw new IOException("Attribute Change Qualifier not found in 
stream; found value: "
    +                + attributesChanged + " after successfully restoring " + 
recordsRestored + " records. The FlowFile Repository appears to be corrupt!");
    +        }
    +
    +        final FlowFileRecord flowFile = ffBuilder.build();
    +        String swapLocation = null;
    +        if (action == ACTION_SWAPPED_IN) {
    +            swapLocation = in.readUTF();
    +        }
    +
    +        final FlowFileQueue queue = getFlowFileQueue(connectionId);
    +        final StandardRepositoryRecord standardRepoRecord = new 
StandardRepositoryRecord(queue, flowFile);
    +        if (swapLocation != null) {
    --- End diff --
    
    Before, it was checking of the flowFileQueueMap was null, I believe, not 
the flowfile queue itself. So this is okay - at this point, it's guaranteed 
non-null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to