This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 45a44f4  ORC: Refactor readers to remove duplicate null handling code 
(#899)
45a44f4 is described below

commit 45a44f4cf49300db9a89e205d172146591b0bad1
Author: Shardul Mahadik <[email protected]>
AuthorDate: Thu Apr 9 14:31:37 2020 -0700

    ORC: Refactor readers to remove duplicate null handling code (#899)
---
 .../apache/iceberg/data/orc/GenericOrcReader.java  | 247 ++++----------
 .../apache/iceberg/spark/data/SparkOrcReader.java  | 376 ++++-----------------
 2 files changed, 138 insertions(+), 485 deletions(-)

diff --git 
a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java 
b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
index 88c407d..03f03be 100644
--- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
+++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcReader.java
@@ -96,219 +96,140 @@ public class GenericOrcReader implements 
OrcValueReader<Record> {
   }
 
   interface Converter<T> {
-    T convert(ColumnVector vector, int row);
-  }
-
-  private static class BooleanConverter implements Converter<Boolean> {
-    @Override
-    public Boolean convert(ColumnVector vector, int row) {
+    default T convert(ColumnVector vector, int row) {
       int rowIndex = vector.isRepeating ? 0 : row;
       if (!vector.noNulls && vector.isNull[rowIndex]) {
         return null;
       } else {
-        return ((LongColumnVector) vector).vector[rowIndex] != 0;
+        return convertNonNullValue(vector, row);
       }
     }
+
+    T convertNonNullValue(ColumnVector vector, int row);
+  }
+
+  private static class BooleanConverter implements Converter<Boolean> {
+    @Override
+    public Boolean convertNonNullValue(ColumnVector vector, int row) {
+      return ((LongColumnVector) vector).vector[row] != 0;
+    }
   }
 
   private static class ByteConverter implements Converter<Byte> {
     @Override
-    public Byte convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        return (byte) ((LongColumnVector) vector).vector[rowIndex];
-      }
+    public Byte convertNonNullValue(ColumnVector vector, int row) {
+      return (byte) ((LongColumnVector) vector).vector[row];
     }
   }
 
   private static class ShortConverter implements Converter<Short> {
     @Override
-    public Short convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        return (short) ((LongColumnVector) vector).vector[rowIndex];
-      }
+    public Short convertNonNullValue(ColumnVector vector, int row) {
+      return (short) ((LongColumnVector) vector).vector[row];
     }
   }
 
   private static class IntConverter implements Converter<Integer> {
     @Override
-    public Integer convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        return (int) ((LongColumnVector) vector).vector[rowIndex];
-      }
+    public Integer convertNonNullValue(ColumnVector vector, int row) {
+      return (int) ((LongColumnVector) vector).vector[row];
     }
   }
 
   private static class TimeConverter implements Converter<LocalTime> {
     @Override
-    public LocalTime convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        return LocalTime.ofNanoOfDay(((LongColumnVector) 
vector).vector[rowIndex] * 1_000);
-      }
+    public LocalTime convertNonNullValue(ColumnVector vector, int row) {
+      return LocalTime.ofNanoOfDay(((LongColumnVector) vector).vector[row] * 
1_000);
     }
   }
 
   private static class DateConverter implements Converter<LocalDate> {
     @Override
-    public LocalDate convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        return EPOCH_DAY.plusDays((int) ((LongColumnVector) 
vector).vector[rowIndex]);
-      }
+    public LocalDate convertNonNullValue(ColumnVector vector, int row) {
+      return EPOCH_DAY.plusDays((int) ((LongColumnVector) vector).vector[row]);
     }
   }
 
   private static class LongConverter implements Converter<Long> {
     @Override
-    public Long convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        return ((LongColumnVector) vector).vector[rowIndex];
-      }
+    public Long convertNonNullValue(ColumnVector vector, int row) {
+      return ((LongColumnVector) vector).vector[row];
     }
   }
 
   private static class FloatConverter implements Converter<Float> {
     @Override
-    public Float convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        return (float) ((DoubleColumnVector) vector).vector[rowIndex];
-      }
+    public Float convertNonNullValue(ColumnVector vector, int row) {
+      return (float) ((DoubleColumnVector) vector).vector[row];
     }
   }
 
   private static class DoubleConverter implements Converter<Double> {
     @Override
-    public Double convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        return ((DoubleColumnVector) vector).vector[rowIndex];
-      }
+    public Double convertNonNullValue(ColumnVector vector, int row) {
+      return ((DoubleColumnVector) vector).vector[row];
     }
   }
 
   private static class TimestampTzConverter implements 
