[6/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.

2016-11-18 Thread ozhurakousky
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
--
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
index 2afaa70..324f59f 100644
--- 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java
@@ -16,19 +16,445 @@
  */
 package org.apache.nifi.stream.io;
 
+import java.io.IOException;
 import java.io.InputStream;
 
 /**
  * This class is a slight modification of the BufferedInputStream in the 
java.io package. The modification is that this implementation does not provide 
synchronization on method calls, which means
  * that this class is not suitable for use by multiple threads. However, the 
absence of these synchronized blocks results in potentially much better 
performance.
  */
-public class BufferedInputStream extends java.io.BufferedInputStream {
+public class BufferedInputStream extends InputStream {
 
-public BufferedInputStream(final InputStream in) {
-super(in);
+private final InputStream in;
+
+private static int DEFAULT_BUFFER_SIZE = 8192;
+
+/**
+ * The maximum size of array to allocate.
+ * Some VMs reserve some header words in an array.
+ * Attempts to allocate larger arrays may result in
+ * OutOfMemoryError: Requested array size exceeds VM limit
+ */
+private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
+
+/**
+ * The internal buffer array where the data is stored. When necessary,
+ * it may be replaced by another array of
+ * a different size.
+ */
+protected byte buf[];
+
+/**
+ * The index one greater than the index of the last valid byte in
+ * the buffer.
+ * This value is always
+ * in the range 0 through buf.length;
+ * elements buf[0] through buf[count-1]
+ * contain buffered input data obtained
+ * from the underlying input stream.
+ */
+private int count;
+
+/**
+ * The current position in the buffer. This is the index of the next
+ * character to be read from the buf array.
+ * 
+ * This value is always in the range 0
+ * through count. If it is less
+ * than count, then buf[pos]
+ * is the next byte to be supplied as input;
+ * if it is equal to count, then
+ * the next read or skip
+ * operation will require more bytes to be
+ * read from the contained input stream.
+ *
+ * @see java.io.BufferedInputStream#buf
+ */
+private int pos;
+
+/**
+ * The value of the pos field at the time the last
+ * mark method was called.
+ * 
+ * This value is always
+ * in the range -1 through pos.
+ * If there is no marked position in the input
+ * stream, this field is -1. If
+ * there is a marked position in the input
+ * stream, then buf[markpos]
+ * is the first byte to be supplied as input
+ * after a reset operation. If
+ * markpos is not -1,
+ * then all bytes from positions buf[markpos]
+ * through buf[pos-1] must remain
+ * in the buffer array (though they may be
+ * moved to another place in the buffer array,
+ * with suitable adjustments to the values
+ * of count, pos,
+ * and markpos); they may not
+ * be discarded unless and until the difference
+ * between pos and markpos
+ * exceeds marklimit.
+ *
+ * @see java.io.BufferedInputStream#mark(int)
+ * @see java.io.BufferedInputStream#pos
+ */
+protected int markpos = -1;
+
+/**
+ * The maximum read ahead allowed after a call to the
+ * mark method before subsequent calls to the
+ * reset method fail.
+ * Whenever the difference between pos
+ * and markpos exceeds marklimit,
+ * then the mark may be dropped by setting
+ * markpos to -1.
+ *
+ * @see java.io.BufferedInputStream#mark(int)
+ * @see java.io.BufferedInputStream#reset()
+ */
+protected int marklimit;
+
+/**
+ * Check to make sure that underlying input stream has not been
+ * nulled out due to close; if not return it;
+ */
+private InputStream getInIfOpen() throws IOException {
+InputStream input = in;
+if (input == null) {
+throw new IOException("Stream closed");
+}
+return input;
+}
+
+/**
+ * Check to make sure that buffer has not been nulled out due to
+ * close; if not return it;
+ */
+private byte[] getBufIfOpen() throws IOException {
+if (buf == null) {
+throw new IOException("Stream closed");
+}
+return buf;
+}
+
+/**
+ * Creates a BufferedInputStream
+

[1/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.

2016-11-18 Thread ozhurakousky
Repository: nifi
Updated Branches:
  refs/heads/master 5a25884f5 -> 1be087147


http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
--
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
new file mode 100644
index 000..bae2364
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/AbstractTestRecordReaderWriter.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance;
+
+import static org.apache.nifi.provenance.TestUtil.createFlowFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.provenance.toc.StandardTocReader;
+import org.apache.nifi.provenance.toc.StandardTocWriter;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.provenance.toc.TocUtil;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public abstract class AbstractTestRecordReaderWriter {
+@BeforeClass
+public static void setLogLevel() {
+
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", 
"INFO");
+}
+
+protected ProvenanceEventRecord createEvent() {
+final Map attributes = new HashMap<>();
+attributes.put("filename", "1.txt");
+attributes.put("uuid", UUID.randomUUID().toString());
+
+final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+builder.setEventTime(System.currentTimeMillis());
+builder.setEventType(ProvenanceEventType.RECEIVE);
+builder.setTransitUri("nifi://unit-test");
+builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+builder.setComponentId("1234");
+builder.setComponentType("dummy processor");
+final ProvenanceEventRecord record = builder.build();
+
+return record;
+}
+
+@Test
+public void testSimpleWriteWithToc() throws IOException {
+final File journalFile = new File("target/storage/" + 
UUID.randomUUID().toString() + "/testSimpleWrite");
+final File tocFile = TocUtil.getTocFile(journalFile);
+final TocWriter tocWriter = new StandardTocWriter(tocFile, false, 
false);
+final RecordWriter writer = createWriter(journalFile, tocWriter, 
false, 1024 * 1024);
+
+writer.writeHeader(1L);
+writer.writeRecord(createEvent(), 1L);
+writer.close();
+
+final TocReader tocReader = new StandardTocReader(tocFile);
+
+try (final FileInputStream fis = new FileInputStream(journalFile);
+final RecordReader reader = createReader(fis, 
journalFile.getName(), tocReader, 2048)) {
+assertEquals(0, reader.getBlockIndex());
+reader.skipToBlock(0);
+final StandardProvenanceEventRecord recovered = 
reader.nextRecord();
+assertNotNull(recovered);
+
+assertEquals("nifi://unit-test", recovered.getTransitUri());
+assertNull(reader.nextRecord());
+}
+
+FileUtils.deleteFile(journalFile.getParentFile(), true);
+}
+
+
+@Test
+public void 

[7/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.

2016-11-18 Thread ozhurakousky
NIFI-2854: Refactor repositories and swap files to use schema-based 
serialization so that nifi can be rolled back to a previous version after an 
upgrade.

NIFI-2854: Incorporated PR review feedback

NIFI-2854: Implemented feedback from PR Review

NIFI-2854: Ensure that all resources are closed on 
CompressableRecordReader.close() even if an IOException is thrown when closing 
one of them

This closes #1202


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1be08714
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1be08714
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1be08714

Branch: refs/heads/master
Commit: 1be08714731f01347ac1f98e18047fe7d9ab8afd
Parents: 5a25884
Author: Mark Payne 
Authored: Tue Oct 4 09:38:14 2016 -0400
Committer: Oleg Zhurakousky 
Committed: Fri Nov 18 14:53:13 2016 -0500

--
 .../nifi/provenance/NamedSearchableField.java   |   9 +-
 .../StandardProvenanceEventRecord.java  |  38 ++
 nifi-commons/nifi-schema-utils/pom.xml  |  26 +
 .../repository/schema/ComplexRecordField.java   |  95 
 .../nifi/repository/schema/FieldMapRecord.java  |  81 +++
 .../nifi/repository/schema/FieldType.java   |  74 +++
 .../nifi/repository/schema/MapRecordField.java  |  75 +++
 .../nifi/repository/schema/NamedValue.java  |  36 ++
 .../apache/nifi/repository/schema/Record.java   |  30 +
 .../nifi/repository/schema/RecordField.java |  30 +
 .../nifi/repository/schema/RecordSchema.java| 188 +++
 .../nifi/repository/schema/Repetition.java  |  22 +
 .../repository/schema/SchemaRecordReader.java   | 191 +++
 .../repository/schema/SchemaRecordWriter.java   | 139 +
 .../repository/schema/SimpleRecordField.java|  84 +++
 .../repository/schema/UnionRecordField.java |  64 +++
 .../schema/TestSchemaRecordReader.java  | 281 ++
 .../schema/TestSchemaRecordReaderWriter.java| 178 ++
 .../nifi/stream/io/BufferedInputStream.java | 436 ++-
 .../java/org/apache/nifi/util/FormatUtils.java  |  38 ++
 .../util/timebuffer/CountSizeEntityAccess.java  |  43 ++
 .../nifi/util/timebuffer/TimedCountSize.java|  41 ++
 .../org/wali/MinimalLockingWriteAheadLog.java   | 118 ++--
 .../src/main/java/org/wali/SerDe.java   |  30 +-
 .../src/main/java/org/wali/SerDeFactory.java|  60 ++
 .../java/org/wali/SingletonSerDeFactory.java|  46 ++
 nifi-commons/pom.xml|   1 +
 .../repository/claim/ResourceClaim.java |  24 +
 .../nifi-framework/nifi-framework-core/pom.xml  |   4 +
 .../nifi/controller/FileSystemSwapManager.java  | 416 ++
 .../nifi/controller/StandardFlowFileQueue.java  |   6 +
 .../repository/RepositoryRecordSerde.java   |  68 +++
 .../RepositoryRecordSerdeFactory.java   |  95 
 .../repository/SchemaRepositoryRecordSerde.java | 213 
 .../repository/StandardProcessSession.java  |  70 +--
 .../WriteAheadFlowFileRepository.java   | 547 +--
 .../WriteAheadRepositoryRecordSerde.java| 517 ++
 .../repository/claim/StandardResourceClaim.java |  23 -
 .../claim/StandardResourceClaimManager.java |   7 +-
 .../repository/io/ByteCountingInputStream.java  | 101 
 .../repository/io/ByteCountingOutputStream.java |  63 ---
 .../repository/schema/ContentClaimFieldMap.java |  92 
 .../repository/schema/ContentClaimSchema.java   |  63 +++
 .../schema/FlowFileRecordFieldMap.java  |  99 
 .../repository/schema/FlowFileSchema.java   |  67 +++
 .../schema/RepositoryRecordFieldMap.java|  83 +++
 .../schema/RepositoryRecordSchema.java  |  93 
 .../schema/RepositoryRecordUpdate.java  |  69 +++
 .../schema/ResourceClaimFieldMap.java   |  85 +++
 .../controller/swap/SchemaSwapDeserializer.java |  77 +++
 .../controller/swap/SchemaSwapSerializer.java   | 101 
 .../controller/swap/SimpleSwapDeserializer.java | 303 ++
 .../controller/swap/SimpleSwapSerializer.java   | 133 +
 .../nifi/controller/swap/SwapDeserializer.java  |  33 ++
 .../apache/nifi/controller/swap/SwapSchema.java |  79 +++
 .../nifi/controller/swap/SwapSerializer.java|  33 ++
 .../controller/swap/SwapSummaryFieldMap.java| 106 
 .../controller/TestFileSystemSwapManager.java   | 160 +-
 .../TestWriteAheadFlowFileRepository.java   |   2 +
 .../nifi/controller/swap/MockFlowFile.java  | 136 +
 .../TestSchemaSwapSerializerDeserializer.java   | 195 +++
 .../TestSimpleSwapSerializerDeserializer.java   | 139 +
 .../pom.xml |   4 +
 .../nifi/provenance/AbstractRecordWriter.java   | 173 ++
 .../provenance/ByteArraySchemaRecordReader.java |  80 +++
 .../provenance/ByteArraySchemaRecordWriter.java |  85 +++
 

[4/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.

2016-11-18 Thread ozhurakousky
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
--
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
new file mode 100644
index 000..76c208d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.schema;
+
+import java.util.List;
+
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class ContentClaimFieldMap implements Record {
+private final ContentClaim contentClaim;
+private final long contentClaimOffset;
+private final ResourceClaimFieldMap resourceClaimFieldMap;
+private final RecordSchema schema;
+
+public ContentClaimFieldMap(final ContentClaim contentClaim, final long 
contentClaimOffset, final RecordSchema schema) {
+this.contentClaim = contentClaim;
+this.contentClaimOffset = contentClaimOffset;
+this.schema = schema;
+
+final List resourceClaimFields = 
schema.getField(ContentClaimSchema.RESOURCE_CLAIM).getSubFields();
+final RecordSchema resourceClaimSchema = new 
RecordSchema(resourceClaimFields);
+this.resourceClaimFieldMap = new 
ResourceClaimFieldMap(contentClaim.getResourceClaim(), resourceClaimSchema);
+}
+
+@Override
+public Object getFieldValue(final String fieldName) {
+switch (fieldName) {
+case ContentClaimSchema.RESOURCE_CLAIM:
+return resourceClaimFieldMap;
+case ContentClaimSchema.CONTENT_CLAIM_LENGTH:
+return contentClaim.getLength();
+case ContentClaimSchema.CONTENT_CLAIM_OFFSET:
+return contentClaimOffset;
+case ContentClaimSchema.RESOURCE_CLAIM_OFFSET:
+return contentClaim.getOffset();
+default:
+return null;
+}
+}
+
+@Override
+public RecordSchema getSchema() {
+return schema;
+}
+
+@Override
+public String toString() {
+return "ContentClaimFieldMap[" + contentClaim + "]";
+}
+
+public static ContentClaim getContentClaim(final Record claimRecord, final 
ResourceClaimManager resourceClaimManager) {
+final Record resourceClaimRecord = (Record) 
claimRecord.getFieldValue(ContentClaimSchema.RESOURCE_CLAIM);
+final String container = (String) 
resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_CONTAINER);
+final String section = (String) 
resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_SECTION);
+final String identifier = (String) 
resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_IDENTIFIER);
+final Boolean lossTolerant = (Boolean) 
resourceClaimRecord.getFieldValue(ContentClaimSchema.LOSS_TOLERANT);
+
+final Long length = (Long) 
claimRecord.getFieldValue(ContentClaimSchema.CONTENT_CLAIM_LENGTH);
+final Long resourceOffset = (Long) 
claimRecord.getFieldValue(ContentClaimSchema.RESOURCE_CLAIM_OFFSET);
+
+final ResourceClaim resourceClaim = 
resourceClaimManager.newResourceClaim(container, section, identifier, 
lossTolerant, false);
+final StandardContentClaim contentClaim = new 

[3/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.

2016-11-18 Thread ozhurakousky
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
--
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 97226b2..46bea31 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -22,26 +22,20 @@ import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.file.Files;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.SwapContents;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.events.EventReporter;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -56,7 +50,9 @@ public class TestFileSystemSwapManager {
 final FlowFileQueue flowFileQueue = 
Mockito.mock(FlowFileQueue.class);
 
Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
 
-final SwapContents swapContents = 
FileSystemSwapManager.deserializeFlowFiles(in, 
"/src/test/resources/old-swap-file.swap", flowFileQueue, new 
NopResourceClaimManager());
+final FileSystemSwapManager swapManager = createSwapManager();
+final SwapContents swapContents = 
swapManager.peek("src/test/resources/old-swap-file.swap", flowFileQueue);
+
 final List records = swapContents.getFlowFiles();
 assertEquals(1, records.size());
 
@@ -67,53 +63,32 @@ public class TestFileSystemSwapManager {
 }
 }
 
-@Test
-public void testRoundTripSerializeDeserialize() throws IOException {
-final List toSwap = new ArrayList<>(1);
-final Map attrs = new HashMap<>();
-for (int i = 0; i < 1; i++) {
-attrs.put("i", String.valueOf(i));
-final FlowFileRecord ff = new TestFlowFile(attrs, i);
-toSwap.add(ff);
-}
-
-final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
-
Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
 
-final String swapLocation = "target/testRoundTrip.swap";
-final File swapFile = new File(swapLocation);
-Files.deleteIfExists(swapFile.toPath());
+private FileSystemSwapManager createSwapManager() {
+final FileSystemSwapManager swapManager = new FileSystemSwapManager();
+final ResourceClaimManager resourceClaimManager = new 
NopResourceClaimManager();
+final FlowFileRepository flowfileRepo = 
Mockito.mock(FlowFileRepository.class);
+swapManager.initialize(new SwapManagerInitializationContext() {
+@Override
+public ResourceClaimManager getResourceClaimManager() {
+return resourceClaimManager;
+}
 
-try (final FileOutputStream fos = new FileOutputStream(swapFile)) {
-FileSystemSwapManager.serializeFlowFiles(toSwap, flowFileQueue, 
swapLocation, fos);
-}
+@Override
+public FlowFileRepository getFlowFileRepository() {
+return flowfileRepo;
+}
 
-final SwapContents swappedIn;
-try (final FileInputStream fis = new FileInputStream(swapFile);
-final DataInputStream dis = new DataInputStream(fis)) {
-swappedIn = FileSystemSwapManager.deserializeFlowFiles(dis, 
swapLocation, flowFileQueue, Mockito.mock(ResourceClaimManager.class));
-}
+@Override
+public EventReporter 

[2/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.

2016-11-18 Thread ozhurakousky
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
--
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index 35832c4..a95bd4f 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -17,239 +17,142 @@
 package org.apache.nifi.provenance;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.nifi.provenance.serialization.CompressableRecordWriter;
 import org.apache.nifi.provenance.serialization.RecordWriter;
 import org.apache.nifi.provenance.toc.TocWriter;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.stream.io.DataOutputStream;
-import org.apache.nifi.stream.io.GZIPOutputStream;
-import org.apache.nifi.stream.io.NonCloseableOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StandardRecordWriter implements RecordWriter {
+/**
+ * @deprecated Deprecated in favor of SchemaRecordWriter
+ */
+@Deprecated
+public class StandardRecordWriter extends CompressableRecordWriter implements 
RecordWriter {
 private static final Logger logger = 
LoggerFactory.getLogger(StandardRecordWriter.class);
+public static final int SERIALIZATION_VERISON = 9;
+public static final String SERIALIZATION_NAME = 
"org.apache.nifi.provenance.PersistentProvenanceRepository";
 
 private final File file;
-private final FileOutputStream fos;
-private final ByteCountingOutputStream rawOutStream;
-private final TocWriter tocWriter;
-private final boolean compressed;
-private final int uncompressedBlockSize;
-private final AtomicBoolean dirtyFlag = new AtomicBoolean(false);
-
-private DataOutputStream out;
-private ByteCountingOutputStream byteCountingOut;
-private long lastBlockOffset = 0L;
-private int recordCount = 0;
-private volatile boolean closed = false;
-
-private final Lock lock = new ReentrantLock();
 
 
 public StandardRecordWriter(final File file, final TocWriter writer, final 
boolean compressed, final int uncompressedBlockSize) throws IOException {
+super(file, writer, compressed, uncompressedBlockSize);
 logger.trace("Creating Record Writer for {}", file.getName());
 
 this.file = file;
-this.compressed = compressed;
-this.fos = new FileOutputStream(file);
-rawOutStream = new ByteCountingOutputStream(fos);
-this.uncompressedBlockSize = uncompressedBlockSize;
+}
 
-this.tocWriter = writer;
+public StandardRecordWriter(final OutputStream out, final TocWriter 
tocWriter, final boolean compressed, final int uncompressedBlockSize) throws 
IOException {
+super(out, tocWriter, compressed, uncompressedBlockSize);
+this.file = null;
 }
 
 @Override
-public synchronized File getFile() {
-return file;
+protected String getSerializationName() {
+return SERIALIZATION_NAME;
 }
 
 @Override
-public synchronized void writeHeader(final long firstEventId) throws 
IOException {
-if (isDirty()) {
-throw new IOException("Cannot update Provenance Repository because 
this Record Writer has already failed to write to the Repository");
-}
-
-try {
-lastBlockOffset = rawOutStream.getBytesWritten();
-resetWriteStream(firstEventId);
-
-out.writeUTF(PersistentProvenanceRepository.class.getName());
-out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION);
-out.flush();
-} catch (final IOException ioe) {
-markDirty();
-throw ioe;
-}
+protected int getSerializationVersion() {
+return SERIALIZATION_VERISON;
 }
 
-
-/**
- * Resets the streams to prepare for a new block
- * @param eventId the first id that will be written to the new block
- * @throws IOException if unable to flush/close the current streams 

[5/7] nifi git commit: NIFI-2854: Refactor repositories and swap files to use schema-based serialization so that nifi can be rolled back to a previous version after an upgrade.

2016-11-18 Thread ozhurakousky
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
--
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
new file mode 100644
index 000..916fd76
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.schema.ContentClaimFieldMap;
+import org.apache.nifi.controller.repository.schema.ContentClaimSchema;
+import org.apache.nifi.controller.repository.schema.FlowFileSchema;
+import org.apache.nifi.controller.repository.schema.RepositoryRecordFieldMap;
+import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
+import org.apache.nifi.controller.repository.schema.RepositoryRecordUpdate;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SchemaRecordReader;
+import org.apache.nifi.repository.schema.SchemaRecordWriter;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+import org.wali.SerDe;
+import org.wali.UpdateType;
+
+public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde 
implements SerDe {
+private static final int MAX_ENCODING_VERSION = 1;
+
+private final RecordSchema writeSchema = 
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1;
+private final RecordSchema contentClaimSchema = 
ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1;
+
+private final ResourceClaimManager resourceClaimManager;
+private volatile RecordSchema recoverySchema;
+
+public SchemaRepositoryRecordSerde(final ResourceClaimManager 
resourceClaimManager) {
+this.resourceClaimManager = resourceClaimManager;
+}
+
+@Override
+public void writeHeader(final DataOutputStream out) throws IOException {
+writeSchema.writeTo(out);
+}
+
+@Override
+public void serializeEdit(final RepositoryRecord previousRecordState, 
final RepositoryRecord newRecordState, final DataOutputStream out) throws 
IOException {
+serializeRecord(newRecordState, out);
+}
+
+@Override
+public void serializeRecord(final RepositoryRecord record, final 
DataOutputStream out) throws IOException {
+final RecordSchema schema;
+switch (record.getType()) {
+case CREATE:
+case UPDATE:
+schema = RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1;
+break;
+case CONTENTMISSING:
+case DELETE:
+schema = RepositoryRecordSchema.DELETE_SCHEMA_V1;
+break;
+case SWAP_IN:
+schema = RepositoryRecordSchema.SWAP_IN_SCHEMA_V1;
+break;
+case SWAP_OUT:
+schema = RepositoryRecordSchema.SWAP_OUT_SCHEMA_V1;
+break;
+default:
+throw new IllegalArgumentException("Received Repository Record 
with unknown Update Type: " + record.getType()); // won't happen.
+}
+
+final RepositoryRecordFieldMap fieldMap = new 
RepositoryRecordFieldMap(record, schema, contentClaimSchema);

nifi git commit: NIFI-3052 Update screenshots in Admin Guide and Getting Starting guide for additional colors added to UI This closes #1239

2016-11-18 Thread scottyaslan
Repository: nifi
Updated Branches:
  refs/heads/master 8568d40cd -> 5a25884f5


NIFI-3052 Update screenshots in Admin Guide and Getting Starting guide for 
additional colors added to UI
This closes #1239


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5a25884f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5a25884f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5a25884f

Branch: refs/heads/master
Commit: 5a25884f52e92b3c73f425a1dd6c0993866b1b8f
Parents: 8568d40
Author: Andrew Lim 
Authored: Thu Nov 17 12:20:21 2016 -0500
Committer: Scott Aslan 
Committed: Fri Nov 18 14:05:17 2016 -0500

--
 .../src/main/asciidoc/images/global-menu.png| Bin 33163 -> 34564 bytes
 .../src/main/asciidoc/images/iconAlert.png  | Bin 624 -> 597 bytes
 nifi-docs/src/main/asciidoc/images/iconRun.png  | Bin 429 -> 456 bytes
 nifi-docs/src/main/asciidoc/images/iconStop.png | Bin 248 -> 245 bytes
 .../asciidoc/images/lineage-graph-annotated.png | Bin 69156 -> 65991 bytes
 nifi-docs/src/main/asciidoc/images/ncm.png  | Bin 244418 -> 134532 bytes
 6 files changed, 0 insertions(+), 0 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/nifi/blob/5a25884f/nifi-docs/src/main/asciidoc/images/global-menu.png
--
diff --git a/nifi-docs/src/main/asciidoc/images/global-menu.png 
b/nifi-docs/src/main/asciidoc/images/global-menu.png
index 25ad52a..c32f1a7 100644
Binary files a/nifi-docs/src/main/asciidoc/images/global-menu.png and 
b/nifi-docs/src/main/asciidoc/images/global-menu.png differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/5a25884f/nifi-docs/src/main/asciidoc/images/iconAlert.png
--
diff --git a/nifi-docs/src/main/asciidoc/images/iconAlert.png 
b/nifi-docs/src/main/asciidoc/images/iconAlert.png
index 18917d4..7006e5a 100644
Binary files a/nifi-docs/src/main/asciidoc/images/iconAlert.png and 
b/nifi-docs/src/main/asciidoc/images/iconAlert.png differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/5a25884f/nifi-docs/src/main/asciidoc/images/iconRun.png
--
diff --git a/nifi-docs/src/main/asciidoc/images/iconRun.png 
b/nifi-docs/src/main/asciidoc/images/iconRun.png
old mode 100755
new mode 100644
index b6fd45b..f912a6c
Binary files a/nifi-docs/src/main/asciidoc/images/iconRun.png and 
b/nifi-docs/src/main/asciidoc/images/iconRun.png differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/5a25884f/nifi-docs/src/main/asciidoc/images/iconStop.png
--
diff --git a/nifi-docs/src/main/asciidoc/images/iconStop.png 
b/nifi-docs/src/main/asciidoc/images/iconStop.png
index 3192a10..2b89422 100644
Binary files a/nifi-docs/src/main/asciidoc/images/iconStop.png and 
b/nifi-docs/src/main/asciidoc/images/iconStop.png differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/5a25884f/nifi-docs/src/main/asciidoc/images/lineage-graph-annotated.png
--
diff --git a/nifi-docs/src/main/asciidoc/images/lineage-graph-annotated.png 
b/nifi-docs/src/main/asciidoc/images/lineage-graph-annotated.png
index d500c70..f89462d 100644
Binary files a/nifi-docs/src/main/asciidoc/images/lineage-graph-annotated.png 
and b/nifi-docs/src/main/asciidoc/images/lineage-graph-annotated.png differ

http://git-wip-us.apache.org/repos/asf/nifi/blob/5a25884f/nifi-docs/src/main/asciidoc/images/ncm.png
--
diff --git a/nifi-docs/src/main/asciidoc/images/ncm.png 
b/nifi-docs/src/main/asciidoc/images/ncm.png
index e02fbb0..c16a4fa 100644
Binary files a/nifi-docs/src/main/asciidoc/images/ncm.png and 
b/nifi-docs/src/main/asciidoc/images/ncm.png differ



[jira] [Commented] (MINIFI-61) Refactor dependencies to make use of separated nifi-api

2016-11-18 Thread Aldrin Piri (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFI-61?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15677021#comment-15677021
 ] 

Aldrin Piri commented on MINIFI-61:
---

Would also benefit if there was a way to get more granular access to certain 
NiFi components but would need some restructuring against those items.

> Refactor dependencies to make use of separated nifi-api
> ---
>
> Key: MINIFI-61
> URL: https://issues.apache.org/jira/browse/MINIFI-61
> Project: Apache NiFi MiNiFi
>  Issue Type: Improvement
>  Components: Core Framework
>Reporter: Aldrin Piri
>Assignee: Aldrin Piri
>
> NIFI-1896 provided a separation of core extension API (nifi-api) and items 
> that were more framework specific to NiFi (nifi-framework-api).  Accordingly, 
> the adjustment of nifi-api should let us provide a common interface to use 
> extensions interchangeably with NiFi while providing internals and framework 
> specific to minifi.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (MINIFI-29) Create FlowController implementation

2016-11-18 Thread Aldrin Piri (JIRA)

 [ 
https://issues.apache.org/jira/browse/MINIFI-29?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aldrin Piri updated MINIFI-29:
--
Assignee: (was: Aldrin Piri)

> Create FlowController implementation 
> -
>
> Key: MINIFI-29
> URL: https://issues.apache.org/jira/browse/MINIFI-29
> Project: Apache NiFi MiNiFi
>  Issue Type: Task
>  Components: Core Framework
>Reporter: Aldrin Piri
>
> While much can be reused and many of the fundamentals are the same, a 
> component of similar functionality to the FlowController in NiFi is also 
> needed to handle the specifics of how processing works within MiNiFi



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (MINIFI-118) MiNiFi does not compile under Centos 6 standard tooling

2016-11-18 Thread Aldrin Piri (JIRA)

[ 
https://issues.apache.org/jira/browse/MINIFI-118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15676967#comment-15676967
 ] 

Aldrin Piri commented on MINIFI-118:


I understand this is an inconvenience but am also of the opinion that it is not 
an unfair requirement given the period of time since its release.  

To get a more informed decision of where the issues arise, do you have a 
listing of the problem areas?  

Off the top of my head, while supported now, I believe the latest (and current 
release, 0.5.3) of yaml-cpp that we are using is the last before C++11 is 
required.

Overall, if it's a small gap, maybe we can adjust, but also don't want to 
preclude us from using the latest libraries nor get in a position where we have 
to manage this compatibility for things upon which we depend.  I could also see 
where we have varying levels of compatibility wherein some of our core 
libraries/framework items are maybe using pre-C++11 type work and our higher 
level more outward facing components run off more current versions.

Let me know your thoughts.

> MiNiFi does not compile under Centos 6 standard tooling
> ---
>
> Key: MINIFI-118
> URL: https://issues.apache.org/jira/browse/MINIFI-118
> Project: Apache NiFi MiNiFi
>  Issue Type: Bug
>Affects Versions: 0.0.1
>Reporter: Andre
>
> Reliance on C++11 is a big issue as default (i.e. no SCL) tooling on RHEL6 
> does not support C++11 fully. 
> Using Software collections to deploy later version of the toolchain produce 
> valid binary



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)