This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5744268157 [csv] Fix projection read for Csv File Format (#6147)
5744268157 is described below
commit 57442681571027697518afa79867b7ce1b79ab36
Author: jerry <[email protected]>
AuthorDate: Wed Aug 27 11:04:52 2025 +0800
[csv] Fix projection read for Csv File Format (#6147)
---
.../apache/paimon/flink/BatchFileStoreITCase.java | 4 +-
.../apache/paimon/format/csv/CsvFileFormat.java | 18 ++++--
.../apache/paimon/format/csv/CsvFileReader.java | 70 +++++++++++++++-----
.../paimon/format/csv/CsvFileFormatTest.java | 75 +++++++++++++++++++---
4 files changed, 136 insertions(+), 31 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index f576010fc3..066eeb31ca 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -89,7 +89,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase {
"CREATE TABLE TEXT_NONE (a INT, b INT, c INT) WITH
('file.format'='%s', 'file.compression'='none')",
format);
sql("INSERT INTO TEXT_NONE VALUES (1, 2, 3)");
- assertThat(sql("SELECT * FROM TEXT_NONE")).containsExactly(Row.of(1,
2, 3));
+ assertThat(sql("SELECT a FROM TEXT_NONE")).containsExactly(Row.of(1));
List<String> files =
sql("select file_path from `TEXT_NONE$files`").stream()
.map(r -> r.getField(0).toString())
@@ -100,7 +100,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase
{
"CREATE TABLE TEXT_GZIP (a INT, b INT, c INT) WITH
('file.format'='%s', 'file.compression'='gzip')",
format);
sql("INSERT INTO TEXT_GZIP VALUES (1, 2, 3)");
- assertThat(sql("SELECT * FROM TEXT_GZIP")).containsExactly(Row.of(1,
2, 3));
+ assertThat(sql("SELECT b FROM TEXT_GZIP")).containsExactly(Row.of(2));
files =
sql("select file_path from `TEXT_GZIP$files`").stream()
.map(r -> r.getField(0).toString())
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
index 393d4dc050..5ee4483b70 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileFormat.java
@@ -54,7 +54,7 @@ public class CsvFileFormat extends FileFormat {
RowType dataSchemaRowType,
RowType projectedRowType,
@Nullable List<Predicate> filters) {
- return new CsvReaderFactory(projectedRowType, options);
+ return new CsvReaderFactory(dataSchemaRowType, projectedRowType,
options);
}
@Override
@@ -101,17 +101,25 @@ public class CsvFileFormat extends FileFormat {
/** CSV {@link FormatReaderFactory} implementation. */
private static class CsvReaderFactory implements FormatReaderFactory {
- private final RowType rowType;
+ private final RowType dataSchemaRowType;
+ private final RowType projectedRowType;
private final CsvOptions options;
- public CsvReaderFactory(RowType rowType, CsvOptions options) {
- this.rowType = rowType;
+ public CsvReaderFactory(
+ RowType dataSchemaRowType, RowType projectedRowType,
CsvOptions options) {
+ this.dataSchemaRowType = dataSchemaRowType;
+ this.projectedRowType = projectedRowType;
this.options = options;
}
@Override
public FileRecordReader<InternalRow> createReader(Context context)
throws IOException {
- return new CsvFileReader(context.fileIO(), context.filePath(),
rowType, options);
+ return new CsvFileReader(
+ context.fileIO(),
+ context.filePath(),
+ dataSchemaRowType,
+ projectedRowType,
+ options);
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
index 36c1043095..0e215aa3fb 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/csv/CsvFileReader.java
@@ -51,12 +51,23 @@ public class CsvFileReader extends BaseTextFileReader {
private final CsvOptions formatOptions;
private final CsvSchema schema;
+ private final RowType dataSchemaRowType;
+ private final RowType projectedRowType;
+ private final int[] projectionMapping;
private boolean headerSkipped = false;
- public CsvFileReader(FileIO fileIO, Path filePath, RowType rowType,
CsvOptions options)
+ public CsvFileReader(
+ FileIO fileIO,
+ Path filePath,
+ RowType rowReadType,
+ RowType projectedRowType,
+ CsvOptions options)
throws IOException {
- super(fileIO, filePath, rowType);
+ super(fileIO, filePath, projectedRowType);
+ this.dataSchemaRowType = rowReadType;
+ this.projectedRowType = projectedRowType;
this.formatOptions = options;
+ this.projectionMapping = createProjectionMapping(rowReadType,
projectedRowType);
this.schema =
CsvSchema.emptySchema()
.withQuoteChar(formatOptions.quoteCharacter().charAt(0))
@@ -99,25 +110,52 @@ public class CsvFileReader extends BaseTextFileReader {
return
CSV_MAPPER.readerFor(String[].class).with(schema).readValue(line);
}
+ /**
+ * Creates a mapping array from read schema to projected schema. Returns
indices of projected
+ * columns in the read schema.
+ */
+ private static int[] createProjectionMapping(RowType rowReadType, RowType
projectedRowType) {
+ int[] mapping = new int[projectedRowType.getFieldCount()];
+ for (int i = 0; i < projectedRowType.getFieldCount(); i++) {
+ String projectedFieldName =
projectedRowType.getFieldNames().get(i);
+ int readIndex =
rowReadType.getFieldNames().indexOf(projectedFieldName);
+ if (readIndex == -1) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Projected field '%s' not found in read
schema",
+ projectedFieldName));
+ }
+ mapping[i] = readIndex;
+ }
+ return mapping;
+ }
+
private InternalRow parseCsvLine(String line, CsvSchema schema) throws
IOException {
String[] fields = parseCsvLineToArray(line, schema);
- int fieldCount = Math.min(fields.length, rowType.getFieldCount());
- Object[] values = new Object[fieldCount]; // Pre-allocated array
-
- for (int i = 0; i < fieldCount; i++) {
- String field = fields[i];
-
- // Fast path for null values
- if (field == null || field.equals(formatOptions.nullLiteral()) ||
field.isEmpty()) {
- values[i] = null;
- continue;
+ int fieldCount = fields.length;
+
+ // Directly parse only projected fields to avoid unnecessary parsing
+ Object[] projectedValues = new
Object[projectedRowType.getFieldCount()];
+ for (int i = 0; i < projectedRowType.getFieldCount(); i++) {
+ int readIndex = projectionMapping[i];
+ // Check if the field exists in the CSV line
+ if (readIndex < fieldCount) {
+ String field = fields[readIndex];
+ // Fast path for null values - check if field is null or empty
first
+ if (field == null || field.isEmpty() ||
field.equals(formatOptions.nullLiteral())) {
+ projectedValues[i] = null;
+ continue;
+ }
+
+ // Optimized field parsing with cached cast executors
+ projectedValues[i] =
+ parseFieldOptimized(field.trim(),
dataSchemaRowType.getTypeAt(readIndex));
+ } else {
+ projectedValues[i] = null; // Field not present in the CSV line
}
-
- // Optimized field parsing with cached cast executors
- values[i] = parseFieldOptimized(field.trim(),
rowType.getTypeAt(i));
}
- return GenericRow.of(values);
+ return GenericRow.of(projectedValues);
}
/** Optimized field parsing with caching and fast paths for common types.
*/
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
index c66094e4b7..a1ae1a2772 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/csv/CsvFileFormatTest.java
@@ -181,7 +181,11 @@ public class CsvFileFormatTest extends FormatReadWriteTest
{
List<InternalRow> result =
writeThenRead(
- options, rowType, testData, "test_field_delim_" +
delimiter.hashCode());
+ options,
+ rowType,
+ rowType,
+ testData,
+ "test_field_delim_" + delimiter.hashCode());
// Verify results
assertThat(result).hasSize(3);
@@ -216,7 +220,11 @@ public class CsvFileFormatTest extends FormatReadWriteTest
{
List<InternalRow> result =
writeThenRead(
- options, rowType, testData, "test_line_delim_" +
delimiter.hashCode());
+ options,
+ rowType,
+ rowType,
+ testData,
+ "test_line_delim_" + delimiter.hashCode());
// Verify results
assertThat(result).hasSize(3);
@@ -248,7 +256,11 @@ public class CsvFileFormatTest extends FormatReadWriteTest
{
List<InternalRow> result =
writeThenRead(
- options, rowType, testData, "test_quote_char_" +
quoteChar.hashCode());
+ options,
+ rowType,
+ rowType,
+ testData,
+ "test_quote_char_" + quoteChar.hashCode());
// Verify results
assertThat(result).hasSize(3);
@@ -282,6 +294,7 @@ public class CsvFileFormatTest extends FormatReadWriteTest {
writeThenRead(
options,
rowType,
+ rowType,
testData,
"test_escape_char_" + escapeChar.hashCode());
@@ -314,7 +327,11 @@ public class CsvFileFormatTest extends FormatReadWriteTest
{
List<InternalRow> result =
writeThenRead(
- options, rowType, testData, "test_include_header_"
+ includeHeader);
+ options,
+ rowType,
+ rowType,
+ testData,
+ "test_include_header_" + includeHeader);
// Verify results
assertThat(result).hasSize(3);
@@ -352,6 +369,7 @@ public class CsvFileFormatTest extends FormatReadWriteTest {
writeThenRead(
options,
rowType,
+ rowType,
testData,
"test_null_literal_" + nullLiteral.hashCode());
@@ -396,7 +414,7 @@ public class CsvFileFormatTest extends FormatReadWriteTest {
GenericRow.of(4, BinaryString.fromString("Normal"),
400.0, null));
List<InternalRow> result =
- writeThenRead(options, rowType, testData,
"test_csv_combination");
+ writeThenRead(options, rowType, rowType, testData,
"test_csv_combination");
// Verify results
assertThat(result).hasSize(4);
@@ -494,6 +512,43 @@ public class CsvFileFormatTest extends FormatReadWriteTest
{
return true;
}
+ @Test
+ public void testProjectionPushdown() throws IOException {
+ RowType fullRowType =
+ RowType.builder()
+ .field("id", DataTypes.INT().notNull())
+ .field("name", DataTypes.STRING())
+ .field("score", DataTypes.DOUBLE())
+ .field("active", DataTypes.BOOLEAN())
+ .build();
+
+ RowType projectedRowType =
+ RowType.builder()
+ .field("score", DataTypes.DOUBLE())
+ .field("name", DataTypes.STRING())
+ .build();
+
+ List<InternalRow> testData =
+ Arrays.asList(
+ GenericRow.of(1, BinaryString.fromString("Alice"),
null, true),
+ GenericRow.of(2, null, 87.2, false),
+ GenericRow.of(3, BinaryString.fromString("Charlie"),
92.8, null));
+
+ List<InternalRow> result =
+ writeThenRead(
+ new Options(), fullRowType, projectedRowType,
testData, "test_projection");
+
+ assertThat(result).hasSize(3);
+ assertThat(result.get(0).isNullAt(0)).isTrue(); // score is null
+ assertThat(result.get(0).getString(1).toString()).isEqualTo("Alice");
+
+ assertThat(result.get(1).getDouble(0)).isEqualTo(87.2);
+ assertThat(result.get(1).isNullAt(1)).isTrue(); // name is null
+
+ assertThat(result.get(2).getDouble(0)).isEqualTo(92.8);
+ assertThat(result.get(2).getString(1).toString()).isEqualTo("Charlie");
+ }
+
private String[] parse(String csvLine) throws IOException {
CsvSchema schema =
CsvSchema.emptySchema()
@@ -509,13 +564,17 @@ public class CsvFileFormatTest extends
FormatReadWriteTest {
* that was read back for further verification.
*/
private List<InternalRow> writeThenRead(
- Options options, RowType rowType, List<InternalRow> testData,
String testPrefix)
+ Options options,
+ RowType fullRowType,
+ RowType rowType,
+ List<InternalRow> testData,
+ String testPrefix)
throws IOException {
FileFormat format =
new CsvFileFormatFactory().create(new FormatContext(options,
1024, 1024));
Path testFile = new Path(parent, testPrefix + "_" + UUID.randomUUID()
+ ".csv");
- FormatWriterFactory writerFactory =
format.createWriterFactory(rowType);
+ FormatWriterFactory writerFactory =
format.createWriterFactory(fullRowType);
try (PositionOutputStream out = fileIO.newOutputStream(testFile,
false);
FormatWriter writer = writerFactory.create(out, "none")) {
for (InternalRow row : testData) {
@@ -523,7 +582,7 @@ public class CsvFileFormatTest extends FormatReadWriteTest {
}
}
try (RecordReader<InternalRow> reader =
- format.createReaderFactory(rowType, rowType, new ArrayList<>())
+ format.createReaderFactory(fullRowType, rowType, new
ArrayList<>())
.createReader(
new FormatReaderContext(
fileIO, testFile,
fileIO.getFileSize(testFile)))) {