aokolnychyi commented on a change in pull request #1124: URL: https://github.com/apache/iceberg/pull/1124#discussion_r446773131
########## File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java ########## @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.expressions.BoundPredicate; +import org.apache.iceberg.expressions.ExpressionVisitors; +import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.transforms.PartitionSpecVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.expressions.Expression; +import org.apache.spark.sql.connector.expressions.Expressions; +import org.apache.spark.sql.connector.expressions.Literal; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class Spark3Util { + + private static final ImmutableSet<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs"); + private static final Joiner DOT = Joiner.on("."); + + private Spark3Util() { + } + + /** + * Applies a list of Spark table changes to an {@link UpdateProperties} operation. + * <p> + * All non-property changes in the list are ignored. Review comment: nit: seems like we actually throw an exception instead of ignoring. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java ########## @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.expressions.BoundPredicate; +import org.apache.iceberg.expressions.ExpressionVisitors; +import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.transforms.PartitionSpecVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.expressions.Expression; +import org.apache.spark.sql.connector.expressions.Expressions; +import org.apache.spark.sql.connector.expressions.Literal; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class Spark3Util { + + private static final ImmutableSet<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs"); + private static final Joiner DOT = Joiner.on("."); + + private Spark3Util() { + } + + /** + * Applies a list of Spark table changes to an {@link UpdateProperties} operation. + * <p> + * All non-property changes in the list are ignored. + * + * @param pendingUpdate an uncommitted UpdateProperties operation to configure + * @param changes a list of Spark table changes + * @return the UpdateProperties operation configured with the changes + */ + public static UpdateProperties applyPropertyChanges(UpdateProperties pendingUpdate, List<TableChange> changes) { + for (TableChange change : changes) { + if (change instanceof TableChange.SetProperty) { + TableChange.SetProperty set = (TableChange.SetProperty) change; + pendingUpdate.set(set.property(), set.value()); + + } else if (change instanceof TableChange.RemoveProperty) { + TableChange.RemoveProperty remove = (TableChange.RemoveProperty) change; + pendingUpdate.remove(remove.property()); + + } else { + throw new UnsupportedOperationException("Cannot apply unknown table change: " + change); + } + } + + return pendingUpdate; + } + + /** + * Applies a list of Spark table changes to an {@link UpdateSchema} operation. + * <p> + * All non-schema changes in the list are ignored. Review comment: nit: same here. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java ########## @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.expressions.BoundPredicate; +import org.apache.iceberg.expressions.ExpressionVisitors; +import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.transforms.PartitionSpecVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.expressions.Expression; +import org.apache.spark.sql.connector.expressions.Expressions; +import org.apache.spark.sql.connector.expressions.Literal; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class Spark3Util { + + private static final ImmutableSet<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs"); + private static final Joiner DOT = Joiner.on("."); + + private Spark3Util() { + } + + /** + * Applies a list of Spark table changes to an {@link UpdateProperties} operation. + * <p> + * All non-property changes in the list are ignored. + * + * @param pendingUpdate an uncommitted UpdateProperties operation to configure + * @param changes a list of Spark table changes + * @return the UpdateProperties operation configured with the changes + */ + public static UpdateProperties applyPropertyChanges(UpdateProperties pendingUpdate, List<TableChange> changes) { + for (TableChange change : changes) { + if (change instanceof TableChange.SetProperty) { + TableChange.SetProperty set = (TableChange.SetProperty) change; + pendingUpdate.set(set.property(), set.value()); + + } else if (change instanceof TableChange.RemoveProperty) { + TableChange.RemoveProperty remove = (TableChange.RemoveProperty) change; + pendingUpdate.remove(remove.property()); + + } else { + throw new UnsupportedOperationException("Cannot apply unknown table change: " + change); + } + } + + return pendingUpdate; + } + + /** + * Applies a list of Spark table changes to an {@link UpdateSchema} operation. + * <p> + * All non-schema changes in the list are ignored. + * + * @param pendingUpdate an uncommitted UpdateSchema operation to configure + * @param changes a list of Spark table changes + * @return the UpdateSchema operation configured with the changes + */ + public static UpdateSchema applySchemaChanges(UpdateSchema pendingUpdate, List<TableChange> changes) { + for (TableChange change : changes) { + if (change instanceof TableChange.AddColumn) { + apply(pendingUpdate, (TableChange.AddColumn) change); + + } else if (change instanceof TableChange.UpdateColumnType) { + TableChange.UpdateColumnType update = (TableChange.UpdateColumnType) change; + Type newType = SparkSchemaUtil.convert(update.newDataType()); + Preconditions.checkArgument(newType.isPrimitiveType(), + "Cannot update '%s', not a primitive type: %s", DOT.join(update.fieldNames()), update.newDataType()); + pendingUpdate.updateColumn(DOT.join(update.fieldNames()), newType.asPrimitiveType()); + + } else if (change instanceof TableChange.UpdateColumnComment) { + TableChange.UpdateColumnComment update = (TableChange.UpdateColumnComment) change; + pendingUpdate.updateColumnDoc(DOT.join(update.fieldNames()), update.newComment()); + + } else if (change instanceof TableChange.RenameColumn) { + TableChange.RenameColumn rename = (TableChange.RenameColumn) change; + pendingUpdate.renameColumn(DOT.join(rename.fieldNames()), rename.newName()); + + } else if (change instanceof TableChange.DeleteColumn) { + TableChange.DeleteColumn delete = (TableChange.DeleteColumn) change; + pendingUpdate.deleteColumn(DOT.join(delete.fieldNames())); + + } else if (change instanceof TableChange.UpdateColumnNullability) { + TableChange.UpdateColumnNullability update = (TableChange.UpdateColumnNullability) change; + if (update.nullable()) { + pendingUpdate.makeColumnOptional(DOT.join(update.fieldNames())); + } else { + pendingUpdate.requireColumn(DOT.join(update.fieldNames())); + } + + } else if (change instanceof TableChange.UpdateColumnPosition) { + apply(pendingUpdate, (TableChange.UpdateColumnPosition) change); + + } else { + throw new UnsupportedOperationException("Cannot apply unknown table change: " + change); + } + } + + return pendingUpdate; + } + + private static void apply(UpdateSchema pendingUpdate, TableChange.UpdateColumnPosition update) { + Preconditions.checkArgument(update.position() != null, "Invalid position: null"); + + if (update.position() instanceof TableChange.After) { + TableChange.After after = (TableChange.After) update.position(); + String referenceField = peerName(update.fieldNames(), after.column()); + pendingUpdate.moveAfter(DOT.join(update.fieldNames()), referenceField); + + } else if (update.position() instanceof TableChange.First) { + pendingUpdate.moveFirst(DOT.join(update.fieldNames())); + + } else { + throw new IllegalArgumentException("Unknown position for reorder: " + update.position()); + } + } + + private static void apply(UpdateSchema pendingUpdate, TableChange.AddColumn add) { + Type type = SparkSchemaUtil.convert(add.dataType()); + pendingUpdate.addColumn(parentName(add.fieldNames()), leafName(add.fieldNames()), type, add.comment()); + + if (add.position() instanceof TableChange.After) { + TableChange.After after = (TableChange.After) add.position(); + String referenceField = peerName(add.fieldNames(), after.column()); + pendingUpdate.moveAfter(DOT.join(add.fieldNames()), referenceField); + + } else if (add.position() instanceof TableChange.First) { + pendingUpdate.moveFirst(DOT.join(add.fieldNames())); + + } else { + Preconditions.checkArgument(add.position() == null, + "Cannot add '%s' at unknown position: %s", DOT.join(add.fieldNames()), add.position()); + } + } + + /** + * Converts a PartitionSpec to Spark transforms. + * + * @param spec a PartitionSpec + * @return an array of Transforms + */ + public static Transform[] toTransforms(PartitionSpec spec) { + List<Transform> transforms = PartitionSpecVisitor.visit(spec.schema(), spec, + new PartitionSpecVisitor<Transform>() { + @Override + public Transform identity(String sourceName, int sourceId) { + return Expressions.identity(sourceName); + } + + @Override + public Transform bucket(String sourceName, int sourceId, int width) { + return Expressions.bucket(width, sourceName); + } + + @Override + public Transform truncate(String sourceName, int sourceId, int width) { + return Expressions.apply("truncate", Expressions.column(sourceName), Expressions.literal(width)); + } + + @Override + public Transform year(String sourceName, int sourceId) { + return Expressions.years(sourceName); + } + + @Override + public Transform month(String sourceName, int sourceId) { + return Expressions.months(sourceName); + } + + @Override + public Transform day(String sourceName, int sourceId) { + return Expressions.days(sourceName); + } + + @Override + public Transform hour(String sourceName, int sourceId) { + return Expressions.hours(sourceName); + } + }); + + return transforms.toArray(new Transform[0]); + } + + /** + * Converts Spark transforms into a {@link PartitionSpec}. + * + * @param schema the table schema + * @param partitioning Spark Transforms + * @return a PartitionSpec + */ + public static PartitionSpec toPartitionSpec(Schema schema, Transform[] partitioning) { + if (partitioning == null || partitioning.length == 0) { + return PartitionSpec.unpartitioned(); + } + + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + for (Transform transform : partitioning) { + Preconditions.checkArgument(transform.references().length == 1, + "Cannot convert transform with more than one column reference: %s", transform); + String colName = DOT.join(transform.references()[0].fieldNames()); + switch (transform.name()) { + case "identity": + builder.identity(colName); + break; + case "bucket": + builder.bucket(colName, findWidth(transform)); Review comment: This won't work until we expose custom bucketing function in Spark? The data has to be sorted by partition columns? ########## File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java ########## @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.expressions.BoundPredicate; +import org.apache.iceberg.expressions.ExpressionVisitors; +import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.transforms.PartitionSpecVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.expressions.Expression; +import org.apache.spark.sql.connector.expressions.Expressions; +import org.apache.spark.sql.connector.expressions.Literal; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class Spark3Util { + + private static final ImmutableSet<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs"); + private static final Joiner DOT = Joiner.on("."); + + private Spark3Util() { + } + + /** + * Applies a list of Spark table changes to an {@link UpdateProperties} operation. + * <p> + * All non-property changes in the list are ignored. + * + * @param pendingUpdate an uncommitted UpdateProperties operation to configure + * @param changes a list of Spark table changes + * @return the UpdateProperties operation configured with the changes + */ + public static UpdateProperties applyPropertyChanges(UpdateProperties pendingUpdate, List<TableChange> changes) { + for (TableChange change : changes) { + if (change instanceof TableChange.SetProperty) { + TableChange.SetProperty set = (TableChange.SetProperty) change; + pendingUpdate.set(set.property(), set.value()); + + } else if (change instanceof TableChange.RemoveProperty) { + TableChange.RemoveProperty remove = (TableChange.RemoveProperty) change; + pendingUpdate.remove(remove.property()); + + } else { + throw new UnsupportedOperationException("Cannot apply unknown table change: " + change); + } + } + + return pendingUpdate; + } + + /** + * Applies a list of Spark table changes to an {@link UpdateSchema} operation. + * <p> + * All non-schema changes in the list are ignored. + * + * @param pendingUpdate an uncommitted UpdateSchema operation to configure + * @param changes a list of Spark table changes + * @return the UpdateSchema operation configured with the changes + */ + public static UpdateSchema applySchemaChanges(UpdateSchema pendingUpdate, List<TableChange> changes) { + for (TableChange change : changes) { + if (change instanceof TableChange.AddColumn) { + apply(pendingUpdate, (TableChange.AddColumn) change); + + } else if (change instanceof TableChange.UpdateColumnType) { + TableChange.UpdateColumnType update = (TableChange.UpdateColumnType) change; + Type newType = SparkSchemaUtil.convert(update.newDataType()); + Preconditions.checkArgument(newType.isPrimitiveType(), + "Cannot update '%s', not a primitive type: %s", DOT.join(update.fieldNames()), update.newDataType()); + pendingUpdate.updateColumn(DOT.join(update.fieldNames()), newType.asPrimitiveType()); + + } else if (change instanceof TableChange.UpdateColumnComment) { + TableChange.UpdateColumnComment update = (TableChange.UpdateColumnComment) change; + pendingUpdate.updateColumnDoc(DOT.join(update.fieldNames()), update.newComment()); + + } else if (change instanceof TableChange.RenameColumn) { + TableChange.RenameColumn rename = (TableChange.RenameColumn) change; + pendingUpdate.renameColumn(DOT.join(rename.fieldNames()), rename.newName()); + + } else if (change instanceof TableChange.DeleteColumn) { + TableChange.DeleteColumn delete = (TableChange.DeleteColumn) change; + pendingUpdate.deleteColumn(DOT.join(delete.fieldNames())); + + } else if (change instanceof TableChange.UpdateColumnNullability) { + TableChange.UpdateColumnNullability update = (TableChange.UpdateColumnNullability) change; + if (update.nullable()) { + pendingUpdate.makeColumnOptional(DOT.join(update.fieldNames())); + } else { + pendingUpdate.requireColumn(DOT.join(update.fieldNames())); + } + + } else if (change instanceof TableChange.UpdateColumnPosition) { + apply(pendingUpdate, (TableChange.UpdateColumnPosition) change); + + } else { + throw new UnsupportedOperationException("Cannot apply unknown table change: " + change); + } + } + + return pendingUpdate; + } + + private static void apply(UpdateSchema pendingUpdate, TableChange.UpdateColumnPosition update) { + Preconditions.checkArgument(update.position() != null, "Invalid position: null"); + + if (update.position() instanceof TableChange.After) { + TableChange.After after = (TableChange.After) update.position(); + String referenceField = peerName(update.fieldNames(), after.column()); + pendingUpdate.moveAfter(DOT.join(update.fieldNames()), referenceField); + + } else if (update.position() instanceof TableChange.First) { + pendingUpdate.moveFirst(DOT.join(update.fieldNames())); + + } else { + throw new IllegalArgumentException("Unknown position for reorder: " + update.position()); + } + } + + private static void apply(UpdateSchema pendingUpdate, TableChange.AddColumn add) { + Type type = SparkSchemaUtil.convert(add.dataType()); + pendingUpdate.addColumn(parentName(add.fieldNames()), leafName(add.fieldNames()), type, add.comment()); + + if (add.position() instanceof TableChange.After) { + TableChange.After after = (TableChange.After) add.position(); + String referenceField = peerName(add.fieldNames(), after.column()); + pendingUpdate.moveAfter(DOT.join(add.fieldNames()), referenceField); + + } else if (add.position() instanceof TableChange.First) { + pendingUpdate.moveFirst(DOT.join(add.fieldNames())); + + } else { + Preconditions.checkArgument(add.position() == null, + "Cannot add '%s' at unknown position: %s", DOT.join(add.fieldNames()), add.position()); + } + } + + /** + * Converts a PartitionSpec to Spark transforms. + * + * @param spec a PartitionSpec + * @return an array of Transforms + */ + public static Transform[] toTransforms(PartitionSpec spec) { + List<Transform> transforms = PartitionSpecVisitor.visit(spec.schema(), spec, + new PartitionSpecVisitor<Transform>() { + @Override + public Transform identity(String sourceName, int sourceId) { + return Expressions.identity(sourceName); + } + + @Override + public Transform bucket(String sourceName, int sourceId, int width) { + return Expressions.bucket(width, sourceName); + } + + @Override + public Transform truncate(String sourceName, int sourceId, int width) { + return Expressions.apply("truncate", Expressions.column(sourceName), Expressions.literal(width)); + } + + @Override + public Transform year(String sourceName, int sourceId) { + return Expressions.years(sourceName); + } + + @Override + public Transform month(String sourceName, int sourceId) { + return Expressions.months(sourceName); + } + + @Override + public Transform day(String sourceName, int sourceId) { + return Expressions.days(sourceName); + } + + @Override + public Transform hour(String sourceName, int sourceId) { + return Expressions.hours(sourceName); + } + }); + + return transforms.toArray(new Transform[0]); + } + + /** + * Converts Spark transforms into a {@link PartitionSpec}. + * + * @param schema the table schema + * @param partitioning Spark Transforms + * @return a PartitionSpec + */ + public static PartitionSpec toPartitionSpec(Schema schema, Transform[] partitioning) { + if (partitioning == null || partitioning.length == 0) { + return PartitionSpec.unpartitioned(); + } + + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + for (Transform transform : partitioning) { + Preconditions.checkArgument(transform.references().length == 1, + "Cannot convert transform with more than one column reference: %s", transform); + String colName = DOT.join(transform.references()[0].fieldNames()); + switch (transform.name()) { + case "identity": + builder.identity(colName); + break; + case "bucket": + builder.bucket(colName, findWidth(transform)); + break; + case "years": + builder.year(colName); + break; + case "months": + builder.month(colName); + break; + case "date": + case "days": + builder.day(colName); + break; + case "date_hour": Review comment: This is Iceberg specific alias that we can consume via `ApplyTransform`? ########## File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java ########## @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.expressions.BoundPredicate; +import org.apache.iceberg.expressions.ExpressionVisitors; +import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.transforms.PartitionSpecVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.expressions.Expression; +import org.apache.spark.sql.connector.expressions.Expressions; +import org.apache.spark.sql.connector.expressions.Literal; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class Spark3Util { + + private static final ImmutableSet<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs"); + private static final Joiner DOT = Joiner.on("."); + + private Spark3Util() { + } + + /** + * Applies a list of Spark table changes to an {@link UpdateProperties} operation. + * <p> + * All non-property changes in the list are ignored. + * + * @param pendingUpdate an uncommitted UpdateProperties operation to configure + * @param changes a list of Spark table changes + * @return the UpdateProperties operation configured with the changes + */ + public static UpdateProperties applyPropertyChanges(UpdateProperties pendingUpdate, List<TableChange> changes) { + for (TableChange change : changes) { + if (change instanceof TableChange.SetProperty) { + TableChange.SetProperty set = (TableChange.SetProperty) change; + pendingUpdate.set(set.property(), set.value()); + + } else if (change instanceof TableChange.RemoveProperty) { + TableChange.RemoveProperty remove = (TableChange.RemoveProperty) change; + pendingUpdate.remove(remove.property()); + + } else { + throw new UnsupportedOperationException("Cannot apply unknown table change: " + change); + } + } + + return pendingUpdate; + } + + /** + * Applies a list of Spark table changes to an {@link UpdateSchema} operation. + * <p> + * All non-schema changes in the list are ignored. + * + * @param pendingUpdate an uncommitted UpdateSchema operation to configure + * @param changes a list of Spark table changes + * @return the UpdateSchema operation configured with the changes + */ + public static UpdateSchema applySchemaChanges(UpdateSchema pendingUpdate, List<TableChange> changes) { + for (TableChange change : changes) { + if (change instanceof TableChange.AddColumn) { + apply(pendingUpdate, (TableChange.AddColumn) change); + + } else if (change instanceof TableChange.UpdateColumnType) { + TableChange.UpdateColumnType update = (TableChange.UpdateColumnType) change; + Type newType = SparkSchemaUtil.convert(update.newDataType()); + Preconditions.checkArgument(newType.isPrimitiveType(), + "Cannot update '%s', not a primitive type: %s", DOT.join(update.fieldNames()), update.newDataType()); + pendingUpdate.updateColumn(DOT.join(update.fieldNames()), newType.asPrimitiveType()); + + } else if (change instanceof TableChange.UpdateColumnComment) { + TableChange.UpdateColumnComment update = (TableChange.UpdateColumnComment) change; + pendingUpdate.updateColumnDoc(DOT.join(update.fieldNames()), update.newComment()); + + } else if (change instanceof TableChange.RenameColumn) { + TableChange.RenameColumn rename = (TableChange.RenameColumn) change; + pendingUpdate.renameColumn(DOT.join(rename.fieldNames()), rename.newName()); + + } else if (change instanceof TableChange.DeleteColumn) { + TableChange.DeleteColumn delete = (TableChange.DeleteColumn) change; + pendingUpdate.deleteColumn(DOT.join(delete.fieldNames())); + + } else if (change instanceof TableChange.UpdateColumnNullability) { + TableChange.UpdateColumnNullability update = (TableChange.UpdateColumnNullability) change; + if (update.nullable()) { + pendingUpdate.makeColumnOptional(DOT.join(update.fieldNames())); + } else { + pendingUpdate.requireColumn(DOT.join(update.fieldNames())); + } + + } else if (change instanceof TableChange.UpdateColumnPosition) { + apply(pendingUpdate, (TableChange.UpdateColumnPosition) change); + + } else { + throw new UnsupportedOperationException("Cannot apply unknown table change: " + change); + } + } + + return pendingUpdate; + } + + private static void apply(UpdateSchema pendingUpdate, TableChange.UpdateColumnPosition update) { + Preconditions.checkArgument(update.position() != null, "Invalid position: null"); + + if (update.position() instanceof TableChange.After) { + TableChange.After after = (TableChange.After) update.position(); + String referenceField = peerName(update.fieldNames(), after.column()); + pendingUpdate.moveAfter(DOT.join(update.fieldNames()), referenceField); + + } else if (update.position() instanceof TableChange.First) { + pendingUpdate.moveFirst(DOT.join(update.fieldNames())); + + } else { + throw new IllegalArgumentException("Unknown position for reorder: " + update.position()); + } + } + + private static void apply(UpdateSchema pendingUpdate, TableChange.AddColumn add) { + Type type = SparkSchemaUtil.convert(add.dataType()); + pendingUpdate.addColumn(parentName(add.fieldNames()), leafName(add.fieldNames()), type, add.comment()); + + if (add.position() instanceof TableChange.After) { + TableChange.After after = (TableChange.After) add.position(); + String referenceField = peerName(add.fieldNames(), after.column()); + pendingUpdate.moveAfter(DOT.join(add.fieldNames()), referenceField); + + } else if (add.position() instanceof TableChange.First) { + pendingUpdate.moveFirst(DOT.join(add.fieldNames())); + + } else { + Preconditions.checkArgument(add.position() == null, + "Cannot add '%s' at unknown position: %s", DOT.join(add.fieldNames()), add.position()); + } + } + + /** + * Converts a PartitionSpec to Spark transforms. + * + * @param spec a PartitionSpec + * @return an array of Transforms + */ + public static Transform[] toTransforms(PartitionSpec spec) { + List<Transform> transforms = PartitionSpecVisitor.visit(spec.schema(), spec, + new PartitionSpecVisitor<Transform>() { + @Override + public Transform identity(String sourceName, int sourceId) { + return Expressions.identity(sourceName); + } + + @Override + public Transform bucket(String sourceName, int sourceId, int width) { + return Expressions.bucket(width, sourceName); + } + + @Override + public Transform truncate(String sourceName, int sourceId, int width) { + return Expressions.apply("truncate", Expressions.column(sourceName), Expressions.literal(width)); + } + + @Override + public Transform year(String sourceName, int sourceId) { + return Expressions.years(sourceName); + } + + @Override + public Transform month(String sourceName, int sourceId) { + return Expressions.months(sourceName); + } + + @Override + public Transform day(String sourceName, int sourceId) { + return Expressions.days(sourceName); + } + + @Override + public Transform hour(String sourceName, int sourceId) { + return Expressions.hours(sourceName); + } + }); + + return transforms.toArray(new Transform[0]); + } + + /** + * Converts Spark transforms into a {@link PartitionSpec}. + * + * @param schema the table schema + * @param partitioning Spark Transforms + * @return a PartitionSpec + */ + public static PartitionSpec toPartitionSpec(Schema schema, Transform[] partitioning) { + if (partitioning == null || partitioning.length == 0) { + return PartitionSpec.unpartitioned(); + } + + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + for (Transform transform : partitioning) { + Preconditions.checkArgument(transform.references().length == 1, + "Cannot convert transform with more than one column reference: %s", transform); + String colName = DOT.join(transform.references()[0].fieldNames()); + switch (transform.name()) { + case "identity": + builder.identity(colName); + break; + case "bucket": + builder.bucket(colName, findWidth(transform)); Review comment: I see below that we can make this work using UDFs. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java ########## @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.expressions.BoundPredicate; +import org.apache.iceberg.expressions.ExpressionVisitors; +import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.transforms.PartitionSpecVisitor; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.expressions.Expression; +import org.apache.spark.sql.connector.expressions.Expressions; +import org.apache.spark.sql.connector.expressions.Literal; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class Spark3Util { + + private static final ImmutableSet<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs"); + private static final Joiner DOT = Joiner.on("."); + + private Spark3Util() { + } + + /** + * Applies a list of Spark table changes to an {@link UpdateProperties} operation. + * <p> + * All non-property changes in the list are ignored. + * + * @param pendingUpdate an uncommitted UpdateProperties operation to configure + * @param changes a list of Spark table changes + * @return the UpdateProperties operation configured with the changes + */ + public static UpdateProperties applyPropertyChanges(UpdateProperties pendingUpdate, List<TableChange> changes) { + for (TableChange change : changes) { + if (change instanceof TableChange.SetProperty) { + TableChange.SetProperty set = (TableChange.SetProperty) change; + pendingUpdate.set(set.property(), set.value()); + + } else if (change instanceof TableChange.RemoveProperty) { + TableChange.RemoveProperty remove = (TableChange.RemoveProperty) change; + pendingUpdate.remove(remove.property()); + + } else { + throw new UnsupportedOperationException("Cannot apply unknown table change: " + change); + } + } + + return pendingUpdate; + } + + /** + * Applies a list of Spark table changes to an {@link UpdateSchema} operation. + * <p> + * All non-schema changes in the list are ignored. + * + * @param pendingUpdate an uncommitted UpdateSchema operation to configure + * @param changes a list of Spark table changes + * @return the UpdateSchema operation configured with the changes + */ + public static UpdateSchema applySchemaChanges(UpdateSchema pendingUpdate, List<TableChange> changes) { + for (TableChange change : changes) { + if (change instanceof TableChange.AddColumn) { + apply(pendingUpdate, (TableChange.AddColumn) change); + + } else if (change instanceof TableChange.UpdateColumnType) { + TableChange.UpdateColumnType update = (TableChange.UpdateColumnType) change; + Type newType = SparkSchemaUtil.convert(update.newDataType()); + Preconditions.checkArgument(newType.isPrimitiveType(), + "Cannot update '%s', not a primitive type: %s", DOT.join(update.fieldNames()), update.newDataType()); + pendingUpdate.updateColumn(DOT.join(update.fieldNames()), newType.asPrimitiveType()); + + } else if (change instanceof TableChange.UpdateColumnComment) { + TableChange.UpdateColumnComment update = (TableChange.UpdateColumnComment) change; + pendingUpdate.updateColumnDoc(DOT.join(update.fieldNames()), update.newComment()); + + } else if (change instanceof TableChange.RenameColumn) { + TableChange.RenameColumn rename = (TableChange.RenameColumn) change; + pendingUpdate.renameColumn(DOT.join(rename.fieldNames()), rename.newName()); + + } else if (change instanceof TableChange.DeleteColumn) { + TableChange.DeleteColumn delete = (TableChange.DeleteColumn) change; + pendingUpdate.deleteColumn(DOT.join(delete.fieldNames())); + + } else if (change instanceof TableChange.UpdateColumnNullability) { + TableChange.UpdateColumnNullability update = (TableChange.UpdateColumnNullability) change; + if (update.nullable()) { + pendingUpdate.makeColumnOptional(DOT.join(update.fieldNames())); + } else { + pendingUpdate.requireColumn(DOT.join(update.fieldNames())); + } + + } else if (change instanceof TableChange.UpdateColumnPosition) { + apply(pendingUpdate, (TableChange.UpdateColumnPosition) change); + + } else { + throw new UnsupportedOperationException("Cannot apply unknown table change: " + change); + } + } + + return pendingUpdate; + } + + private static void apply(UpdateSchema pendingUpdate, TableChange.UpdateColumnPosition update) { + Preconditions.checkArgument(update.position() != null, "Invalid position: null"); + + if (update.position() instanceof TableChange.After) { + TableChange.After after = (TableChange.After) update.position(); + String referenceField = peerName(update.fieldNames(), after.column()); + pendingUpdate.moveAfter(DOT.join(update.fieldNames()), referenceField); + + } else if (update.position() instanceof TableChange.First) { + pendingUpdate.moveFirst(DOT.join(update.fieldNames())); + + } else { + throw new IllegalArgumentException("Unknown position for reorder: " + update.position()); + } + } + + private static void apply(UpdateSchema pendingUpdate, TableChange.AddColumn add) { + Type type = SparkSchemaUtil.convert(add.dataType()); + pendingUpdate.addColumn(parentName(add.fieldNames()), leafName(add.fieldNames()), type, add.comment()); + + if (add.position() instanceof TableChange.After) { + TableChange.After after = (TableChange.After) add.position(); + String referenceField = peerName(add.fieldNames(), after.column()); + pendingUpdate.moveAfter(DOT.join(add.fieldNames()), referenceField); + + } else if (add.position() instanceof TableChange.First) { + pendingUpdate.moveFirst(DOT.join(add.fieldNames())); + + } else { + Preconditions.checkArgument(add.position() == null, + "Cannot add '%s' at unknown position: %s", DOT.join(add.fieldNames()), add.position()); + } + } + + /** + * Converts a PartitionSpec to Spark transforms. + * + * @param spec a PartitionSpec + * @return an array of Transforms + */ + public static Transform[] toTransforms(PartitionSpec spec) { + List<Transform> transforms = PartitionSpecVisitor.visit(spec.schema(), spec, + new PartitionSpecVisitor<Transform>() { + @Override + public Transform identity(String sourceName, int sourceId) { + return Expressions.identity(sourceName); + } + + @Override + public Transform bucket(String sourceName, int sourceId, int width) { + return Expressions.bucket(width, sourceName); + } + + @Override + public Transform truncate(String sourceName, int sourceId, int width) { + return Expressions.apply("truncate", Expressions.column(sourceName), Expressions.literal(width)); + } + + @Override + public Transform year(String sourceName, int sourceId) { + return Expressions.years(sourceName); + } + + @Override + public Transform month(String sourceName, int sourceId) { + return Expressions.months(sourceName); + } + + @Override + public Transform day(String sourceName, int sourceId) { + return Expressions.days(sourceName); + } + + @Override + public Transform hour(String sourceName, int sourceId) { + return Expressions.hours(sourceName); + } + }); + + return transforms.toArray(new Transform[0]); + } + + /** + * Converts Spark transforms into a {@link PartitionSpec}. + * + * @param schema the table schema + * @param partitioning Spark Transforms + * @return a PartitionSpec + */ + public static PartitionSpec toPartitionSpec(Schema schema, Transform[] partitioning) { + if (partitioning == null || partitioning.length == 0) { + return PartitionSpec.unpartitioned(); + } + + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + for (Transform transform : partitioning) { + Preconditions.checkArgument(transform.references().length == 1, + "Cannot convert transform with more than one column reference: %s", transform); + String colName = DOT.join(transform.references()[0].fieldNames()); + switch (transform.name()) { + case "identity": + builder.identity(colName); + break; + case "bucket": + builder.bucket(colName, findWidth(transform)); + break; + case "years": + builder.year(colName); + break; + case "months": + builder.month(colName); + break; + case "date": + case "days": + builder.day(colName); + break; + case "date_hour": + case "hours": + builder.hour(colName); + break; + case "truncate": + builder.truncate(colName, findWidth(transform)); + break; + default: + throw new UnsupportedOperationException("Transform is not supported: " + transform); + } + } + + return builder.build(); + } + + @SuppressWarnings("unchecked") + private static int findWidth(Transform transform) { + for (Expression expr : transform.arguments()) { + if (expr instanceof Literal) { + if (((Literal) expr).dataType() instanceof IntegerType) { + Literal<Integer> lit = (Literal<Integer>) expr; + Preconditions.checkArgument(lit.value() > 0, + "Unsupported width for transform: %s", transform.describe()); + return lit.value(); + + } else if (((Literal) expr).dataType() instanceof LongType) { + Literal<Long> lit = (Literal<Long>) expr; + Preconditions.checkArgument(lit.value() > 0 && lit.value() < Integer.MAX_VALUE, + "Unsupported width for transform: %s", transform.describe()); + if (lit.value() > Integer.MAX_VALUE) { + throw new IllegalArgumentException(); + } + return lit.value().intValue(); + } + } + } + + throw new IllegalArgumentException("Cannot find width for transform: " + transform.describe()); + } + + private static String leafName(String[] fieldNames) { + Preconditions.checkArgument(fieldNames.length > 0, "Invalid field name: at least one name is required"); + return fieldNames[fieldNames.length - 1]; + } + + private static String peerName(String[] fieldNames, String fieldName) { + if (fieldNames.length > 1) { + String[] peerNames = Arrays.copyOf(fieldNames, fieldNames.length); + peerNames[fieldNames.length - 1] = fieldName; + return DOT.join(peerNames); + } + return fieldName; + } + + private static String parentName(String[] fieldNames) { + if (fieldNames.length > 1) { + return DOT.join(Arrays.copyOfRange(fieldNames, 0, fieldNames.length - 1)); + } + return null; + } + + public static String describe(org.apache.iceberg.expressions.Expression expr) { + return ExpressionVisitors.visit(expr, DescribeExpressionVisitor.INSTANCE); + } + + public static String describe(Schema schema) { Review comment: It seems this one is not yet used. DS V2 doesn't support describe now? ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.HiveCatalogs; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.TableProvider; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class IcebergSource implements DataSourceRegister, TableProvider { + @Override + public String shortName() { + return "iceberg"; + } + + @Override + public StructType inferSchema(CaseInsensitiveStringMap options) { + return null; + } + + @Override + public Transform[] inferPartitioning(CaseInsensitiveStringMap options) { + return getTable(null, null, options).partitioning(); + } + + @Override + public boolean supportsExternalMetadata() { + return true; + } + + @Override + public SparkTable getTable(StructType schema, Transform[] partitioning, Map<String, String> options) { + // TODO: if partitioning is non-null, the table is being created? + // Get Iceberg table from options + Configuration conf = new Configuration(SparkSession.active().sparkContext().hadoopConfiguration()); + Table icebergTable = getTableAndResolveHadoopConfiguration(options, conf); + + // Build Spark table based on Iceberg table, and return it + if (schema != null) { Review comment: nit: do we need this if as `SparkTable` handles null correctly? ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.HiveCatalogs; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.TableProvider; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class IcebergSource implements DataSourceRegister, TableProvider { + @Override + public String shortName() { + return "iceberg"; + } + + @Override + public StructType inferSchema(CaseInsensitiveStringMap options) { + return null; + } + + @Override + public Transform[] inferPartitioning(CaseInsensitiveStringMap options) { + return getTable(null, null, options).partitioning(); + } + + @Override + public boolean supportsExternalMetadata() { + return true; + } + + @Override + public SparkTable getTable(StructType schema, Transform[] partitioning, Map<String, String> options) { + // TODO: if partitioning is non-null, the table is being created? + // Get Iceberg table from options + Configuration conf = new Configuration(SparkSession.active().sparkContext().hadoopConfiguration()); + Table icebergTable = getTableAndResolveHadoopConfiguration(options, conf); + + // Build Spark table based on Iceberg table, and return it + if (schema != null) { + return new SparkTable(icebergTable, schema); + } else { + return new SparkTable(icebergTable); + } + } + + protected Table findTable(Map<String, String> options, Configuration conf) { + Preconditions.checkArgument(options.containsKey("path"), "Cannot open table: path is not set"); + String path = options.get("path"); + + if (path.contains("/")) { + HadoopTables tables = new HadoopTables(conf); + return tables.load(path); + } else { + HiveCatalog hiveCatalog = HiveCatalogs.loadCatalog(conf); + TableIdentifier tableIdentifier = TableIdentifier.parse(path); + return hiveCatalog.loadTable(tableIdentifier); + } + } + + private Table getTableAndResolveHadoopConfiguration( + Map<String, String> options, Configuration conf) { + // Overwrite configurations from the Spark Context with configurations from the options. + mergeIcebergHadoopConfs(conf, options); + + Table table = findTable(options, conf); + + // Set confs from table properties + mergeIcebergHadoopConfs(conf, table.properties()); + + // Re-overwrite values set in options and table properties but were not in the environment. + mergeIcebergHadoopConfs(conf, options); + + return table; + } + + private static void mergeIcebergHadoopConfs( + Configuration baseConf, Map<String, String> options) { Review comment: nit: can be on one line ########## File path: spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java ########## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CachingCatalog; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.source.SparkTable; +import org.apache.iceberg.spark.source.StagedSparkTable; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.StagedTable; +import org.apache.spark.sql.connector.catalog.StagingTableCatalog; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.catalog.TableChange.ColumnChange; +import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty; +import org.apache.spark.sql.connector.catalog.TableChange.SetProperty; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * A Spark TableCatalog implementation that wraps Iceberg's {@link Catalog} interface. + */ +public class SparkCatalog implements StagingTableCatalog { + private String catalogName = null; + private Catalog icebergCatalog = null; + + /** + * Build an Iceberg {@link Catalog} to be used by this Spark catalog adapter. + * + * @param name Spark's catalog name + * @param options Spark's catalog options + * @return an Iceberg catalog + */ + protected Catalog buildIcebergCatalog(String name, CaseInsensitiveStringMap options) { + // TODO: add name to catalogs + Configuration conf = SparkSession.active().sparkContext().hadoopConfiguration(); + String catalogType = options.getOrDefault("type", "hive"); + switch (catalogType) { + case "hive": + int clientPoolSize = options.getInt("clients", 2); + String uri = options.get("uri"); + return new HiveCatalog(uri, clientPoolSize, conf); + + case "hadoop": + String warehouseLocation = options.get("warehouse"); + return new HadoopCatalog(conf, warehouseLocation); + + default: + throw new UnsupportedOperationException("Unknown catalog type: " + catalogType); + } + } + + /** + * Build an Iceberg {@link TableIdentifier} for the given Spark identifier. + * + * @param identifier Spark's identifier + * @return an Iceberg identifier + */ + protected TableIdentifier buildIdentifier(Identifier identifier) { + return TableIdentifier.of(Namespace.of(identifier.namespace()), identifier.name()); + } + + @Override + public Identifier[] listTables(String[] namespace) { + // TODO: handle namespaces Review comment: Will we implement this in a follow-up? ########## File path: spark3/src/main/java/org/apache/iceberg/spark/SparkCatalog.java ########## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark; + +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CachingCatalog; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.source.SparkTable; +import org.apache.iceberg.spark.source.StagedSparkTable; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.StagedTable; +import org.apache.spark.sql.connector.catalog.StagingTableCatalog; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.catalog.TableChange.ColumnChange; +import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty; +import org.apache.spark.sql.connector.catalog.TableChange.SetProperty; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * A Spark TableCatalog implementation that wraps Iceberg's {@link Catalog} interface. + */ +public class SparkCatalog implements StagingTableCatalog { + private String catalogName = null; + private Catalog icebergCatalog = null; + + /** + * Build an Iceberg {@link Catalog} to be used by this Spark catalog adapter. + * + * @param name Spark's catalog name + * @param options Spark's catalog options + * @return an Iceberg catalog + */ + protected Catalog buildIcebergCatalog(String name, CaseInsensitiveStringMap options) { + // TODO: add name to catalogs + Configuration conf = SparkSession.active().sparkContext().hadoopConfiguration(); + String catalogType = options.getOrDefault("type", "hive"); + switch (catalogType) { + case "hive": + int clientPoolSize = options.getInt("clients", 2); + String uri = options.get("uri"); + return new HiveCatalog(uri, clientPoolSize, conf); + + case "hadoop": + String warehouseLocation = options.get("warehouse"); + return new HadoopCatalog(conf, warehouseLocation); + + default: + throw new UnsupportedOperationException("Unknown catalog type: " + catalogType); + } + } + + /** + * Build an Iceberg {@link TableIdentifier} for the given Spark identifier. + * + * @param identifier Spark's identifier + * @return an Iceberg identifier + */ + protected TableIdentifier buildIdentifier(Identifier identifier) { + return TableIdentifier.of(Namespace.of(identifier.namespace()), identifier.name()); + } + + @Override + public Identifier[] listTables(String[] namespace) { + // TODO: handle namespaces + return new Identifier[0]; + } + + @Override + public SparkTable loadTable(Identifier ident) throws NoSuchTableException { + try { + return new SparkTable(icebergCatalog.loadTable(buildIdentifier(ident))); + } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { + throw new NoSuchTableException(ident); + } + } + + @Override + public SparkTable createTable(Identifier ident, StructType schema, + Transform[] transforms, + Map<String, String> properties) + throws TableAlreadyExistsException { Review comment: nit: will it make sense to put the `throws` part to the line above? It should fit. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.HiveCatalogs; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.TableProvider; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class IcebergSource implements DataSourceRegister, TableProvider { + @Override + public String shortName() { + return "iceberg"; + } + + @Override + public StructType inferSchema(CaseInsensitiveStringMap options) { + return null; + } + + @Override + public Transform[] inferPartitioning(CaseInsensitiveStringMap options) { + return getTable(null, null, options).partitioning(); + } + + @Override + public boolean supportsExternalMetadata() { + return true; + } + + @Override + public SparkTable getTable(StructType schema, Transform[] partitioning, Map<String, String> options) { + // TODO: if partitioning is non-null, the table is being created? + // Get Iceberg table from options + Configuration conf = new Configuration(SparkSession.active().sparkContext().hadoopConfiguration()); + Table icebergTable = getTableAndResolveHadoopConfiguration(options, conf); + + // Build Spark table based on Iceberg table, and return it + if (schema != null) { + return new SparkTable(icebergTable, schema); + } else { + return new SparkTable(icebergTable); + } + } + + protected Table findTable(Map<String, String> options, Configuration conf) { + Preconditions.checkArgument(options.containsKey("path"), "Cannot open table: path is not set"); + String path = options.get("path"); + + if (path.contains("/")) { + HadoopTables tables = new HadoopTables(conf); + return tables.load(path); + } else { + HiveCatalog hiveCatalog = HiveCatalogs.loadCatalog(conf); + TableIdentifier tableIdentifier = TableIdentifier.parse(path); + return hiveCatalog.loadTable(tableIdentifier); + } + } + + private Table getTableAndResolveHadoopConfiguration( Review comment: nit: can be on one line ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
