This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 78df6c187 [#1747] feat(remote merge): Introduce common merger to merge 
multiple data streams. (#1925)
78df6c187 is described below

commit 78df6c187108c00e6b68f9af2504bd0b57c774da
Author: zhengchenyu <zhengcheny...@163.com>
AuthorDate: Mon Jul 22 16:01:49 2024 +0800

    [#1747] feat(remote merge): Introduce common merger to merge multiple data 
streams. (#1925)
    
    ### What changes were proposed in this pull request?
    
    Introduce merger to merge multiple data streams according to key. Minimum 
heap K-way merge sorting is used to merge and sort the data streams that have 
been partially sorted.
    
    ### Why are the changes needed?
    
    Fix: #1747
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    unit test and test in cluster.
---
 .../uniffle/common/merger/KeyValueIterator.java    |  31 +++
 .../apache/uniffle/common/merger/MergeState.java   |  35 ++++
 .../org/apache/uniffle/common/merger/Merger.java   | 208 +++++++++++++++++++++
 .../apache/uniffle/common/merger/Recordable.java   |  30 +++
 .../org/apache/uniffle/common/merger/Segment.java  |  41 ++++
 .../uniffle/common/merger/StreamedSegment.java     | 122 ++++++++++++
 .../apache/uniffle/common/merger/MergerTest.java   | 113 +++++++++++
 .../uniffle/common/serializer/SerializerUtils.java | 103 ++++++++++
 8 files changed, 683 insertions(+)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/merger/KeyValueIterator.java 
b/common/src/main/java/org/apache/uniffle/common/merger/KeyValueIterator.java
new file mode 100644
index 000000000..22344f803
--- /dev/null
+++ 
b/common/src/main/java/org/apache/uniffle/common/merger/KeyValueIterator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.merger;
+
+import java.io.IOException;
+
+public interface KeyValueIterator<K, V> {
+
+  K getCurrentKey();
+
+  V getCurrentValue();
+
+  boolean next() throws IOException;
+
+  void close() throws IOException;
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/merger/MergeState.java 
b/common/src/main/java/org/apache/uniffle/common/merger/MergeState.java
new file mode 100644
index 000000000..3439d0701
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/merger/MergeState.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.merger;
+
+public enum MergeState {
+  DONE(0),
+  INITED(1),
+  MERGING(2),
+  INTERNAL_ERROR(3);
+
+  private final int code;
+
+  MergeState(int code) {
+    this.code = code;
+  }
+
+  public int code() {
+    return code;
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/merger/Merger.java 
b/common/src/main/java/org/apache/uniffle/common/merger/Merger.java
new file mode 100644
index 000000000..7fcbe2d9b
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/merger/Merger.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.merger;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.PriorityQueue;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.records.RecordsWriter;
+
+public class Merger {
+
+  public static class MergeQueue<K, V> extends PriorityQueue<Segment> 
implements KeyValueIterator {
+
+    private final RssConf rssConf;
+    private final List<Segment> segments;
+    private final Class<K> keyClass;
+    private final Class<V> valueClass;
+    private Comparator comparator;
+    private boolean raw;
+
+    private Object currentKey;
+    private Object currentValue;
+    private Segment minSegment;
+    private Function<Integer, Segment> popSegmentHook;
+
+    public MergeQueue(
+        RssConf rssConf,
+        List<Segment> segments,
+        Class<K> keyClass,
+        Class<V> valueClass,
+        Comparator<K> comparator,
+        boolean raw) {
+      this.rssConf = rssConf;
+      this.segments = segments;
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+      if (comparator == null) {
+        throw new RssException("comparator is null!");
+      }
+      this.raw = raw;
+      this.comparator = comparator;
+    }
+
+    public void setPopSegmentHook(Function<Integer, Segment> popSegmentHook) {
+      this.popSegmentHook = popSegmentHook;
+    }
+
+    @Override
+    protected boolean lessThan(Object o1, Object o2) {
+      if (raw) {
+        Segment s1 = (Segment) o1;
+        Segment s2 = (Segment) o2;
+        DataOutputBuffer key1 = (DataOutputBuffer) s1.getCurrentKey();
+        DataOutputBuffer key2 = (DataOutputBuffer) s2.getCurrentKey();
+        int c =
+            ((RawComparator) comparator)
+                .compare(key1.getData(), 0, key1.getLength(), key2.getData(), 
0, key2.getLength());
+        return c < 0 || ((c == 0) && s1.getId() < s2.getId());
+      } else {
+        Segment s1 = (Segment) o1;
+        Segment s2 = (Segment) o2;
+        Object key1 = s1.getCurrentKey();
+        Object key2 = s2.getCurrentKey();
+        int c = comparator.compare(key1, key2);
+        return c < 0 || ((c == 0) && s1.getId() < s2.getId());
+      }
+    }
+
+    public void init() throws IOException {
+      List<Segment> segmentsToMerge = new ArrayList();
+      for (Segment segment : segments) {
+        boolean hasNext = segment.next();
+        if (hasNext) {
+          segmentsToMerge.add(segment);
+        } else {
+          segment.close();
+        }
+      }
+      initialize(segmentsToMerge.size());
+      clear();
+      for (Segment segment : segmentsToMerge) {
+        put(segment);
+      }
+    }
+
+    @Override
+    public Object getCurrentKey() {
+      return currentKey;
+    }
+
+    @Override
+    public Object getCurrentValue() {
+      return currentValue;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+      if (size() == 0) {
+        resetKeyValue();
+        return false;
+      }
+
+      if (minSegment != null) {
+        adjustPriorityQueue(minSegment);
+        if (size() == 0) {
+          minSegment = null;
+          resetKeyValue();
+          return false;
+        }
+      }
+      minSegment = top();
+      currentKey = minSegment.getCurrentKey();
+      currentValue = minSegment.getCurrentValue();
+      return true;
+    }
+
+    private void resetKeyValue() {
+      currentKey = null;
+      currentValue = null;
+    }
+
+    private void adjustPriorityQueue(Segment segment) throws IOException {
+      if (segment.next()) {
+        adjustTop();
+      } else {
+        pop();
+        segment.close();
+        if (popSegmentHook != null) {
+          Segment newSegment = popSegmentHook.apply((int) segment.getId());
+          if (newSegment != null) {
+            if (newSegment.next()) {
+              put(newSegment);
+            } else {
+              newSegment.close();
+            }
+          }
+        }
+      }
+    }
+
+    void merge(OutputStream output) throws IOException {
+      RecordsWriter<K, V> writer =
+          new RecordsWriter<K, V>(rssConf, output, keyClass, valueClass, raw);
+      boolean recorded = true;
+      while (this.next()) {
+        writer.append(this.getCurrentKey(), this.getCurrentValue());
+        if (output instanceof Recordable) {
+          recorded =
+              ((Recordable) output)
+                  .record(writer.getTotalBytesWritten(), () -> writer.flush(), 
false);
+        }
+      }
+      writer.flush();
+      if (!recorded) {
+        ((Recordable) output).record(writer.getTotalBytesWritten(), null, 
true);
+      }
+      writer.close();
+    }
+
+    @Override
+    public void close() throws IOException {
+      Segment segment;
+      while ((segment = pop()) != null) {
+        segment.close();
+      }
+    }
+  }
+
+  public static void merge(
+      RssConf conf,
+      OutputStream output,
+      List<Segment> segments,
+      Class keyClass,
+      Class valueClass,
+      Comparator comparator,
+      boolean raw)
+      throws IOException {
+    MergeQueue mergeQueue = new MergeQueue(conf, segments, keyClass, 
valueClass, comparator, raw);
+    mergeQueue.init();
+    mergeQueue.merge(output);
+    mergeQueue.close();
+  }
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/merger/Recordable.java 
b/common/src/main/java/org/apache/uniffle/common/merger/Recordable.java
new file mode 100644
index 000000000..79604b7f6
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/merger/Recordable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.merger;
+
+import java.io.IOException;
+
+public interface Recordable {
+
+  @FunctionalInterface
+  interface Flushable {
+    void flush() throws IOException;
+  }
+
+  boolean record(long written, Flushable flush, boolean force) throws 
IOException;
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/merger/Segment.java 
b/common/src/main/java/org/apache/uniffle/common/merger/Segment.java
new file mode 100644
index 000000000..f8a730122
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/merger/Segment.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.merger;
+
+import java.io.IOException;
+
+public abstract class Segment {
+
+  private long id;
+
+  public Segment(long id) {
+    this.id = id;
+  }
+
+  public abstract boolean next() throws IOException;
+
+  public abstract Object getCurrentKey();
+
+  public abstract Object getCurrentValue();
+
+  public long getId() {
+    return this.id;
+  }
+
+  public abstract void close() throws IOException;
+}
diff --git 
a/common/src/main/java/org/apache/uniffle/common/merger/StreamedSegment.java 
b/common/src/main/java/org/apache/uniffle/common/merger/StreamedSegment.java
new file mode 100644
index 000000000..1966a3e51
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/merger/StreamedSegment.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.merger;
+
+import java.io.File;
+import java.io.IOException;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.records.RecordsReader;
+import org.apache.uniffle.common.serializer.PartialInputStream;
+import org.apache.uniffle.common.serializer.PartialInputStreamImpl;
+
+public class StreamedSegment<K, V> extends Segment {
+
+  private RecordsReader<K, V> reader;
+  ByteBuf byteBuf = null;
+
+  public StreamedSegment(
+      RssConf rssConf,
+      PartialInputStream inputStream,
+      long blockId,
+      Class keyClass,
+      Class valueClass,
+      boolean raw) {
+    super(blockId);
+    this.reader = new RecordsReader<>(rssConf, inputStream, keyClass, 
valueClass, raw);
+  }
+
+  public StreamedSegment(
+      RssConf rssConf, ByteBuf byteBuf, long blockId, Class keyClass, Class 
valueClass, boolean raw)
+      throws IOException {
+    super(blockId);
+    this.byteBuf = byteBuf;
+    this.byteBuf.retain();
+    byte[] buffer = byteBuf.array();
+    this.reader =
+        new RecordsReader<>(
+            rssConf,
+            PartialInputStreamImpl.newInputStream(buffer, 0, buffer.length),
+            keyClass,
+            valueClass,
+            raw);
+  }
+
+  // The buffer must be sorted by key
+  public StreamedSegment(
+      RssConf rssConf, byte[] buffer, long blockId, Class keyClass, Class 
valueClass, boolean raw)
+      throws IOException {
+    super(blockId);
+    this.reader =
+        new RecordsReader<>(
+            rssConf,
+            PartialInputStreamImpl.newInputStream(buffer, 0, buffer.length),
+            keyClass,
+            valueClass,
+            raw);
+  }
+
+  public StreamedSegment(
+      RssConf rssConf,
+      File file,
+      long start,
+      long end,
+      long blockId,
+      Class keyClass,
+      Class valueClass,
+      boolean raw)
+      throws IOException {
+    super(blockId);
+    this.reader =
+        new RecordsReader<K, V>(
+            rssConf,
+            PartialInputStreamImpl.newInputStream(file, start, end),
+            keyClass,
+            valueClass,
+            raw);
+  }
+
+  @Override
+  public boolean next() throws IOException {
+    return this.reader.next();
+  }
+
+  @Override
+  public Object getCurrentKey() {
+    return this.reader.getCurrentKey();
+  }
+
+  @Override
+  public Object getCurrentValue() {
+    return this.reader.getCurrentValue();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (byteBuf != null) {
+      this.byteBuf.release();
+      this.byteBuf = null;
+    }
+    if (this.reader != null) {
+      this.reader.close();
+      this.reader = null;
+    }
+  }
+}
diff --git 
a/common/src/test/java/org/apache/uniffle/common/merger/MergerTest.java 
b/common/src/test/java/org/apache/uniffle/common/merger/MergerTest.java
new file mode 100644
index 000000000..1757ad004
--- /dev/null
+++ b/common/src/test/java/org/apache/uniffle/common/merger/MergerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.merger;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.io.RawComparator;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.records.RecordsReader;
+import org.apache.uniffle.common.serializer.PartialInputStreamImpl;
+import org.apache.uniffle.common.serializer.SerializerUtils;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class MergerTest {
+
+  private static final int RECORDS = 1009;
+  private static final int SEGMENTS = 4;
+
+  @ParameterizedTest
+  @ValueSource(
+      strings = {
+        "org.apache.hadoop.io.Text,org.apache.hadoop.io.IntWritable",
+      })
+  public void testMergeSegmentToFile(String classes, @TempDir File tmpDir) 
throws Exception {
+    // 1 Parse arguments
+    String[] classArray = classes.split(",");
+    Class keyClass = SerializerUtils.getClassByName(classArray[0]);
+    Class valueClass = SerializerUtils.getClassByName(classArray[1]);
+
+    // 2 Construct segments, then merge
+    RssConf rssConf = new RssConf();
+    List<Segment> segments = new ArrayList<>();
+    Comparator comparator = SerializerUtils.getComparator(keyClass);
+    for (int i = 0; i < SEGMENTS; i++) {
+      if (i % 2 == 0) {
+        segments.add(
+            SerializerUtils.genMemorySegment(
+                rssConf,
+                keyClass,
+                valueClass,
+                i,
+                i,
+                SEGMENTS,
+                RECORDS,
+                comparator instanceof RawComparator));
+      } else {
+        segments.add(
+            SerializerUtils.genFileSegment(
+                rssConf,
+                keyClass,
+                valueClass,
+                i,
+                i,
+                SEGMENTS,
+                RECORDS,
+                tmpDir,
+                comparator instanceof RawComparator));
+      }
+    }
+    File mergedFile = new File(tmpDir, "data.merged");
+    FileOutputStream outputStream = new FileOutputStream(mergedFile);
+    Merger.merge(
+        rssConf,
+        outputStream,
+        segments,
+        keyClass,
+        valueClass,
+        comparator,
+        comparator instanceof RawComparator);
+    outputStream.close();
+
+    // 3 Check the merged file
+    RecordsReader reader =
+        new RecordsReader(
+            rssConf,
+            PartialInputStreamImpl.newInputStream(mergedFile, 0, 
mergedFile.length()),
+            keyClass,
+            valueClass,
+            false);
+    int index = 0;
+    while (reader.next()) {
+      assertEquals(SerializerUtils.genData(keyClass, index), 
reader.getCurrentKey());
+      assertEquals(SerializerUtils.genData(valueClass, index), 
reader.getCurrentValue());
+      index++;
+    }
+    assertEquals(RECORDS * SEGMENTS, index);
+    reader.close();
+  }
+}
diff --git 
a/common/src/test/java/org/apache/uniffle/common/serializer/SerializerUtils.java
 
b/common/src/test/java/org/apache/uniffle/common/serializer/SerializerUtils.java
index b57114fb1..d5675182b 100644
--- 
a/common/src/test/java/org/apache/uniffle/common/serializer/SerializerUtils.java
+++ 
b/common/src/test/java/org/apache/uniffle/common/serializer/SerializerUtils.java
@@ -17,12 +17,22 @@
 
 package org.apache.uniffle.common.serializer;
 
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Comparator;
 
 import com.google.common.base.Objects;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.merger.Segment;
+import org.apache.uniffle.common.merger.StreamedSegment;
+import org.apache.uniffle.common.records.RecordsWriter;
+
 public class SerializerUtils {
 
   public static class SomeClass {
@@ -125,4 +135,97 @@ public class SerializerUtils {
     }
     return null;
   }
+
+  public static byte[] genSortedRecordBytes(
+      RssConf rssConf,
+      Class keyClass,
+      Class valueClass,
+      int start,
+      int interval,
+      int length,
+      int replica)
+      throws IOException {
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    genSortedRecord(rssConf, keyClass, valueClass, start, interval, length, 
output, replica);
+    return output.toByteArray();
+  }
+
+  public static Segment genMemorySegment(
+      RssConf rssConf,
+      Class keyClass,
+      Class valueClass,
+      long blockId,
+      int start,
+      int interval,
+      int length)
+      throws IOException {
+    return genMemorySegment(rssConf, keyClass, valueClass, blockId, start, 
interval, length, false);
+  }
+
+  public static Segment genMemorySegment(
+      RssConf rssConf,
+      Class keyClass,
+      Class valueClass,
+      long blockId,
+      int start,
+      int interval,
+      int length,
+      boolean raw)
+      throws IOException {
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+    genSortedRecord(rssConf, keyClass, valueClass, start, interval, length, 
output, 1);
+    return new StreamedSegment(rssConf, output.toByteArray(), blockId, 
keyClass, valueClass, raw);
+  }
+
+  public static Segment genFileSegment(
+      RssConf rssConf,
+      Class keyClass,
+      Class valueClass,
+      long blockId,
+      int start,
+      int interval,
+      int length,
+      File tmpDir)
+      throws IOException {
+    return genFileSegment(
+        rssConf, keyClass, valueClass, blockId, start, interval, length, 
tmpDir, false);
+  }
+
+  public static Segment genFileSegment(
+      RssConf rssConf,
+      Class keyClass,
+      Class valueClass,
+      long blockId,
+      int start,
+      int interval,
+      int length,
+      File tmpDir,
+      boolean raw)
+      throws IOException {
+    File file = new File(tmpDir, "data." + start);
+    genSortedRecord(
+        rssConf, keyClass, valueClass, start, interval, length, new 
FileOutputStream(file), 1);
+    return new StreamedSegment(rssConf, file, 0, file.length(), blockId, 
keyClass, valueClass, raw);
+  }
+
+  private static void genSortedRecord(
+      RssConf rssConf,
+      Class keyClass,
+      Class valueClass,
+      int start,
+      int interval,
+      int length,
+      OutputStream output,
+      int replica)
+      throws IOException {
+    RecordsWriter writer = new RecordsWriter(rssConf, output, keyClass, 
valueClass, false);
+    for (int i = 0; i < length; i++) {
+      for (int j = 0; j < replica; j++) {
+        writer.append(
+            SerializerUtils.genData(keyClass, start + i * interval),
+            SerializerUtils.genData(valueClass, start + i * interval));
+      }
+    }
+    writer.close();
+  }
 }

Reply via email to