boyuanzz commented on a change in pull request #12223:
URL: https://github.com/apache/beam/pull/12223#discussion_r480654132
##########
File path:
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -229,15 +279,259 @@ public void populateDisplayData(DisplayData.Builder
builder) {
public ReadFiles withAvroDataModel(GenericData model) {
return toBuilder().setAvroDataModel(model).build();
}
+ /** Enable the Splittable reading. */
+ public ReadFiles withSplit() {
+ return toBuilder().setSplittable(true).build();
+ }
@Override
public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile>
input) {
checkNotNull(getSchema(), "Schema can not be null");
+ if (isSplittable()) {
+ return input
+ .apply(ParDo.of(new SplitReadFn(getAvroDataModel())))
+ .setCoder(AvroCoder.of(getSchema()));
+ }
return input
.apply(ParDo.of(new ReadFn(getAvroDataModel())))
.setCoder(AvroCoder.of(getSchema()));
}
+ @DoFn.BoundedPerElement
+ static class SplitReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
+ private Class<? extends GenericData> modelClass;
+ private static final Logger LOG =
LoggerFactory.getLogger(SplitReadFn.class);
+ // Default initial splitting the file into blocks of 64MB. Unit of
SPLIT_LIMIT is byte.
+ private static final long SPLIT_LIMIT = 64000000;
+
+ SplitReadFn(GenericData model) {
+
+ this.modelClass = model != null ? model.getClass() : null;
+ }
+
+ ParquetFileReader getParquetFileReader(FileIO.ReadableFile file) throws
Exception {
+ ParquetReadOptions options =
HadoopReadOptions.builder(getConfWithModelClass()).build();
+ return ParquetFileReader.open(new
BeamParquetInputFile(file.openSeekable()), options);
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element FileIO.ReadableFile file,
+ RestrictionTracker<OffsetRange, Long> tracker,
+ OutputReceiver<GenericRecord> outputReceiver)
+ throws Exception {
+ LOG.debug(
+ "start "
+ + tracker.currentRestriction().getFrom()
+ + " to "
+ + tracker.currentRestriction().getTo());
+ ParquetReadOptions options =
HadoopReadOptions.builder(getConfWithModelClass()).build();
+ ParquetFileReader reader =
+ ParquetFileReader.open(new
BeamParquetInputFile(file.openSeekable()), options);
+ GenericData model = null;
+ if (modelClass != null) {
+ model = (GenericData) modelClass.getMethod("get").invoke(null);
+ }
+ ReadSupport<GenericRecord> readSupport = new
AvroReadSupport<GenericRecord>(model);
+
+ Filter filter = checkNotNull(options.getRecordFilter(), "filter");
+ Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+ FileMetaData parquetFileMetadata =
reader.getFooter().getFileMetaData();
+ MessageType fileSchema = parquetFileMetadata.getSchema();
+ Map<String, String> fileMetadata =
parquetFileMetadata.getKeyValueMetaData();
+
+ ReadSupport.ReadContext readContext =
+ readSupport.init(
+ new InitContext(
+ hadoopConf, Maps.transformValues(fileMetadata,
ImmutableSet::of), fileSchema));
+ ColumnIOFactory columnIOFactory = new
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+ MessageType requestedSchema = readContext.getRequestedSchema();
+ RecordMaterializer<GenericRecord> recordConverter =
+ readSupport.prepareForRead(hadoopConf, fileMetadata, fileSchema,
readContext);
+ reader.setRequestedSchema(requestedSchema);
+ MessageColumnIO columnIO =
columnIOFactory.getColumnIO(requestedSchema, fileSchema, true);
+ long currentBlock = tracker.currentRestriction().getFrom();
+ for (int i = 0; i < currentBlock; i++) {
+ reader.skipNextRowGroup();
+ }
+
+ while ((tracker).tryClaim(currentBlock)) {
Review comment:
```suggestion
while (tracker.tryClaim(currentBlock)) {
```
##########
File path:
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -229,15 +279,259 @@ public void populateDisplayData(DisplayData.Builder
builder) {
public ReadFiles withAvroDataModel(GenericData model) {
return toBuilder().setAvroDataModel(model).build();
}
+ /** Enable the Splittable reading. */
+ public ReadFiles withSplit() {
+ return toBuilder().setSplittable(true).build();
+ }
@Override
public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile>
input) {
checkNotNull(getSchema(), "Schema can not be null");
+ if (isSplittable()) {
+ return input
+ .apply(ParDo.of(new SplitReadFn(getAvroDataModel())))
+ .setCoder(AvroCoder.of(getSchema()));
+ }
return input
.apply(ParDo.of(new ReadFn(getAvroDataModel())))
.setCoder(AvroCoder.of(getSchema()));
}
+ @DoFn.BoundedPerElement
+ static class SplitReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
+ private Class<? extends GenericData> modelClass;
+ private static final Logger LOG =
LoggerFactory.getLogger(SplitReadFn.class);
+ // Default initial splitting the file into blocks of 64MB. Unit of
SPLIT_LIMIT is byte.
+ private static final long SPLIT_LIMIT = 64000000;
+
+ SplitReadFn(GenericData model) {
+
+ this.modelClass = model != null ? model.getClass() : null;
+ }
+
+ ParquetFileReader getParquetFileReader(FileIO.ReadableFile file) throws
Exception {
+ ParquetReadOptions options =
HadoopReadOptions.builder(getConfWithModelClass()).build();
+ return ParquetFileReader.open(new
BeamParquetInputFile(file.openSeekable()), options);
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element FileIO.ReadableFile file,
+ RestrictionTracker<OffsetRange, Long> tracker,
+ OutputReceiver<GenericRecord> outputReceiver)
+ throws Exception {
+ LOG.debug(
+ "start "
+ + tracker.currentRestriction().getFrom()
+ + " to "
+ + tracker.currentRestriction().getTo());
+ ParquetReadOptions options =
HadoopReadOptions.builder(getConfWithModelClass()).build();
+ ParquetFileReader reader =
+ ParquetFileReader.open(new
BeamParquetInputFile(file.openSeekable()), options);
+ GenericData model = null;
+ if (modelClass != null) {
+ model = (GenericData) modelClass.getMethod("get").invoke(null);
+ }
+ ReadSupport<GenericRecord> readSupport = new
AvroReadSupport<GenericRecord>(model);
+
+ Filter filter = checkNotNull(options.getRecordFilter(), "filter");
+ Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+ FileMetaData parquetFileMetadata =
reader.getFooter().getFileMetaData();
+ MessageType fileSchema = parquetFileMetadata.getSchema();
+ Map<String, String> fileMetadata =
parquetFileMetadata.getKeyValueMetaData();
+
+ ReadSupport.ReadContext readContext =
+ readSupport.init(
+ new InitContext(
+ hadoopConf, Maps.transformValues(fileMetadata,
ImmutableSet::of), fileSchema));
+ ColumnIOFactory columnIOFactory = new
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+ MessageType requestedSchema = readContext.getRequestedSchema();
+ RecordMaterializer<GenericRecord> recordConverter =
+ readSupport.prepareForRead(hadoopConf, fileMetadata, fileSchema,
readContext);
+ reader.setRequestedSchema(requestedSchema);
+ MessageColumnIO columnIO =
columnIOFactory.getColumnIO(requestedSchema, fileSchema, true);
+ long currentBlock = tracker.currentRestriction().getFrom();
+ for (int i = 0; i < currentBlock; i++) {
+ reader.skipNextRowGroup();
+ }
+
+ while ((tracker).tryClaim(currentBlock)) {
+ PageReadStore pages = reader.readNextRowGroup();
+ LOG.debug("block {} read in memory. row count = {}", currentBlock,
pages.getRowCount());
+ currentBlock += 1;
+ RecordReader<GenericRecord> recordReader =
+ columnIO.getRecordReader(
+ pages, recordConverter, options.useRecordFilter() ? filter :
FilterCompat.NOOP);
+ long currentRow = 0;
+ long totalRows = pages.getRowCount();
+ while (currentRow < totalRows) {
+ try {
+ GenericRecord record;
+ currentRow += 1;
+ try {
+ record = recordReader.read();
+ } catch (RecordMaterializer.RecordMaterializationException e) {
+ LOG.debug("skipping a corrupt record");
+ continue;
+ }
+ if (record == null) {
+ // only happens with FilteredRecordReader at end of block
+ LOG.debug("filtered record reader reached end of block");
+ break;
+ }
+ if (recordReader.shouldSkipCurrentRecord()) {
+ // this record is being filtered via the filter2 package
+ LOG.debug("skipping record");
+ continue;
+ }
+ outputReceiver.output(record);
+ } catch (RuntimeException e) {
+
+ throw new ParquetDecodingException(
+ format(
+ "Can not read value at %d in block %d in file %s",
+ currentRow, currentBlock, file.toString()),
+ e);
+ }
+ }
+ LOG.debug("Finish processing " + currentRow + " rows from block " +
(currentBlock - 1));
+ }
+ }
+
+ public Configuration getConfWithModelClass() throws Exception {
+ Configuration conf = new Configuration();
+ GenericData model = null;
+ if (modelClass != null) {
+ model = (GenericData) modelClass.getMethod("get").invoke(null);
+ }
+ if (model != null
+ && (model.getClass() == GenericData.class || model.getClass() ==
SpecificData.class)) {
+ conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true);
+ } else {
+ conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+ }
+ return conf;
+ }
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRestriction(@Element FileIO.ReadableFile
file) throws Exception {
+ ParquetFileReader reader = getParquetFileReader(file);
+ return new OffsetRange(0, reader.getRowGroups().size());
+ }
+
+ @SplitRestriction
+ public void split(
+ @Restriction OffsetRange restriction,
+ OutputReceiver<OffsetRange> out,
+ @Element FileIO.ReadableFile file)
+ throws Exception {
+ ParquetFileReader reader = getParquetFileReader(file);
+ List<BlockMetaData> rowGroups = reader.getRowGroups();
+ for (OffsetRange offsetRange :
+ splitBlockWithLimit(
+ restriction.getFrom(), restriction.getTo(), rowGroups,
SPLIT_LIMIT)) {
+ out.output(offsetRange);
+ }
+ }
+
+ public ArrayList<OffsetRange> splitBlockWithLimit(
+ long start, long end, List<BlockMetaData> blockList, long limit) {
+ ArrayList<OffsetRange> offsetList = new ArrayList<OffsetRange>();
+ long totalSize = 0;
+ long rangeStart = start;
+ long rangeEnd = start;
+ for (long i = start; i < end; i++) {
+ totalSize += blockList.get((int) i).getTotalByteSize();
+ rangeEnd += 1;
+ if (totalSize >= limit) {
+ offsetList.add(new OffsetRange(rangeStart, rangeEnd));
+ rangeStart = rangeEnd;
+ totalSize = 0;
+ }
+ }
+ if (totalSize != 0) {
+ offsetList.add(new OffsetRange(rangeStart, rangeEnd));
+ }
+ return offsetList;
+ }
+
+ @NewTracker
+ public RestrictionTracker<OffsetRange, Long> newTracker(
+ @Restriction OffsetRange restriction, @Element FileIO.ReadableFile
file)
+ throws Exception {
+ CountAndSize recordCountAndSize = getRecordCountAndSize(file,
restriction);
+ return new BlockTracker(
+ restriction,
+ Math.round(recordCountAndSize.getSize()),
+ Math.round(recordCountAndSize.getCount()));
+ }
+
+ @GetRestrictionCoder
+ public OffsetRange.Coder getRestrictionCoder() {
+ return new OffsetRange.Coder();
+ }
+
+ @GetSize
+ public double getSize(@Element FileIO.ReadableFile file, @Restriction
OffsetRange restriction)
+ throws Exception {
+ return getRecordCountAndSize(file, restriction).getSize();
+ }
+
+ public CountAndSize getRecordCountAndSize(
+ @Element FileIO.ReadableFile file, @Restriction OffsetRange
restriction)
+ throws Exception {
+ ParquetFileReader reader = getParquetFileReader(file);
+ double size = 0;
+ double recordCount = 0;
+ for (long i = restriction.getFrom(); i < restriction.getTo(); i++) {
+ BlockMetaData block = reader.getRowGroups().get((int) i);
+ recordCount += block.getRowCount();
+ size += block.getTotalByteSize();
+ }
+ CountAndSize countAndSize = CountAndSize.create(recordCount, size);
+ return countAndSize;
+ }
+
+ @AutoValue
+ abstract static class CountAndSize {
+ static CountAndSize create(double count, double size) {
+ return new
AutoValue_ParquetIO_ReadFiles_SplitReadFn_CountAndSize(count, size);
+ }
+
+ abstract double getCount();
+
+ abstract double getSize();
+ }
+ }
+
+ public static class BlockTracker extends OffsetRangeTracker {
+ private long totalWork;
+ private long progress;
+ private long approximateRecordSize;
+
+ public BlockTracker(OffsetRange range, long totalByteSize, long
recordCount) {
+ super(range);
+ if (recordCount != 0) {
+ this.approximateRecordSize = totalByteSize / recordCount;
+ this.totalWork = approximateRecordSize * recordCount;
+ this.progress = 0;
+ }
+ }
+
+ public void makeProgress() throws Exception {
+ progress += approximateRecordSize;
+ if (progress > totalWork) {
+ throw new IOException("Making progress out of range");
+ }
+ }
+
+ @Override
+ // TODO:[BEAM-10842] A more precise progress update
Review comment:
```suggestion
// TODO(BEAM-10842): Refine the BlockTracker to provide better
progress.
```
##########
File path:
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -229,15 +279,259 @@ public void populateDisplayData(DisplayData.Builder
builder) {
public ReadFiles withAvroDataModel(GenericData model) {
return toBuilder().setAvroDataModel(model).build();
}
+ /** Enable the Splittable reading. */
+ public ReadFiles withSplit() {
+ return toBuilder().setSplittable(true).build();
+ }
@Override
public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile>
input) {
checkNotNull(getSchema(), "Schema can not be null");
+ if (isSplittable()) {
+ return input
+ .apply(ParDo.of(new SplitReadFn(getAvroDataModel())))
+ .setCoder(AvroCoder.of(getSchema()));
+ }
return input
.apply(ParDo.of(new ReadFn(getAvroDataModel())))
.setCoder(AvroCoder.of(getSchema()));
}
+ @DoFn.BoundedPerElement
+ static class SplitReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
+ private Class<? extends GenericData> modelClass;
+ private static final Logger LOG =
LoggerFactory.getLogger(SplitReadFn.class);
+ // Default initial splitting the file into blocks of 64MB. Unit of
SPLIT_LIMIT is byte.
+ private static final long SPLIT_LIMIT = 64000000;
+
+ SplitReadFn(GenericData model) {
+
+ this.modelClass = model != null ? model.getClass() : null;
+ }
+
+ ParquetFileReader getParquetFileReader(FileIO.ReadableFile file) throws
Exception {
+ ParquetReadOptions options =
HadoopReadOptions.builder(getConfWithModelClass()).build();
+ return ParquetFileReader.open(new
BeamParquetInputFile(file.openSeekable()), options);
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element FileIO.ReadableFile file,
+ RestrictionTracker<OffsetRange, Long> tracker,
+ OutputReceiver<GenericRecord> outputReceiver)
+ throws Exception {
+ LOG.debug(
+ "start "
+ + tracker.currentRestriction().getFrom()
+ + " to "
+ + tracker.currentRestriction().getTo());
+ ParquetReadOptions options =
HadoopReadOptions.builder(getConfWithModelClass()).build();
+ ParquetFileReader reader =
+ ParquetFileReader.open(new
BeamParquetInputFile(file.openSeekable()), options);
+ GenericData model = null;
+ if (modelClass != null) {
+ model = (GenericData) modelClass.getMethod("get").invoke(null);
+ }
+ ReadSupport<GenericRecord> readSupport = new
AvroReadSupport<GenericRecord>(model);
+
+ Filter filter = checkNotNull(options.getRecordFilter(), "filter");
+ Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+ FileMetaData parquetFileMetadata =
reader.getFooter().getFileMetaData();
+ MessageType fileSchema = parquetFileMetadata.getSchema();
+ Map<String, String> fileMetadata =
parquetFileMetadata.getKeyValueMetaData();
+
+ ReadSupport.ReadContext readContext =
+ readSupport.init(
+ new InitContext(
+ hadoopConf, Maps.transformValues(fileMetadata,
ImmutableSet::of), fileSchema));
+ ColumnIOFactory columnIOFactory = new
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+ MessageType requestedSchema = readContext.getRequestedSchema();
+ RecordMaterializer<GenericRecord> recordConverter =
+ readSupport.prepareForRead(hadoopConf, fileMetadata, fileSchema,
readContext);
+ reader.setRequestedSchema(requestedSchema);
+ MessageColumnIO columnIO =
columnIOFactory.getColumnIO(requestedSchema, fileSchema, true);
+ long currentBlock = tracker.currentRestriction().getFrom();
+ for (int i = 0; i < currentBlock; i++) {
+ reader.skipNextRowGroup();
+ }
+
+ while ((tracker).tryClaim(currentBlock)) {
+ PageReadStore pages = reader.readNextRowGroup();
+ LOG.debug("block {} read in memory. row count = {}", currentBlock,
pages.getRowCount());
+ currentBlock += 1;
+ RecordReader<GenericRecord> recordReader =
+ columnIO.getRecordReader(
+ pages, recordConverter, options.useRecordFilter() ? filter :
FilterCompat.NOOP);
+ long currentRow = 0;
+ long totalRows = pages.getRowCount();
+ while (currentRow < totalRows) {
+ try {
+ GenericRecord record;
+ currentRow += 1;
+ try {
+ record = recordReader.read();
+ } catch (RecordMaterializer.RecordMaterializationException e) {
+ LOG.debug("skipping a corrupt record");
+ continue;
+ }
+ if (record == null) {
+ // only happens with FilteredRecordReader at end of block
+ LOG.debug("filtered record reader reached end of block");
+ break;
+ }
+ if (recordReader.shouldSkipCurrentRecord()) {
+ // this record is being filtered via the filter2 package
+ LOG.debug("skipping record");
+ continue;
+ }
+ outputReceiver.output(record);
+ } catch (RuntimeException e) {
+
+ throw new ParquetDecodingException(
+ format(
+ "Can not read value at %d in block %d in file %s",
+ currentRow, currentBlock, file.toString()),
+ e);
+ }
+ }
+ LOG.debug("Finish processing " + currentRow + " rows from block " +
(currentBlock - 1));
+ }
+ }
+
+ public Configuration getConfWithModelClass() throws Exception {
+ Configuration conf = new Configuration();
+ GenericData model = null;
+ if (modelClass != null) {
+ model = (GenericData) modelClass.getMethod("get").invoke(null);
+ }
+ if (model != null
+ && (model.getClass() == GenericData.class || model.getClass() ==
SpecificData.class)) {
+ conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true);
+ } else {
+ conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+ }
+ return conf;
+ }
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRestriction(@Element FileIO.ReadableFile
file) throws Exception {
+ ParquetFileReader reader = getParquetFileReader(file);
+ return new OffsetRange(0, reader.getRowGroups().size());
+ }
+
+ @SplitRestriction
+ public void split(
+ @Restriction OffsetRange restriction,
+ OutputReceiver<OffsetRange> out,
+ @Element FileIO.ReadableFile file)
+ throws Exception {
+ ParquetFileReader reader = getParquetFileReader(file);
+ List<BlockMetaData> rowGroups = reader.getRowGroups();
+ for (OffsetRange offsetRange :
+ splitBlockWithLimit(
+ restriction.getFrom(), restriction.getTo(), rowGroups,
SPLIT_LIMIT)) {
+ out.output(offsetRange);
+ }
+ }
+
+ public ArrayList<OffsetRange> splitBlockWithLimit(
+ long start, long end, List<BlockMetaData> blockList, long limit) {
+ ArrayList<OffsetRange> offsetList = new ArrayList<OffsetRange>();
+ long totalSize = 0;
+ long rangeStart = start;
+ long rangeEnd = start;
+ for (long i = start; i < end; i++) {
+ totalSize += blockList.get((int) i).getTotalByteSize();
+ rangeEnd += 1;
+ if (totalSize >= limit) {
+ offsetList.add(new OffsetRange(rangeStart, rangeEnd));
+ rangeStart = rangeEnd;
+ totalSize = 0;
+ }
+ }
+ if (totalSize != 0) {
+ offsetList.add(new OffsetRange(rangeStart, rangeEnd));
+ }
+ return offsetList;
+ }
+
+ @NewTracker
+ public RestrictionTracker<OffsetRange, Long> newTracker(
+ @Restriction OffsetRange restriction, @Element FileIO.ReadableFile
file)
+ throws Exception {
+ CountAndSize recordCountAndSize = getRecordCountAndSize(file,
restriction);
+ return new BlockTracker(
+ restriction,
+ Math.round(recordCountAndSize.getSize()),
+ Math.round(recordCountAndSize.getCount()));
+ }
+
+ @GetRestrictionCoder
+ public OffsetRange.Coder getRestrictionCoder() {
+ return new OffsetRange.Coder();
+ }
+
+ @GetSize
+ public double getSize(@Element FileIO.ReadableFile file, @Restriction
OffsetRange restriction)
+ throws Exception {
+ return getRecordCountAndSize(file, restriction).getSize();
+ }
+
+ public CountAndSize getRecordCountAndSize(
Review comment:
```suggestion
private CountAndSize getRecordCountAndSize(
```
`getRecordCountAndSize` is only for this DoFn, right?
##########
File path:
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -229,15 +279,259 @@ public void populateDisplayData(DisplayData.Builder
builder) {
public ReadFiles withAvroDataModel(GenericData model) {
return toBuilder().setAvroDataModel(model).build();
}
+ /** Enable the Splittable reading. */
+ public ReadFiles withSplit() {
+ return toBuilder().setSplittable(true).build();
+ }
@Override
public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile>
input) {
checkNotNull(getSchema(), "Schema can not be null");
+ if (isSplittable()) {
+ return input
+ .apply(ParDo.of(new SplitReadFn(getAvroDataModel())))
+ .setCoder(AvroCoder.of(getSchema()));
+ }
return input
.apply(ParDo.of(new ReadFn(getAvroDataModel())))
.setCoder(AvroCoder.of(getSchema()));
}
+ @DoFn.BoundedPerElement
+ static class SplitReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
+ private Class<? extends GenericData> modelClass;
+ private static final Logger LOG =
LoggerFactory.getLogger(SplitReadFn.class);
+ // Default initial splitting the file into blocks of 64MB. Unit of
SPLIT_LIMIT is byte.
+ private static final long SPLIT_LIMIT = 64000000;
+
+ SplitReadFn(GenericData model) {
+
+ this.modelClass = model != null ? model.getClass() : null;
+ }
+
+ ParquetFileReader getParquetFileReader(FileIO.ReadableFile file) throws
Exception {
+ ParquetReadOptions options =
HadoopReadOptions.builder(getConfWithModelClass()).build();
+ return ParquetFileReader.open(new
BeamParquetInputFile(file.openSeekable()), options);
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element FileIO.ReadableFile file,
+ RestrictionTracker<OffsetRange, Long> tracker,
+ OutputReceiver<GenericRecord> outputReceiver)
+ throws Exception {
+ LOG.debug(
+ "start "
+ + tracker.currentRestriction().getFrom()
+ + " to "
+ + tracker.currentRestriction().getTo());
+ ParquetReadOptions options =
HadoopReadOptions.builder(getConfWithModelClass()).build();
+ ParquetFileReader reader =
+ ParquetFileReader.open(new
BeamParquetInputFile(file.openSeekable()), options);
+ GenericData model = null;
+ if (modelClass != null) {
+ model = (GenericData) modelClass.getMethod("get").invoke(null);
+ }
+ ReadSupport<GenericRecord> readSupport = new
AvroReadSupport<GenericRecord>(model);
+
+ Filter filter = checkNotNull(options.getRecordFilter(), "filter");
+ Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+ FileMetaData parquetFileMetadata =
reader.getFooter().getFileMetaData();
+ MessageType fileSchema = parquetFileMetadata.getSchema();
+ Map<String, String> fileMetadata =
parquetFileMetadata.getKeyValueMetaData();
+
+ ReadSupport.ReadContext readContext =
+ readSupport.init(
+ new InitContext(
+ hadoopConf, Maps.transformValues(fileMetadata,
ImmutableSet::of), fileSchema));
+ ColumnIOFactory columnIOFactory = new
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+ MessageType requestedSchema = readContext.getRequestedSchema();
+ RecordMaterializer<GenericRecord> recordConverter =
+ readSupport.prepareForRead(hadoopConf, fileMetadata, fileSchema,
readContext);
+ reader.setRequestedSchema(requestedSchema);
+ MessageColumnIO columnIO =
columnIOFactory.getColumnIO(requestedSchema, fileSchema, true);
+ long currentBlock = tracker.currentRestriction().getFrom();
+ for (int i = 0; i < currentBlock; i++) {
+ reader.skipNextRowGroup();
+ }
+
+ while ((tracker).tryClaim(currentBlock)) {
+ PageReadStore pages = reader.readNextRowGroup();
+ LOG.debug("block {} read in memory. row count = {}", currentBlock,
pages.getRowCount());
+ currentBlock += 1;
+ RecordReader<GenericRecord> recordReader =
+ columnIO.getRecordReader(
+ pages, recordConverter, options.useRecordFilter() ? filter :
FilterCompat.NOOP);
+ long currentRow = 0;
+ long totalRows = pages.getRowCount();
+ while (currentRow < totalRows) {
Review comment:
IIUC, it seems like there is no need to track `currentRow`, we can do
something like
```java
record = recordReader.read();
while(record != null) {
// do something
record = recordReader.read();
}
```
If that's true, we can have a follow-up PR to clean up.
##########
File path:
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -133,15 +161,18 @@
* pattern).
*/
public static Read read(Schema schema) {
- return new AutoValue_ParquetIO_Read.Builder().setSchema(schema).build();
+ return new
AutoValue_ParquetIO_Read.Builder().setSchema(schema).setSplittable(false).build();
Review comment:
It seems like this comment is not addressed yet. Can you add related
explanation(what's this option for and what will happens if the option is set
to true) and code snippet(how to enable this option) into L89-L155? You can
also link your design doc there if that helps end users to use this attribute.
##########
File path:
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -229,15 +279,259 @@ public void populateDisplayData(DisplayData.Builder
builder) {
public ReadFiles withAvroDataModel(GenericData model) {
return toBuilder().setAvroDataModel(model).build();
}
+ /** Enable the Splittable reading. */
+ public ReadFiles withSplit() {
+ return toBuilder().setSplittable(true).build();
+ }
@Override
public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile>
input) {
checkNotNull(getSchema(), "Schema can not be null");
+ if (isSplittable()) {
+ return input
+ .apply(ParDo.of(new SplitReadFn(getAvroDataModel())))
+ .setCoder(AvroCoder.of(getSchema()));
+ }
return input
.apply(ParDo.of(new ReadFn(getAvroDataModel())))
.setCoder(AvroCoder.of(getSchema()));
}
+ @DoFn.BoundedPerElement
+ static class SplitReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
+ private Class<? extends GenericData> modelClass;
+ private static final Logger LOG =
LoggerFactory.getLogger(SplitReadFn.class);
+ // Default initial splitting the file into blocks of 64MB. Unit of
SPLIT_LIMIT is byte.
+ private static final long SPLIT_LIMIT = 64000000;
+
+ SplitReadFn(GenericData model) {
+
+ this.modelClass = model != null ? model.getClass() : null;
+ }
+
+ ParquetFileReader getParquetFileReader(FileIO.ReadableFile file) throws
Exception {
+ ParquetReadOptions options =
HadoopReadOptions.builder(getConfWithModelClass()).build();
+ return ParquetFileReader.open(new
BeamParquetInputFile(file.openSeekable()), options);
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element FileIO.ReadableFile file,
+ RestrictionTracker<OffsetRange, Long> tracker,
+ OutputReceiver<GenericRecord> outputReceiver)
+ throws Exception {
+ LOG.debug(
+ "start "
+ + tracker.currentRestriction().getFrom()
+ + " to "
+ + tracker.currentRestriction().getTo());
+ ParquetReadOptions options =
HadoopReadOptions.builder(getConfWithModelClass()).build();
+ ParquetFileReader reader =
+ ParquetFileReader.open(new
BeamParquetInputFile(file.openSeekable()), options);
+ GenericData model = null;
+ if (modelClass != null) {
+ model = (GenericData) modelClass.getMethod("get").invoke(null);
+ }
+ ReadSupport<GenericRecord> readSupport = new
AvroReadSupport<GenericRecord>(model);
+
+ Filter filter = checkNotNull(options.getRecordFilter(), "filter");
+ Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+ FileMetaData parquetFileMetadata =
reader.getFooter().getFileMetaData();
+ MessageType fileSchema = parquetFileMetadata.getSchema();
+ Map<String, String> fileMetadata =
parquetFileMetadata.getKeyValueMetaData();
+
+ ReadSupport.ReadContext readContext =
+ readSupport.init(
+ new InitContext(
+ hadoopConf, Maps.transformValues(fileMetadata,
ImmutableSet::of), fileSchema));
+ ColumnIOFactory columnIOFactory = new
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+ MessageType requestedSchema = readContext.getRequestedSchema();
+ RecordMaterializer<GenericRecord> recordConverter =
+ readSupport.prepareForRead(hadoopConf, fileMetadata, fileSchema,
readContext);
+ reader.setRequestedSchema(requestedSchema);
+ MessageColumnIO columnIO =
columnIOFactory.getColumnIO(requestedSchema, fileSchema, true);
+ long currentBlock = tracker.currentRestriction().getFrom();
+ for (int i = 0; i < currentBlock; i++) {
+ reader.skipNextRowGroup();
+ }
+
+ while ((tracker).tryClaim(currentBlock)) {
+ PageReadStore pages = reader.readNextRowGroup();
+ LOG.debug("block {} read in memory. row count = {}", currentBlock,
pages.getRowCount());
+ currentBlock += 1;
+ RecordReader<GenericRecord> recordReader =
+ columnIO.getRecordReader(
+ pages, recordConverter, options.useRecordFilter() ? filter :
FilterCompat.NOOP);
+ long currentRow = 0;
+ long totalRows = pages.getRowCount();
+ while (currentRow < totalRows) {
+ try {
+ GenericRecord record;
+ currentRow += 1;
+ try {
+ record = recordReader.read();
+ } catch (RecordMaterializer.RecordMaterializationException e) {
+ LOG.debug("skipping a corrupt record");
+ continue;
+ }
+ if (record == null) {
+ // only happens with FilteredRecordReader at end of block
+ LOG.debug("filtered record reader reached end of block");
+ break;
+ }
+ if (recordReader.shouldSkipCurrentRecord()) {
+ // this record is being filtered via the filter2 package
+ LOG.debug("skipping record");
+ continue;
+ }
+ outputReceiver.output(record);
+ } catch (RuntimeException e) {
+
+ throw new ParquetDecodingException(
+ format(
+ "Can not read value at %d in block %d in file %s",
+ currentRow, currentBlock, file.toString()),
+ e);
+ }
+ }
+ LOG.debug("Finish processing " + currentRow + " rows from block " +
(currentBlock - 1));
+ }
+ }
+
+ public Configuration getConfWithModelClass() throws Exception {
+ Configuration conf = new Configuration();
+ GenericData model = null;
+ if (modelClass != null) {
+ model = (GenericData) modelClass.getMethod("get").invoke(null);
+ }
+ if (model != null
+ && (model.getClass() == GenericData.class || model.getClass() ==
SpecificData.class)) {
+ conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true);
+ } else {
+ conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+ }
+ return conf;
+ }
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRestriction(@Element FileIO.ReadableFile
file) throws Exception {
+ ParquetFileReader reader = getParquetFileReader(file);
+ return new OffsetRange(0, reader.getRowGroups().size());
+ }
+
+ @SplitRestriction
+ public void split(
+ @Restriction OffsetRange restriction,
+ OutputReceiver<OffsetRange> out,
+ @Element FileIO.ReadableFile file)
+ throws Exception {
+ ParquetFileReader reader = getParquetFileReader(file);
+ List<BlockMetaData> rowGroups = reader.getRowGroups();
+ for (OffsetRange offsetRange :
+ splitBlockWithLimit(
+ restriction.getFrom(), restriction.getTo(), rowGroups,
SPLIT_LIMIT)) {
+ out.output(offsetRange);
+ }
+ }
+
+ public ArrayList<OffsetRange> splitBlockWithLimit(
+ long start, long end, List<BlockMetaData> blockList, long limit) {
+ ArrayList<OffsetRange> offsetList = new ArrayList<OffsetRange>();
+ long totalSize = 0;
+ long rangeStart = start;
+ long rangeEnd = start;
+ for (long i = start; i < end; i++) {
+ totalSize += blockList.get((int) i).getTotalByteSize();
+ rangeEnd += 1;
+ if (totalSize >= limit) {
+ offsetList.add(new OffsetRange(rangeStart, rangeEnd));
+ rangeStart = rangeEnd;
+ totalSize = 0;
+ }
+ }
+ if (totalSize != 0) {
+ offsetList.add(new OffsetRange(rangeStart, rangeEnd));
+ }
+ return offsetList;
+ }
+
+ @NewTracker
+ public RestrictionTracker<OffsetRange, Long> newTracker(
+ @Restriction OffsetRange restriction, @Element FileIO.ReadableFile
file)
+ throws Exception {
+ CountAndSize recordCountAndSize = getRecordCountAndSize(file,
restriction);
+ return new BlockTracker(
+ restriction,
+ Math.round(recordCountAndSize.getSize()),
+ Math.round(recordCountAndSize.getCount()));
+ }
+
+ @GetRestrictionCoder
+ public OffsetRange.Coder getRestrictionCoder() {
+ return new OffsetRange.Coder();
+ }
+
+ @GetSize
+ public double getSize(@Element FileIO.ReadableFile file, @Restriction
OffsetRange restriction)
+ throws Exception {
+ return getRecordCountAndSize(file, restriction).getSize();
+ }
+
+ public CountAndSize getRecordCountAndSize(
+ @Element FileIO.ReadableFile file, @Restriction OffsetRange
restriction)
+ throws Exception {
+ ParquetFileReader reader = getParquetFileReader(file);
+ double size = 0;
+ double recordCount = 0;
+ for (long i = restriction.getFrom(); i < restriction.getTo(); i++) {
+ BlockMetaData block = reader.getRowGroups().get((int) i);
+ recordCount += block.getRowCount();
+ size += block.getTotalByteSize();
+ }
+ CountAndSize countAndSize = CountAndSize.create(recordCount, size);
+ return countAndSize;
+ }
+
+ @AutoValue
+ abstract static class CountAndSize {
Review comment:
Any reason to use `double` for `count` and `size`? If that's not
preferable, we can have a follow-up PR to clean up.
##########
File path:
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -229,15 +279,259 @@ public void populateDisplayData(DisplayData.Builder
builder) {
public ReadFiles withAvroDataModel(GenericData model) {
return toBuilder().setAvroDataModel(model).build();
}
+ /** Enable the Splittable reading. */
+ public ReadFiles withSplit() {
+ return toBuilder().setSplittable(true).build();
+ }
@Override
public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile>
input) {
checkNotNull(getSchema(), "Schema can not be null");
+ if (isSplittable()) {
+ return input
+ .apply(ParDo.of(new SplitReadFn(getAvroDataModel())))
+ .setCoder(AvroCoder.of(getSchema()));
+ }
return input
.apply(ParDo.of(new ReadFn(getAvroDataModel())))
.setCoder(AvroCoder.of(getSchema()));
}
+ @DoFn.BoundedPerElement
+ static class SplitReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
+ private Class<? extends GenericData> modelClass;
+ private static final Logger LOG =
LoggerFactory.getLogger(SplitReadFn.class);
+ // Default initial splitting the file into blocks of 64MB. Unit of
SPLIT_LIMIT is byte.
+ private static final long SPLIT_LIMIT = 64000000;
+
+ SplitReadFn(GenericData model) {
+
+ this.modelClass = model != null ? model.getClass() : null;
+ }
+
+ ParquetFileReader getParquetFileReader(FileIO.ReadableFile file) throws
Exception {
+ ParquetReadOptions options =
HadoopReadOptions.builder(getConfWithModelClass()).build();
+ return ParquetFileReader.open(new
BeamParquetInputFile(file.openSeekable()), options);
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element FileIO.ReadableFile file,
+ RestrictionTracker<OffsetRange, Long> tracker,
+ OutputReceiver<GenericRecord> outputReceiver)
+ throws Exception {
+ LOG.debug(
+ "start "
+ + tracker.currentRestriction().getFrom()
+ + " to "
+ + tracker.currentRestriction().getTo());
+ ParquetReadOptions options =
HadoopReadOptions.builder(getConfWithModelClass()).build();
+ ParquetFileReader reader =
+ ParquetFileReader.open(new
BeamParquetInputFile(file.openSeekable()), options);
+ GenericData model = null;
+ if (modelClass != null) {
+ model = (GenericData) modelClass.getMethod("get").invoke(null);
+ }
+ ReadSupport<GenericRecord> readSupport = new
AvroReadSupport<GenericRecord>(model);
+
+ Filter filter = checkNotNull(options.getRecordFilter(), "filter");
+ Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+ FileMetaData parquetFileMetadata =
reader.getFooter().getFileMetaData();
+ MessageType fileSchema = parquetFileMetadata.getSchema();
+ Map<String, String> fileMetadata =
parquetFileMetadata.getKeyValueMetaData();
+
+ ReadSupport.ReadContext readContext =
+ readSupport.init(
+ new InitContext(
+ hadoopConf, Maps.transformValues(fileMetadata,
ImmutableSet::of), fileSchema));
+ ColumnIOFactory columnIOFactory = new
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
+ MessageType requestedSchema = readContext.getRequestedSchema();
+ RecordMaterializer<GenericRecord> recordConverter =
+ readSupport.prepareForRead(hadoopConf, fileMetadata, fileSchema,
readContext);
+ reader.setRequestedSchema(requestedSchema);
+ MessageColumnIO columnIO =
columnIOFactory.getColumnIO(requestedSchema, fileSchema, true);
+ long currentBlock = tracker.currentRestriction().getFrom();
+ for (int i = 0; i < currentBlock; i++) {
+ reader.skipNextRowGroup();
+ }
+
+ while ((tracker).tryClaim(currentBlock)) {
+ PageReadStore pages = reader.readNextRowGroup();
+ LOG.debug("block {} read in memory. row count = {}", currentBlock,
pages.getRowCount());
+ currentBlock += 1;
+ RecordReader<GenericRecord> recordReader =
+ columnIO.getRecordReader(
+ pages, recordConverter, options.useRecordFilter() ? filter :
FilterCompat.NOOP);
+ long currentRow = 0;
+ long totalRows = pages.getRowCount();
+ while (currentRow < totalRows) {
+ try {
+ GenericRecord record;
+ currentRow += 1;
+ try {
+ record = recordReader.read();
+ } catch (RecordMaterializer.RecordMaterializationException e) {
+ LOG.debug("skipping a corrupt record");
+ continue;
+ }
+ if (record == null) {
+ // only happens with FilteredRecordReader at end of block
+ LOG.debug("filtered record reader reached end of block");
+ break;
+ }
+ if (recordReader.shouldSkipCurrentRecord()) {
+ // this record is being filtered via the filter2 package
+ LOG.debug("skipping record");
+ continue;
+ }
+ outputReceiver.output(record);
+ } catch (RuntimeException e) {
+
+ throw new ParquetDecodingException(
+ format(
+ "Can not read value at %d in block %d in file %s",
+ currentRow, currentBlock, file.toString()),
+ e);
+ }
+ }
+ LOG.debug("Finish processing " + currentRow + " rows from block " +
(currentBlock - 1));
+ }
+ }
+
+ public Configuration getConfWithModelClass() throws Exception {
+ Configuration conf = new Configuration();
+ GenericData model = null;
+ if (modelClass != null) {
+ model = (GenericData) modelClass.getMethod("get").invoke(null);
+ }
+ if (model != null
+ && (model.getClass() == GenericData.class || model.getClass() ==
SpecificData.class)) {
+ conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true);
+ } else {
+ conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+ }
+ return conf;
+ }
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRestriction(@Element FileIO.ReadableFile
file) throws Exception {
+ ParquetFileReader reader = getParquetFileReader(file);
+ return new OffsetRange(0, reader.getRowGroups().size());
+ }
+
+ @SplitRestriction
+ public void split(
+ @Restriction OffsetRange restriction,
+ OutputReceiver<OffsetRange> out,
+ @Element FileIO.ReadableFile file)
+ throws Exception {
+ ParquetFileReader reader = getParquetFileReader(file);
+ List<BlockMetaData> rowGroups = reader.getRowGroups();
+ for (OffsetRange offsetRange :
+ splitBlockWithLimit(
+ restriction.getFrom(), restriction.getTo(), rowGroups,
SPLIT_LIMIT)) {
+ out.output(offsetRange);
+ }
+ }
+
+ public ArrayList<OffsetRange> splitBlockWithLimit(
+ long start, long end, List<BlockMetaData> blockList, long limit) {
+ ArrayList<OffsetRange> offsetList = new ArrayList<OffsetRange>();
+ long totalSize = 0;
+ long rangeStart = start;
+ long rangeEnd = start;
+ for (long i = start; i < end; i++) {
+ totalSize += blockList.get((int) i).getTotalByteSize();
+ rangeEnd += 1;
+ if (totalSize >= limit) {
+ offsetList.add(new OffsetRange(rangeStart, rangeEnd));
+ rangeStart = rangeEnd;
+ totalSize = 0;
+ }
+ }
+ if (totalSize != 0) {
+ offsetList.add(new OffsetRange(rangeStart, rangeEnd));
+ }
+ return offsetList;
+ }
+
+ @NewTracker
+ public RestrictionTracker<OffsetRange, Long> newTracker(
+ @Restriction OffsetRange restriction, @Element FileIO.ReadableFile
file)
+ throws Exception {
+ CountAndSize recordCountAndSize = getRecordCountAndSize(file,
restriction);
+ return new BlockTracker(
+ restriction,
+ Math.round(recordCountAndSize.getSize()),
+ Math.round(recordCountAndSize.getCount()));
+ }
+
+ @GetRestrictionCoder
+ public OffsetRange.Coder getRestrictionCoder() {
+ return new OffsetRange.Coder();
+ }
+
+ @GetSize
+ public double getSize(@Element FileIO.ReadableFile file, @Restriction
OffsetRange restriction)
+ throws Exception {
+ return getRecordCountAndSize(file, restriction).getSize();
+ }
+
+ public CountAndSize getRecordCountAndSize(
+ @Element FileIO.ReadableFile file, @Restriction OffsetRange
restriction)
Review comment:
```suggestion
FileIO.ReadableFile file, OffsetRange restriction)
```
The annotations like `@Element` and `@Restriction` are only for common APIs
of a `DoFn`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]