Converter<OffsetDateTime> {
-    private OffsetDateTime convert(TimestampColumnVector vector, int row) {
-      return Instant.ofEpochSecond(Math.floorDiv(vector.time[row], 1_000), 
vector.nanos[row]).atOffset(ZoneOffset.UTC);
-    }
-
     @Override
-    public OffsetDateTime convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        return convert((TimestampColumnVector) vector, rowIndex);
-      }
+    public OffsetDateTime convertNonNullValue(ColumnVector vector, int row) {
+      TimestampColumnVector tcv = (TimestampColumnVector) vector;
+      return Instant.ofEpochSecond(Math.floorDiv(tcv.time[row], 1_000), 
tcv.nanos[row]).atOffset(ZoneOffset.UTC);
     }
   }
 
   private static class TimestampConverter implements Converter<LocalDateTime> {
-
-    private LocalDateTime convert(TimestampColumnVector vector, int row) {
-      return Instant.ofEpochSecond(Math.floorDiv(vector.time[row], 1_000), 
vector.nanos[row]).atOffset(ZoneOffset.UTC)
-          .toLocalDateTime();
-    }
-
     @Override
-    public LocalDateTime convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        return convert((TimestampColumnVector) vector, rowIndex);
-      }
+    public LocalDateTime convertNonNullValue(ColumnVector vector, int row) {
+      TimestampColumnVector tcv = (TimestampColumnVector) vector;
+      return Instant.ofEpochSecond(Math.floorDiv(tcv.time[row], 1_000), 
tcv.nanos[row]).atOffset(ZoneOffset.UTC)
+          .toLocalDateTime();
     }
   }
 
   private static class FixedConverter implements Converter<byte[]> {
     @Override
-    public byte[] convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        BytesColumnVector bytesVector = (BytesColumnVector) vector;
-        return Arrays.copyOfRange(bytesVector.vector[rowIndex], 
bytesVector.start[rowIndex],
-            bytesVector.start[rowIndex] + bytesVector.length[rowIndex]);
-      }
+    public byte[] convertNonNullValue(ColumnVector vector, int row) {
+      BytesColumnVector bytesVector = (BytesColumnVector) vector;
+      return Arrays.copyOfRange(bytesVector.vector[row], 
bytesVector.start[row],
+          bytesVector.start[row] + bytesVector.length[row]);
     }
   }
 
   private static class BinaryConverter implements Converter<ByteBuffer> {
     @Override
-    public ByteBuffer convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        BytesColumnVector bytesVector = (BytesColumnVector) vector;
-        return ByteBuffer.wrap(bytesVector.vector[rowIndex], 
bytesVector.start[rowIndex], bytesVector.length[rowIndex]);
-      }
+    public ByteBuffer convertNonNullValue(ColumnVector vector, int row) {
+      BytesColumnVector bytesVector = (BytesColumnVector) vector;
+      return ByteBuffer.wrap(bytesVector.vector[row], bytesVector.start[row], 
bytesVector.length[row]);
     }
   }
 
   private static class UUIDConverter implements Converter<UUID> {
     @Override
-    public UUID convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        BytesColumnVector bytesVector = (BytesColumnVector) vector;
-        ByteBuffer buf = ByteBuffer.wrap(bytesVector.vector[rowIndex], 
bytesVector.start[rowIndex],
-            bytesVector.length[rowIndex]);
-        long mostSigBits = buf.getLong();
-        long leastSigBits = buf.getLong();
-        return new UUID(mostSigBits, leastSigBits);
-      }
+    public UUID convertNonNullValue(ColumnVector vector, int row) {
+      BytesColumnVector bytesVector = (BytesColumnVector) vector;
+      ByteBuffer buf = ByteBuffer.wrap(bytesVector.vector[row], 
bytesVector.start[row], bytesVector.length[row]);
+      long mostSigBits = buf.getLong();
+      long leastSigBits = buf.getLong();
+      return new UUID(mostSigBits, leastSigBits);
     }
   }
 
   private static class StringConverter implements Converter<String> {
     @Override
-    public String convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        BytesColumnVector bytesVector = (BytesColumnVector) vector;
-        return new String(bytesVector.vector[rowIndex], 
bytesVector.start[rowIndex], bytesVector.length[rowIndex],
-            StandardCharsets.UTF_8);
-      }
+    public String convertNonNullValue(ColumnVector vector, int row) {
+      BytesColumnVector bytesVector = (BytesColumnVector) vector;
+      return new String(bytesVector.vector[row], bytesVector.start[row], 
bytesVector.length[row],
+          StandardCharsets.UTF_8);
     }
   }
 
   private static class DecimalConverter implements Converter<BigDecimal> {
     @Override
-    public BigDecimal convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        DecimalColumnVector cv = (DecimalColumnVector) vector;
-        return 
cv.vector[rowIndex].getHiveDecimal().bigDecimalValue().setScale(cv.scale);
-      }
+    public BigDecimal convertNonNullValue(ColumnVector vector, int row) {
+      DecimalColumnVector cv = (DecimalColumnVector) vector;
+      return 
cv.vector[row].getHiveDecimal().bigDecimalValue().setScale(cv.scale);
     }
   }
 
