rdblue commented on a change in pull request #1124:
URL: https://github.com/apache/iceberg/pull/1124#discussion_r447159974



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.expressions.BoundPredicate;
+import org.apache.iceberg.expressions.ExpressionVisitors;
+import org.apache.iceberg.expressions.UnboundPredicate;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.transforms.PartitionSpecVisitor;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.expressions.Expression;
+import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.expressions.Literal;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class Spark3Util {
+
+  private static final ImmutableSet<String> LOCALITY_WHITELIST_FS = 
ImmutableSet.of("hdfs");
+  private static final Joiner DOT = Joiner.on(".");
+
+  private Spark3Util() {
+  }
+
+  /**
+   * Applies a list of Spark table changes to an {@link UpdateProperties} 
operation.
+   * <p>
+   * All non-property changes in the list are ignored.
+   *
+   * @param pendingUpdate an uncommitted UpdateProperties operation to 
configure
+   * @param changes a list of Spark table changes
+   * @return the UpdateProperties operation configured with the changes
+   */
+  public static UpdateProperties applyPropertyChanges(UpdateProperties 
pendingUpdate, List<TableChange> changes) {
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.SetProperty) {
+        TableChange.SetProperty set = (TableChange.SetProperty) change;
+        pendingUpdate.set(set.property(), set.value());
+
+      } else if (change instanceof TableChange.RemoveProperty) {
+        TableChange.RemoveProperty remove = (TableChange.RemoveProperty) 
change;
+        pendingUpdate.remove(remove.property());
+
+      } else {
+        throw new UnsupportedOperationException("Cannot apply unknown table 
change: " + change);
+      }
+    }
+
+    return pendingUpdate;
+  }
+
+  /**
+   * Applies a list of Spark table changes to an {@link UpdateSchema} 
operation.
+   * <p>
+   * All non-schema changes in the list are ignored.
+   *
+   * @param pendingUpdate an uncommitted UpdateSchema operation to configure
+   * @param changes a list of Spark table changes
+   * @return the UpdateSchema operation configured with the changes
+   */
+  public static UpdateSchema applySchemaChanges(UpdateSchema pendingUpdate, 
List<TableChange> changes) {
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.AddColumn) {
+        apply(pendingUpdate, (TableChange.AddColumn) change);
+
+      } else if (change instanceof TableChange.UpdateColumnType) {
+        TableChange.UpdateColumnType update = (TableChange.UpdateColumnType) 
change;
+        Type newType = SparkSchemaUtil.convert(update.newDataType());
+        Preconditions.checkArgument(newType.isPrimitiveType(),
+            "Cannot update '%s', not a primitive type: %s", 
DOT.join(update.fieldNames()), update.newDataType());
+        pendingUpdate.updateColumn(DOT.join(update.fieldNames()), 
newType.asPrimitiveType());
+
+      } else if (change instanceof TableChange.UpdateColumnComment) {
+        TableChange.UpdateColumnComment update = 
(TableChange.UpdateColumnComment) change;
+        pendingUpdate.updateColumnDoc(DOT.join(update.fieldNames()), 
update.newComment());
+
+      } else if (change instanceof TableChange.RenameColumn) {
+        TableChange.RenameColumn rename = (TableChange.RenameColumn) change;
+        pendingUpdate.renameColumn(DOT.join(rename.fieldNames()), 
rename.newName());
+
+      } else if (change instanceof TableChange.DeleteColumn) {
+        TableChange.DeleteColumn delete = (TableChange.DeleteColumn) change;
+        pendingUpdate.deleteColumn(DOT.join(delete.fieldNames()));
+
+      } else if (change instanceof TableChange.UpdateColumnNullability) {
+        TableChange.UpdateColumnNullability update = 
(TableChange.UpdateColumnNullability) change;
+        if (update.nullable()) {
+          pendingUpdate.makeColumnOptional(DOT.join(update.fieldNames()));
+        } else {
+          pendingUpdate.requireColumn(DOT.join(update.fieldNames()));
+        }
+
+      } else if (change instanceof TableChange.UpdateColumnPosition) {
+        apply(pendingUpdate, (TableChange.UpdateColumnPosition) change);
+
+      } else {
+        throw new UnsupportedOperationException("Cannot apply unknown table 
change: " + change);
+      }
+    }
+
+    return pendingUpdate;
+  }
+
+  private static void apply(UpdateSchema pendingUpdate, 
TableChange.UpdateColumnPosition update) {
+    Preconditions.checkArgument(update.position() != null, "Invalid position: 
null");
+
+    if (update.position() instanceof TableChange.After) {
+      TableChange.After after = (TableChange.After) update.position();
+      String referenceField = peerName(update.fieldNames(), after.column());
+      pendingUpdate.moveAfter(DOT.join(update.fieldNames()), referenceField);
+
+    } else if (update.position() instanceof TableChange.First) {
+      pendingUpdate.moveFirst(DOT.join(update.fieldNames()));
+
+    } else {
+      throw new IllegalArgumentException("Unknown position for reorder: " + 
update.position());
+    }
+  }
+
+  private static void apply(UpdateSchema pendingUpdate, TableChange.AddColumn 
add) {
+    Type type = SparkSchemaUtil.convert(add.dataType());
+    pendingUpdate.addColumn(parentName(add.fieldNames()), 
leafName(add.fieldNames()), type, add.comment());
+
+    if (add.position() instanceof TableChange.After) {
+      TableChange.After after = (TableChange.After) add.position();
+      String referenceField = peerName(add.fieldNames(), after.column());
+      pendingUpdate.moveAfter(DOT.join(add.fieldNames()), referenceField);
+
+    } else if (add.position() instanceof TableChange.First) {
+      pendingUpdate.moveFirst(DOT.join(add.fieldNames()));
+
+    } else {
+      Preconditions.checkArgument(add.position() == null,
+          "Cannot add '%s' at unknown position: %s", 
DOT.join(add.fieldNames()), add.position());
+    }
+  }
+
+  /**
+   * Converts a PartitionSpec to Spark transforms.
+   *
+   * @param spec a PartitionSpec
+   * @return an array of Transforms
+   */
+  public static Transform[] toTransforms(PartitionSpec spec) {
+    List<Transform> transforms = PartitionSpecVisitor.visit(spec.schema(), 
spec,
+        new PartitionSpecVisitor<Transform>() {
+          @Override
+          public Transform identity(String sourceName, int sourceId) {
+            return Expressions.identity(sourceName);
+          }
+
+          @Override
+          public Transform bucket(String sourceName, int sourceId, int width) {
+            return Expressions.bucket(width, sourceName);
+          }
+
+          @Override
+          public Transform truncate(String sourceName, int sourceId, int 
width) {
+            return Expressions.apply("truncate", 
Expressions.column(sourceName), Expressions.literal(width));
+          }
+
+          @Override
+          public Transform year(String sourceName, int sourceId) {
+            return Expressions.years(sourceName);
+          }
+
+          @Override
+          public Transform month(String sourceName, int sourceId) {
+            return Expressions.months(sourceName);
+          }
+
+          @Override
+          public Transform day(String sourceName, int sourceId) {
+            return Expressions.days(sourceName);
+          }
+
+          @Override
+          public Transform hour(String sourceName, int sourceId) {
+            return Expressions.hours(sourceName);
+          }
+        });
+
+    return transforms.toArray(new Transform[0]);
+  }
+
+  /**
+   * Converts Spark transforms into a {@link PartitionSpec}.
+   *
+   * @param schema the table schema
+   * @param partitioning Spark Transforms
+   * @return a PartitionSpec
+   */
+  public static PartitionSpec toPartitionSpec(Schema schema, Transform[] 
partitioning) {
+    if (partitioning == null || partitioning.length == 0) {
+      return PartitionSpec.unpartitioned();
+    }
+
+    PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
+    for (Transform transform : partitioning) {
+      Preconditions.checkArgument(transform.references().length == 1,
+          "Cannot convert transform with more than one column reference: %s", 
transform);
+      String colName = DOT.join(transform.references()[0].fieldNames());
+      switch (transform.name()) {
+        case "identity":
+          builder.identity(colName);
+          break;
+        case "bucket":
+          builder.bucket(colName, findWidth(transform));
+          break;
+        case "years":
+          builder.year(colName);
+          break;
+        case "months":
+          builder.month(colName);
+          break;
+        case "date":
+        case "days":
+          builder.day(colName);
+          break;
+        case "date_hour":

Review comment:
       This was what we called ours internally before the Spark community 
decided to go with `hours`. Using `hour` doesn't quite describe it.
   
   My thinking is that more alternatives are good here, but I can remove it if 
you'd like.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to