This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new db2a1dce5 [#1746] feat(remote merge): Introduce the reader and writer 
for record. (#1920)
db2a1dce5 is described below

commit db2a1dce547a594e8496273ab0bd4c6500528c64
Author: zhengchenyu <zhengcheny...@163.com>
AuthorDate: Thu Jul 18 10:29:30 2024 +0800

    [#1746] feat(remote merge): Introduce the reader and writer for record. 
(#1920)
    
    ### What changes were proposed in this pull request?
    
    Provides abstract methods for processing Records.
    
    ### Why are the changes needed?
    
    Fix: #1746
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    unit test and test in real cluster
---
 .../uniffle/common/records/RecordsReader.java      |  71 ++++
 .../uniffle/common/records/RecordsWriter.java      |  60 +++
 .../common/records/RecordsReaderWriterTest.java    | 445 +++++++++++++++++++++
 3 files changed, 576 insertions(+)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/records/RecordsReader.java 
b/common/src/main/java/org/apache/uniffle/common/records/RecordsReader.java
new file mode 100644
index 000000000..370239c40
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/records/RecordsReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.records;
+
+import java.io.IOException;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.serializer.DeserializationStream;
+import org.apache.uniffle.common.serializer.PartialInputStream;
+import org.apache.uniffle.common.serializer.Serializer;
+import org.apache.uniffle.common.serializer.SerializerFactory;
+import org.apache.uniffle.common.serializer.SerializerInstance;
+
+public class RecordsReader<K, V> {
+
+  private DeserializationStream<K, V> stream;
+  private Object currentKey;
+  private Object currentValue;
+
+  public RecordsReader(
+      RssConf rssConf,
+      PartialInputStream input,
+      Class<K> keyClass,
+      Class<V> valueClass,
+      boolean raw) {
+    SerializerFactory factory = new SerializerFactory(rssConf);
+    Serializer serializer = factory.getSerializer(keyClass);
+    assert 
factory.getSerializer(valueClass).getClass().equals(serializer.getClass());
+    SerializerInstance instance = serializer.newInstance();
+    stream = instance.deserializeStream(input, keyClass, valueClass, raw);
+  }
+
+  public boolean next() throws IOException {
+    boolean hasNext = stream.nextRecord();
+    if (hasNext) {
+      currentKey = stream.getCurrentKey();
+      currentValue = stream.getCurrentValue();
+    }
+    return hasNext;
+  }
+
+  public Object getCurrentKey() {
+    return currentKey;
+  }
+
+  public Object getCurrentValue() {
+    return currentValue;
+  }
+
+  public void close() throws IOException {
+    if (stream != null) {
+      stream.close();
+      stream = null;
+    }
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/records/RecordsWriter.java 
b/common/src/main/java/org/apache/uniffle/common/records/RecordsWriter.java
new file mode 100644
index 000000000..9ac9b2734
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/records/RecordsWriter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.records;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.serializer.SerializationStream;
+import org.apache.uniffle.common.serializer.Serializer;
+import org.apache.uniffle.common.serializer.SerializerFactory;
+import org.apache.uniffle.common.serializer.SerializerInstance;
+
+public class RecordsWriter<K, V> {
+
+  private SerializationStream stream;
+
+  public RecordsWriter(
+      RssConf rssConf, OutputStream out, Class keyClass, Class valueClass, 
boolean raw) {
+    SerializerFactory factory = new SerializerFactory(rssConf);
+    Serializer serializer = factory.getSerializer(keyClass);
+    assert 
factory.getSerializer(valueClass).getClass().equals(serializer.getClass());
+    SerializerInstance instance = serializer.newInstance();
+    stream = instance.serializeStream(out, raw);
+  }
+
+  public void append(Object key, Object value) throws IOException {
+    stream.writeRecord(key, value);
+  }
+
+  public void flush() throws IOException {
+    stream.flush();
+  }
+
+  public void close() throws IOException {
+    if (stream != null) {
+      stream.close();
+      stream = null;
+    }
+  }
+
+  public long getTotalBytesWritten() {
+    return stream.getTotalBytesWritten();
+  }
+}
diff --git 
a/common/src/test/java/org/apache/uniffle/common/records/RecordsReaderWriterTest.java
 
b/common/src/test/java/org/apache/uniffle/common/records/RecordsReaderWriterTest.java
new file mode 100644
index 000000000..0fc6e16be
--- /dev/null
+++ 
b/common/src/test/java/org/apache/uniffle/common/records/RecordsReaderWriterTest.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.records;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.util.Random;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.serializer.PartialInputStreamImpl;
+import org.apache.uniffle.common.serializer.Serializer;
+import org.apache.uniffle.common.serializer.SerializerFactory;
+import org.apache.uniffle.common.serializer.SerializerInstance;
+import org.apache.uniffle.common.serializer.SerializerUtils;
+
+import static org.apache.uniffle.common.serializer.SerializerUtils.genData;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+public class RecordsReaderWriterTest {
+
+  private static final int RECORDS = 1009;
+  private static final int LOOP = 5;
+
+  // Test 1: both write and read will use common api
+  @ParameterizedTest
+  @ValueSource(
+      strings = {
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,mem",
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file",
+      })
+  public void testWriteAndReadRecordFile1(String classes, @TempDir File 
tmpDir) throws Exception {
+    RssConf rssConf = new RssConf();
+    // 1 Parse arguments
+    String[] classArray = classes.split(",");
+    Class keyClass = SerializerUtils.getClassByName(classArray[0]);
+    Class valueClass = SerializerUtils.getClassByName(classArray[1]);
+    boolean isFileMode = classArray[2].equals("file");
+    File tmpFile = new File(tmpDir, "tmp.data");
+
+    // 2 Write
+    long[] offsets = new long[RECORDS];
+    OutputStream outputStream =
+        isFileMode ? new FileOutputStream(tmpFile) : new 
ByteArrayOutputStream();
+    RecordsWriter writer = new RecordsWriter(rssConf, outputStream, keyClass, 
valueClass, false);
+    for (int i = 0; i < RECORDS; i++) {
+      writer.append(SerializerUtils.genData(keyClass, i), 
SerializerUtils.genData(valueClass, i));
+      offsets[i] = writer.getTotalBytesWritten();
+    }
+    writer.close();
+
+    // 3 Read
+    // 3.1 read from start
+    PartialInputStreamImpl inputStream =
+        isFileMode
+            ? PartialInputStreamImpl.newInputStream(tmpFile, 0, 
tmpFile.length())
+            : PartialInputStreamImpl.newInputStream(
+                ((ByteArrayOutputStream) outputStream).toByteArray(), 0, 
Long.MAX_VALUE);
+    RecordsReader reader = new RecordsReader(rssConf, inputStream, keyClass, 
valueClass, false);
+    int index = 0;
+    while (reader.next()) {
+      assertEquals(SerializerUtils.genData(keyClass, index), 
reader.getCurrentKey());
+      assertEquals(SerializerUtils.genData(valueClass, index), 
reader.getCurrentValue());
+      index++;
+    }
+    assertEquals(RECORDS, index);
+    reader.close();
+
+    // 3.2 read from end
+    inputStream =
+        isFileMode
+            ? PartialInputStreamImpl.newInputStream(tmpFile, offsets[RECORDS - 
1], tmpFile.length())
+            : PartialInputStreamImpl.newInputStream(
+                ((ByteArrayOutputStream) outputStream).toByteArray(),
+                offsets[RECORDS - 1],
+                Long.MAX_VALUE);
+    reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, 
false);
+    assertFalse(reader.next());
+    reader.close();
+
+    // 3.3 read from random position to end
+    Random random = new Random();
+    long[][] indexAndOffsets = new long[LOOP + 3][2];
+    indexAndOffsets[0] = new long[] {0, 0};
+    indexAndOffsets[1] = new long[] {RECORDS - 1, offsets[RECORDS - 2]}; // 
Last record
+    indexAndOffsets[2] = new long[] {RECORDS, offsets[RECORDS - 1]}; // 
Records that don't exist
+    for (int i = 0; i < LOOP; i++) {
+      int off = random.nextInt(RECORDS - 2) + 1;
+      indexAndOffsets[i + 3] = new long[] {off + 1, offsets[off]};
+    }
+    for (long[] indexAndOffset : indexAndOffsets) {
+      index = (int) indexAndOffset[0];
+      long offset = indexAndOffset[1];
+      inputStream =
+          isFileMode
+              ? PartialInputStreamImpl.newInputStream(tmpFile, offset, 
tmpFile.length())
+              : PartialInputStreamImpl.newInputStream(
+                  ((ByteArrayOutputStream) outputStream).toByteArray(), 
offset, Long.MAX_VALUE);
+      reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, 
false);
+      while (reader.next()) {
+        assertEquals(SerializerUtils.genData(keyClass, index), 
reader.getCurrentKey());
+        assertEquals(SerializerUtils.genData(valueClass, index), 
reader.getCurrentValue());
+        index++;
+      }
+      assertEquals(RECORDS, index);
+    }
+    reader.close();
+  }
+
+  // Test 2: write with common api, read with raw api
+  @ParameterizedTest
+  @ValueSource(
+      strings = {
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,mem",
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file",
+      })
+  public void testWriteAndReadRecordFile2(String classes, @TempDir File 
tmpDir) throws Exception {
+    RssConf rssConf = new RssConf();
+    // 1 Parse arguments
+    String[] classArray = classes.split(",");
+    Class keyClass = SerializerUtils.getClassByName(classArray[0]);
+    Class valueClass = SerializerUtils.getClassByName(classArray[1]);
+    boolean isFileMode = classArray[2].equals("file");
+    File tmpFile = new File(tmpDir, "tmp.data");
+    SerializerFactory factory = new SerializerFactory(rssConf);
+    Serializer serializer = factory.getSerializer(keyClass);
+    assert 
factory.getSerializer(valueClass).getClass().equals(serializer.getClass());
+    SerializerInstance instance = serializer.newInstance();
+
+    // 2 Write
+    long[] offsets = new long[RECORDS];
+    OutputStream outputStream =
+        isFileMode ? new FileOutputStream(tmpFile) : new 
ByteArrayOutputStream();
+    RecordsWriter writer = new RecordsWriter(rssConf, outputStream, keyClass, 
valueClass, false);
+    for (int i = 0; i < RECORDS; i++) {
+      writer.append(SerializerUtils.genData(keyClass, i), 
SerializerUtils.genData(valueClass, i));
+      offsets[i] = writer.getTotalBytesWritten();
+    }
+    writer.close();
+
+    // 3 Read
+    // 3.1 read from start
+    PartialInputStreamImpl inputStream =
+        isFileMode
+            ? PartialInputStreamImpl.newInputStream(tmpFile, 0, 
tmpFile.length())
+            : PartialInputStreamImpl.newInputStream(
+                ((ByteArrayOutputStream) outputStream).toByteArray(), 0, 
Long.MAX_VALUE);
+    RecordsReader reader = new RecordsReader(rssConf, inputStream, keyClass, 
valueClass, true);
+    int index = 0;
+    while (reader.next()) {
+      DataOutputBuffer keyBuffer = (DataOutputBuffer) reader.getCurrentKey();
+      DataInputBuffer keyInputBuffer = new DataInputBuffer();
+      keyInputBuffer.reset(keyBuffer.getData(), 0, keyBuffer.getLength());
+      assertEquals(
+          SerializerUtils.genData(keyClass, index), 
instance.deserialize(keyInputBuffer, keyClass));
+      DataOutputBuffer valueBuffer = (DataOutputBuffer) 
reader.getCurrentValue();
+      DataInputBuffer valueInputBuffer = new DataInputBuffer();
+      valueInputBuffer.reset(valueBuffer.getData(), 0, 
valueBuffer.getLength());
+      assertEquals(
+          SerializerUtils.genData(valueClass, index),
+          instance.deserialize(valueInputBuffer, valueClass));
+      index++;
+    }
+    assertEquals(RECORDS, index);
+    reader.close();
+
+    // 3.2 read from end
+    inputStream =
+        isFileMode
+            ? PartialInputStreamImpl.newInputStream(tmpFile, offsets[RECORDS - 
1], tmpFile.length())
+            : PartialInputStreamImpl.newInputStream(
+                ((ByteArrayOutputStream) outputStream).toByteArray(),
+                offsets[RECORDS - 1],
+                Long.MAX_VALUE);
+    reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, 
true);
+    assertFalse(reader.next());
+    reader.close();
+
+    // 3.3 read from random position to end
+    Random random = new Random();
+    long[][] indexAndOffsets = new long[LOOP + 3][2];
+    indexAndOffsets[0] = new long[] {0, 0};
+    indexAndOffsets[1] = new long[] {RECORDS - 1, offsets[RECORDS - 2]}; // 
Last record
+    indexAndOffsets[2] = new long[] {RECORDS, offsets[RECORDS - 1]}; // 
Records that don't exist
+    for (int i = 0; i < LOOP; i++) {
+      int off = random.nextInt(RECORDS - 2) + 1;
+      indexAndOffsets[i + 3] = new long[] {off + 1, offsets[off]};
+    }
+    for (long[] indexAndOffset : indexAndOffsets) {
+      index = (int) indexAndOffset[0];
+      long offset = indexAndOffset[1];
+      inputStream =
+          isFileMode
+              ? PartialInputStreamImpl.newInputStream(tmpFile, offset, 
tmpFile.length())
+              : PartialInputStreamImpl.newInputStream(
+                  ((ByteArrayOutputStream) outputStream).toByteArray(), 
offset, Long.MAX_VALUE);
+      reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, 
true);
+      while (reader.next()) {
+        DataOutputBuffer keyBuffer = (DataOutputBuffer) reader.getCurrentKey();
+        DataInputBuffer keyInputBuffer = new DataInputBuffer();
+        keyInputBuffer.reset(keyBuffer.getData(), 0, keyBuffer.getLength());
+        assertEquals(
+            SerializerUtils.genData(keyClass, index),
+            instance.deserialize(keyInputBuffer, keyClass));
+        DataOutputBuffer valueBuffer = (DataOutputBuffer) 
reader.getCurrentValue();
+        DataInputBuffer valueInputBuffer = new DataInputBuffer();
+        valueInputBuffer.reset(valueBuffer.getData(), 0, 
valueBuffer.getLength());
+        assertEquals(
+            SerializerUtils.genData(valueClass, index),
+            instance.deserialize(valueInputBuffer, valueClass));
+        index++;
+      }
+      assertEquals(RECORDS, index);
+    }
+    reader.close();
+  }
+
+  // Test 3: write with raw api, read with common api
+  @ParameterizedTest
+  @ValueSource(
+      strings = {
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,mem",
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file",
+      })
+  public void testWriteAndReadRecordFile3(String classes, @TempDir File 
tmpDir) throws Exception {
+    RssConf rssConf = new RssConf();
+    // 1 Parse arguments
+    String[] classArray = classes.split(",");
+    Class keyClass = SerializerUtils.getClassByName(classArray[0]);
+    Class valueClass = SerializerUtils.getClassByName(classArray[1]);
+    boolean isFileMode = classArray[2].equals("file");
+    File tmpFile = new File(tmpDir, "tmp.data");
+    SerializerFactory factory = new SerializerFactory(rssConf);
+    Serializer serializer = factory.getSerializer(keyClass);
+    assert 
factory.getSerializer(valueClass).getClass().equals(serializer.getClass());
+    SerializerInstance instance = serializer.newInstance();
+
+    // 2 Write
+    long[] offsets = new long[RECORDS];
+    OutputStream outputStream =
+        isFileMode ? new FileOutputStream(tmpFile) : new 
ByteArrayOutputStream();
+    RecordsWriter writer = new RecordsWriter(rssConf, outputStream, keyClass, 
valueClass, true);
+    for (int i = 0; i < RECORDS; i++) {
+      DataOutputBuffer keyBuffer = new DataOutputBuffer();
+      DataOutputBuffer valueBuffer = new DataOutputBuffer();
+      instance.serialize(genData(keyClass, i), keyBuffer);
+      instance.serialize(genData(valueClass, i), valueBuffer);
+      writer.append(keyBuffer, valueBuffer);
+      offsets[i] = writer.getTotalBytesWritten();
+    }
+    writer.close();
+
+    // 3 Read
+    // 3.1 read from start
+    PartialInputStreamImpl inputStream =
+        isFileMode
+            ? PartialInputStreamImpl.newInputStream(tmpFile, 0, 
tmpFile.length())
+            : PartialInputStreamImpl.newInputStream(
+                ((ByteArrayOutputStream) outputStream).toByteArray(), 0, 
Long.MAX_VALUE);
+    RecordsReader reader = new RecordsReader(rssConf, inputStream, keyClass, 
valueClass, false);
+    int index = 0;
+    while (reader.next()) {
+      assertEquals(SerializerUtils.genData(keyClass, index), 
reader.getCurrentKey());
+      assertEquals(SerializerUtils.genData(valueClass, index), 
reader.getCurrentValue());
+      index++;
+    }
+    assertEquals(RECORDS, index);
+    reader.close();
+
+    // 3.2 read from end
+    inputStream =
+        isFileMode
+            ? PartialInputStreamImpl.newInputStream(tmpFile, offsets[RECORDS - 
1], tmpFile.length())
+            : PartialInputStreamImpl.newInputStream(
+                ((ByteArrayOutputStream) outputStream).toByteArray(),
+                offsets[RECORDS - 1],
+                Long.MAX_VALUE);
+    reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, 
false);
+    assertFalse(reader.next());
+    reader.close();
+
+    // 3.3 read from random position to end
+    Random random = new Random();
+    long[][] indexAndOffsets = new long[LOOP + 3][2];
+    indexAndOffsets[0] = new long[] {0, 0};
+    indexAndOffsets[1] = new long[] {RECORDS - 1, offsets[RECORDS - 2]}; // 
Last record
+    indexAndOffsets[2] = new long[] {RECORDS, offsets[RECORDS - 1]}; // 
Records that don't exist
+    for (int i = 0; i < LOOP; i++) {
+      int off = random.nextInt(RECORDS - 2) + 1;
+      indexAndOffsets[i + 3] = new long[] {off + 1, offsets[off]};
+    }
+    for (long[] indexAndOffset : indexAndOffsets) {
+      index = (int) indexAndOffset[0];
+      long offset = indexAndOffset[1];
+      inputStream =
+          isFileMode
+              ? PartialInputStreamImpl.newInputStream(tmpFile, offset, 
tmpFile.length())
+              : PartialInputStreamImpl.newInputStream(
+                  ((ByteArrayOutputStream) outputStream).toByteArray(), 
offset, Long.MAX_VALUE);
+      reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, 
false);
+      while (reader.next()) {
+        assertEquals(SerializerUtils.genData(keyClass, index), 
reader.getCurrentKey());
+        assertEquals(SerializerUtils.genData(valueClass, index), 
reader.getCurrentValue());
+        index++;
+      }
+      assertEquals(RECORDS, index);
+    }
+    reader.close();
+  }
+
+  // Test 4: both write and read use raw api
+  @ParameterizedTest
+  @ValueSource(
+      strings = {
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,mem",
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable,file",
+      })
+  public void testWriteAndReadRecordFile4(String classes, @TempDir File 
tmpDir) throws Exception {
+    RssConf rssConf = new RssConf();
+    // 1 Parse arguments
+    String[] classArray = classes.split(",");
+    Class keyClass = SerializerUtils.getClassByName(classArray[0]);
+    Class valueClass = SerializerUtils.getClassByName(classArray[1]);
+    boolean isFileMode = classArray[2].equals("file");
+    File tmpFile = new File(tmpDir, "tmp.data");
+    SerializerFactory factory = new SerializerFactory(rssConf);
+    Serializer serializer = factory.getSerializer(keyClass);
+    assert 
factory.getSerializer(valueClass).getClass().equals(serializer.getClass());
+    SerializerInstance instance = serializer.newInstance();
+
+    // 2 Write
+    long[] offsets = new long[RECORDS];
+    OutputStream outputStream =
+        isFileMode ? new FileOutputStream(tmpFile) : new 
ByteArrayOutputStream();
+    RecordsWriter writer = new RecordsWriter(rssConf, outputStream, keyClass, 
valueClass, true);
+    for (int i = 0; i < RECORDS; i++) {
+      DataOutputBuffer keyBuffer = new DataOutputBuffer();
+      DataOutputBuffer valueBuffer = new DataOutputBuffer();
+      instance.serialize(genData(keyClass, i), keyBuffer);
+      instance.serialize(genData(valueClass, i), valueBuffer);
+      writer.append(keyBuffer, valueBuffer);
+      offsets[i] = writer.getTotalBytesWritten();
+    }
+    writer.close();
+
+    // 3 Read
+    // 3.1 read from start
+    PartialInputStreamImpl inputStream =
+        isFileMode
+            ? PartialInputStreamImpl.newInputStream(tmpFile, 0, 
tmpFile.length())
+            : PartialInputStreamImpl.newInputStream(
+                ((ByteArrayOutputStream) outputStream).toByteArray(), 0, 
Long.MAX_VALUE);
+    RecordsReader reader = new RecordsReader(rssConf, inputStream, keyClass, 
valueClass, true);
+    int index = 0;
+    while (reader.next()) {
+      DataOutputBuffer keyBuffer = (DataOutputBuffer) reader.getCurrentKey();
+      DataInputBuffer keyInputBuffer = new DataInputBuffer();
+      keyInputBuffer.reset(keyBuffer.getData(), 0, keyBuffer.getLength());
+      assertEquals(
+          SerializerUtils.genData(keyClass, index), 
instance.deserialize(keyInputBuffer, keyClass));
+      DataOutputBuffer valueBuffer = (DataOutputBuffer) 
reader.getCurrentValue();
+      DataInputBuffer valueInputBuffer = new DataInputBuffer();
+      valueInputBuffer.reset(valueBuffer.getData(), 0, 
valueBuffer.getLength());
+      assertEquals(
+          SerializerUtils.genData(valueClass, index),
+          instance.deserialize(valueInputBuffer, valueClass));
+      index++;
+    }
+    assertEquals(RECORDS, index);
+    reader.close();
+
+    // 3.2 read from end
+    inputStream =
+        isFileMode
+            ? PartialInputStreamImpl.newInputStream(tmpFile, offsets[RECORDS - 
1], tmpFile.length())
+            : PartialInputStreamImpl.newInputStream(
+                ((ByteArrayOutputStream) outputStream).toByteArray(),
+                offsets[RECORDS - 1],
+                Long.MAX_VALUE);
+    reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, 
true);
+    assertFalse(reader.next());
+    reader.close();
+
+    // 3.3 read from random position to end
+    Random random = new Random();
+    long[][] indexAndOffsets = new long[LOOP + 3][2];
+    indexAndOffsets[0] = new long[] {0, 0};
+    indexAndOffsets[1] = new long[] {RECORDS - 1, offsets[RECORDS - 2]}; // 
Last record
+    indexAndOffsets[2] = new long[] {RECORDS, offsets[RECORDS - 1]}; // 
Records that don't exist
+    for (int i = 0; i < LOOP; i++) {
+      int off = random.nextInt(RECORDS - 2) + 1;
+      indexAndOffsets[i + 3] = new long[] {off + 1, offsets[off]};
+    }
+    for (long[] indexAndOffset : indexAndOffsets) {
+      index = (int) indexAndOffset[0];
+      long offset = indexAndOffset[1];
+      inputStream =
+          isFileMode
+              ? PartialInputStreamImpl.newInputStream(tmpFile, offset, 
tmpFile.length())
+              : PartialInputStreamImpl.newInputStream(
+                  ((ByteArrayOutputStream) outputStream).toByteArray(), 
offset, Long.MAX_VALUE);
+      reader = new RecordsReader(rssConf, inputStream, keyClass, valueClass, 
true);
+      while (reader.next()) {
+        DataOutputBuffer keyBuffer = (DataOutputBuffer) reader.getCurrentKey();
+        DataInputBuffer keyInputBuffer = new DataInputBuffer();
+        keyInputBuffer.reset(keyBuffer.getData(), 0, keyBuffer.getLength());
+        assertEquals(
+            SerializerUtils.genData(keyClass, index),
+            instance.deserialize(keyInputBuffer, keyClass));
+        DataOutputBuffer valueBuffer = (DataOutputBuffer) 
reader.getCurrentValue();
+        DataInputBuffer valueInputBuffer = new DataInputBuffer();
+        valueInputBuffer.reset(valueBuffer.getData(), 0, 
valueBuffer.getLength());
+        assertEquals(
+            SerializerUtils.genData(valueClass, index),
+            instance.deserialize(valueInputBuffer, valueClass));
+        index++;
+      }
+      assertEquals(RECORDS, index);
+    }
+    reader.close();
+  }
+}

Reply via email to