Repository: hive Updated Branches: refs/heads/master 1629ec058 -> d717d3853
HIVE-18907: Create utility to fix acid key index issue from HIVE-18817 (Jason Dere, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d717d385 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d717d385 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d717d385 Branch: refs/heads/master Commit: d717d3853469afc95dbb46a05f52e1b4c7304a89 Parents: 1629ec0 Author: Jason Dere <jd...@hortonworks.com> Authored: Mon Mar 12 15:28:50 2018 -0700 Committer: Jason Dere <jd...@hortonworks.com> Committed: Mon Mar 12 15:28:50 2018 -0700 ---------------------------------------------------------------------- bin/ext/fixacidkeyindex.sh | 32 ++ .../hadoop/hive/ql/io/orc/FixAcidKeyIndex.java | 374 +++++++++++++++++++ .../hive/ql/io/orc/TestFixAcidKeyIndex.java | 301 +++++++++++++++ 3 files changed, 707 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d717d385/bin/ext/fixacidkeyindex.sh ---------------------------------------------------------------------- diff --git a/bin/ext/fixacidkeyindex.sh b/bin/ext/fixacidkeyindex.sh new file mode 100644 index 0000000..28af2ea --- /dev/null +++ b/bin/ext/fixacidkeyindex.sh @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +THISSERVICE=fixacidkeyindex +export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} " + +fixacidkeyindex () { + CLASS=org.apache.hadoop.hive.ql.io.orc.FixAcidKeyIndex + HIVE_OPTS='' + execHiveCmd $CLASS "$@" +} + +fixacidkeyindex_help () { + echo "usage ./hive fixacidkeyindex [-h] --check-only|--recover [--backup-path <new-path>] <path_to_orc_file_or_directory>" + echo "" + echo " --check-only Check acid orc file for valid acid key index and exit without fixing" + echo " --recover Fix the acid key index for acid orc file if it requires fixing" + echo " --backup-path <new_path> Specify a backup path to store the corrupted files (default: /tmp)" + echo " --help (-h) Print help message" +} http://git-wip-us.apache.org/repos/asf/hive/blob/d717d385/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FixAcidKeyIndex.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FixAcidKeyIndex.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FixAcidKeyIndex.java new file mode 100644 index 0000000..6920938 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FixAcidKeyIndex.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.orc.CompressionKind; +import org.apache.orc.StripeInformation; +import org.apache.orc.OrcProto.StripeStatistics; +import org.apache.orc.impl.OrcAcidUtils; +import org.apache.orc.tools.FileDump; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility to check and fix the ACID key index of an ORC file if it has been written incorrectly + * due to HIVE-18817. + * The condition that will be checked in the ORC file will be if the number of stripes in the + * acid key index matches the number of stripes in the ORC StripeInformation. + */ +public class FixAcidKeyIndex { + public final static Logger LOG = LoggerFactory.getLogger(FixAcidKeyIndex.class); + + public static final String DEFAULT_BACKUP_PATH = System.getProperty("java.io.tmpdir"); + private static final Charset UTF8 = Charset.forName("UTF-8"); + private static final CharsetDecoder utf8Decoder = UTF8.newDecoder(); + + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + + Options opts = createOptions(); + CommandLine cli = new GnuParser().parse(opts, args); + + if (cli.hasOption('h')) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("fixacidkeyindex", opts); + return; + } + + String backupPath = DEFAULT_BACKUP_PATH; + if (cli.hasOption("backup-path")) { + backupPath = cli.getOptionValue("backup-path"); + } + + boolean checkOnly = cli.hasOption("check-only"); + boolean recover = cli.hasOption("recover"); + + String[] files = cli.getArgs(); + if (files.length == 0) { + System.err.println("Error : ORC files are not specified"); + return; + } + + // if the specified path is directory, iterate through all files + List<String> filesInPath = new ArrayList<>(); + for (String filename : files) { + Path path = new Path(filename); + filesInPath.addAll(getAllFilesInPath(path, conf)); + } + + if (checkOnly) { + checkFiles(conf, filesInPath); + } else if (recover) { + recoverFiles(conf, filesInPath, backupPath); + } else { + System.err.println("check-only or recover option must be specified"); + } + } + + static boolean isAcidKeyIndexValid(Reader reader) { + // The number of stripes should match the key index count + List<StripeInformation> stripes = reader.getStripes(); + RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(reader); + if (keyIndex == null) { + return false; + } + + for (int idx = 0; idx < keyIndex.length; ++idx) { + if (keyIndex[idx] == null) { + LOG.info("*** keyIndex[" + idx + "] is null"); + return false; + } + } + + return stripes.size() == keyIndex.length; + } + + static void recoverFiles(Configuration conf, List<String> fileList, String backup) { + for (String fileName : fileList) { + try { + Path filePath = new Path(fileName); + recoverFile(conf, filePath, backup); + } catch (Exception err) { + System.err.println("ERROR recovering " + fileName); + err.printStackTrace(System.err); + } + } + } + + static void checkFiles(Configuration conf, List<String> fileList) { + for (String fileName : fileList) { + try { + Path filePath = new Path(fileName); + checkFile(conf, filePath); + } catch (Exception err) { + System.err.println("ERROR checking " + fileName); + err.printStackTrace(System.err); + } + } + } + + static void checkFile(Configuration conf, Path inputPath) throws IOException { + FileSystem fs = inputPath.getFileSystem(conf); + Reader reader = OrcFile.createReader(fs, inputPath); + + if (OrcInputFormat.isOriginal(reader)) { + System.out.println(inputPath + " is not an acid file"); + return; + } + + boolean validIndex = isAcidKeyIndexValid(reader); + System.out.println("Checking " + inputPath + " - acid key index is " + + (validIndex ? "valid" : "invalid")); + } + + static void recoverFile(Configuration conf, Path inputPath, String backup) throws IOException { + FileSystem fs = inputPath.getFileSystem(conf); + Reader reader = OrcFile.createReader(fs, inputPath); + + if (OrcInputFormat.isOriginal(reader)) { + System.out.println(inputPath + " is not an acid file. No need to recover."); + return; + } + + boolean validIndex = isAcidKeyIndexValid(reader); + if (validIndex) { + System.out.println(inputPath + " has a valid acid key index. No need to recover."); + return; + } + + System.out.println("Recovering " + inputPath); + + Path recoveredPath = getRecoveryFile(inputPath); + // make sure that file does not exist + if (fs.exists(recoveredPath)) { + fs.delete(recoveredPath, false); + } + + // Writer should match the orc configuration from the original file + OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf) + .compress(reader.getCompression()) + .version(reader.getFileVersion()) + .rowIndexStride(reader.getRowIndexStride()) + .inspector(reader.getObjectInspector()); + // compression buffer size should only be set if compression is enabled + if (reader.getCompression() != org.apache.hadoop.hive.ql.io.orc.CompressionKind.NONE) { + writerOptions.bufferSize(reader.getCompressionSize()).enforceBufferSize(); + } + + try (Writer writer = OrcFile.createWriter(recoveredPath, writerOptions)) { + + // For HIVE-18817, the only thing missing is the last stripe index information. + // Get the information from the last stripe and append it to the existing index. + // The actual stripe data can be written as-is, similar to OrcFileMergeOperator. + + String keyIndexString = getKeyIndexAsString(reader); + if (keyIndexString == null || keyIndexString.equals("null")) { + // Key index can be null/"null" if there is only a single stripe. Just start fresh. + keyIndexString = ""; + } + + List<StripeInformation> stripes = reader.getStripes(); + List<StripeStatistics> stripeStats = reader.getOrcProtoStripeStatistics(); + + try (FSDataInputStream inputStream = fs.open(inputPath)) { + for (int idx = 0; idx < stripes.size(); ++idx) { + // initialize buffer to read the entire stripe. + StripeInformation stripe = stripes.get(idx); + int stripeLength = (int) stripe.getLength(); + byte[] buffer = new byte[stripeLength]; + inputStream.readFully(stripe.getOffset(), buffer, 0, stripeLength); + + // append the stripe buffer to the new ORC file + writer.appendStripe(buffer, 0, buffer.length, stripe, stripeStats.get(idx)); + } + } + + // For last stripe we need to get the last trasactionId/bucket/rowId from the last row. + long lastRow = reader.getNumberOfRows() - 1; + //RecordReader rr = reader.rows(); + try (RecordReader rr = reader.rows()) { + rr.seekToRow(lastRow); + OrcStruct row = (OrcStruct) rr.next(null); + StructObjectInspector soi = (StructObjectInspector) reader.getObjectInspector(); + // struct<operation:int,originalTransaction:bigint,bucket:int,rowId:bigint,currentTransaction:bigint + List<? extends StructField> structFields = soi.getAllStructFieldRefs(); + + StructField transactionField = structFields.get(1); + StructField bucketField = structFields.get(2); + StructField rowIdField = structFields.get(3); + + long lastTransaction = ((LongObjectInspector) transactionField.getFieldObjectInspector()).get( + soi.getStructFieldData(row, transactionField)); + int lastBucket = ((IntObjectInspector) bucketField.getFieldObjectInspector()).get( + soi.getStructFieldData(row, bucketField)); + long lastRowId = ((LongObjectInspector) rowIdField.getFieldObjectInspector()).get( + soi.getStructFieldData(row, rowIdField)); + keyIndexString += lastTransaction + "," + lastBucket + "," + lastRowId + ";"; + } + + // Add the rest of the metadata keys. + for (String metadataKey : reader.getMetadataKeys()) { + if (!metadataKey.equals(OrcRecordUpdater.ACID_KEY_INDEX_NAME)) { + writer.addUserMetadata(metadataKey, reader.getMetadataValue(metadataKey)); + } + } + + // Finally add the fixed acid key index. + writer.addUserMetadata(OrcRecordUpdater.ACID_KEY_INDEX_NAME, UTF8.encode(keyIndexString)); + } + + // Confirm the file is really fixed, and replace the old file. + Reader newReader = OrcFile.createReader(fs, recoveredPath); + boolean fileFixed = isAcidKeyIndexValid(newReader); + if (fileFixed) { + Path backupDataPath; + String scheme = inputPath.toUri().getScheme(); + String authority = inputPath.toUri().getAuthority(); + String filePath = inputPath.toUri().getPath(); + + // use the same filesystem as input file if backup-path is not explicitly specified + if (backup.equals(DEFAULT_BACKUP_PATH)) { + backupDataPath = new Path(scheme, authority, DEFAULT_BACKUP_PATH + filePath); + } else { + backupDataPath = Path.mergePaths(new Path(backup), inputPath); + } + + // Move data file to backup path + moveFiles(fs, inputPath, backupDataPath); + // finally move recovered file to actual file + moveFiles(fs, recoveredPath, inputPath); + + System.out.println("Fixed acid key index for " + inputPath); + } else { + System.out.println("Unable to fix acid key index for " + inputPath); + } + } + + private static void moveFiles(final FileSystem fs, final Path src, final Path dest) + throws IOException { + try { + // create the dest directory if not exist + if (!fs.exists(dest.getParent())) { + fs.mkdirs(dest.getParent()); + } + + // if the destination file exists for some reason delete it + fs.delete(dest, false); + + if (fs.rename(src, dest)) { + System.err.println("Moved " + src + " to " + dest); + } else { + throw new IOException("Unable to move " + src + " to " + dest); + } + + } catch (Exception e) { + throw new IOException("Unable to move " + src + " to " + dest, e); + } + } + + static String getKeyIndexAsString(Reader reader) { + try { + ByteBuffer val = + reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME) + .duplicate(); + return utf8Decoder.decode(val).toString(); + } catch (CharacterCodingException e) { + throw new IllegalArgumentException("Bad string encoding for " + + OrcRecordUpdater.ACID_KEY_INDEX_NAME, e); + } + } + + static Path getRecoveryFile(final Path corruptPath) { + return new Path(corruptPath.getParent(), corruptPath.getName() + ".fixacidindex"); + } + + static Options createOptions() { + Options result = new Options(); + + result.addOption(OptionBuilder + .withLongOpt("check-only") + .withDescription("Check acid orc file for valid acid key index and exit without fixing") + .create('c')); + + result.addOption(OptionBuilder + .withLongOpt("recover") + .withDescription("Fix the acid key index for acid orc file if it requires fixing") + .create('r')); + + result.addOption(OptionBuilder + .withLongOpt("backup-path") + .withDescription("specify a backup path to store the corrupted files (default: /tmp)") + .hasArg() + .create()); + + result.addOption(OptionBuilder + .withLongOpt("help") + .withDescription("print help message") + .create('h')); + + return result; + } + + public static Collection<String> getAllFilesInPath(final Path path, + final Configuration conf) throws IOException { + List<String> filesInPath = new ArrayList<>(); + FileSystem fs = path.getFileSystem(conf); + FileStatus fileStatus = fs.getFileStatus(path); + if (fileStatus.isDir()) { + FileStatus[] fileStatuses = fs.listStatus(path, FileDump.HIDDEN_AND_SIDE_FILE_FILTER); + for (FileStatus fileInPath : fileStatuses) { + if (fileInPath.isDir()) { + filesInPath.addAll(getAllFilesInPath(fileInPath.getPath(), conf)); + } else { + filesInPath.add(fileInPath.getPath().toString()); + } + } + } else { + filesInPath.add(path.toString()); + } + + return filesInPath; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/d717d385/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFixAcidKeyIndex.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFixAcidKeyIndex.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFixAcidKeyIndex.java new file mode 100644 index 0000000..bf478cc --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestFixAcidKeyIndex.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.orc.OrcFile.WriterContext; +import org.apache.orc.impl.AcidStats; +import org.apache.orc.impl.OrcAcidUtils; +import org.apache.orc.impl.WriterImpl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.*; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +public class TestFixAcidKeyIndex { + public final static Logger LOG = LoggerFactory.getLogger(TestFixAcidKeyIndex.class); + + @Rule + public TestName testCaseName = new TestName(); + Path workDir = new Path(System.getProperty("test.tmp.dir","target/tmp")); + Configuration conf; + Path testFilePath; + FileSystem fs; + + @Before + public void openFileSystem () throws Exception { + conf = new Configuration(); + fs = FileSystem.getLocal(conf); + testFilePath = new Path(workDir, "TestFixAcidKeyIndex." + + testCaseName.getMethodName() + ".orc"); + fs.delete(testFilePath, false); + } + + static abstract class TestKeyIndexBuilder + extends OrcRecordUpdater.KeyIndexBuilder + implements OrcFile.WriterCallback { + public TestKeyIndexBuilder(String name) { + super(name); + } + + // Will be called before closing the ORC file to stop writing any additional information + // to the acid key index. + abstract void stopWritingKeyIndex(); + } + + void createTestAcidFile(Path path, int numRows, TestKeyIndexBuilder indexBuilder) throws Exception { + FileSystem fs = path.getFileSystem(conf); + fs.delete(path, true); + String typeStr = "struct<operation:int," + + "originalTransaction:bigint,bucket:int,rowId:bigint," + + "currentTransaction:bigint," + + "row:struct<a:int,b:struct<c:int>,d:string>>"; + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeStr); + Writer writer = OrcFile.createWriter(path, + OrcFile.writerOptions(conf) + .fileSystem(fs) + .inspector(OrcStruct.createObjectInspector(typeInfo)) + .compress(CompressionKind.NONE) + .callback(indexBuilder) + .stripeSize(128)); + // Create ORC file with small stripe size so we can write multiple stripes. + OrcStruct row = new OrcStruct(6); + row.setFieldValue(0, new IntWritable(0)); + row.setFieldValue(1, new LongWritable(1)); + row.setFieldValue(2, new IntWritable(0)); + LongWritable rowId = new LongWritable(); + row.setFieldValue(3, rowId); + row.setFieldValue(4, new LongWritable(1)); + OrcStruct rowField = new OrcStruct(3); + row.setFieldValue(5, rowField); + IntWritable a = new IntWritable(); + rowField.setFieldValue(0, a); + OrcStruct b = new OrcStruct(1); + rowField.setFieldValue(1, b); + IntWritable c = new IntWritable(); + b.setFieldValue(0, c); + Text d = new Text(); + rowField.setFieldValue(2, d); + + // Minimum 5000 rows per stripe. + for(int r=0; r < numRows; r++) { + // row id + rowId.set(r); + // a + a.set(r * 42); + // b.c + c.set(r * 10001); + // d + d.set(Integer.toHexString(r)); + indexBuilder.addKey(OrcRecordUpdater.INSERT_OPERATION, 1, 0, rowId.get()); + writer.addRow(row); + } + + indexBuilder.stopWritingKeyIndex(); + + writer.close(); + } + + void runIndexCheck(Path orcFile, File outFile) throws Exception { + // Run with --check-index and save the output to file so it can be checked. + PrintStream origOut = System.out; + FileOutputStream myOut = new FileOutputStream(outFile); + + System.setOut(new PrintStream(myOut)); + String[] checkArgs = new String[] { + "--check-only", + orcFile.toString() + }; + FixAcidKeyIndex.main(checkArgs); + System.out.flush(); + System.setOut(origOut); + } + + void checkValidKeyIndex(Path orcFile) throws Exception { + String outputFilename = "fixAcidKeyIndex.out"; + File outFile = new File(workDir.toString(), outputFilename); + runIndexCheck(orcFile, outFile); + + // Check the output of FixAcidKeyIndex - it should indicate the index was valid. + String outputAsString = FileUtils.readFileToString(outFile); + System.out.println(outputAsString); + assertTrue(outputAsString.contains("acid key index is valid")); + } + + void checkInvalidKeyIndex(Path orcFile) throws Exception { + String outputFilename = "fixAcidKeyIndex.out"; + File outFile = new File(workDir.toString(), outputFilename); + runIndexCheck(orcFile, outFile); + + // Check the output of FixAcidKeyIndex - it should indicate the index was invalid. + String outputAsString = FileUtils.readFileToString(outFile); + System.out.println(outputAsString); + assertTrue(outputAsString.contains("acid key index is invalid")); + } + + void runFixIndex(Path orcFile, File outFile) throws Exception { + // Run with --recover and save the output to a file so it can be checked. + PrintStream origOut = System.out; + FileOutputStream myOut = new FileOutputStream(outFile); + + System.setOut(new PrintStream(myOut)); + String[] checkArgs = new String[] { + "--recover", + orcFile.toString() + }; + FixAcidKeyIndex.main(checkArgs); + System.out.flush(); + System.setOut(origOut); + } + + void fixInvalidIndex(Path orcFile) throws Exception { + String outputFilename = "fixAcidKeyIndex.out"; + File outFile = new File(workDir.toString(), outputFilename); + runFixIndex(orcFile, outFile); + + // Check the output of FixAcidKeyIndex - it should indicate the index was fixed. + String outputAsString = FileUtils.readFileToString(outFile); + System.out.println(outputAsString); + assertTrue(outputAsString.contains("Fixed acid key index")); + } + + void fixValidIndex(Path orcFile) throws Exception { + String outputFilename = "fixAcidKeyIndex.out"; + File outFile = new File(workDir.toString(), outputFilename); + runFixIndex(orcFile, outFile); + + // Check the output of FixAcidKeyIndex - it should indicate nothing required fixing. + String outputAsString = FileUtils.readFileToString(outFile); + System.out.println(outputAsString); + assertTrue(outputAsString.contains("No need to recover")); + } + + @Test + public void testValidKeyIndex() throws Exception { + // Try single stripe + createTestAcidFile(testFilePath, 100, new GoodKeyIndexBuilder()); + checkValidKeyIndex(testFilePath); + // Attempting to fix a valid - should not result in a new file. + fixValidIndex(testFilePath); + + // Multiple stripes + createTestAcidFile(testFilePath, 12000, new GoodKeyIndexBuilder()); + checkValidKeyIndex(testFilePath); + // Attempting to fix a valid - should not result in a new file. + fixValidIndex(testFilePath); + } + + @Test + public void testInvalidKeyIndex() throws Exception { + // Try single stripe + createTestAcidFile(testFilePath, 100, new BadKeyIndexBuilder()); + checkInvalidKeyIndex(testFilePath); + // Try fixing, this should result in new fixed file. + fixInvalidIndex(testFilePath); + + // Multiple stripes + createTestAcidFile(testFilePath, 12000, new BadKeyIndexBuilder()); + checkInvalidKeyIndex(testFilePath); + // Try fixing, this should result in new fixed file. + fixInvalidIndex(testFilePath); + } + + @Test + public void testNonAcidOrcFile() throws Exception { + // Copy data/files/alltypesorc to workDir + Path baseSrcDir = new Path(System.getProperty("basedir")).getParent(); + Path dataFilesPath = new Path(new Path(baseSrcDir, "data"), "files"); + File origOrcFile = new File(dataFilesPath.toString(), "alltypesorc"); + File testOrcFile = new File(workDir.toString(), "alltypesorc"); + FileUtils.copyFile(origOrcFile, testOrcFile); + + String outputFilename = "fixAcidKeyIndex.out"; + File outFile = new File(workDir.toString(), outputFilename); + runIndexCheck(new Path(testOrcFile.getPath()), outFile); + String outputAsString = FileUtils.readFileToString(outFile); + System.out.println(outputAsString); + assertTrue(outputAsString.contains("is not an acid file")); + } + + /** + * Version of KeyIndexBuilder that generates a valid key index + */ + static class GoodKeyIndexBuilder extends TestKeyIndexBuilder { + + GoodKeyIndexBuilder() { + super("GoodKeyIndexBuilder"); + } + + @Override + public void stopWritingKeyIndex() { + // Do nothing - this should generate proper index. + } + } + + /** + * Bad version of KeyIndexBuilder which builds an invalid acid key index + * by not including the key index info once stopWritingKeyIndex() is called. + */ + static class BadKeyIndexBuilder extends TestKeyIndexBuilder { + + boolean writeAcidIndexInfo = true; + + BadKeyIndexBuilder() { + super("BadKeyIndexBuilder"); + } + + public void stopWritingKeyIndex() { + LOG.info("*** Stop writing index!"); + writeAcidIndexInfo = false; + } + + @Override + public void preStripeWrite(OrcFile.WriterContext context) throws IOException { + LOG.info("*** writeAcidIndexInfo: " + writeAcidIndexInfo); + if (!writeAcidIndexInfo) { + return; + } + + super.preStripeWrite(context); + } + } +}