@@ -326,29 +247,21 @@ public class GenericOrcReader implements 
OrcValueReader<Record> {
           .get(0), child);
     }
 
-    List<?> readList(ListColumnVector vector, int row) {
-      int offset = (int) vector.offsets[row];
-      int length = (int) vector.lengths[row];
+    @Override
+    public List<?> convertNonNullValue(ColumnVector vector, int row) {
+      ListColumnVector listVector = (ListColumnVector) vector;
+      int offset = (int) listVector.offsets[row];
+      int length = (int) listVector.lengths[row];
 
       List<Object> list = Lists.newArrayListWithExpectedSize(length);
       for (int c = 0; c < length; ++c) {
-        list.add(childConverter.convert(vector.child, offset + c));
+        list.add(childConverter.convert(listVector.child, offset + c));
       }
       return list;
     }
-
-    @Override
-    public List<?> convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        return readList((ListColumnVector) vector, rowIndex);
-      }
-    }
   }
 
-  private static class MapConverter implements Converter {
+  private static class MapConverter implements Converter<Map<?, ?>> {
     private final Converter keyConvert;
     private final Converter valueConvert;
 
@@ -362,29 +275,21 @@ public class GenericOrcReader implements 
OrcValueReader<Record> {
       valueConvert = buildConverter(mapFields.get(1), valueType);
     }
 
-    Map<?, ?> readMap(MapColumnVector vector, int row) {
-      final int offset = (int) vector.offsets[row];
-      final int length = (int) vector.lengths[row];
+    @Override
+    public Map<?, ?> convertNonNullValue(ColumnVector vector, int row) {
+      MapColumnVector mapVector = (MapColumnVector) vector;
+      final int offset = (int) mapVector.offsets[row];
+      final int length = (int) mapVector.lengths[row];
 
       Map<Object, Object> map = Maps.newHashMapWithExpectedSize(length);
       for (int c = 0; c < length; ++c) {
-        Object key = keyConvert.convert(vector.keys, offset + c);
-        Object value = valueConvert.convert(vector.values, offset + c);
+        Object key = keyConvert.convert(mapVector.keys, offset + c);
+        Object value = valueConvert.convert(mapVector.values, offset + c);
         map.put(key, value);
       }
 
       return map;
     }
-
-    @Override
-    public Map convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        return readMap((MapColumnVector) vector, rowIndex);
-      }
-    }
   }
 
   private static class StructConverter implements Converter<Record> {
@@ -404,23 +309,15 @@ public class GenericOrcReader implements 
OrcValueReader<Record> {
       }
     }
 
