This is an automated email from the ASF dual-hosted git repository. tpalfy pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new 48fb538e68 NIFI-12843: Fix incorrect read of parquet data, when record.count is inherited 48fb538e68 is described below commit 48fb538e685ae4c81faf67a65c1e790d5e1bf4e5 Author: Rajmund Takacs <tak...@gmail.com> AuthorDate: Mon Feb 26 16:52:59 2024 +0100 NIFI-12843: Fix incorrect read of parquet data, when record.count is inherited This closes #8452. Signed-off-by: Tamas Palfy <tpa...@apache.org> --- .../nifi/parquet/record/ParquetRecordReader.java | 9 ++- .../org/apache/nifi/parquet/TestParquetReader.java | 90 ---------------------- 2 files changed, 6 insertions(+), 93 deletions(-) diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java index 380081a3b0..0ed4680c4b 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java @@ -61,10 +61,13 @@ public class ParquetRecordReader implements RecordReader { final Long offset = Optional.ofNullable(variables.get(ParquetAttribute.RECORD_OFFSET)) .map(Long::parseLong) .orElse(null); + final String recordCount = variables.get(ParquetAttribute.RECORD_COUNT); - recordsToRead = Optional.ofNullable(variables.get(ParquetAttribute.RECORD_COUNT)) - .map(Long::parseLong) - .orElse(null); + if (offset != null && recordCount != null) { + recordsToRead = Long.parseLong(recordCount); + } else { + recordsToRead = null; + } final long fileStartOffset = Optional.ofNullable(variables.get(ParquetAttribute.FILE_RANGE_START_OFFSET)) .map(Long::parseLong) diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java index f8d2929ee7..e3b4d13623 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java @@ -17,7 +17,6 @@ package org.apache.nifi.parquet; import static java.util.Collections.emptyMap; -import static java.util.Collections.singletonMap; import static java.util.stream.Collectors.toMap; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -79,30 +78,6 @@ public class TestParquetReader { .forEach(i -> assertEquals(ParquetTestUtils.createUser(i), convertRecordToUser(results.get(i)))); } - @Test - public void testReadUsersPartiallyWithLimitedRecordCount() throws IOException, MalformedRecordException { - final int numUsers = 25; - final int expectedRecords = 3; - final File parquetFile = ParquetTestUtils.createUsersParquetFile(numUsers); - final List<Record> results = getRecords(parquetFile, singletonMap(ParquetAttribute.RECORD_COUNT, "3")); - - assertEquals(expectedRecords, results.size()); - IntStream.range(0, expectedRecords) - .forEach(i -> assertEquals(ParquetTestUtils.createUser(i), convertRecordToUser(results.get(i)))); - } - - @Test - public void testReadUsersPartiallyWithOffset() throws IOException, MalformedRecordException { - final int numUsers = 1000025; // intentionally so large, to test input with many record groups - final int expectedRecords = 5; - final File parquetFile = ParquetTestUtils.createUsersParquetFile(numUsers); - final List<Record> results = getRecords(parquetFile, singletonMap(ParquetAttribute.RECORD_OFFSET, "1000020")); - - assertEquals(expectedRecords, results.size()); - IntStream.range(0, expectedRecords) - .forEach(i -> assertEquals(ParquetTestUtils.createUser(i + 1000020), convertRecordToUser(results.get(i)))); - } - @Test public void testReadUsersPartiallyWithOffsetAndLimitedRecordCount() throws IOException, MalformedRecordException { final int numUsers = 1000025; // intentionally so large, to test input with many record groups @@ -120,28 +95,6 @@ public class TestParquetReader { .forEach(i -> assertEquals(ParquetTestUtils.createUser(i + 1000020), convertRecordToUser(results.get(i)))); } - @Test - public void testReadUsersPartiallyWithLimitedRecordCountWithinFileRange() - throws IOException, MalformedRecordException { - final int numUsers = 1000; - final int expectedRecords = 3; - final File parquetFile = ParquetTestUtils.createUsersParquetFile(numUsers); - final List<Record> results = getRecords( - parquetFile, - new HashMap<String, String>() { - { - put(ParquetAttribute.RECORD_COUNT, "3"); - put(ParquetAttribute.FILE_RANGE_START_OFFSET, "16543"); - put(ParquetAttribute.FILE_RANGE_END_OFFSET, "24784"); - } - } - ); - - assertEquals(expectedRecords, results.size()); - IntStream.range(0, expectedRecords) - .forEach(i -> assertEquals(ParquetTestUtils.createUser(i + 663), convertRecordToUser(results.get(i)))); - } - @Test public void testReadUsersPartiallyWithOffsetWithinFileRange() throws IOException, MalformedRecordException { final int numUsers = 1000; @@ -213,25 +166,6 @@ public class TestParquetReader { "MapRecord[{name=Bob9, favorite_number=9, favorite_color=blue9}]"); } - @Test - public void testPartialReaderWithLimitedRecordCount() throws InitializationException, IOException { - final TestRunner runner = TestRunners.newTestRunner(TestParquetProcessor.class); - final ParquetReader parquetReader = new ParquetReader(); - - runner.addControllerService("reader", parquetReader); - runner.enableControllerService(parquetReader); - - runner.enqueue(Paths.get(PARQUET_PATH), singletonMap(ParquetAttribute.RECORD_COUNT, "2")); - - runner.setProperty(TestParquetProcessor.READER, "reader"); - - runner.run(); - runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1); - runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).get(0).assertContentEquals( - "MapRecord[{name=Bob0, favorite_number=0, favorite_color=blue0}]\n" + - "MapRecord[{name=Bob1, favorite_number=1, favorite_color=blue1}]"); - } - @Test public void testPartialReaderWithOffsetAndLimitedRecordCount() throws InitializationException, IOException { final TestRunner runner = TestRunners.newTestRunner(TestParquetProcessor.class); @@ -256,30 +190,6 @@ public class TestParquetReader { "MapRecord[{name=Bob7, favorite_number=7, favorite_color=blue7}]"); } - @Test - public void testPartialReaderWithOffsetOnly() throws InitializationException, IOException { - final TestRunner runner = TestRunners.newTestRunner(TestParquetProcessor.class); - final ParquetReader parquetReader = new ParquetReader(); - - runner.addControllerService("reader", parquetReader); - runner.enableControllerService(parquetReader); - - runner.enqueue(Paths.get(PARQUET_PATH), singletonMap(ParquetAttribute.RECORD_OFFSET, "3")); - - runner.setProperty(TestParquetProcessor.READER, "reader"); - - runner.run(); - runner.assertAllFlowFilesTransferred(TestParquetProcessor.SUCCESS, 1); - runner.getFlowFilesForRelationship(TestParquetProcessor.SUCCESS).get(0).assertContentEquals( - "MapRecord[{name=Bob3, favorite_number=3, favorite_color=blue3}]\n" + - "MapRecord[{name=Bob4, favorite_number=4, favorite_color=blue4}]\n" + - "MapRecord[{name=Bob5, favorite_number=5, favorite_color=blue5}]\n" + - "MapRecord[{name=Bob6, favorite_number=6, favorite_color=blue6}]\n" + - "MapRecord[{name=Bob7, favorite_number=7, favorite_color=blue7}]\n" + - "MapRecord[{name=Bob8, favorite_number=8, favorite_color=blue8}]\n" + - "MapRecord[{name=Bob9, favorite_number=9, favorite_color=blue9}]"); - } - private List<Record> getRecords(File parquetFile, Map<String, String> variables) throws IOException, MalformedRecordException { final List<Record> results = new ArrayList<>();