This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f119c49c4d NIFI-12843: Fix incorrect read of parquet data, when 
record.count is inherited
f119c49c4d is described below

commit f119c49c4d649e97015a442da26af1c0bcfe16b0
Author: Rajmund Takacs <tak...@gmail.com>
AuthorDate: Mon Feb 26 16:52:59 2024 +0100

    NIFI-12843: Fix incorrect read of parquet data, when record.count is 
inherited
    
    This closes #8452.
    
    Signed-off-by: Tamas Palfy <tpa...@apache.org>
---
 .../nifi/parquet/record/ParquetRecordReader.java   |  9 ++-
 .../org/apache/nifi/parquet/TestParquetReader.java | 90 ----------------------
 2 files changed, 6 insertions(+), 93 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
index 380081a3b0..0ed4680c4b 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java
@@ -61,10 +61,13 @@ public class ParquetRecordReader implements RecordReader {
         final Long offset = 
Optional.ofNullable(variables.get(ParquetAttribute.RECORD_OFFSET))
                 .map(Long::parseLong)
                 .orElse(null);
+        final String recordCount = 
variables.get(ParquetAttribute.RECORD_COUNT);
 
-        recordsToRead = 
Optional.ofNullable(variables.get(ParquetAttribute.RECORD_COUNT))
-                .map(Long::parseLong)
-                .orElse(null);
+        if (offset != null && recordCount != null) {
+            recordsToRead = Long.parseLong(recordCount);
+        } else {
+            recordsToRead = null;
+        }
 
         final long fileStartOffset = 
Optional.ofNullable(variables.get(ParquetAttribute.FILE_RANGE_START_OFFSET))
                 .map(Long::parseLong)
diff --git 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
index 1b50d131d0..2a22f75345 100644
--- 
a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
+++ 
b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.parquet;
 
 import static java.util.Collections.emptyMap;
-import static java.util.Collections.singletonMap;
 import static java.util.stream.Collectors.toMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -79,30 +78,6 @@ public class TestParquetReader {
                 .forEach(i -> assertEquals(ParquetTestUtils.createUser(i), 
convertRecordToUser(results.get(i))));
     }
 
-    @Test
-    public void testReadUsersPartiallyWithLimitedRecordCount() throws 
IOException, MalformedRecordException {
-        final int numUsers = 25;
-        final int expectedRecords = 3;
-        final File parquetFile = 
ParquetTestUtils.createUsersParquetFile(numUsers);
-        final List<Record> results = getRecords(parquetFile, 
singletonMap(ParquetAttribute.RECORD_COUNT, "3"));
-
-        assertEquals(expectedRecords, results.size());
-        IntStream.range(0, expectedRecords)
-                .forEach(i -> assertEquals(ParquetTestUtils.createUser(i), 
convertRecordToUser(results.get(i))));
-    }
-
-    @Test
-    public void testReadUsersPartiallyWithOffset() throws IOException, 
MalformedRecordException {
-        final int numUsers = 1000025; // intentionally so large, to test input 
with many record groups
-        final int expectedRecords = 5;
-        final File parquetFile = 
ParquetTestUtils.createUsersParquetFile(numUsers);
-        final List<Record> results = getRecords(parquetFile, 
singletonMap(ParquetAttribute.RECORD_OFFSET, "1000020"));
-
-        assertEquals(expectedRecords, results.size());
-        IntStream.range(0, expectedRecords)
-                .forEach(i -> assertEquals(ParquetTestUtils.createUser(i + 
1000020), convertRecordToUser(results.get(i))));
-    }
-
     @Test
     public void testReadUsersPartiallyWithOffsetAndLimitedRecordCount() throws 
IOException, MalformedRecordException {
         final int numUsers = 1000025; // intentionally so large, to test input 
with many record groups
@@ -120,28 +95,6 @@ public class TestParquetReader {
                 .forEach(i -> assertEquals(ParquetTestUtils.createUser(i + 
1000020), convertRecordToUser(results.get(i))));
     }
 
-    @Test
-    public void testReadUsersPartiallyWithLimitedRecordCountWithinFileRange()
-            throws IOException, MalformedRecordException {
-        final int numUsers = 1000;
-        final int expectedRecords = 3;
-        final File parquetFile = 
ParquetTestUtils.createUsersParquetFile(numUsers);
-        final List<Record> results = getRecords(
-                parquetFile,
-                new HashMap<String, String>() {
-                    {
-                        put(ParquetAttribute.RECORD_COUNT, "3");
-                        put(ParquetAttribute.FILE_RANGE_START_OFFSET, "16543");
-                        put(ParquetAttribute.FILE_RANGE_END_OFFSET, "24784");
-                    }
-                }
-        );
-
-        assertEquals(expectedRecords, results.size());
-        IntStream.range(0, expectedRecords)
-                .forEach(i -> assertEquals(ParquetTestUtils.createUser(i + 
663), convertRecordToUser(results.get(i))));
-    }
-
     @Test
     public void testReadUsersPartiallyWithOffsetWithinFileRange() throws 
IOException, MalformedRecordException {
         final int numUsers = 1000;
@@ -213,25 +166,6 @@ public class TestParquetReader {
                 "MapRecord[{name=Bob9, favorite_number=9, 
favorite_color=blue9}]");
     }
 
-    @Test
-    public void testPartialReaderWithLimitedRecordCount() throws 
InitializationException, IOException {
-        final TestRunner runner = 
TestRunners.newTestRunner(TestParquetProcessor.class);
-        final ParquetReader parquetReader = new ParquetReader();
-
-        runner.addControllerService("reader", parquetReader);
-        runner.enableControllerService(parquetReader);
-
-        runner.enqueue(Paths.get(PARQUET_PATH), 
singletonMap(ParquetAttribute.RECORD_COUNT, "2"));
-
-        runner.setProperty(TestParquetProcessor.READER, "reader");
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
-        
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).get(0).assertContentEquals(
-                "MapRecord[{name=Bob0, favorite_number=0, 
favorite_color=blue0}]\n" +
-                "MapRecord[{name=Bob1, favorite_number=1, 
favorite_color=blue1}]");
-    }
-
     @Test
     public void testPartialReaderWithOffsetAndLimitedRecordCount() throws 