-    Record writeStruct(StructColumnVector vector, int row) {
+    @Override
+    public Record convertNonNullValue(ColumnVector vector, int row) {
+      StructColumnVector structVector = (StructColumnVector) vector;
       Record data = GenericRecord.create(icebergStructSchema);
       for (int c = 0; c < children.length; ++c) {
-        data.set(c, children[c].convert(vector.fields[c], row));
+        data.set(c, children[c].convert(structVector.fields[c], row));
       }
       return data;
     }
-
-    @Override
-    public Record convert(ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        return null;
-      } else {
-        return writeStruct((StructColumnVector) vector, rowIndex);
-      }
-    }
   }
 
   private static Converter buildConverter(final Types.NestedField 
icebergField, final TypeDescription schema) {
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java 
b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
index b965c6d..829ca45 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
@@ -212,230 +212,90 @@ public class SparkOrcReader implements 
OrcValueReader<InternalRow> {
    * methods.
    */
   interface Converter {
-    void convert(UnsafeRowWriter writer, int column, ColumnVector vector, int 
row);
-    void convert(UnsafeArrayWriter writer, int element, ColumnVector vector, 
int row);
-  }
-
-  private static class BooleanConverter implements Converter {
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector, int row) {
+    default void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector, int row) {
       int rowIndex = vector.isRepeating ? 0 : row;
       if (!vector.noNulls && vector.isNull[rowIndex]) {
         writer.setNullAt(column);
       } else {
-        writer.write(column, ((LongColumnVector) vector).vector[rowIndex] != 
0);
+        convertNonNullValue(writer, column, vector, rowIndex);
       }
     }
 
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
+    default void convert(UnsafeArrayWriter writer, int element, ColumnVector 
vector, int row) {
       int rowIndex = vector.isRepeating ? 0 : row;
       if (!vector.noNulls && vector.isNull[rowIndex]) {
         writer.setNull(element);
       } else {
-        writer.write(element, ((LongColumnVector) vector).vector[rowIndex] != 
0);
+        convertNonNullValue(writer, element, vector, rowIndex);
       }
     }
+
+    void convertNonNullValue(UnsafeWriter writer, int ordinal, ColumnVector 
vector, int row);
   }
 
-  private static class ByteConverter implements Converter {
+  private static class BooleanConverter implements Converter {
     @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
-                        int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, (byte) ((LongColumnVector) 
vector).vector[rowIndex]);
-      }
+    public void convertNonNullValue(UnsafeWriter writer, int ordinal, 
ColumnVector vector, int row) {
+      writer.write(ordinal, ((LongColumnVector) vector).vector[row] != 0);
     }
+  }
 
