This is an automated email from the ASF dual-hosted git repository. szita pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 5d1a7fa HIVE-24266: Committed rows in hflush'd ACID files may be missing from query result (Adam Szita, reviewed by Peter Vary) 5d1a7fa is described below commit 5d1a7facf3dfc90a35611b7b5bcbd5c57e4e0dc9 Author: Adam Szita <40628386+sz...@users.noreply.github.com> AuthorDate: Wed Oct 14 21:21:00 2020 +0200 HIVE-24266: Committed rows in hflush'd ACID files may be missing from query result (Adam Szita, reviewed by Peter Vary) --- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 42 ++++++- .../hive/ql/io/orc/TestInputOutputFormat.java | 128 ++++++++++++++++++++- 2 files changed, 162 insertions(+), 8 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 0d028bc..b76f797 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io.orc; import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hive.common.BlobStorageUtils; import org.apache.hadoop.hive.common.NoDynamicValuesException; import org.apache.hadoop.fs.PathFilter; @@ -1156,13 +1157,40 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } else { TreeMap<Long, BlockLocation> blockOffsets = SHIMS.getLocationsWithOffset(fs, fileStatus); for (Map.Entry<Long, BlockLocation> entry : blockOffsets.entrySet()) { - if (entry.getKey() + entry.getValue().getLength() > logicalLen) { + long blockOffset = entry.getKey(); + long blockLength = entry.getValue().getLength(); + if (blockOffset > logicalLen) { //don't create splits for anything past logical EOF - continue; + //map is ordered, thus any possible entry in the iteration after this is bound to be > logicalLen + break; } - OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, entry.getKey(), - entry.getValue().getLength(), entry.getValue().getHosts(), null, isOriginal, true, - deltas, -1, logicalLen, dir, offsetAndBucket); + long splitLength = blockLength; + + long blockEndOvershoot = (blockOffset + blockLength) - logicalLen; + if (blockEndOvershoot > 0) { + // if logicalLen is placed within a block, we should make (this last) split out of the part of this block + // -> we should read less than block end + splitLength -= blockEndOvershoot; + } else if (blockOffsets.lastKey() == blockOffset && blockEndOvershoot < 0) { + // This is the last block but it ends before logicalLen + // This can happen with HDFS if hflush was called and blocks are not persisted to disk yet, but content + // is otherwise available for readers, as DNs have these buffers in memory at this time. + // -> we should read more than (persisted) block end, but only within the block + if (fileStatus instanceof HdfsLocatedFileStatus) { + HdfsLocatedFileStatus hdfsFileStatus = (HdfsLocatedFileStatus)fileStatus; + if (hdfsFileStatus.getLocatedBlocks().isUnderConstruction()) { + // sanity check + if (logicalLen > blockOffset + hdfsFileStatus.getBlockSize()) { + throw new IOException("Side file indicates more data available after the last known block!"); + } + // blockEndOvershoot is negative here... + splitLength -= blockEndOvershoot; + } + } + } + OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, blockOffset, + splitLength, entry.getValue().getHosts(), null, isOriginal, true, + deltas, -1, logicalLen, dir, offsetAndBucket); splits.add(orcSplit); } } @@ -1431,6 +1459,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, //this is the root of the partition in which the 'file' is located private final Path rootDir; OrcSplit.OffsetAndBucketProperty offsetAndBucket = null; + boolean isAcidTableScan; public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi, boolean allowSyntheticFileIds, boolean isDefaultFs) throws IOException { @@ -1452,6 +1481,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, this.deltaSplits = splitInfo.getSplits(); this.allowSyntheticFileIds = allowSyntheticFileIds; this.ppdResult = splitInfo.ppdResult; + this.isAcidTableScan = AcidUtils.isFullAcidScan(context.conf); } public boolean isBlocking() { @@ -1550,7 +1580,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } // scale the raw data size to split level based on ratio of split wrt to file length - final long fileLen = file.getLen(); + final long fileLen = isAcidTableScan ? AcidUtils.getLogicalLength(fs, file) : file.getLen(); final double splitRatio = (double) length / (double) fileLen; final long scaledProjSize = projColsUncompressedSize > 0 ? (long) (splitRatio * projColsUncompressedSize) : fileLen; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index 4865df5..c51106c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.io.orc; import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.DataInput; import java.io.DataOutput; @@ -36,6 +38,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -47,6 +50,8 @@ import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -1070,14 +1075,23 @@ public class TestInputOutputFormat { int length; MockBlock[] blocks; byte[] content; + // If true - it will simulate an HDFS file that was hflushed (i.e. might have content above what can be found as + // first block on disk) and is still being written to (e.g. isUnderConstruction is true) + boolean isHdfsHflushed = false; public MockFile(String path, int blockSize, byte[] content, + MockBlock... blocks) { + this(path, blockSize, content, false, blocks); + } + + public MockFile(String path, int blockSize, byte[] content, boolean isHdfsHflushed, MockBlock... blocks) { this.path = new Path(path); this.blockSize = blockSize; this.blocks = blocks; this.content = content; this.length = content.length; + this.isHdfsHflushed = isHdfsHflushed; int offset = 0; for(MockBlock block: blocks) { block.offset = offset; @@ -1148,6 +1162,11 @@ public class TestInputOutputFormat { } return -1; } + + @Override + public int available() throws IOException { + return file.length - offset; + } } public static class MockPath extends Path { @@ -1506,8 +1525,23 @@ public class TestInputOutputFormat { private LocatedFileStatus createLocatedStatus(MockFile file) throws IOException { FileStatus fileStatus = createStatus(file); - return new LocatedFileStatus(fileStatus, - getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false)); + if (file.isHdfsHflushed) { + // Should work the same way as the local status except for having isUnderConstruction flag set to true and + // having HdfsLocatedFileStatus type + LocatedBlocks lb = new LocatedBlocks(fileStatus.getLen(), true, null, null, false, null, null); + HdfsLocatedFileStatus mockStatus = mock(HdfsLocatedFileStatus.class); + when(mockStatus.getLocatedBlocks()).thenReturn(lb); + when(mockStatus.getPath()).thenReturn(fileStatus.getPath()); + when(mockStatus.getLen()).thenReturn(fileStatus.getLen()); + when(mockStatus.isDirectory()).thenReturn(false); + when(mockStatus.isFile()).thenReturn(true); + when(mockStatus.getBlockSize()).thenReturn(fileStatus.getBlockSize()); + when(mockStatus.getBlockLocations()).thenReturn(getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), + false)); + return mockStatus; + } else { + return new LocatedFileStatus(fileStatus, getFileBlockLocationsImpl(fileStatus, 0, fileStatus.getLen(), false)); + } } private LocatedFileStatus createLocatedDirectory(Path dir) throws IOException { @@ -4221,4 +4255,94 @@ public class TestInputOutputFormat { reader.close(); } + + + private static List<MockFile> mockDeltaWithSideFileForStreaming(String delta, int contentLength, int flush_length) { + final int blockSize = 1000; + boolean isDeltaHflushed = contentLength < flush_length; + + List<MockFile> files = new LinkedList<>(); + + ByteBuffer bb = ByteBuffer.allocate(Long.BYTES); + bb.putLong(flush_length); + bb.array(); + + MockBlock[] blocks = new MockBlock[(contentLength / blockSize) + 1]; + for (int i = 0; i < blocks.length; ++i) { + blocks[i] = new MockBlock("host1"); + } + + files.add(new MockFile("mock:/streaming/" + delta + "/bucket_00000", blockSize, new byte[contentLength], isDeltaHflushed, + blocks)); + files.add(new MockFile("mock:/streaming/" + delta + "/bucket_00000_flush_length", blockSize, bb.array(), false, + new MockBlock("host1"))); + + return files; + } + + private List<OrcSplit> splitsForStreamingAcidTable(List<MockFile> files) throws Exception { + try { + MockFileSystem fs = new MockFileSystem(conf); + files.forEach(f -> MockFileSystem.addGlobalFile(f)); + conf.set("bucket_count", "1"); + //set up props for read + conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, true); + AcidUtils.setAcidOperationalProperties(conf, true, null); + conf.set(ValidTxnList.VALID_TXNS_KEY, + new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString()); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, + TestVectorizedOrcAcidRowBatchReader.DummyRow.getColumnNamesProperty()); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, + TestVectorizedOrcAcidRowBatchReader.DummyRow.getColumnTypesProperty()); + conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, true); + MockPath mockPath = new MockPath(fs, "mock:/streaming"); + conf.set("mapred.input.dir", mockPath.toString()); + conf.set("fs.defaultFS", "mock:///"); + conf.set("fs.mock.impl", MockFileSystem.class.getName()); + conf.set(ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); + OrcInputFormat.Context context = new OrcInputFormat.Context(conf, 0); + + OrcInputFormat.FileGenerator gen = + new OrcInputFormat.FileGenerator(context, () -> fs, new MockPath(fs, + "mock:/streaming"), + false, null); + List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen); + assertEquals(1, splitStrategies.size()); + assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy); + return ((OrcInputFormat.ACIDSplitStrategy)splitStrategies.get(0)).getSplits(); + + } finally { + MockFileSystem.clearGlobalFiles(); + } + } + + @Test + public void testAcidTableStreamingBISplitGeneration() throws Exception { + List<OrcSplit> result = null; + List<MockFile> files = new LinkedList<>(); + + // 1 complete delta file + 1 incomplete where more rows were committed than written to disk + // (1000) + (15/95) + files.addAll(mockDeltaWithSideFileForStreaming("delta_0000001_0000010_0000", 1000, 1000)); + files.addAll(mockDeltaWithSideFileForStreaming("delta_0000011_0000020_0000", 15, 95)); + result = splitsForStreamingAcidTable(files); + files.clear(); + assertEquals(1000, result.get(0).getLength()); + assertEquals(95, result.get(1).getLength()); + + // 1 incomplete delta with 2 complete and 1 incomplete blocks: (1000 + 1000 + 500/800) + files.addAll(mockDeltaWithSideFileForStreaming("delta_0000021_0000030_0000", 2500, 2800)); + result = splitsForStreamingAcidTable(files); + files.clear(); + assertEquals(1000, result.get(0).getLength()); + assertEquals(1000, result.get(1).getLength()); + assertEquals(800, result.get(2).getLength()); + + // 1 complete delta but shorter flush_length - though I think this is almost impossible + files.addAll(mockDeltaWithSideFileForStreaming("delta_0000021_0000030_0000", 1000, 450)); + result = splitsForStreamingAcidTable(files); + files.clear(); + assertEquals(450, result.get(0).getLength()); + + } }