Repository: nifi
Updated Branches:
  refs/heads/0.x 2276c6ac1 -> 4f72e3491


NIFI-3055 StandardRecordWriter Can Throw UTFDataFormatException (0.x)
* Updated StandardRecordWriter to consider the encoding behavior of
* java.io.DataOutputStream.writeUTF() and truncate string values such that
* the UTF representation will not be longer than that DataOutputStream's
* 64K byte UTF format limit.
* Add test to confirm handling of large UTF strings.

Signed-off-by: Mike Moser <mose...@apache.org>
This closes #1470.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4f72e349
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4f72e349
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4f72e349

Branch: refs/heads/0.x
Commit: 4f72e3491f2372c8c45afb96a765c1f5cdd2f07d
Parents: 2276c6a
Author: Joe Skora <jsk...@apache.org>
Authored: Thu Feb 2 19:11:05 2017 +0000
Committer: Mike Moser <mose...@apache.org>
Committed: Fri Feb 3 17:10:30 2017 -0500

----------------------------------------------------------------------
 .../nifi/provenance/StandardRecordWriter.java   | 62 ++++++++++++++++----
 .../TestStandardRecordReaderWriter.java         | 44 ++++++++++++++
 2 files changed, 96 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4f72e349/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index a5c121a..f015cc8 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -20,6 +20,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.UTFDataFormatException;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -37,6 +38,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StandardRecordWriter implements RecordWriter {
+
+    public static final int MAX_ALLOWED_UTF_LENGTH = 65_535;
+
     private static final Logger logger = 
LoggerFactory.getLogger(StandardRecordWriter.class);
 
     private final File file;
@@ -83,7 +87,7 @@ public class StandardRecordWriter implements RecordWriter {
             lastBlockOffset = rawOutStream.getBytesWritten();
             resetWriteStream(firstEventId);
 
-            out.writeUTF(PersistentProvenanceRepository.class.getName());
+            writeUTFLimited(out, 
PersistentProvenanceRepository.class.getName());
             out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION);
             out.flush();
         } catch (final IOException ioe) {
@@ -161,7 +165,7 @@ public class StandardRecordWriter implements RecordWriter {
             }
 
             out.writeLong(recordIdentifier);
-            out.writeUTF(record.getEventType().name());
+            writeUTFLimited(out, record.getEventType().name());
             out.writeLong(record.getEventTime());
             out.writeLong(record.getFlowFileEntryDate());
             out.writeLong(record.getEventDuration());
@@ -192,9 +196,9 @@ public class StandardRecordWriter implements RecordWriter {
             // If Content Claim Info is present, write out a 'TRUE' followed 
by claim info. Else, write out 'false'.
             if (record.getContentClaimSection() != null && 
record.getContentClaimContainer() != null && record.getContentClaimIdentifier() 
!= null) {
                 out.writeBoolean(true);
-                out.writeUTF(record.getContentClaimContainer());
-                out.writeUTF(record.getContentClaimSection());
-                out.writeUTF(record.getContentClaimIdentifier());
+                writeUTFLimited(out, record.getContentClaimContainer());
+                writeUTFLimited(out, record.getContentClaimSection());
+                writeUTFLimited(out, record.getContentClaimIdentifier());
                 if (record.getContentClaimOffset() == null) {
                     out.writeLong(0L);
                 } else {
@@ -208,9 +212,9 @@ public class StandardRecordWriter implements RecordWriter {
             // If Previous Content Claim Info is present, write out a 'TRUE' 
followed by claim info. Else, write out 'false'.
             if (record.getPreviousContentClaimSection() != null && 
record.getPreviousContentClaimContainer() != null && 
record.getPreviousContentClaimIdentifier() != null) {
                 out.writeBoolean(true);
-                out.writeUTF(record.getPreviousContentClaimContainer());
-                out.writeUTF(record.getPreviousContentClaimSection());
-                out.writeUTF(record.getPreviousContentClaimIdentifier());
+                writeUTFLimited(out, 
record.getPreviousContentClaimContainer());
+                writeUTFLimited(out, record.getPreviousContentClaimSection());
+                writeUTFLimited(out, 
record.getPreviousContentClaimIdentifier());
                 if (record.getPreviousContentClaimOffset() == null) {
                     out.writeLong(0L);
                 } else {
@@ -256,7 +260,7 @@ public class StandardRecordWriter implements RecordWriter {
     }
 
     protected void writeUUID(final DataOutputStream out, final String uuid) 
throws IOException {
-        out.writeUTF(uuid);
+        writeUTFLimited(out, uuid);
     }
 
     protected void writeUUIDs(final DataOutputStream out, final 
Collection<String> list) throws IOException {
@@ -275,7 +279,7 @@ public class StandardRecordWriter implements RecordWriter {
             out.writeBoolean(false);
         } else {
             out.writeBoolean(true);
-            out.writeUTF(toWrite);
+            writeUTFLimited(out, toWrite);
         }
     }
 
@@ -400,4 +404,42 @@ public class StandardRecordWriter implements RecordWriter {
     public boolean isDirty() {
         return dirtyFlag.get();
     }
+
+    private void writeUTFLimited(final DataOutputStream out, final String 
utfString) throws IOException {
+        try {
+            out.writeUTF(utfString);
+        } catch (UTFDataFormatException e) {
+            final String truncated = utfString.substring(0, 
getCharsInUTFLength(utfString, MAX_ALLOWED_UTF_LENGTH));
+            logger.warn("Truncating provenance record value!  Attempted to 
write {} chars that encode to a UTF byte length greater than "
+                            + "supported maximum ({}), truncating to {} 
chars.",
+                    utfString.length(), MAX_ALLOWED_UTF_LENGTH, 
truncated.length());
+            if (logger.isDebugEnabled()) {
+                logger.warn("String value was:\n{}", truncated);
+            }
+            out.writeUTF(truncated);
+        }
+    }
+
+    static int getCharsInUTFLength(final String str, final int utfLimit) {
+        // see java.io.DataOutputStream.writeUTF()
+        int strlen = str.length();
+        int utflen = 0;
+        int c;
+
+        /* use charAt instead of copying String to Char array */
+        for (int i = 0; i < strlen; i++) {
+            c = str.charAt(i);
+            if ((c >= 0x0001) & (c <= 0x007F)) {
+                utflen++;
+            } else if (c > 0x07FF) {
+                utflen += 3;
+            } else {
+                utflen += 2;
+            }
+            if (utflen > utfLimit) {
+                return i;
+            }
+        }
+        return strlen;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/4f72e349/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
index e11502a..60a2518 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java
@@ -20,7 +20,10 @@ import static 
org.apache.nifi.provenance.TestUtil.createFlowFile;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -28,6 +31,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.provenance.toc.StandardTocReader;
 import org.apache.nifi.provenance.toc.StandardTocWriter;
 import org.apache.nifi.provenance.toc.TocReader;
@@ -186,4 +190,44 @@ public class TestStandardRecordReaderWriter {
 
         FileUtils.deleteFile(journalFile.getParentFile(), true);
     }
+
+    @Test
+    public void testWriteUtfLargerThan64k() throws IOException, 
InterruptedException {
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", "1.txt");
+        attributes.put("uuid", UUID.randomUUID().toString());
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder();
+        builder.setEventTime(System.currentTimeMillis());
+        builder.setEventType(ProvenanceEventType.RECEIVE);
+        builder.setTransitUri("nifi://unit-test");
+        builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+        builder.setComponentId("1234");
+        builder.setComponentType("dummy processor");
+        final String seventyK = StringUtils.repeat("X", 70000);
+        assertTrue(seventyK.length() > 65535);
+        assertTrue(seventyK.getBytes("UTF-8").length > 65535);
+        builder.setDetails(seventyK);
+        final ProvenanceEventRecord record = builder.build();
+
+        try (final ByteArrayOutputStream headerOut = new 
ByteArrayOutputStream();
+             final DataOutputStream out = new DataOutputStream(headerOut)) {
+            out.writeUTF(PersistentProvenanceRepository.class.getName());
+            out.writeInt(9);
+        }
+
+        final File journalFile = new File("target/storage/" + 
UUID.randomUUID().toString() + "/testSimpleWrite.gz");
+        final File tocFile = TocUtil.getTocFile(journalFile);
+        final TocWriter tocWriter = new StandardTocWriter(tocFile, false, 
false);
+        try (final ByteArrayOutputStream recordOut = new 
ByteArrayOutputStream();
+             final StandardRecordWriter writer = new 
StandardRecordWriter(journalFile, tocWriter, false, 0)) {
+
+             writer.writeHeader(1L);
+             recordOut.reset();
+
+             writer.writeRecord(record, 1L);
+        }
+    }
+
 }

Reply via email to