+  private static class ByteConverter implements Converter {
     @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, (byte) ((LongColumnVector) 
vector).vector[rowIndex]);
-      }
+    public void convertNonNullValue(UnsafeWriter writer, int ordinal, 
ColumnVector vector, int row) {
+      writer.write(ordinal, (byte) ((LongColumnVector) vector).vector[row]);
     }
   }
 
   private static class ShortConverter implements Converter {
     @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
-                        int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, (short) ((LongColumnVector) 
vector).vector[rowIndex]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, (short) ((LongColumnVector) 
vector).vector[rowIndex]);
-      }
+    public void convertNonNullValue(UnsafeWriter writer, int ordinal, 
ColumnVector vector, int row) {
+      writer.write(ordinal, (short) ((LongColumnVector) vector).vector[row]);
     }
   }
 
   private static class IntConverter implements Converter {
     @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
-                        int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, (int) ((LongColumnVector) 
vector).vector[rowIndex]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, (int) ((LongColumnVector) 
vector).vector[rowIndex]);
-      }
+    public void convertNonNullValue(UnsafeWriter writer, int ordinal, 
ColumnVector vector, int row) {
+      writer.write(ordinal, (int) ((LongColumnVector) vector).vector[row]);
     }
   }
 
   private static class LongConverter implements Converter {
     @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
-                        int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, ((LongColumnVector) vector).vector[rowIndex]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, ((LongColumnVector) vector).vector[rowIndex]);
-      }
+    public void convertNonNullValue(UnsafeWriter writer, int ordinal, 
ColumnVector vector, int row) {
+      writer.write(ordinal, ((LongColumnVector) vector).vector[row]);
     }
   }
 
   private static class FloatConverter implements Converter {
     @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
-                        int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, (float) ((DoubleColumnVector) 
vector).vector[rowIndex]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, (float) ((DoubleColumnVector) 
vector).vector[rowIndex]);
-      }
+    public void convertNonNullValue(UnsafeWriter writer, int ordinal, 
ColumnVector vector, int row) {
+      writer.write(ordinal, (float) ((DoubleColumnVector) vector).vector[row]);
     }
   }
 
   private static class DoubleConverter implements Converter {
     @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
-                        int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, ((DoubleColumnVector) vector).vector[rowIndex]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, ((DoubleColumnVector) vector).vector[rowIndex]);
-      }
+    public void convertNonNullValue(UnsafeWriter writer, int ordinal, 
ColumnVector vector, int row) {
+      writer.write(ordinal, ((DoubleColumnVector) vector).vector[row]);
     }
   }
 
   private static class TimestampTzConverter implements Converter {
-
-    private long convert(TimestampColumnVector vector, int row) {
-      // compute microseconds past 1970.
-      return (vector.time[row] / 1000) * 1_000_000 + vector.nanos[row] / 1000;
-    }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
-                        int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNullAt(column);
-      } else {
-        writer.write(column, convert((TimestampColumnVector) vector, 
rowIndex));
-      }
-    }
-
     @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNull(element);
-      } else {
-        writer.write(element, convert((TimestampColumnVector) vector, 
rowIndex));
-      }
+    public void convertNonNullValue(UnsafeWriter writer, int ordinal, 
ColumnVector vector, int row) {
+      TimestampColumnVector timestampVector = (TimestampColumnVector) vector;
+      // compute microseconds past 1970.
+      writer.write(ordinal, (timestampVector.time[row] / 1000) * 1_000_000 + 
timestampVector.nanos[row] / 1000);
     }
   }
 
   private static class BinaryConverter implements Converter {
-
     @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNullAt(column);
-      } else {
-        BytesColumnVector bytesVector = (BytesColumnVector) vector;
-        writer.write(column, bytesVector.vector[rowIndex],
-            bytesVector.start[rowIndex], bytesVector.length[rowIndex]);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element, ColumnVector 
vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNull(element);
-      } else {
-        final BytesColumnVector v = (BytesColumnVector) vector;
-        writer.write(element, v.vector[rowIndex], v.start[rowIndex], 
v.length[rowIndex]);
-      }
+    public void convertNonNullValue(UnsafeWriter writer, int ordinal, 
ColumnVector vector, int row) {
+      BytesColumnVector bytesVector = (BytesColumnVector) vector;
+      writer.write(ordinal, bytesVector.vector[row], bytesVector.start[row], 
bytesVector.length[row]);
     }
   }
 
@@ -449,31 +309,11 @@ public class SparkOrcReader implements 
OrcValueReader<InternalRow> {
     }
 
     @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
