This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 851c62e  Add PartitionKey to the public API (#1195)
851c62e is described below

commit 851c62edd2d068eb8023311f4bbc4380ad4e8e1c
Author: Ryan Blue <[email protected]>
AuthorDate: Mon Jul 13 16:04:23 2020 -0700

    Add PartitionKey to the public API (#1195)
    
    * Add PartitionKey to the public API.
    
    * Handle null values.
---
 .../main/java/org/apache/iceberg/PartitionKey.java | 139 ++++++++
 .../apache/iceberg/spark/source/BaseWriter.java    |   1 +
 .../iceberg/spark/source/InternalRowWrapper.java   |  96 ++++++
 .../iceberg/spark/source/OutputFileFactory.java    |   1 +
 .../apache/iceberg/spark/source/PartitionKey.java  | 364 ---------------------
 .../iceberg/spark/source/PartitionedWriter.java    |   6 +-
 6 files changed, 242 insertions(+), 365 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/PartitionKey.java 
b/api/src/main/java/org/apache/iceberg/PartitionKey.java
new file mode 100644
index 0000000..71cdb27
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/PartitionKey.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.transforms.Transform;
+
+/**
+ * A struct of partition values.
+ * <p>
+ * Instances of this class can produce partition values from a data row passed 
to {@link #partition(StructLike)}.
+ */
+public class PartitionKey implements StructLike, Serializable {
+
+  private final PartitionSpec spec;
+  private final int size;
+  private final Object[] partitionTuple;
+  private final Transform[] transforms;
+  private final Accessor<StructLike>[] accessors;
+
+  @SuppressWarnings("unchecked")
+  public PartitionKey(PartitionSpec spec, Schema inputSchema) {
+    this.spec = spec;
+
+    List<PartitionField> fields = spec.fields();
+    this.size = fields.size();
+    this.partitionTuple = new Object[size];
+    this.transforms = new Transform[size];
+    this.accessors = (Accessor<StructLike>[]) 
Array.newInstance(Accessor.class, size);
+
+    Schema schema = spec.schema();
+    for (int i = 0; i < size; i += 1) {
+      PartitionField field = fields.get(i);
+      Accessor<StructLike> accessor = 
inputSchema.accessorForField(field.sourceId());
+      Preconditions.checkArgument(accessor != null,
+          "Cannot build accessor for field: " + 
schema.findField(field.sourceId()));
+      this.accessors[i] = accessor;
+      this.transforms[i] = field.transform();
+    }
+  }
+
+  private PartitionKey(PartitionKey toCopy) {
+    this.spec = toCopy.spec;
+    this.size = toCopy.size;
+    this.partitionTuple = new Object[toCopy.partitionTuple.length];
+    this.transforms = toCopy.transforms;
+    this.accessors = toCopy.accessors;
+
+    System.arraycopy(toCopy.partitionTuple, 0, this.partitionTuple, 0, 
partitionTuple.length);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    for (int i = 0; i < partitionTuple.length; i += 1) {
+      if (i > 0) {
+        sb.append(", ");
+      }
+      sb.append(partitionTuple[i]);
+    }
+    sb.append("]");
+    return sb.toString();
+  }
+
+  public PartitionKey copy() {
+    return new PartitionKey(this);
+  }
+
+  public String toPath() {
+    return spec.partitionToPath(this);
+  }
+
+  /**
+   * Replace this key's partition values with the partition values for the row.
+   *
+   * @param row a StructLike row
+   */
+  @SuppressWarnings("unchecked")
+  public void partition(StructLike row) {
+    for (int i = 0; i < partitionTuple.length; i += 1) {
+      Transform<Object, Object> transform = transforms[i];
+      partitionTuple[i] = transform.apply(accessors[i].get(row));
+    }
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  @Override
+  public <T> T get(int pos, Class<T> javaClass) {
+    return javaClass.cast(partitionTuple[pos]);
+  }
+
+  @Override
+  public <T> void set(int pos, T value) {
+    partitionTuple[pos] = value;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    } else if (!(o instanceof PartitionKey)) {
+      return false;
+    }
+
+    PartitionKey that = (PartitionKey) o;
+    return Arrays.equals(partitionTuple, that.partitionTuple);
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(partitionTuple);
+  }
+}
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/source/BaseWriter.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/BaseWriter.java
index f3a1030..8c41e77 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseWriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseWriter.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.io.FileAppender;
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
new file mode 100644
index 0000000..ef1eb08
--- /dev/null
+++ 
b/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.nio.ByteBuffer;
+import java.util.function.BiFunction;
+import java.util.stream.Stream;
+import org.apache.iceberg.StructLike;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Class to adapt a Spark {@code InternalRow} to Iceberg {@link StructLike} 
for uses like
+ * {@link org.apache.iceberg.PartitionKey#partition(StructLike)}
+ */
+class InternalRowWrapper implements StructLike {
+  private final DataType[] types;
+  private final BiFunction<InternalRow, Integer, ?>[] getters;
+  private InternalRow row = null;
+
+  @SuppressWarnings("unchecked")
+  InternalRowWrapper(StructType rowType) {
+    this.types = Stream.of(rowType.fields())
+        .map(StructField::dataType)
+        .toArray(DataType[]::new);
+    this.getters = Stream.of(types)
+        .map(InternalRowWrapper::getter)
+        .toArray(BiFunction[]::new);
+  }
+
+  InternalRowWrapper wrap(InternalRow internalRow) {
+    this.row = internalRow;
+    return this;
+  }
+
+  @Override
+  public int size() {
+    return types.length;
+  }
+
+  @Override
+  public <T> T get(int pos, Class<T> javaClass) {
+    if (row.isNullAt(pos)) {
+      return null;
+    } else if (getters[pos] != null) {
+      return javaClass.cast(getters[pos].apply(row, pos));
+    }
+
+    return javaClass.cast(row.get(pos, types[pos]));
+  }
+
+  @Override
+  public <T> void set(int pos, T value) {
+    row.update(pos, value);
+  }
+
+  private static BiFunction<InternalRow, Integer, ?> getter(DataType type) {
+    if (type instanceof StringType) {
+      return (row, pos) -> row.getUTF8String(pos).toString();
+    } else if (type instanceof DecimalType) {
+      DecimalType decimal = (DecimalType) type;
+      return (row, pos) ->
+          row.getDecimal(pos, decimal.precision(), 
decimal.scale()).toJavaBigDecimal();
+    } else if (type instanceof BinaryType) {
+      return (row, pos) -> ByteBuffer.wrap(row.getBinary(pos));
+    } else if (type instanceof StructType) {
+      StructType structType = (StructType) type;
+      InternalRowWrapper nestedWrapper = new InternalRowWrapper(structType);
+      return (row, pos) -> nestedWrapper.wrap(row.getStruct(pos, 
structType.size()));
+    }
+
+    return null;
+  }
+}
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/source/OutputFileFactory.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/OutputFileFactory.java
index 1daf889..08e66df 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/OutputFileFactory.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/OutputFileFactory.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.spark.source;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.encryption.EncryptionManager;
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java
deleted file mode 100644
index 292be8a..0000000
--- a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionKey.java
+++ /dev/null
@@ -1,364 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.spark.source;
-
-import java.lang.reflect.Array;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import org.apache.iceberg.PartitionField;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.transforms.Transform;
-import org.apache.iceberg.types.Type;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.unsafe.types.UTF8String;
-
-class PartitionKey implements StructLike {
-
-  private final PartitionSpec spec;
-  private final int size;
-  private final Object[] partitionTuple;
-  private final Transform[] transforms;
-  private final Accessor<InternalRow>[] accessors;
-
-  @SuppressWarnings("unchecked")
-  PartitionKey(PartitionSpec spec, Schema inputSchema) {
-    this.spec = spec;
-
-    List<PartitionField> fields = spec.fields();
-    this.size = fields.size();
-    this.partitionTuple = new Object[size];
-    this.transforms = new Transform[size];
-    this.accessors = (Accessor<InternalRow>[]) 
Array.newInstance(Accessor.class, size);
-
-    Schema schema = spec.schema();
-    Map<Integer, Accessor<InternalRow>> newAccessors = 
buildAccessors(inputSchema);
-    for (int i = 0; i < size; i += 1) {
-      PartitionField field = fields.get(i);
-      Accessor<InternalRow> accessor = newAccessors.get(field.sourceId());
-      if (accessor == null) {
-        throw new RuntimeException(
-            "Cannot build accessor for field: " + 
schema.findField(field.sourceId()));
-      }
-      this.accessors[i] = accessor;
-      this.transforms[i] = field.transform();
-    }
-  }
-
-  private PartitionKey(PartitionKey toCopy) {
-    this.spec = toCopy.spec;
-    this.size = toCopy.size;
-    this.partitionTuple = new Object[toCopy.partitionTuple.length];
-    this.transforms = toCopy.transforms;
-    this.accessors = toCopy.accessors;
-
-    for (int i = 0; i < partitionTuple.length; i += 1) {
-      this.partitionTuple[i] = defensiveCopyIfNeeded(toCopy.partitionTuple[i]);
-    }
-  }
-
-  private Object defensiveCopyIfNeeded(Object obj) {
-    if (obj instanceof UTF8String) {
-      // bytes backing the UTF8 string might be reused
-      byte[] bytes = ((UTF8String) obj).getBytes();
-      return UTF8String.fromBytes(Arrays.copyOf(bytes, bytes.length));
-    }
-    return obj;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("[");
-    for (int i = 0; i < partitionTuple.length; i += 1) {
-      if (i > 0) {
-        sb.append(", ");
-      }
-      sb.append(partitionTuple[i]);
-    }
-    sb.append("]");
-    return sb.toString();
-  }
-
-  PartitionKey copy() {
-    return new PartitionKey(this);
-  }
-
-  String toPath() {
-    return spec.partitionToPath(this);
-  }
-
-  @SuppressWarnings("unchecked")
-  void partition(InternalRow row) {
-    for (int i = 0; i < partitionTuple.length; i += 1) {
-      Transform<Object, Object> transform = transforms[i];
-      partitionTuple[i] = transform.apply(accessors[i].get(row));
-    }
-  }
-
-  @Override
-  public int size() {
-    return size;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public <T> T get(int pos, Class<T> javaClass) {
-    return javaClass.cast(partitionTuple[pos]);
-  }
-
-  @Override
-  public <T> void set(int pos, T value) {
-    partitionTuple[pos] = value;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    } else if (!(o instanceof PartitionKey)) {
-      return false;
-    }
-
-    PartitionKey that = (PartitionKey) o;
-    return Arrays.equals(partitionTuple, that.partitionTuple);
-  }
-
-  @Override
-  public int hashCode() {
-    return Arrays.hashCode(partitionTuple);
-  }
-
-  private interface Accessor<T> {
-    Object get(T container);
-  }
-
-  private static Map<Integer, Accessor<InternalRow>> buildAccessors(Schema 
schema) {
-    return TypeUtil.visit(schema, new BuildPositionAccessors());
-  }
-
-  private static Accessor<InternalRow> newAccessor(int position, Type type) {
-    switch (type.typeId()) {
-      case STRING:
-        return new StringAccessor(position, SparkSchemaUtil.convert(type));
-      case DECIMAL:
-        return new DecimalAccessor(position, SparkSchemaUtil.convert(type));
-      case BINARY:
-        return new BytesAccessor(position, SparkSchemaUtil.convert(type));
-      default:
-        return new PositionAccessor(position, SparkSchemaUtil.convert(type));
-    }
-  }
-
-  private static Accessor<InternalRow> newAccessor(int position, boolean 
isOptional, Types.StructType type,
-                                                   Accessor<InternalRow> 
accessor) {
-    int size = type.fields().size();
-    if (isOptional) {
-      // the wrapped position handles null layers
-      return new WrappedPositionAccessor(position, size, accessor);
-    } else if (accessor.getClass() == PositionAccessor.class) {
-      return new Position2Accessor(position, size, (PositionAccessor) 
accessor);
-    } else if (accessor instanceof Position2Accessor) {
-      return new Position3Accessor(position, size, (Position2Accessor) 
accessor);
-    } else {
-      return new WrappedPositionAccessor(position, size, accessor);
-    }
-  }
-
-  private static class BuildPositionAccessors
-      extends TypeUtil.SchemaVisitor<Map<Integer, Accessor<InternalRow>>> {
-    @Override
-    public Map<Integer, Accessor<InternalRow>> schema(
-        Schema schema, Map<Integer, Accessor<InternalRow>> structResult) {
-      return structResult;
-    }
-
-    @Override
-    public Map<Integer, Accessor<InternalRow>> struct(
-        Types.StructType struct, List<Map<Integer, Accessor<InternalRow>>> 
fieldResults) {
-      Map<Integer, Accessor<InternalRow>> accessors = Maps.newHashMap();
-      List<Types.NestedField> fields = struct.fields();
-      for (int i = 0; i < fieldResults.size(); i += 1) {
-        Types.NestedField field = fields.get(i);
-        Map<Integer, Accessor<InternalRow>> result = fieldResults.get(i);
-        if (result != null) {
-          for (Map.Entry<Integer, Accessor<InternalRow>> entry : 
result.entrySet()) {
-            accessors.put(entry.getKey(), newAccessor(i, field.isOptional(),
-                field.type().asNestedType().asStructType(), entry.getValue()));
-          }
-        } else {
-          accessors.put(field.fieldId(), newAccessor(i, field.type()));
-        }
-      }
-
-      if (accessors.isEmpty()) {
-        return null;
-      }
-
-      return accessors;
-    }
-
-    @Override
-    public Map<Integer, Accessor<InternalRow>> field(
-        Types.NestedField field, Map<Integer, Accessor<InternalRow>> 
fieldResult) {
-      return fieldResult;
-    }
-  }
-
-  private static class PositionAccessor implements Accessor<InternalRow> {
-    private final DataType type;
-    private int position;
-
-    private PositionAccessor(int position, DataType type) {
-      this.position = position;
-      this.type = type;
-    }
-
-    @Override
-    public Object get(InternalRow row) {
-      if (row.isNullAt(position)) {
-        return null;
-      }
-      return row.get(position, type);
-    }
-
-    DataType type() {
-      return type;
-    }
-
-    int position() {
-      return position;
-    }
-  }
-
-  private static class StringAccessor extends PositionAccessor {
-    private StringAccessor(int position, DataType type) {
-      super(position, type);
-    }
-
-    @Override
-    public Object get(InternalRow row) {
-      if (row.isNullAt(position())) {
-        return null;
-      }
-      return row.get(position(), type()).toString();
-    }
-  }
-
-  private static class DecimalAccessor extends PositionAccessor {
-    private DecimalAccessor(int position, DataType type) {
-      super(position, type);
-    }
-
-    @Override
-    public Object get(InternalRow row) {
-      if (row.isNullAt(position())) {
-        return null;
-      }
-      return ((Decimal) row.get(position(), type())).toJavaBigDecimal();
-    }
-  }
-
-  private static class BytesAccessor extends PositionAccessor {
-    private BytesAccessor(int position, DataType type) {
-      super(position, type);
-    }
-
-    @Override
-    public Object get(InternalRow row) {
-      if (row.isNullAt(position())) {
-        return null;
-      }
-      return ByteBuffer.wrap((byte[]) row.get(position(), type()));
-    }
-  }
-
-  private static class Position2Accessor implements Accessor<InternalRow> {
-    private final int p0;
-    private final int size0;
-    private final int p1;
-    private final DataType type;
-
-    private Position2Accessor(int position, int size, PositionAccessor 
wrapped) {
-      this.p0 = position;
-      this.size0 = size;
-      this.p1 = wrapped.position;
-      this.type = wrapped.type;
-    }
-
-    @Override
-    public Object get(InternalRow row) {
-      return row.getStruct(p0, size0).get(p1, type);
-    }
-  }
-
-  private static class Position3Accessor implements Accessor<InternalRow> {
-    private final int p0;
-    private final int size0;
-    private final int p1;
-    private final int size1;
-    private final int p2;
-    private final DataType type;
-
-    private Position3Accessor(int position, int size, Position2Accessor 
wrapped) {
-      this.p0 = position;
-      this.size0 = size;
-      this.p1 = wrapped.p0;
-      this.size1 = wrapped.size0;
-      this.p2 = wrapped.p1;
-      this.type = wrapped.type;
-    }
-
-    @Override
-    public Object get(InternalRow row) {
-      return row.getStruct(p0, size0).getStruct(p1, size1).get(p2, type);
-    }
-  }
-
-  private static class WrappedPositionAccessor implements 
Accessor<InternalRow> {
-    private final int position;
-    private final int size;
-    private final Accessor<InternalRow> accessor;
-
-    private WrappedPositionAccessor(int position, int size, 
Accessor<InternalRow> accessor) {
-      this.position = position;
-      this.size = size;
-      this.accessor = accessor;
-    }
-
-    @Override
-    public Object get(InternalRow row) {
-      InternalRow inner = row.getStruct(position, size);
-      if (inner != null) {
-        return accessor.get(inner);
-      }
-      return null;
-    }
-  }
-}
diff --git 
a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionedWriter.java 
b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionedWriter.java
index 7cc0c9f..0ead766 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionedWriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionedWriter.java
@@ -22,11 +22,13 @@ package org.apache.iceberg.spark.source;
 import java.io.IOException;
 import java.util.Set;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,17 +37,19 @@ class PartitionedWriter extends BaseWriter {
   private static final Logger LOG = 
LoggerFactory.getLogger(PartitionedWriter.class);
 
   private final PartitionKey key;
+  private final InternalRowWrapper wrapper;
   private final Set<PartitionKey> completedPartitions = Sets.newHashSet();
 
   PartitionedWriter(PartitionSpec spec, FileFormat format, 
SparkAppenderFactory appenderFactory,
                     OutputFileFactory fileFactory, FileIO io, long 
targetFileSize, Schema writeSchema) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
     this.key = new PartitionKey(spec, writeSchema);
+    this.wrapper = new 
InternalRowWrapper(SparkSchemaUtil.convert(writeSchema));
   }
 
   @Override
   public void write(InternalRow row) throws IOException {
-    key.partition(row);
+    key.partition(wrapper.wrap(row));
 
     PartitionKey currentKey = getCurrentKey();
     if (!key.equals(currentKey)) {

Reply via email to