aokolnychyi commented on a change in pull request #3287:
URL: https://github.com/apache/iceberg/pull/3287#discussion_r743032246
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java
##########
@@ -61,6 +61,10 @@ public VectorizedReaderBuilder(
this.readerFactory = readerFactory;
}
+ public Function<List<VectorizedReader<?>>, VectorizedReader<?>>
readerFactory() {
Review comment:
I think we may not this method. The second method should be enough if
`ReadBuilder` is changed.
```
@Override
protected VectorizedReader<?> vectorizedReader(List<VectorizedReader<?>>
reorderedFields) {
VectorizedReader<?> reader = super.vectorizedReader(reorderedFields);
if (deleteFilter != null) {
((ColumnarBatchReader) reader).setDeleteFilter(deleteFilter);
}
return reader;
}
```
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
##########
@@ -49,4 +54,45 @@ public static ColumnarBatchReader buildReader(
expectedSchema, fileSchema, setArrowValidityVector,
idToConstant, ColumnarBatchReader::new));
}
+
+ public static ColumnarBatchReader buildReader(
+ Schema expectedSchema,
+ MessageType fileSchema,
+ boolean setArrowValidityVector,
+ Map<Integer, ?> idToConstant,
+ DeleteFilter<InternalRow> deleteFilter) {
+ return (ColumnarBatchReader)
+ TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema,
+ new ReaderBuilder(
+ expectedSchema, fileSchema, setArrowValidityVector,
+ idToConstant, ColumnarBatchReader::new, deleteFilter));
+ }
+
+ private static class ReaderBuilder extends VectorizedReaderBuilder {
+ private final DeleteFilter<InternalRow> deleteFilter;
+
+ ReaderBuilder(
+ Schema expectedSchema,
+ MessageType parquetSchema,
+ boolean setArrowValidityVector,
+ Map<Integer, ?> idToConstant,
+ Function<List<VectorizedReader<?>>, VectorizedReader<?>> readerFactory,
+ DeleteFilter<InternalRow> deleteFilter) {
+ super(expectedSchema, parquetSchema, setArrowValidityVector,
idToConstant, readerFactory);
+ this.deleteFilter = deleteFilter;
+ }
+
+ @Override
+ protected VectorizedReader<?> vectorizedReader(List<VectorizedReader<?>>
reorderedFields) {
Review comment:
We can probably simplify a little bit like
[here](https://github.com/apache/iceberg/pull/3287/files#r743032246).
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java
##########
@@ -49,4 +54,45 @@ public static ColumnarBatchReader buildReader(
expectedSchema, fileSchema, setArrowValidityVector,
idToConstant, ColumnarBatchReader::new));
}
+
+ public static ColumnarBatchReader buildReader(
+ Schema expectedSchema,
Review comment:
The formatting in this class and many other Arrow-related classes is
off. However, we should at least match it. Iceberg uses 2 spaces for args. I
think we are using 4 here.
##########
File path: spark/v3.2/build.gradle
##########
@@ -64,8 +64,6 @@ project(':iceberg-spark:iceberg-spark-3.2') {
compileOnly("org.apache.spark:spark-hive_2.12:${sparkVersion}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
- // to make sure io.netty.buffer only comes from project(':iceberg-arrow')
Review comment:
Do we remove this on purpose? I think it was added recently.
##########
File path: spark/v3.0/build.gradle
##########
@@ -80,6 +80,8 @@ project(':iceberg-spark:iceberg-spark3') {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
+ implementation "org.roaringbitmap:RoaringBitmap"
Review comment:
Are we adding this to all Spark versions? If so, we should cover 3.1
too. It was added recently.
##########
File path:
spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
##########
@@ -187,4 +202,45 @@ protected void withTableProperties(Map<String, String>
props, Action action) {
restoreProperties.commit();
}
}
+
+ protected void writePosDeletes(CharSequence path, long numRows, double
percentage) throws IOException {
+ OutputFileFactory fileFactory = newFileFactory();
+ SparkFileWriterFactory writerFactory =
SparkFileWriterFactory.builderFor(table())
+ .dataFileFormat(fileFormat())
Review comment:
Iceberg uses 4 spaces for continued indentation, not 8.
##########
File path:
spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
##########
@@ -187,4 +202,45 @@ protected void withTableProperties(Map<String, String>
props, Action action) {
restoreProperties.commit();
}
}
+
+ protected void writePosDeletes(CharSequence path, long numRows, double
percentage) throws IOException {
+ OutputFileFactory fileFactory = newFileFactory();
+ SparkFileWriterFactory writerFactory =
SparkFileWriterFactory.builderFor(table())
+ .dataFileFormat(fileFormat())
+ .build();
+
+ ClusteredPositionDeleteWriter<InternalRow> writer = new
ClusteredPositionDeleteWriter<>(
+ writerFactory, fileFactory, table().io(),
Review comment:
Here too.
##########
File path:
spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java
##########
@@ -187,4 +202,45 @@ protected void withTableProperties(Map<String, String>
props, Action action) {
restoreProperties.commit();
}
}
+
+ protected void writePosDeletes(CharSequence path, long numRows, double
percentage) throws IOException {
+ OutputFileFactory fileFactory = newFileFactory();
+ SparkFileWriterFactory writerFactory =
SparkFileWriterFactory.builderFor(table())
+ .dataFileFormat(fileFormat())
+ .build();
+
+ ClusteredPositionDeleteWriter<InternalRow> writer = new
ClusteredPositionDeleteWriter<>(
+ writerFactory, fileFactory, table().io(),
+ fileFormat(), TARGET_FILE_SIZE_IN_BYTES);
+
+ PartitionSpec unpartitionedSpec = table().specs().get(0);
+ Set<Long> deletedPos = Sets.newHashSet();
+ while (deletedPos.size() < numRows * percentage) {
+ deletedPos.add(ThreadLocalRandom.current().nextLong(numRows));
+ }
+
+ LOG.info("pos delete row count: {}", deletedPos.size());
+
+ PositionDelete<InternalRow> positionDelete = PositionDelete.create();
+ try (ClusteredPositionDeleteWriter<InternalRow> closeableWriter = writer) {
+ for (Long pos : deletedPos) {
+ positionDelete.set(path, pos, null);
+ closeableWriter.write(positionDelete, unpartitionedSpec, null);
+ }
+ }
+
+ RowDelta rowDelta = table().newRowDelta();
+ writer.result().deleteFiles().forEach(rowDelta::addDeletes);
+ rowDelta.validateDeletedFiles().commit();
+ }
+
+ private OutputFileFactory newFileFactory() {
+ return OutputFileFactory.builderFor(table(), 1, 1)
+ .format(fileFormat())
Review comment:
Here too.
##########
File path:
spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+public abstract class IcebergSourceDeleteBenchmark extends
IcebergSourceBenchmark {
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergSourceDeleteBenchmark.class);
+ private static final long TARGET_FILE_SIZE_IN_BYTES = 512L * 1024 * 1024;
+
+ protected static final int NUM_FILES = 1;
+ protected static final int NUM_ROWS = 10 * 1000 * 1000;
+
+ @Setup
+ public void setupBenchmark() throws IOException {
+ setupSpark();
+ appendData();
+ }
+
+ @TearDown
+ public void tearDownBenchmark() throws IOException {
+ tearDownSpark();
+ cleanupFiles();
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readIceberg() {
+ Map<String, String> tableProperties = Maps.newHashMap();
+ tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 *
1024));
+ withTableProperties(tableProperties, () -> {
+ String tableLocation = table().location();
+ Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
+ materialize(df);
+ });
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readIcebergVectorized() {
+ Map<String, String> tableProperties = Maps.newHashMap();
+ tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 *
1024));
+ tableProperties.put(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true");
+ withTableProperties(tableProperties, () -> {
+ String tableLocation = table().location();
+ Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
+ materialize(df);
+ });
+ }
+
+ protected abstract void appendData() throws IOException;
+
+ protected void writeData(int fileNum) {
+ Dataset<Row> df = spark().range(NUM_ROWS)
+ .withColumnRenamed("id", "longCol")
+ .withColumn("intCol", expr("CAST(MOD(longCol, 2147483647) AS INT)"))
+ .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
+ .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
+ .withColumn("dateCol", date_add(current_date(), fileNum))
+ .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
+ .withColumn("stringCol", expr("CAST(dateCol AS STRING)"));
+ appendAsFile(df);
+ }
+
+ @Override
+ protected Table initTable() {
+ Schema schema = new Schema(
+ required(1, "longCol", Types.LongType.get()),
+ required(2, "intCol", Types.IntegerType.get()),
+ required(3, "floatCol", Types.FloatType.get()),
+ optional(4, "doubleCol", Types.DoubleType.get()),
+ optional(6, "dateCol", Types.DateType.get()),
+ optional(7, "timestampCol", Types.TimestampType.withZone()),
+ optional(8, "stringCol", Types.StringType.get()));
+ PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+ HadoopTables tables = new HadoopTables(hadoopConf());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
+ properties.put(TableProperties.FORMAT_VERSION, "2");
+ return tables.create(schema, partitionSpec, properties,
newTableLocation());
+ }
+
+ @Override
+ protected Configuration initHadoopConf() {
+ return new Configuration();
+ }
+
+ protected void writePosDeletes(CharSequence path, long numRows, double
percentage) throws IOException {
+ writePosDeletes(path, numRows, percentage, 1);
+ }
+
+ protected void writePosDeletes(CharSequence path, long numRows, double
percentage,
+ int numDeleteFile) throws IOException {
+ writePosDeletesWithNoise(path, numRows, percentage, 0, numDeleteFile);
+ }
+
+ protected void writePosDeletesWithNoise(CharSequence path, long numRows,
double percentage, int numNoise,
+ int numDeleteFile) throws
IOException {
+ Set<Long> deletedPos = Sets.newHashSet();
+ while (deletedPos.size() < numRows * percentage) {
+ deletedPos.add(ThreadLocalRandom.current().nextLong(numRows));
+ }
+ LOG.info("pos delete row count: {}, num of delete files: {}",
deletedPos.size(), numDeleteFile);
+
+ int partitionSize = (int) (numRows * percentage) / numDeleteFile;
+ Iterable<List<Long>> sets = Iterables.partition(deletedPos, partitionSize);
+ for (List item : sets) {
Review comment:
nit: raw usage of List
##########
File path:
spark/v3.2/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceDeleteBenchmark.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+public abstract class IcebergSourceDeleteBenchmark extends
IcebergSourceBenchmark {
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergSourceDeleteBenchmark.class);
+ private static final long TARGET_FILE_SIZE_IN_BYTES = 512L * 1024 * 1024;
+
+ protected static final int NUM_FILES = 1;
+ protected static final int NUM_ROWS = 10 * 1000 * 1000;
+
+ @Setup
+ public void setupBenchmark() throws IOException {
+ setupSpark();
+ appendData();
+ }
+
+ @TearDown
+ public void tearDownBenchmark() throws IOException {
+ tearDownSpark();
+ cleanupFiles();
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readIceberg() {
+ Map<String, String> tableProperties = Maps.newHashMap();
+ tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 *
1024));
+ withTableProperties(tableProperties, () -> {
+ String tableLocation = table().location();
+ Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
+ materialize(df);
+ });
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void readIcebergVectorized() {
+ Map<String, String> tableProperties = Maps.newHashMap();
+ tableProperties.put(SPLIT_OPEN_FILE_COST, Integer.toString(128 * 1024 *
1024));
+ tableProperties.put(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true");
+ withTableProperties(tableProperties, () -> {
+ String tableLocation = table().location();
+ Dataset<Row> df = spark().read().format("iceberg").load(tableLocation);
+ materialize(df);
+ });
+ }
+
+ protected abstract void appendData() throws IOException;
+
+ protected void writeData(int fileNum) {
+ Dataset<Row> df = spark().range(NUM_ROWS)
+ .withColumnRenamed("id", "longCol")
+ .withColumn("intCol", expr("CAST(MOD(longCol, 2147483647) AS INT)"))
+ .withColumn("floatCol", expr("CAST(longCol AS FLOAT)"))
+ .withColumn("doubleCol", expr("CAST(longCol AS DOUBLE)"))
+ .withColumn("dateCol", date_add(current_date(), fileNum))
+ .withColumn("timestampCol", expr("TO_TIMESTAMP(dateCol)"))
+ .withColumn("stringCol", expr("CAST(dateCol AS STRING)"));
+ appendAsFile(df);
+ }
+
+ @Override
+ protected Table initTable() {
+ Schema schema = new Schema(
+ required(1, "longCol", Types.LongType.get()),
+ required(2, "intCol", Types.IntegerType.get()),
+ required(3, "floatCol", Types.FloatType.get()),
+ optional(4, "doubleCol", Types.DoubleType.get()),
+ optional(6, "dateCol", Types.DateType.get()),
+ optional(7, "timestampCol", Types.TimestampType.withZone()),
+ optional(8, "stringCol", Types.StringType.get()));
+ PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
+ HadoopTables tables = new HadoopTables(hadoopConf());
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(TableProperties.METADATA_COMPRESSION, "gzip");
+ properties.put(TableProperties.FORMAT_VERSION, "2");
+ return tables.create(schema, partitionSpec, properties,
newTableLocation());
+ }
+
+ @Override
+ protected Configuration initHadoopConf() {
+ return new Configuration();
+ }
+
+ protected void writePosDeletes(CharSequence path, long numRows, double
percentage) throws IOException {
+ writePosDeletes(path, numRows, percentage, 1);
+ }
+
+ protected void writePosDeletes(CharSequence path, long numRows, double
percentage,
+ int numDeleteFile) throws IOException {
+ writePosDeletesWithNoise(path, numRows, percentage, 0, numDeleteFile);
+ }
+
+ protected void writePosDeletesWithNoise(CharSequence path, long numRows,
double percentage, int numNoise,
+ int numDeleteFile) throws
IOException {
+ Set<Long> deletedPos = Sets.newHashSet();
+ while (deletedPos.size() < numRows * percentage) {
+ deletedPos.add(ThreadLocalRandom.current().nextLong(numRows));
+ }
+ LOG.info("pos delete row count: {}, num of delete files: {}",
deletedPos.size(), numDeleteFile);
+
+ int partitionSize = (int) (numRows * percentage) / numDeleteFile;
+ Iterable<List<Long>> sets = Iterables.partition(deletedPos, partitionSize);
+ for (List item : sets) {
+ writePosDeletes(path, item, numNoise);
+ }
+ }
+
+ protected void writePosDeletes(CharSequence path, List<Long> deletedPos, int
numNoise) throws IOException {
Review comment:
nit: formatting in this method is off
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
##########
@@ -47,18 +68,71 @@ public final ColumnarBatch read(ColumnarBatch reuse, int
numRowsToRead) {
closeVectors();
}
+ Pair<int[], Integer> rowIdMapping = rowIdMapping(numRowsToRead);
+
for (int i = 0; i < readers.length; i += 1) {
vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
int numRowsInVector = vectorHolders[i].numValues();
Preconditions.checkState(
numRowsInVector == numRowsToRead,
"Number of rows in the vector %s didn't match expected %s ",
numRowsInVector,
numRowsToRead);
- arrowColumnVectors[i] =
- IcebergArrowColumnVector.forHolder(vectorHolders[i],
numRowsInVector);
+
+ if (rowIdMapping == null) {
+ arrowColumnVectors[i] =
IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector);
+ } else {
+ int[] rowIdMap = rowIdMapping.first();
+ Integer numRows = rowIdMapping.second();
+ arrowColumnVectors[i] =
ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMap, numRows);
+ }
}
+
+ rowStartPosInBatch += numRowsToRead;
ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors);
- batch.setNumRows(numRowsToRead);
+
+ if (rowIdMapping == null) {
+ batch.setNumRows(numRowsToRead);
+ } else {
+ Integer numRows = rowIdMapping.second();
+ batch.setNumRows(numRows);
+ }
return batch;
}
+
+ private Pair<int[], Integer> rowIdMapping(int numRows) {
+ if (deletes != null && deletes.hasPosDeletes()) {
+ return buildRowIdMapping(deletes.deletedRowPositions(), numRows);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Build a row id mapping inside a batch, which skips delete rows. For
example, if the 1st and 3rd rows are deleted in
+ * a batch with 5 rows, the mapping would be {0->1, 1->3, 2->4}, and the new
num of rows is 3.
+ * @param deletedRowPositions a set of deleted row positions
+ * @param numRows the num of rows
+ * @return the mapping array and the new num of rows in a batch, null if no
row is deleted
+ */
+ private Pair<int[], Integer> buildRowIdMapping(Roaring64Bitmap
deletedRowPositions, int numRows) {
+ if (deletedRowPositions == null) {
+ return null;
+ }
+ int[] rowIdMapping = new int[numRows];
Review comment:
nit: an empty line before this line would decouple this logical block
from the top `if` statement.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
##########
@@ -47,18 +68,71 @@ public final ColumnarBatch read(ColumnarBatch reuse, int
numRowsToRead) {
closeVectors();
}
+ Pair<int[], Integer> rowIdMapping = rowIdMapping(numRowsToRead);
+
for (int i = 0; i < readers.length; i += 1) {
vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
int numRowsInVector = vectorHolders[i].numValues();
Preconditions.checkState(
numRowsInVector == numRowsToRead,
"Number of rows in the vector %s didn't match expected %s ",
numRowsInVector,
numRowsToRead);
- arrowColumnVectors[i] =
- IcebergArrowColumnVector.forHolder(vectorHolders[i],
numRowsInVector);
+
+ if (rowIdMapping == null) {
+ arrowColumnVectors[i] =
IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector);
+ } else {
+ int[] rowIdMap = rowIdMapping.first();
+ Integer numRows = rowIdMapping.second();
+ arrowColumnVectors[i] =
ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMap, numRows);
+ }
}
+
+ rowStartPosInBatch += numRowsToRead;
ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors);
- batch.setNumRows(numRowsToRead);
+
+ if (rowIdMapping == null) {
+ batch.setNumRows(numRowsToRead);
+ } else {
+ Integer numRows = rowIdMapping.second();
+ batch.setNumRows(numRows);
+ }
return batch;
}
+
+ private Pair<int[], Integer> rowIdMapping(int numRows) {
+ if (deletes != null && deletes.hasPosDeletes()) {
+ return buildRowIdMapping(deletes.deletedRowPositions(), numRows);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Build a row id mapping inside a batch, which skips delete rows. For
example, if the 1st and 3rd rows are deleted in
+ * a batch with 5 rows, the mapping would be {0->1, 1->3, 2->4}, and the new
num of rows is 3.
+ * @param deletedRowPositions a set of deleted row positions
+ * @param numRows the num of rows
+ * @return the mapping array and the new num of rows in a batch, null if no
row is deleted
+ */
+ private Pair<int[], Integer> buildRowIdMapping(Roaring64Bitmap
deletedRowPositions, int numRows) {
Review comment:
Russell's concern is a valid one but I'd probably start with the current
implementation as it is fairly simple. Also, we will share the array between
all column vectors. For example, if we are scanning 10 columns, only one array
is created. Array memory overhead is small compared to maps/sets as there is no
boxing, extra wrappers. Lookups should be super quick too.
Overall, I feel like the performance of `CharSeqComparator` has a bigger
impact. That being said, it would be nice to try the idea later. It definitely
gives some benefits on paper.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java
##########
@@ -47,18 +68,71 @@ public final ColumnarBatch read(ColumnarBatch reuse, int
numRowsToRead) {
closeVectors();
}
+ Pair<int[], Integer> rowIdMapping = rowIdMapping(numRowsToRead);
+
for (int i = 0; i < readers.length; i += 1) {
vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
int numRowsInVector = vectorHolders[i].numValues();
Preconditions.checkState(
numRowsInVector == numRowsToRead,
"Number of rows in the vector %s didn't match expected %s ",
numRowsInVector,
numRowsToRead);
- arrowColumnVectors[i] =
- IcebergArrowColumnVector.forHolder(vectorHolders[i],
numRowsInVector);
+
+ if (rowIdMapping == null) {
+ arrowColumnVectors[i] =
IcebergArrowColumnVector.forHolder(vectorHolders[i], numRowsInVector);
+ } else {
+ int[] rowIdMap = rowIdMapping.first();
+ Integer numRows = rowIdMapping.second();
+ arrowColumnVectors[i] =
ColumnVectorWithFilter.forHolder(vectorHolders[i], rowIdMap, numRows);
+ }
}
+
+ rowStartPosInBatch += numRowsToRead;
ColumnarBatch batch = new ColumnarBatch(arrowColumnVectors);
- batch.setNumRows(numRowsToRead);
+
+ if (rowIdMapping == null) {
+ batch.setNumRows(numRowsToRead);
+ } else {
+ Integer numRows = rowIdMapping.second();
+ batch.setNumRows(numRows);
+ }
return batch;
}
+
+ private Pair<int[], Integer> rowIdMapping(int numRows) {
+ if (deletes != null && deletes.hasPosDeletes()) {
+ return buildRowIdMapping(deletes.deletedRowPositions(), numRows);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Build a row id mapping inside a batch, which skips delete rows. For
example, if the 1st and 3rd rows are deleted in
+ * a batch with 5 rows, the mapping would be {0->1, 1->3, 2->4}, and the new
num of rows is 3.
+ * @param deletedRowPositions a set of deleted row positions
+ * @param numRows the num of rows
+ * @return the mapping array and the new num of rows in a batch, null if no
row is deleted
+ */
+ private Pair<int[], Integer> buildRowIdMapping(Roaring64Bitmap
deletedRowPositions, int numRows) {
+ if (deletedRowPositions == null) {
+ return null;
+ }
+ int[] rowIdMapping = new int[numRows];
+ int originalRowId = 0;
+ int currentRowId = 0;
+ while (originalRowId < numRows) {
+ if (!deletedRowPositions.contains(originalRowId + rowStartPosInBatch)) {
+ rowIdMapping[currentRowId] = originalRowId;
+ currentRowId++;
+ }
+ originalRowId++;
+ }
+
+ if (currentRowId == numRows) {
Review comment:
Am I right that `currentRowId == numRows` means we have no deletes in a
batch? Maybe, add a comment?
##########
File path: versions.props
##########
@@ -1,7 +1,7 @@
org.slf4j:* = 1.7.25
org.apache.avro:avro = 1.10.1
org.apache.calcite:* = 1.10.0
-org.apache.flink:* = 1.12.5
+org.apache.flink:* = 1.13.2
Review comment:
Why change Flink version?
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java
##########
@@ -94,6 +94,10 @@ public VectorizedReaderBuilder(
reorderedFields.add(VectorizedArrowReader.nulls());
}
}
+ return vectorizedReader(reorderedFields);
+ }
+
+ protected VectorizedReader vectorizedReader(List<VectorizedReader<?>>
reorderedFields) {
Review comment:
nit: `VectorizedReader<?>`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]