-                        int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNullAt(column);
-      } else {
-        HiveDecimalWritable value = ((DecimalColumnVector) 
vector).vector[rowIndex];
-        writer.write(column,
-            new Decimal().set(value.serialize64(value.scale()), 
value.precision(), value.scale()),
-            precision, scale);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNull(element);
-      } else {
-        HiveDecimalWritable value = ((DecimalColumnVector) 
vector).vector[rowIndex];
-        writer.write(element,
-            new Decimal().set(value.serialize64(value.scale()), 
value.precision(), value.scale()),
-            precision, scale);
-      }
+    public void convertNonNullValue(UnsafeWriter writer, int ordinal, 
ColumnVector vector, int row) {
+      HiveDecimalWritable value = ((DecimalColumnVector) vector).vector[row];
+      writer.write(ordinal,
+          new Decimal().set(value.serialize64(value.scale()), 
value.precision(), value.scale()),
+          precision, scale);
     }
   }
 
@@ -487,33 +327,12 @@ public class SparkOrcReader implements 
OrcValueReader<InternalRow> {
     }
 
     @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
-                        int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNullAt(column);
-      } else {
-        BigDecimal value = ((DecimalColumnVector) vector).vector[rowIndex]
-            .getHiveDecimal().bigDecimalValue();
-        writer.write(column,
-            new Decimal().set(new scala.math.BigDecimal(value), precision, 
scale),
-            precision, scale);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNull(element);
-      } else {
-        BigDecimal value = ((DecimalColumnVector) vector).vector[rowIndex]
-            .getHiveDecimal().bigDecimalValue();
-        writer.write(element,
-            new Decimal().set(new scala.math.BigDecimal(value), precision, 
scale),
-            precision, scale);
-      }
+    public void convertNonNullValue(UnsafeWriter writer, int ordinal, 
ColumnVector vector, int row) {
+      BigDecimal value = ((DecimalColumnVector) vector).vector[row]
+          .getHiveDecimal().bigDecimalValue();
+      writer.write(ordinal,
+          new Decimal().set(new scala.math.BigDecimal(value), precision, 
scale),
+          precision, scale);
     }
   }
 
@@ -527,37 +346,16 @@ public class SparkOrcReader implements 
OrcValueReader<InternalRow> {
       }
     }
 
-    int writeStruct(UnsafeWriter parentWriter, StructColumnVector vector, int 
row) {
-      UnsafeRowWriter childWriter = new UnsafeRowWriter(parentWriter, 
children.length);
+    @Override
+    public void convertNonNullValue(UnsafeWriter writer, int ordinal, 
ColumnVector vector, int row) {
+      StructColumnVector structVector = (StructColumnVector) vector;
+      UnsafeRowWriter childWriter = new UnsafeRowWriter(writer, 
children.length);
       int start = childWriter.cursor();
       childWriter.resetRowWriter();
       for (int c = 0; c < children.length; ++c) {
-        children[c].convert(childWriter, c, vector.fields[c], row);
-      }
-      return start;
-    }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNullAt(column);
-      } else {
-        int start = writeStruct(writer, (StructColumnVector) vector, rowIndex);
-        writer.setOffsetAndSizeFromPreviousCursor(column, start);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNull(element);
-      } else {
-        int start = writeStruct(writer, (StructColumnVector) vector, rowIndex);
-        writer.setOffsetAndSizeFromPreviousCursor(element, start);
+        children[c].convert(childWriter, c, structVector.fields[c], row);
       }
+      writer.setOffsetAndSizeFromPreviousCursor(ordinal, start);
     }
   }
 
@@ -570,41 +368,19 @@ public class SparkOrcReader implements 
OrcValueReader<InternalRow> {
       childConverter = buildConverter(child);
     }
 
