This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d4dee8e  Flink: Support CREATE and ALTER TABLE in Flink SQL (#1393)
d4dee8e is described below

commit d4dee8e667368945d00ad6f0051e2f492c9d59a5
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Sep 3 06:18:17 2020 +0800

    Flink: Support CREATE and ALTER TABLE in Flink SQL (#1393)
---
 .../org/apache/iceberg/flink/FlinkCatalog.java     | 200 +++++++++++++++++++--
 .../iceberg/flink/TestFlinkCatalogTable.java       | 187 ++++++++++++++++++-
 2 files changed, 371 insertions(+), 16 deletions(-)

diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java 
b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 4182964..e37db84 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.flink.table.api.TableSchema;
@@ -34,6 +35,7 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -49,7 +51,12 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.util.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -57,6 +64,9 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 
@@ -277,15 +287,14 @@ public class FlinkCatalog extends AbstractCatalog {
   }
 
   @Override
-  public CatalogBaseTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
-    try {
-      Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
-      TableSchema tableSchema = 
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
+  public CatalogTable getTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
+    Table table = loadIcebergTable(tablePath);
+    return toCatalogTable(table);
+  }
 
-      // NOTE: We can not create a IcebergCatalogTable, because Flink 
optimizer may use CatalogTableImpl to copy a new
-      // catalog table.
-      // Let's re-loading table from Iceberg catalog when creating source/sink 
operators.
-      return new CatalogTableImpl(tableSchema, table.properties(), null);
+  private Table loadIcebergTable(ObjectPath tablePath) throws 
TableNotExistException {
+    try {
+      return icebergCatalog.loadTable(toIdentifier(tablePath));
     } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
       throw new TableNotExistException(getName(), tablePath, e);
     }
@@ -320,19 +329,180 @@ public class FlinkCatalog extends AbstractCatalog {
     }
   }
 
-  /**
-   * TODO Add partitioning to the Flink DDL parser.
-   */
   @Override
   public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support createTable now.");
+      throws CatalogException, TableAlreadyExistException {
+    validateFlinkTable(table);
+
+    Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+    PartitionSpec spec = toPartitionSpec(((CatalogTable) 
table).getPartitionKeys(), icebergSchema);
+
+    ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+    String location = null;
+    for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+      if ("location".equalsIgnoreCase(entry.getKey())) {
+        location = entry.getValue();
+      } else {
+        properties.put(entry.getKey(), entry.getValue());
+      }
+    }
+
+    try {
+      icebergCatalog.createTable(
+          toIdentifier(tablePath),
+          icebergSchema,
+          spec,
+          location,
+          properties.build());
+    } catch (AlreadyExistsException e) {
+      throw new TableAlreadyExistException(getName(), tablePath, e);
+    }
   }
 
   @Override
   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