InitializationException, IOException {
         final TestRunner runner = 
TestRunners.newTestRunner(TestParquetProcessor.class);
@@ -256,30 +190,6 @@ public class TestParquetReader {
                 "MapRecord[{name=Bob7, favorite_number=7, 
favorite_color=blue7}]");
     }
 
-    @Test
-    public void testPartialReaderWithOffsetOnly() throws 
InitializationException, IOException {
-        final TestRunner runner = 
TestRunners.newTestRunner(TestParquetProcessor.class);
-        final ParquetReader parquetReader = new ParquetReader();
-
-        runner.addControllerService("reader", parquetReader);
-        runner.enableControllerService(parquetReader);
-
-        runner.enqueue(Paths.get(PARQUET_PATH), 
singletonMap(ParquetAttribute.RECORD_OFFSET, "3"));
-
-        runner.setProperty(TestParquetProcessor.READER, "reader");
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1);
-        
runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).get(0).assertContentEquals(
-                "MapRecord[{name=Bob3, favorite_number=3, 
favorite_color=blue3}]\n" +
-                "MapRecord[{name=Bob4, favorite_number=4, 
favorite_color=blue4}]\n" +
-                "MapRecord[{name=Bob5, favorite_number=5, 
favorite_color=blue5}]\n" +
-                "MapRecord[{name=Bob6, favorite_number=6, 
favorite_color=blue6}]\n" +
-                "MapRecord[{name=Bob7, favorite_number=7, 
favorite_color=blue7}]\n" +
-                "MapRecord[{name=Bob8, favorite_number=8, 
favorite_color=blue8}]\n" +
-                "MapRecord[{name=Bob9, favorite_number=9, 
favorite_color=blue9}]");
-    }
-
     private List<Record> getRecords(File parquetFile, Map<String, String> 
variables)
             throws IOException, MalformedRecordException {
         final List<Record> results = new ArrayList<>();

Reply via email to