This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d4dee8e Flink: Support CREATE and ALTER TABLE in Flink SQL (#1393)
d4dee8e is described below
commit d4dee8e667368945d00ad6f0051e2f492c9d59a5
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Sep 3 06:18:17 2020 +0800
Flink: Support CREATE and ALTER TABLE in Flink SQL (#1393)
---
.../org/apache/iceberg/flink/FlinkCatalog.java | 200 +++++++++++++++++++--
.../iceberg/flink/TestFlinkCatalogTable.java | 187 ++++++++++++++++++-
2 files changed, 371 insertions(+), 16 deletions(-)
diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 4182964..e37db84 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.table.api.TableSchema;
@@ -34,6 +35,7 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -49,7 +51,12 @@ import org.apache.flink.table.expressions.Expression;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -57,6 +64,9 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -277,15 +287,14 @@ public class FlinkCatalog extends AbstractCatalog {
}
@Override
- public CatalogBaseTable getTable(ObjectPath tablePath) throws
TableNotExistException, CatalogException {
- try {
- Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
- TableSchema tableSchema =
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
+ public CatalogTable getTable(ObjectPath tablePath) throws
TableNotExistException, CatalogException {
+ Table table = loadIcebergTable(tablePath);
+ return toCatalogTable(table);
+ }
- // NOTE: We can not create a IcebergCatalogTable, because Flink
optimizer may use CatalogTableImpl to copy a new
- // catalog table.
- // Let's re-loading table from Iceberg catalog when creating source/sink
operators.
- return new CatalogTableImpl(tableSchema, table.properties(), null);
+ private Table loadIcebergTable(ObjectPath tablePath) throws
TableNotExistException {
+ try {
+ return icebergCatalog.loadTable(toIdentifier(tablePath));
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new TableNotExistException(getName(), tablePath, e);
}
@@ -320,19 +329,180 @@ public class FlinkCatalog extends AbstractCatalog {
}
}
- /**
- * TODO Add partitioning to the Flink DDL parser.
- */
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support createTable now.");
+ throws CatalogException, TableAlreadyExistException {
+ validateFlinkTable(table);
+
+ Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
+ PartitionSpec spec = toPartitionSpec(((CatalogTable)
table).getPartitionKeys(), icebergSchema);
+
+ ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+ String location = null;
+ for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
+ if ("location".equalsIgnoreCase(entry.getKey())) {
+ location = entry.getValue();
+ } else {
+ properties.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ try {
+ icebergCatalog.createTable(
+ toIdentifier(tablePath),
+ icebergSchema,
+ spec,
+ location,
+ properties.build());
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistException(getName(), tablePath, e);
+ }
}
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable,
boolean ignoreIfNotExists)
- throws CatalogException {
- throw new UnsupportedOperationException("Not support alterTable now.");
+ throws CatalogException, TableNotExistException {
+ validateFlinkTable(newTable);
+ Table icebergTable = loadIcebergTable(tablePath);
+ CatalogTable table = toCatalogTable(icebergTable);
+
+ // Currently, Flink SQL only support altering table properties.
+
+ // For current Flink Catalog API, support for adding/removing/renaming
columns cannot be done by comparing
+ // CatalogTable instances, unless the Flink schema contains Iceberg column
IDs.
+ if (!table.getSchema().equals(newTable.getSchema())) {
+ throw new UnsupportedOperationException("Altering schema is not
supported yet.");
+ }
+
+ if (!table.getPartitionKeys().equals(((CatalogTable)
newTable).getPartitionKeys())) {
+ throw new UnsupportedOperationException("Altering partition keys is not
supported yet.");
+ }
+
+ Map<String, String> oldOptions = table.getOptions();
+ Map<String, String> setProperties = Maps.newHashMap();
+
+ String setLocation = null;
+ String setSnapshotId = null;
+ String pickSnapshotId = null;
+
+ for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ if (Objects.equals(value, oldOptions.get(key))) {
+ continue;
+ }
+
+ if ("location".equalsIgnoreCase(key)) {
+ setLocation = value;
+ } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
+ setSnapshotId = value;
+ } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
+ pickSnapshotId = value;
+ } else {
+ setProperties.put(key, value);
+ }
+ }
+
+ oldOptions.keySet().forEach(k -> {
+ if (!newTable.getOptions().containsKey(k)) {
+ setProperties.put(k, null);
+ }
+ });
+
+ commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId,
setProperties);
+ }
+
+ private static void validateFlinkTable(CatalogBaseTable table) {
+ Preconditions.checkArgument(table instanceof CatalogTable, "The Table
should be a CatalogTable.");
+
+ TableSchema schema = table.getSchema();
+ schema.getTableColumns().forEach(column -> {
+ if (column.isGenerated()) {
+ throw new UnsupportedOperationException("Creating table with computed
columns is not supported yet.");
+ }
+ });
+
+ if (!schema.getWatermarkSpecs().isEmpty()) {
+ throw new UnsupportedOperationException("Creating table with watermark
specs is not supported yet.");
+ }
+
+ if (schema.getPrimaryKey().isPresent()) {
+ throw new UnsupportedOperationException("Creating table with primary key
is not supported yet.");
+ }
+ }
+
+ private static PartitionSpec toPartitionSpec(List<String> partitionKeys,
Schema icebergSchema) {
+ PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema);
+ partitionKeys.forEach(builder::identity);
+ return builder.build();
+ }
+
+ private static List<String> toPartitionKeys(PartitionSpec spec, Schema
icebergSchema) {
+ List<String> partitionKeys = Lists.newArrayList();
+ for (PartitionField field : spec.fields()) {
+ if (field.transform().isIdentity()) {
+ partitionKeys.add(icebergSchema.findColumnName(field.sourceId()));
+ } else {
+ // Not created by Flink SQL.
+ // For compatibility with iceberg tables, return empty.
+ // TODO modify this after Flink support partition transform.
+ return Collections.emptyList();
+ }
+ }
+ return partitionKeys;
+ }
+
+ private static void commitChanges(Table table, String setLocation, String
setSnapshotId,
+ String pickSnapshotId, Map<String, String>
setProperties) {
+ // don't allow setting the snapshot and picking a commit at the same time
because order is ambiguous and choosing
+ // one order leads to different results
+ Preconditions.checkArgument(setSnapshotId == null || pickSnapshotId ==
null,
+ "Cannot set the current snapshot ID and cherry-pick snapshot changes");
+
+ if (setSnapshotId != null) {
+ long newSnapshotId = Long.parseLong(setSnapshotId);
+ table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
+ }
+
+ // if updating the table snapshot, perform that update first in case it
fails
+ if (pickSnapshotId != null) {
+ long newSnapshotId = Long.parseLong(pickSnapshotId);
+ table.manageSnapshots().cherrypick(newSnapshotId).commit();
+ }
+
+ Transaction transaction = table.newTransaction();
+
+ if (setLocation != null) {
+ transaction.updateLocation()
+ .setLocation(setLocation)
+ .commit();
+ }
+
+ if (!setProperties.isEmpty()) {
+ UpdateProperties updateProperties = transaction.updateProperties();
+ setProperties.forEach((k, v) -> {
+ if (v == null) {
+ updateProperties.remove(k);
+ } else {
+ updateProperties.set(k, v);
+ }
+ });
+ updateProperties.commit();
+ }
+
+ transaction.commitTransaction();
+ }
+
+ static CatalogTable toCatalogTable(Table table) {
+ TableSchema schema =
FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema()));
+ List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
+
+ // NOTE: We can not create a IcebergCatalogTable extends CatalogTable,
because Flink optimizer may use
+ // CatalogTableImpl to copy a new catalog table.
+ // Let's re-loading table from Iceberg catalog when creating source/sink
operators.
+ // Iceberg does not have Table comment, so pass a null (Default comment
value in Flink).
+ return new CatalogTableImpl(schema, partitionKeys, table.properties(),
null);
}
// ------------------------------ Unsupported methods
---------------------------------------------
diff --git
a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 2c77a66..ef8d72a 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -21,13 +21,30 @@ package org.apache.iceberg.flink;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.Assert;
@@ -82,10 +99,178 @@ public class TestFlinkCatalogTable extends
FlinkCatalogTestBase {
"Should fail if trying to get a nonexistent table",
ValidationException.class,
"Table `tl` was not found.",
- () -> tEnv.from("tl")
+ () -> tEnv.from("tl")
);
Assert.assertEquals(
Collections.singletonList(TableColumn.of("id", DataTypes.BIGINT())),
tEnv.from("tl2").getSchema().getTableColumns());
}
+
+ @Test
+ public void testCreateTable() throws TableNotExistException {
+ tEnv.executeSql("CREATE TABLE tl(id BIGINT)");
+
+ Table table = table("tl");
+ Assert.assertEquals(
+ new Schema(Types.NestedField.optional(1, "id",
Types.LongType.get())).asStruct(),
+ table.schema().asStruct());
+ Assert.assertEquals(Maps.newHashMap(), table.properties());
+
+ CatalogTable catalogTable = catalogTable("tl");
+ Assert.assertEquals(TableSchema.builder().field("id",
DataTypes.BIGINT()).build(), catalogTable.getSchema());
+ Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
+ }
+
+ @Test
+ public void testCreateTableLocation() {
+ Assume.assumeFalse("HadoopCatalog does not support creating table with
location", isHadoopCatalog);
+
+ tEnv.executeSql("CREATE TABLE tl(id BIGINT) WITH
('location'='/tmp/location')");
+
+ Table table = table("tl");
+ Assert.assertEquals(
+ new Schema(Types.NestedField.optional(1, "id",
Types.LongType.get())).asStruct(),
+ table.schema().asStruct());
+ Assert.assertEquals("/tmp/location", table.location());
+ Assert.assertEquals(Maps.newHashMap(), table.properties());
+ }
+
+ @Test
+ public void testCreatePartitionTable() throws TableNotExistException {
+ tEnv.executeSql("CREATE TABLE tl(id BIGINT, dt STRING) PARTITIONED
BY(dt)");
+
+ Table table = table("tl");
+ Assert.assertEquals(
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "dt",
Types.StringType.get())).asStruct(),
+ table.schema().asStruct());
+
Assert.assertEquals(PartitionSpec.builderFor(table.schema()).identity("dt").build(),
table.spec());
+ Assert.assertEquals(Maps.newHashMap(), table.properties());
+
+ CatalogTable catalogTable = catalogTable("tl");
+ Assert.assertEquals(
+ TableSchema.builder().field("id", DataTypes.BIGINT()).field("dt",
DataTypes.STRING()).build(),
+ catalogTable.getSchema());
+ Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
+ Assert.assertEquals(Collections.singletonList("dt"),
catalogTable.getPartitionKeys());
+ }
+
+ @Test
+ public void testLoadTransformPartitionTable() throws TableNotExistException {
+ Schema schema = new Schema(Types.NestedField.optional(0, "id",
Types.LongType.get()));
+ validationCatalog.createTable(
+ TableIdentifier.of(icebergNamespace, "tl"), schema,
+ PartitionSpec.builderFor(schema).bucket("id", 100).build());
+
+ CatalogTable catalogTable = catalogTable("tl");
+ Assert.assertEquals(
+ TableSchema.builder().field("id", DataTypes.BIGINT()).build(),
+ catalogTable.getSchema());
+ Assert.assertEquals(Maps.newHashMap(), catalogTable.getOptions());
+ Assert.assertEquals(Collections.emptyList(),
catalogTable.getPartitionKeys());
+ }
+
+ @Test
+ public void testAlterTable() throws TableNotExistException {
+ tEnv.executeSql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')");
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("oldK", "oldV");
+
+ // new
+ tEnv.executeSql("ALTER TABLE tl SET('newK'='newV')");
+ properties.put("newK", "newV");
+ Assert.assertEquals(properties, table("tl").properties());
+
+ // update old
+ tEnv.executeSql("ALTER TABLE tl SET('oldK'='oldV2')");
+ properties.put("oldK", "oldV2");
+ Assert.assertEquals(properties, table("tl").properties());
+
+ // remove property
+ CatalogTable catalogTable = catalogTable("tl");
+ properties.remove("oldK");
+ tEnv.getCatalog(tEnv.getCurrentCatalog()).get().alterTable(
+ new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false);
+ Assert.assertEquals(properties, table("tl").properties());
+ }
+
+ @Test
+ public void testRelocateTable() {
+ Assume.assumeFalse("HadoopCatalog does not support relocate table",
isHadoopCatalog);
+
+ tEnv.executeSql("CREATE TABLE tl(id BIGINT)");
+ tEnv.executeSql("ALTER TABLE tl SET('location'='/tmp/location')");
+ Assert.assertEquals("/tmp/location", table("tl").location());
+ }
+
+ @Test
+ public void testSetCurrentAndCherryPickSnapshotId() {
+ tEnv.executeSql("CREATE TABLE tl(c1 INT, c2 STRING, c3 STRING) PARTITIONED
BY (c1)");
+
+ Table table = table("tl");
+
+ DataFile fileA = DataFiles.builder(table.spec())
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=0") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ DataFile fileB = DataFiles.builder(table.spec())
+ .withPath("/path/to/data-b.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=1") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+ DataFile replacementFile = DataFiles.builder(table.spec())
+ .withPath("/path/to/data-a-replacement.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("c1=0") // easy way to set partition data for now
+ .withRecordCount(1)
+ .build();
+
+ table.newAppend()
+ .appendFile(fileA)
+ .commit();
+ long snapshotId = table.currentSnapshot().snapshotId();
+
+ // stage an overwrite that replaces FILE_A
+ table.newReplacePartitions()
+ .addFile(replacementFile)
+ .stageOnly()
+ .commit();
+
+ Snapshot staged = Iterables.getLast(table.snapshots());
+ Assert.assertEquals("Should find the staged overwrite snapshot",
DataOperations.OVERWRITE, staged.operation());
+
+ // add another append so that the original commit can't be fast-forwarded
+ table.newAppend()
+ .appendFile(fileB)
+ .commit();
+
+ // test cherry pick
+ tEnv.executeSql(String.format("ALTER TABLE tl
SET('cherry-pick-snapshot-id'='%s')", staged.snapshotId()));
+ validateTableFiles(table, fileB, replacementFile);
+
+ // test set current snapshot
+ tEnv.executeSql(String.format("ALTER TABLE tl
SET('current-snapshot-id'='%s')", snapshotId));
+ validateTableFiles(table, fileA);
+ }
+
+ private void validateTableFiles(Table tbl, DataFile... expectedFiles) {
+ tbl.refresh();
+ Set<CharSequence> expectedFilePaths =
Arrays.stream(expectedFiles).map(DataFile::path).collect(Collectors.toSet());
+ Set<CharSequence> actualFilePaths =
StreamSupport.stream(tbl.newScan().planFiles().spliterator(), false)
+ .map(FileScanTask::file).map(ContentFile::path)
+ .collect(Collectors.toSet());
+ Assert.assertEquals("Files should match", expectedFilePaths,
actualFilePaths);
+ }
+
+ private Table table(String name) {
+ return validationCatalog.loadTable(TableIdentifier.of(icebergNamespace,
name));
+ }
+
+ private CatalogTable catalogTable(String name) throws TableNotExistException
{
+ return (CatalogTable)
tEnv.getCatalog(tEnv.getCurrentCatalog()).get().getTable(new
ObjectPath(DATABASE, name));
+ }
}