-      throws CatalogException {
-    throw new UnsupportedOperationException("Not support alterTable now.");
+      throws CatalogException, TableNotExistException {
+    validateFlinkTable(newTable);
+    Table icebergTable = loadIcebergTable(tablePath);
+    CatalogTable table = toCatalogTable(icebergTable);
+
+    // Currently, Flink SQL only support altering table properties.
+
+    // For current Flink Catalog API, support for adding/removing/renaming 
columns cannot be done by comparing
+    // CatalogTable instances, unless the Flink schema contains Iceberg column 
IDs.
+    if (!table.getSchema().equals(newTable.getSchema())) {
+      throw new UnsupportedOperationException("Altering schema is not 
supported yet.");
+    }
+
+    if (!table.getPartitionKeys().equals(((CatalogTable) 
newTable).getPartitionKeys())) {
+      throw new UnsupportedOperationException("Altering partition keys is not 
supported yet.");
+    }
+
+    Map<String, String> oldOptions = table.getOptions();
+    Map<String, String> setProperties = Maps.newHashMap();
+
+    String setLocation = null;
+    String setSnapshotId = null;
+    String pickSnapshotId = null;
+
+    for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+
+      if (Objects.equals(value, oldOptions.get(key))) {
+        continue;
+      }
+
+      if ("location".equalsIgnoreCase(key)) {
+        setLocation = value;
+      } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+        setSnapshotId = value;
+      } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+        pickSnapshotId = value;
+      } else {
+        setProperties.put(key, value);
+      }
+    }
+
+    oldOptions.keySet().forEach(k -> {
+      if (!newTable.getOptions().containsKey(k)) {
+        setProperties.put(k, null);
+      }
+    });
+
+    commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, 
setProperties);
+  }
+
+  private static void validateFlinkTable(CatalogBaseTable table) {
+    Preconditions.checkArgument(table instanceof CatalogTable, "The Table 
should be a CatalogTable.");
+
+    TableSchema schema = table.getSchema();
+    schema.getTableColumns().forEach(column -> {
+      if (column.isGenerated()) {
+        throw new UnsupportedOperationException("Creating table with computed 
columns is not supported yet.");
+      }
+    });
+
+    if (!schema.getWatermarkSpecs().isEmpty()) {
+      throw new UnsupportedOperationException("Creating table with watermark 
specs is not supported yet.");
+    }
+
+    if (schema.getPrimaryKey().isPresent()) {
+      throw new UnsupportedOperationException("Creating table with primary key 
is not supported yet.");
+    }
+  }
+
+  private static PartitionSpec toPartitionSpec(List<String> partitionKeys, 
Schema icebergSchema) {
+    PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema);
+    partitionKeys.forEach(builder::identity);
+    return builder.build();
+  }
+
+  private static List<String> toPartitionKeys(PartitionSpec spec, Schema 
icebergSchema) {
+    List<String> partitionKeys = Lists.newArrayList();
+    for (PartitionField field : spec.fields()) {
+      if (field.transform().isIdentity()) {
+        partitionKeys.add(icebergSchema.findColumnName(field.sourceId()));
+      } else {
+        // Not created by Flink SQL.
+        // For compatibility with iceberg tables, return empty.
+        // TODO modify this after Flink support partition transform.
+        return Collections.emptyList();
+      }
+    }
+    return partitionKeys;
+  }
+
+  private static void commitChanges(Table table, String setLocation, String 
setSnapshotId,
+                                    String pickSnapshotId, Map<String, String> 
setProperties) {
+    // don't allow setting the snapshot and picking a commit at the same time 
because order is ambiguous and choosing
+    // one order leads to different results
+    Preconditions.checkArgument(setSnapshotId == null || pickSnapshotId == 
null,
+        "Cannot set the current snapshot ID and cherry-pick snapshot changes");
+
+    if (setSnapshotId != null) {
+      long newSnapshotId = Long.parseLong(setSnapshotId);
+      table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
+    }
+
+    // if updating the table snapshot, perform that update first in case it 
fails
+    if (pickSnapshotId != null) {
+      long newSnapshotId = Long.parseLong(pickSnapshotId);
+      table.manageSnapshots().cherrypick(newSnapshotId).commit();
+    }
+
+    Transaction transaction = table.newTransaction();
+
+    if (setLocation != null) {
+      transaction.updateLocation()
+          .setLocation(setLocation)
+          .commit();
+    }
+
+    if (!setProperties.isEmpty()) {
+      UpdateProperties updateProperties = transaction.updateProperties();
+      setProperties.forEach((k, v) -> {
+        if (v == null) {
+          updateProperties.remove(k);
+        } else {
+          updateProperties.set(k, v);
+        }
+      });
+      updateProperties.commit();
+    }
+
+    transaction.commitTransaction();
+  }
+
+  static CatalogTable toCatalogTable(Table table) {
+    TableSchema schema = 
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
+    List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
+
+    // NOTE: We can not create a IcebergCatalogTable extends CatalogTable, 
because Flink optimizer may use
+    // CatalogTableImpl to copy a new catalog table.
+    // Let's re-loading table from Iceberg catalog when creating source/sink 
operators.
+    // Iceberg does not have Table comment, so pass a null (Default comment 
value in Flink).
+    return new CatalogTableImpl(schema, partitionKeys, table.properties(), 
null);
   }
 
   // ------------------------------ Unsupported methods 
---------------------------------------------
diff --git 
a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java 
b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 2c77a66..ef8d72a 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -21,13 +21,30 @@ package org.apache.iceberg.flink;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
 import org.junit.After;
 import org.junit.Assert;
@@ -82,10 +99,178 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
         "Should fail if trying to get a nonexistent table",
         ValidationException.class,
         "Table `tl` was not found.",
-        () ->  tEnv.from("tl")
+        () -> tEnv.from("tl")
     );
     Assert.assertEquals(
         Collections.singletonList(TableColumn.of("id", DataTypes.BIGINT())),
         tEnv.from("tl2").getSchema().getTableColumns());
   }