-    int writeList(UnsafeWriter parentWriter, ListColumnVector vector, int row) 
{
-      int offset = (int) vector.offsets[row];
-      int length = (int) vector.lengths[row];
+    @Override
+    public void convertNonNullValue(UnsafeWriter writer, int ordinal, 
ColumnVector vector, int row) {
+      ListColumnVector listVector = (ListColumnVector) vector;
+      int offset = (int) listVector.offsets[row];
+      int length = (int) listVector.lengths[row];
 
-      UnsafeArrayWriter childWriter = new UnsafeArrayWriter(parentWriter, 
getArrayElementSize(child));
+      UnsafeArrayWriter childWriter = new UnsafeArrayWriter(writer, 
getArrayElementSize(child));
       int start = childWriter.cursor();
       childWriter.initialize(length);
       for (int c = 0; c < length; ++c) {
-        childConverter.convert(childWriter, c, vector.child, offset + c);
-      }
-      return start;
-    }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector,
-                        int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNullAt(column);
-      } else {
-        int start = writeList(writer, (ListColumnVector) vector, rowIndex);
-        writer.setOffsetAndSizeFromPreviousCursor(column, start);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element,
-                        ColumnVector vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNull(element);
-      } else {
-        int start = writeList(writer, (ListColumnVector) vector, rowIndex);
-        writer.setOffsetAndSizeFromPreviousCursor(element, start);
+        childConverter.convert(childWriter, c, listVector.child, offset + c);
       }
+      writer.setOffsetAndSizeFromPreviousCursor(ordinal, start);
     }
   }
 
@@ -627,11 +403,13 @@ public class SparkOrcReader implements 
OrcValueReader<InternalRow> {
       valueSize = getArrayElementSize(valueType);
     }
 
-    int writeMap(UnsafeWriter parentWriter, MapColumnVector vector, int row) {
-      final int offset = (int) vector.offsets[row];
-      final int length = (int) vector.lengths[row];
+    @Override
+    public void convertNonNullValue(UnsafeWriter writer, int ordinal, 
ColumnVector vector, int row) {
+      MapColumnVector mapVector = (MapColumnVector) vector;
+      final int offset = (int) mapVector.offsets[row];
+      final int length = (int) mapVector.lengths[row];
 
-      UnsafeArrayWriter keyWriter = new UnsafeArrayWriter(parentWriter, 
keySize);
+      UnsafeArrayWriter keyWriter = new UnsafeArrayWriter(writer, keySize);
       final int start = keyWriter.cursor();
       // save room for the key size
       keyWriter.grow(KEY_SIZE_BYTES);
@@ -640,41 +418,19 @@ public class SparkOrcReader implements 
OrcValueReader<InternalRow> {
       // serialize the keys
       keyWriter.initialize(length);
       for (int c = 0; c < length; ++c) {
-        keyConvert.convert(keyWriter, c, vector.keys, offset + c);
+        keyConvert.convert(keyWriter, c, mapVector.keys, offset + c);
       }
       // store the serialized size of the keys
       Platform.putLong(keyWriter.getBuffer(), start,
-                keyWriter.cursor() - start - KEY_SIZE_BYTES);
+          keyWriter.cursor() - start - KEY_SIZE_BYTES);
 
       // serialize the values
-      UnsafeArrayWriter valueWriter = new UnsafeArrayWriter(parentWriter, 
valueSize);
+      UnsafeArrayWriter valueWriter = new UnsafeArrayWriter(writer, valueSize);
       valueWriter.initialize(length);
       for (int c = 0; c < length; ++c) {
-        valueConvert.convert(valueWriter, c, vector.values, offset + c);
-      }
-      return start;
-    }
-
-    @Override
-    public void convert(UnsafeRowWriter writer, int column, ColumnVector 
vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNullAt(column);
-      } else {
-        int start = writeMap(writer, (MapColumnVector) vector, rowIndex);
-        writer.setOffsetAndSizeFromPreviousCursor(column, start);
-      }
-    }
-
-    @Override
-    public void convert(UnsafeArrayWriter writer, int element, ColumnVector 
vector, int row) {
-      int rowIndex = vector.isRepeating ? 0 : row;
-      if (!vector.noNulls && vector.isNull[rowIndex]) {
-        writer.setNull(element);
-      } else {
-        int start = writeMap(writer, (MapColumnVector) vector, rowIndex);
-        writer.setOffsetAndSizeFromPreviousCursor(element, start);
+        valueConvert.convert(valueWriter, c, mapVector.values, offset + c);
       }
+      writer.setOffsetAndSizeFromPreviousCursor(ordinal, start);
     }
   }
 

Reply via email to