This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new f3cd500 ORC-756: Include Vector Length Error with null readerSchema
(for ACID formats) (#650)
f3cd500 is described below
commit f3cd500bbe8c2840720e95871efcf636adc9dc99
Author: Panagiotis Garefalakis <[email protected]>
AuthorDate: Tue Mar 23 19:53:56 2021 +0000
ORC-756: Include Vector Length Error with null readerSchema (for ACID
formats) (#650)
### What changes were proposed in this pull request?
SchemaEvolution should use given file schema when reader schema not
provided.
This also simplifies the build conversion logic.
### Why are the changes needed?
Currently SchemaEvolution from a recordreader without
OrcConf.MAPRED_INPUT_SCHEMA causes "Include vector the wrong length" error
### How was this patch tested?
TestMapreduceOrcOutputFormat.testAcidSelectionNoSchema()
---
.../java/org/apache/orc/impl/SchemaEvolution.java | 99 ++++++++-------------
.../mapreduce/TestMapreduceOrcOutputFormat.java | 29 ++++++
java/mapreduce/src/test/resources/acid5k.orc | Bin 0 -> 47024 bytes
3 files changed, 64 insertions(+), 64 deletions(-)
diff --git a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
index fa13e26..ffab732 100644
--- a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
+++ b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java
@@ -85,58 +85,47 @@ public class SchemaEvolution {
this.hasConversion = false;
this.isOnlyImplicitConversion = true;
this.fileSchema = fileSchema;
+ // Use file schema when reader schema not provided
+ readerSchema = readerSchema == null ? this.fileSchema : readerSchema;
this.isAcid = checkAcidSchema(fileSchema);
- boolean readerSchemaIsAcid = readerSchema == null ? false :
checkAcidSchema(readerSchema);
+ boolean readerSchemaIsAcid = checkAcidSchema(readerSchema);
this.includeAcidColumns = options.getIncludeAcidColumns();
this.readerColumnOffset = isAcid && !readerSchemaIsAcid ?
acidEventFieldNames.size() : 0;
- if (readerSchema != null) {
- if (isAcid && !readerSchemaIsAcid) {
- this.readerSchema = createEventSchema(readerSchema);
- } else {
- this.readerSchema = readerSchema;
- }
- if (readerIncluded != null &&
- readerIncluded.length + readerColumnOffset !=
- this.readerSchema.getMaximumId() + 1) {
- throw new IllegalArgumentException("Include vector the wrong length: "
- + this.readerSchema.toJson() + " with include length "
- + readerIncluded.length);
- }
- this.readerFileTypes =
- new TypeDescription[this.readerSchema.getMaximumId() + 1];
- int positionalLevels = 0;
- if (options.getForcePositionalEvolution()) {
- positionalLevels = isAcid ? 2 : options.getPositionalEvolutionLevel();
- } else if (!hasColumnNames(isAcid? getBaseRow(fileSchema) : fileSchema))
{
- if (!this.fileSchema.equals(this.readerSchema)) {
- if (!allowMissingMetadata) {
- throw new RuntimeException("Found that schema metadata is missing"
- + " from file. This is likely caused by"
- + " a writer earlier than HIVE-4243. Will"
- + " not try to reconcile schemas");
- } else {
- LOG.warn("Column names are missing from this file. This is"
- + " caused by a writer earlier than HIVE-4243. The reader will"
- + " reconcile schemas based on index. File type: " +
- this.fileSchema + ", reader type: " + this.readerSchema);
- positionalLevels = isAcid ? 2 :
options.getPositionalEvolutionLevel();
- }
- }
- }
- buildConversion(fileSchema, this.readerSchema, positionalLevels);
+ // Create type conversion using reader schema
+ if (isAcid && !readerSchemaIsAcid) {
+ this.readerSchema = createEventSchema(readerSchema);
} else {
- this.readerSchema = fileSchema;
- this.readerFileTypes =
- new TypeDescription[this.readerSchema.getMaximumId() + 1];
- if (readerIncluded != null &&
- readerIncluded.length + readerColumnOffset !=
- this.readerSchema.getMaximumId() + 1) {
- throw new IllegalArgumentException("Include vector the wrong length: "
- + this.readerSchema.toJson() + " with include length "
- + readerIncluded.length);
+ this.readerSchema = readerSchema;
+ }
+ if (readerIncluded != null &&
+ readerIncluded.length + readerColumnOffset !=
+ this.readerSchema.getMaximumId() + 1) {
+ throw new IllegalArgumentException("Include vector the wrong length: "
+ + this.readerSchema.toJson() + " with include length "
+ + readerIncluded.length);
+ }
+ this.readerFileTypes =
+ new TypeDescription[this.readerSchema.getMaximumId() + 1];
+ int positionalLevels = 0;
+ if (options.getForcePositionalEvolution()) {
+ positionalLevels = isAcid ? 2 : options.getPositionalEvolutionLevel();
+ } else if (!hasColumnNames(isAcid? getBaseRow(fileSchema) : fileSchema)) {
+ if (!this.fileSchema.equals(this.readerSchema)) {
+ if (!allowMissingMetadata) {
+ throw new RuntimeException("Found that schema metadata is missing"
+ + " from file. This is likely caused by"
+ + " a writer earlier than HIVE-4243. Will"
+ + " not try to reconcile schemas");
+ } else {
+ LOG.warn("Column names are missing from this file. This is"
+ + " caused by a writer earlier than HIVE-4243. The reader will"
+ + " reconcile schemas based on index. File type: " +
+ this.fileSchema + ", reader type: " + this.readerSchema);
+ positionalLevels = isAcid ? 2 :
options.getPositionalEvolutionLevel();
+ }
}
- buildIdentityConversion(this.readerSchema);
}
+ buildConversion(fileSchema, this.readerSchema, positionalLevels);
this.positionalColumns = options.getForcePositionalEvolution();
this.ppdSafeConversion = populatePpdSafeConversion();
}
@@ -575,24 +564,6 @@ public class SchemaEvolution {
}
}
- void buildIdentityConversion(TypeDescription readerType) {
- int id = readerType.getId();
- if (!includeReaderColumn(id)) {
- return;
- }
- if (readerFileTypes[id] != null) {
- throw new RuntimeException("reader to file type entry already assigned");
- }
- readerFileTypes[id] = readerType;
- fileIncluded[id] = true;
- List<TypeDescription> children = readerType.getChildren();
- if (children != null) {
- for (TypeDescription child : children) {
- buildIdentityConversion(child);
- }
- }
- }
-
public static boolean checkAcidSchema(TypeDescription type) {
if (type.getCategory().equals(TypeDescription.Category.STRUCT)) {
List<String> rootFields = type.getFieldNames();
diff --git
a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
index 6a139ec..21c96d8 100644
---
a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
+++
b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
@@ -48,6 +48,7 @@ import java.io.File;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class TestMapreduceOrcOutputFormat {
@@ -154,6 +155,34 @@ public class TestMapreduceOrcOutputFormat {
}
@Test
+ public void testAcidSelectionNoSchema() throws IOException,
InterruptedException {
+ TaskAttemptID id = new TaskAttemptID("jt", 0, TaskType.MAP, 0, 1);
+ TaskAttemptContext attemptContext = new TaskAttemptContextImpl(conf, id);
+ //
struct<operation:int,originalTransaction:bigint,bucket:int,rowId:bigint,currentTransaction:bigint,
+ // row:struct<i:int,j:int,k:int>>
+ conf.set(OrcConf.INCLUDE_COLUMNS.getAttribute(), "5");
+ // Do not set OrcConf.MAPRED_INPUT_SCHEMA (reader should use file schema
instead)
+ FileSplit split = new FileSplit(new Path(getClass().getClassLoader().
+ getSystemResource("acid5k.orc").getPath()),
+ 0, 1000000, new String[0]);
+ RecordReader<NullWritable, OrcStruct> reader =
+ new OrcInputFormat<OrcStruct>().createRecordReader(split,
+ attemptContext);
+ // Make sure we can read all rows
+ OrcStruct row;
+ for (int r=0; r < 5000; ++r) {
+ assertEquals(true, reader.nextKeyValue());
+ row = reader.getCurrentValue();
+ assertEquals(6, row.getNumFields());
+ OrcStruct innerRow = (OrcStruct) row.getFieldValue(5);
+ assertEquals(3,innerRow.getNumFields());
+ assertTrue(((IntWritable)innerRow.getFieldValue(0)).get() >= 0);
+ assertTrue(((IntWritable)innerRow.getFieldValue(1)).get() >= 0);
+ assertTrue(((IntWritable)innerRow.getFieldValue(2)).get() >= 0);
+ }
+ }
+
+ @Test
public void testColumnSelectionBlank() throws Exception {
String typeStr = "struct<i:int,j:int,k:int>";
OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, typeStr);
diff --git a/java/mapreduce/src/test/resources/acid5k.orc
b/java/mapreduce/src/test/resources/acid5k.orc
new file mode 100644
index 0000000..25b4fab
Binary files /dev/null and b/java/mapreduce/src/test/resources/acid5k.orc differ