This is an automated email from the ASF dual-hosted git repository. ibzib pushed a commit to branch release-2.34.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.34.0 by this push: new 2d655ea [BEAM-13104] ParquetIO: SplitReadFn must read the whole block new 621a4f9 Merge pull request #15806 from ibzib/parquet-cp 2d655ea is described below commit 2d655eac45d036dc23962d09e45642b77c6f5cea Author: Alexey Romanenko <aromanenko....@gmail.com> AuthorDate: Mon Oct 25 17:21:47 2021 +0200 [BEAM-13104] ParquetIO: SplitReadFn must read the whole block --- .../src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java | 6 +++--- .../src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java index 81f5978..c733f67 100644 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java +++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java @@ -888,12 +888,12 @@ public class ParquetIO { continue; } if (record == null) { - // only happens with FilteredRecordReader at end of block + // it happens when a record is filtered out in this block LOG.debug( - "filtered record reader reached end of block in block {} in file {}", + "record is filtered out by reader in block {} in file {}", currentBlock, file.toString()); - break; + continue; } if (recordReader.shouldSkipCurrentRecord()) { // this record is being filtered via the filter2 package diff --git a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java index d2609b4..261abd9 100644 --- a/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java +++ b/sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java @@ -574,7 +574,8 @@ public class ParquetIOTest implements Serializable { readPipeline.apply( ParquetIO.read(SCHEMA) .from(temporaryFolder.getRoot().getAbsolutePath() + "/*") - .withConfiguration(configuration)); + .withConfiguration(configuration) + .withSplit()); PAssert.that(readBack).containsInAnyOrder(expectedRecords); readPipeline.run().waitUntilFinish(); }