This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new db2a1dce5 [#1746] feat(remote merge): Introduce the reader and writer for record. (#1920) db2a1dce5 is described below commit db2a1dce547a594e8496273ab0bd4c6500528c64 Author: zhengchenyu <zhengcheny...@163.com> AuthorDate: Thu Jul 18 10:29:30 2024 +0800 [#1746] feat(remote merge): Introduce the reader and writer for record. (#1920) ### What changes were proposed in this pull request? Provides abstract methods for processing Records. ### Why are the changes needed? Fix: #1746 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? unit test and test in real cluster --- .../uniffle/common/records/RecordsReader.java | 71 ++++ .../uniffle/common/records/RecordsWriter.java | 60 +++ .../common/records/RecordsReaderWriterTest.java | 445 +++++++++++++++++++++ 3 files changed, 576 insertions(+) diff --git a/common/src/main/java/org/apache/uniffle/common/records/RecordsReader.java b/common/src/main/java/org/apache/uniffle/common/records/RecordsReader.java new file mode 100644 index 000000000..370239c40 --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/records/RecordsReader.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.records; + +import java.io.IOException; + +import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.serializer.DeserializationStream; +import org.apache.uniffle.common.serializer.PartialInputStream; +import org.apache.uniffle.common.serializer.Serializer; +import org.apache.uniffle.common.serializer.SerializerFactory; +import org.apache.uniffle.common.serializer.SerializerInstance; + +public class RecordsReader<K, V> { + + private DeserializationStream<K, V> stream; + private Object currentKey; + private Object currentValue; + + public RecordsReader( + RssConf rssConf, + PartialInputStream input, + Class<K> keyClass, + Class<V> valueClass, + boolean raw) { + SerializerFactory factory = new SerializerFactory(rssConf); + Serializer serializer = factory.getSerializer(keyClass); + assert factory.getSerializer(valueClass).getClass().equals(serializer.getClass()); + SerializerInstance instance = serializer.newInstance(); + stream = instance.deserializeStream(input, keyClass, valueClass, raw); + } + + public boolean next() throws IOException { + boolean hasNext = stream.nextRecord(); + if (hasNext) { + currentKey = stream.getCurrentKey(); + currentValue = stream.getCurrentValue(); + } + return hasNext; + } + + public Object getCurrentKey() { + return currentKey; + } + + public Object getCurrentValue() { + return currentValue; + } + + public void close() throws IOException { + if (stream != null) { + stream.close(); + stream = null; + } + } +} diff --git a/common/src/main/java/org/apache/uniffle/common/records/RecordsWriter.java b/common/src/main/java/org/apache/uniffle/common/records/RecordsWriter.java new file mode 100644 index 000000000..9ac9b2734 --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/records/RecordsWriter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.records; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.serializer.SerializationStream; +import org.apache.uniffle.common.serializer.Serializer; +import org.apache.uniffle.common.serializer.SerializerFactory; +import org.apache.uniffle.common.serializer.SerializerInstance; + +public class RecordsWriter<K, V> { + + private SerializationStream stream; + + public RecordsWriter( + RssConf rssConf, OutputStream out, Class keyClass, Class valueClass, boolean raw) { + SerializerFactory factory = new SerializerFactory(rssConf); + Serializer serializer = factory.getSerializer(keyClass); + assert factory.getSerializer(valueClass).getClass().equals(serializer.getClass()); + SerializerInstance instance = serializer.newInstance(); + stream = instance.serializeStream(out, raw); + } + + public void append(Object key, Object value) throws IOException { + stream.writeRecord(key, value); + } + + public void flush() throws IOException { + stream.flush(); + } + + public void close() throws IOException { + if (stream != null) { + stream.close(); + stream = null; + } + } + + public long getTotalBytesWritten() { + return stream.getTotalBytesWritten(); + } +} diff --git a/common/src/test/java/org/apache/uniffle/common/records/RecordsReaderWriterTest.java b/common/src/test/java/org/apache/uniffle/common/records/RecordsReaderWriterTest.java new file mode 100644 index 000000000..0fc6e16be --- /dev/null +++ b/common/src/test/java/org/apache/uniffle/common/records/RecordsReaderWriterTest.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.records; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.util.Random; + +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.serializer.PartialInputStreamImpl; +import org.apache.uniffle.common.serializer.Serializer; +import org.apache.uniffle.common.serializer.SerializerFactory; +import org.apache.uniffle.common.serializer.SerializerInstance; +import org.apache.uniffle.common.serializer.SerializerUtils; + +import static org.apache.uniffle.common.serializer.SerializerUtils.genData; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class RecordsReaderWriterTest { + + private static final int RECORDS = 1009; + private static final int LOOP = 5; + + // Test 1: both write and read will use common api + @ParameterizedTest + @ValueSource( + strings = { + "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,mem", + "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file", + }) + public void testWriteAndReadRecordFile1(String classes, @TempDir File tmpDir) throws Exception { + RssConf rssConf = new RssConf(); + // 1 Parse arguments + String[] classArray = classes.split(","); + Class keyClass = SerializerUtils.getClassByName(classArray[0]); + Class valueClass = SerializerUtils.getClassByName(classArray[1]); + boolean isFileMode = classArray[2].equals("file"); + File tmpFile = new File(tmpDir, "tmp.data"); + + // 2 Write + long[] offsets = new long[RECORDS]; + OutputStream outputStream = + isFileMode ? new FileOutputStream(tmpFile) : new ByteArrayOutputStream(); + RecordsWriter writer = new RecordsWriter(rssConf, outputStream, keyClass, valueClass, false); + for (int i = 0; i < RECORDS; i++) { + writer.append(SerializerUtils.genData(keyClass, i), SerializerUtils.genData(valueClass, i)); + offsets[i] = writer.getTotalBytesWritten(); + } + writer.close(); + + // 3 Read + // 3.1 read from start + PartialInputStreamImpl inputStream = + isFileMode + ? PartialInputStreamImpl.newInputStream(tmpFile, 0, tmpFile.length()) + : PartialInputStreamImpl.newInputStream( + ((ByteArrayOutputStream) outputStream).toByteArray(), 0, Long.MAX_VALUE); + RecordsReader reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, false); + int index = 0; + while (reader.next()) { + assertEquals(SerializerUtils.genData(keyClass, index), reader.getCurrentKey()); + assertEquals(SerializerUtils.genData(valueClass, index), reader.getCurrentValue()); + index++; + } + assertEquals(RECORDS, index); + reader.close(); + + // 3.2 read from end + inputStream = + isFileMode + ? PartialInputStreamImpl.newInputStream(tmpFile, offsets[RECORDS - 1], tmpFile.length()) + : PartialInputStreamImpl.newInputStream( + ((ByteArrayOutputStream) outputStream).toByteArray(), + offsets[RECORDS - 1], + Long.MAX_VALUE); + reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, false); + assertFalse(reader.next()); + reader.close(); + + // 3.3 read from random position to end + Random random = new Random(); + long[][] indexAndOffsets = new long[LOOP + 3][2]; + indexAndOffsets[0] = new long[] {0, 0}; + indexAndOffsets[1] = new long[] {RECORDS - 1, offsets[RECORDS - 2]}; // Last record + indexAndOffsets[2] = new long[] {RECORDS, offsets[RECORDS - 1]}; // Records that don't exist + for (int i = 0; i < LOOP; i++) { + int off = random.nextInt(RECORDS - 2) + 1; + indexAndOffsets[i + 3] = new long[] {off + 1, offsets[off]}; + } + for (long[] indexAndOffset : indexAndOffsets) { + index = (int) indexAndOffset[0]; + long offset = indexAndOffset[1]; + inputStream = + isFileMode + ? PartialInputStreamImpl.newInputStream(tmpFile, offset, tmpFile.length()) + : PartialInputStreamImpl.newInputStream( + ((ByteArrayOutputStream) outputStream).toByteArray(), offset, Long.MAX_VALUE); + reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, false); + while (reader.next()) { + assertEquals(SerializerUtils.genData(keyClass, index), reader.getCurrentKey()); + assertEquals(SerializerUtils.genData(valueClass, index), reader.getCurrentValue()); + index++; + } + assertEquals(RECORDS, index); + } + reader.close(); + } + + // Test 2: write with common api, read with raw api + @ParameterizedTest + @ValueSource( + strings = { + "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,mem", + "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file", + }) + public void testWriteAndReadRecordFile2(String classes, @TempDir File tmpDir) throws Exception { + RssConf rssConf = new RssConf(); + // 1 Parse arguments + String[] classArray = classes.split(","); + Class keyClass = SerializerUtils.getClassByName(classArray[0]); + Class valueClass = SerializerUtils.getClassByName(classArray[1]); + boolean isFileMode = classArray[2].equals("file"); + File tmpFile = new File(tmpDir, "tmp.data"); + SerializerFactory factory = new SerializerFactory(rssConf); + Serializer serializer = factory.getSerializer(keyClass); + assert factory.getSerializer(valueClass).getClass().equals(serializer.getClass()); + SerializerInstance instance = serializer.newInstance(); + + // 2 Write + long[] offsets = new long[RECORDS]; + OutputStream outputStream = + isFileMode ? new FileOutputStream(tmpFile) : new ByteArrayOutputStream(); + RecordsWriter writer = new RecordsWriter(rssConf, outputStream, keyClass, valueClass, false); + for (int i = 0; i < RECORDS; i++) { + writer.append(SerializerUtils.genData(keyClass, i), SerializerUtils.genData(valueClass, i)); + offsets[i] = writer.getTotalBytesWritten(); + } + writer.close(); + + // 3 Read + // 3.1 read from start + PartialInputStreamImpl inputStream = + isFileMode + ? PartialInputStreamImpl.newInputStream(tmpFile, 0, tmpFile.length()) + : PartialInputStreamImpl.newInputStream( + ((ByteArrayOutputStream) outputStream).toByteArray(), 0, Long.MAX_VALUE); + RecordsReader reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, true); + int index = 0; + while (reader.next()) { + DataOutputBuffer keyBuffer = (DataOutputBuffer) reader.getCurrentKey(); + DataInputBuffer keyInputBuffer = new DataInputBuffer(); + keyInputBuffer.reset(keyBuffer.getData(), 0, keyBuffer.getLength()); + assertEquals( + SerializerUtils.genData(keyClass, index), instance.deserialize(keyInputBuffer, keyClass)); + DataOutputBuffer valueBuffer = (DataOutputBuffer) reader.getCurrentValue(); + DataInputBuffer valueInputBuffer = new DataInputBuffer(); + valueInputBuffer.reset(valueBuffer.getData(), 0, valueBuffer.getLength()); + assertEquals( + SerializerUtils.genData(valueClass, index), + instance.deserialize(valueInputBuffer, valueClass)); + index++; + } + assertEquals(RECORDS, index); + reader.close(); + + // 3.2 read from end + inputStream = + isFileMode + ? PartialInputStreamImpl.newInputStream(tmpFile, offsets[RECORDS - 1], tmpFile.length()) + : PartialInputStreamImpl.newInputStream( + ((ByteArrayOutputStream) outputStream).toByteArray(), + offsets[RECORDS - 1], + Long.MAX_VALUE); + reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, true); + assertFalse(reader.next()); + reader.close(); + + // 3.3 read from random position to end + Random random = new Random(); + long[][] indexAndOffsets = new long[LOOP + 3][2]; + indexAndOffsets[0] = new long[] {0, 0}; + indexAndOffsets[1] = new long[] {RECORDS - 1, offsets[RECORDS - 2]}; // Last record + indexAndOffsets[2] = new long[] {RECORDS, offsets[RECORDS - 1]}; // Records that don't exist + for (int i = 0; i < LOOP; i++) { + int off = random.nextInt(RECORDS - 2) + 1; + indexAndOffsets[i + 3] = new long[] {off + 1, offsets[off]}; + } + for (long[] indexAndOffset : indexAndOffsets) { + index = (int) indexAndOffset[0]; + long offset = indexAndOffset[1]; + inputStream = + isFileMode + ? PartialInputStreamImpl.newInputStream(tmpFile, offset, tmpFile.length()) + : PartialInputStreamImpl.newInputStream( + ((ByteArrayOutputStream) outputStream).toByteArray(), offset, Long.MAX_VALUE); + reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, true); + while (reader.next()) { + DataOutputBuffer keyBuffer = (DataOutputBuffer) reader.getCurrentKey(); + DataInputBuffer keyInputBuffer = new DataInputBuffer(); + keyInputBuffer.reset(keyBuffer.getData(), 0, keyBuffer.getLength()); + assertEquals( + SerializerUtils.genData(keyClass, index), + instance.deserialize(keyInputBuffer, keyClass)); + DataOutputBuffer valueBuffer = (DataOutputBuffer) reader.getCurrentValue(); + DataInputBuffer valueInputBuffer = new DataInputBuffer(); + valueInputBuffer.reset(valueBuffer.getData(), 0, valueBuffer.getLength()); + assertEquals( + SerializerUtils.genData(valueClass, index), + instance.deserialize(valueInputBuffer, valueClass)); + index++; + } + assertEquals(RECORDS, index); + } + reader.close(); + } + + // Test 3: write with raw api, read with common api + @ParameterizedTest + @ValueSource( + strings = { + "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,mem", + "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file", + }) + public void testWriteAndReadRecordFile3(String classes, @TempDir File tmpDir) throws Exception { + RssConf rssConf = new RssConf(); + // 1 Parse arguments + String[] classArray = classes.split(","); + Class keyClass = SerializerUtils.getClassByName(classArray[0]); + Class valueClass = SerializerUtils.getClassByName(classArray[1]); + boolean isFileMode = classArray[2].equals("file"); + File tmpFile = new File(tmpDir, "tmp.data"); + SerializerFactory factory = new SerializerFactory(rssConf); + Serializer serializer = factory.getSerializer(keyClass); + assert factory.getSerializer(valueClass).getClass().equals(serializer.getClass()); + SerializerInstance instance = serializer.newInstance(); + + // 2 Write + long[] offsets = new long[RECORDS]; + OutputStream outputStream = + isFileMode ? new FileOutputStream(tmpFile) : new ByteArrayOutputStream(); + RecordsWriter writer = new RecordsWriter(rssConf, outputStream, keyClass, valueClass, true); + for (int i = 0; i < RECORDS; i++) { + DataOutputBuffer keyBuffer = new DataOutputBuffer(); + DataOutputBuffer valueBuffer = new DataOutputBuffer(); + instance.serialize(genData(keyClass, i), keyBuffer); + instance.serialize(genData(valueClass, i), valueBuffer); + writer.append(keyBuffer, valueBuffer); + offsets[i] = writer.getTotalBytesWritten(); + } + writer.close(); + + // 3 Read + // 3.1 read from start + PartialInputStreamImpl inputStream = + isFileMode + ? PartialInputStreamImpl.newInputStream(tmpFile, 0, tmpFile.length()) + : PartialInputStreamImpl.newInputStream( + ((ByteArrayOutputStream) outputStream).toByteArray(), 0, Long.MAX_VALUE); + RecordsReader reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, false); + int index = 0; + while (reader.next()) { + assertEquals(SerializerUtils.genData(keyClass, index), reader.getCurrentKey()); + assertEquals(SerializerUtils.genData(valueClass, index), reader.getCurrentValue()); + index++; + } + assertEquals(RECORDS, index); + reader.close(); + + // 3.2 read from end + inputStream = + isFileMode + ? PartialInputStreamImpl.newInputStream(tmpFile, offsets[RECORDS - 1], tmpFile.length()) + : PartialInputStreamImpl.newInputStream( + ((ByteArrayOutputStream) outputStream).toByteArray(), + offsets[RECORDS - 1], + Long.MAX_VALUE); + reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, false); + assertFalse(reader.next()); + reader.close(); + + // 3.3 read from random position to end + Random random = new Random(); + long[][] indexAndOffsets = new long[LOOP + 3][2]; + indexAndOffsets[0] = new long[] {0, 0}; + indexAndOffsets[1] = new long[] {RECORDS - 1, offsets[RECORDS - 2]}; // Last record + indexAndOffsets[2] = new long[] {RECORDS, offsets[RECORDS - 1]}; // Records that don't exist + for (int i = 0; i < LOOP; i++) { + int off = random.nextInt(RECORDS - 2) + 1; + indexAndOffsets[i + 3] = new long[] {off + 1, offsets[off]}; + } + for (long[] indexAndOffset : indexAndOffsets) { + index = (int) indexAndOffset[0]; + long offset = indexAndOffset[1]; + inputStream = + isFileMode + ? PartialInputStreamImpl.newInputStream(tmpFile, offset, tmpFile.length()) + : PartialInputStreamImpl.newInputStream( + ((ByteArrayOutputStream) outputStream).toByteArray(), offset, Long.MAX_VALUE); + reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, false); + while (reader.next()) { + assertEquals(SerializerUtils.genData(keyClass, index), reader.getCurrentKey()); + assertEquals(SerializerUtils.genData(valueClass, index), reader.getCurrentValue()); + index++; + } + assertEquals(RECORDS, index); + } + reader.close(); + } + + // Test 4: both write and read use raw api + @ParameterizedTest + @ValueSource( + strings = { + "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,mem", + "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file", + }) + public void testWriteAndReadRecordFile4(String classes, @TempDir File tmpDir) throws Exception { + RssConf rssConf = new RssConf(); + // 1 Parse arguments + String[] classArray = classes.split(","); + Class keyClass = SerializerUtils.getClassByName(classArray[0]); + Class valueClass = SerializerUtils.getClassByName(classArray[1]); + boolean isFileMode = classArray[2].equals("file"); + File tmpFile = new File(tmpDir, "tmp.data"); + SerializerFactory factory = new SerializerFactory(rssConf); + Serializer serializer = factory.getSerializer(keyClass); + assert factory.getSerializer(valueClass).getClass().equals(serializer.getClass()); + SerializerInstance instance = serializer.newInstance(); + + // 2 Write + long[] offsets = new long[RECORDS]; + OutputStream outputStream = + isFileMode ? new FileOutputStream(tmpFile) : new ByteArrayOutputStream(); + RecordsWriter writer = new RecordsWriter(rssConf, outputStream, keyClass, valueClass, true); + for (int i = 0; i < RECORDS; i++) { + DataOutputBuffer keyBuffer = new DataOutputBuffer(); + DataOutputBuffer valueBuffer = new DataOutputBuffer(); + instance.serialize(genData(keyClass, i), keyBuffer); + instance.serialize(genData(valueClass, i), valueBuffer); + writer.append(keyBuffer, valueBuffer); + offsets[i] = writer.getTotalBytesWritten(); + } + writer.close(); + + // 3 Read + // 3.1 read from start + PartialInputStreamImpl inputStream = + isFileMode + ? PartialInputStreamImpl.newInputStream(tmpFile, 0, tmpFile.length()) + : PartialInputStreamImpl.newInputStream( + ((ByteArrayOutputStream) outputStream).toByteArray(), 0, Long.MAX_VALUE); + RecordsReader reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, true); + int index = 0; + while (reader.next()) { + DataOutputBuffer keyBuffer = (DataOutputBuffer) reader.getCurrentKey(); + DataInputBuffer keyInputBuffer = new DataInputBuffer(); + keyInputBuffer.reset(keyBuffer.getData(), 0, keyBuffer.getLength()); + assertEquals( + SerializerUtils.genData(keyClass, index), instance.deserialize(keyInputBuffer, keyClass)); + DataOutputBuffer valueBuffer = (DataOutputBuffer) reader.getCurrentValue(); + DataInputBuffer valueInputBuffer = new DataInputBuffer(); + valueInputBuffer.reset(valueBuffer.getData(), 0, valueBuffer.getLength()); + assertEquals( + SerializerUtils.genData(valueClass, index), + instance.deserialize(valueInputBuffer, valueClass)); + index++; + } + assertEquals(RECORDS, index); + reader.close(); + + // 3.2 read from end + inputStream = + isFileMode + ? PartialInputStreamImpl.newInputStream(tmpFile, offsets[RECORDS - 1], tmpFile.length()) + : PartialInputStreamImpl.newInputStream( + ((ByteArrayOutputStream) outputStream).toByteArray(), + offsets[RECORDS - 1], + Long.MAX_VALUE); + reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, true); + assertFalse(reader.next()); + reader.close(); + + // 3.3 read from random position to end + Random random = new Random(); + long[][] indexAndOffsets = new long[LOOP + 3][2]; + indexAndOffsets[0] = new long[] {0, 0}; + indexAndOffsets[1] = new long[] {RECORDS - 1, offsets[RECORDS - 2]}; // Last record + indexAndOffsets[2] = new long[] {RECORDS, offsets[RECORDS - 1]}; // Records that don't exist + for (int i = 0; i < LOOP; i++) { + int off = random.nextInt(RECORDS - 2) + 1; + indexAndOffsets[i + 3] = new long[] {off + 1, offsets[off]}; + } + for (long[] indexAndOffset : indexAndOffsets) { + index = (int) indexAndOffset[0]; + long offset = indexAndOffset[1]; + inputStream = + isFileMode + ? PartialInputStreamImpl.newInputStream(tmpFile, offset, tmpFile.length()) + : PartialInputStreamImpl.newInputStream( + ((ByteArrayOutputStream) outputStream).toByteArray(), offset, Long.MAX_VALUE); + reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, true); + while (reader.next()) { + DataOutputBuffer keyBuffer = (DataOutputBuffer) reader.getCurrentKey(); + DataInputBuffer keyInputBuffer = new DataInputBuffer(); + keyInputBuffer.reset(keyBuffer.getData(), 0, keyBuffer.getLength()); + assertEquals( + SerializerUtils.genData(keyClass, index), + instance.deserialize(keyInputBuffer, keyClass)); + DataOutputBuffer valueBuffer = (DataOutputBuffer) reader.getCurrentValue(); + DataInputBuffer valueInputBuffer = new DataInputBuffer(); + valueInputBuffer.reset(valueBuffer.getData(), 0, valueBuffer.getLength()); + assertEquals( + SerializerUtils.genData(valueClass, index), + instance.deserialize(valueInputBuffer, valueClass)); + index++; + } + assertEquals(RECORDS, index); + } + reader.close(); + } +}