+
+  @Test
+  public void testCreateTable() throws TableNotExistException {
+    tEnv.executeSql("CREATE TABLE tl(id BIGINT)");
+
+    Table table = table("tl");
+    Assert.assertEquals(
+        new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct(),
+        table.schema().asStruct());
+    Assert.assertEquals(Maps.newHashMap(), table.properties());
+
+    CatalogTable catalogTable = catalogTable("tl");
+    Assert.assertEquals(TableSchema.builder().field("id", 
DataTypes.BIGINT()).build(), catalogTable.getSchema());
+    Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
+  }
+
+  @Test
+  public void testCreateTableLocation() {
+    Assume.assumeFalse("HadoopCatalog does not support creating table with 
location", isHadoopCatalog);
+
+    tEnv.executeSql("CREATE TABLE tl(id BIGINT) WITH 
('location'='/tmp/location')");
+
+    Table table = table("tl");
+    Assert.assertEquals(
+        new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct(),
+        table.schema().asStruct());
+    Assert.assertEquals("/tmp/location", table.location());
+    Assert.assertEquals(Maps.newHashMap(), table.properties());
+  }
+
+  @Test
+  public void testCreatePartitionTable() throws TableNotExistException {
+    tEnv.executeSql("CREATE TABLE tl(id BIGINT, dt STRING) PARTITIONED 
BY(dt)");
+
+    Table table = table("tl");
+    Assert.assertEquals(
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.LongType.get()),
+            Types.NestedField.optional(2, "dt", 
Types.StringType.get())).asStruct(),
+        table.schema().asStruct());
+    
Assert.assertEquals(PartitionSpec.builderFor(table.schema()).identity("dt").build(),
 table.spec());
+    Assert.assertEquals(Maps.newHashMap(), table.properties());
+
+    CatalogTable catalogTable = catalogTable("tl");
+    Assert.assertEquals(
+        TableSchema.builder().field("id", DataTypes.BIGINT()).field("dt", 
DataTypes.STRING()).build(),
+        catalogTable.getSchema());
+    Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
+    Assert.assertEquals(Collections.singletonList("dt"), 
catalogTable.getPartitionKeys());
+  }
+
+  @Test
+  public void testLoadTransformPartitionTable() throws TableNotExistException {
+    Schema schema = new Schema(Types.NestedField.optional(0, "id", 
Types.LongType.get()));
+    validationCatalog.createTable(
+        TableIdentifier.of(icebergNamespace, "tl"), schema,
+        PartitionSpec.builderFor(schema).bucket("id", 100).build());
+
+    CatalogTable catalogTable = catalogTable("tl");
+    Assert.assertEquals(
+        TableSchema.builder().field("id", DataTypes.BIGINT()).build(),
+        catalogTable.getSchema());
+    Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
+    Assert.assertEquals(Collections.emptyList(), 
catalogTable.getPartitionKeys());
+  }
+
+  @Test
+  public void testAlterTable() throws TableNotExistException {
+    tEnv.executeSql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')");
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("oldK", "oldV");
+
+    // new
+    tEnv.executeSql("ALTER TABLE tl SET('newK'='newV')");
+    properties.put("newK", "newV");
+    Assert.assertEquals(properties, table("tl").properties());
+
+    // update old
+    tEnv.executeSql("ALTER TABLE tl SET('oldK'='oldV2')");
+    properties.put("oldK", "oldV2");
+    Assert.assertEquals(properties, table("tl").properties());
+
+    // remove property
+    CatalogTable catalogTable = catalogTable("tl");
+    properties.remove("oldK");
+    tEnv.getCatalog(tEnv.getCurrentCatalog()).get().alterTable(
+        new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false);
+    Assert.assertEquals(properties, table("tl").properties());
+  }
+
+  @Test
+  public void testRelocateTable() {
+    Assume.assumeFalse("HadoopCatalog does not support relocate table", 
isHadoopCatalog);
+
+    tEnv.executeSql("CREATE TABLE tl(id BIGINT)");
+    tEnv.executeSql("ALTER TABLE tl SET('location'='/tmp/location')");
+    Assert.assertEquals("/tmp/location", table("tl").location());
+  }
+
+  @Test
+  public void testSetCurrentAndCherryPickSnapshotId() {
+    tEnv.executeSql("CREATE TABLE tl(c1 INT, c2 STRING, c3 STRING) PARTITIONED 
BY (c1)");
+
+    Table table = table("tl");
+
+    DataFile fileA = DataFiles.builder(table.spec())
+        .withPath("/path/to/data-a.parquet")
+        .withFileSizeInBytes(10)
+        .withPartitionPath("c1=0") // easy way to set partition data for now
+        .withRecordCount(1)
+        .build();
+    DataFile fileB = DataFiles.builder(table.spec())
+        .withPath("/path/to/data-b.parquet")
+        .withFileSizeInBytes(10)
+        .withPartitionPath("c1=1") // easy way to set partition data for now
+        .withRecordCount(1)
+        .build();
+    DataFile replacementFile = DataFiles.builder(table.spec())
+        .withPath("/path/to/data-a-replacement.parquet")
+        .withFileSizeInBytes(10)
+        .withPartitionPath("c1=0") // easy way to set partition data for now
+        .withRecordCount(1)
+        .build();
+
+    table.newAppend()
+        .appendFile(fileA)
+        .commit();
+    long snapshotId = table.currentSnapshot().snapshotId();
+
+    // stage an overwrite that replaces FILE_A
+    table.newReplacePartitions()
+        .addFile(replacementFile)
+        .stageOnly()
+        .commit();
+
+    Snapshot staged = Iterables.getLast(table.snapshots());
+    Assert.assertEquals("Should find the staged overwrite snapshot", 
DataOperations.OVERWRITE, staged.operation());
+
+    // add another append so that the original commit can't be fast-forwarded
+    table.newAppend()
+        .appendFile(fileB)
+        .commit();
+
+    // test cherry pick
+    tEnv.executeSql(String.format("ALTER TABLE tl 
SET('cherry-pick-snapshot-id'='%s')", staged.snapshotId()));
+    validateTableFiles(table, fileB, replacementFile);
+
+    // test set current snapshot
+    tEnv.executeSql(String.format("ALTER TABLE tl 
SET('current-snapshot-id'='%s')", snapshotId));
+    validateTableFiles(table, fileA);
+  }
+
+  private void validateTableFiles(Table tbl, DataFile... expectedFiles) {
+    tbl.refresh();
+    Set<CharSequence> expectedFilePaths = 
Arrays.stream(expectedFiles).map(DataFile::path).collect(Collectors.toSet());
+    Set<CharSequence> actualFilePaths = 
StreamSupport.stream(tbl.newScan().planFiles().spliterator(), false)
+        .map(FileScanTask::file).map(ContentFile::path)
+        .collect(Collectors.toSet());
+    Assert.assertEquals("Files should match", expectedFilePaths, 
actualFilePaths);
+  }
+
+  private Table table(String name) {
+    return validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, 
name));
+  }
+
+  private CatalogTable catalogTable(String name) throws TableNotExistException 
{
+    return (CatalogTable) 
tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTable(new 
ObjectPath(DATABASE, name));
+  }
 }

Reply via email to