[
https://issues.apache.org/jira/browse/GOBBLIN-2163?focusedWorklogId=939865&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-939865
]
ASF GitHub Bot logged work on GOBBLIN-2163:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 24/Oct/24 05:58
Start Date: 24/Oct/24 05:58
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4064:
URL: https://github.com/apache/gobblin/pull/4064#discussion_r1814301437
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Validator for Iceberg table metadata, ensuring that the source and
destination tables have
+ * compatible schemas and partition specifications.
+ */
+@Slf4j
+public class IcebergTableMetadataValidatorUtils {
+
+ private IcebergTableMetadataValidatorUtils() {
+ // Do not instantiate
+ }
+
+ /**
+ * Validates the metadata of the source and destination Iceberg tables.
+ *
+ * @param srcTableMetadata the metadata of the source table
+ * @param destTableMetadata the metadata of the destination table
+ * @throws IOException if the schemas or partition specifications do not
match
+ */
+ public static void validateSourceAndDestinationTablesMetadata(TableMetadata
srcTableMetadata,
Review Comment:
thoughts on the name and method semantics:
a. it's uncommon for the purpose of a method to be to throw an exception.
when it does, it's worth naming more explicitly, such as
`failOnMetadataMismatch` or `throwIfTableMetadataMismatch`
b. we're not validating anything about the metadata. rather we're checking
for equality/compatibility (you tell me which). specifically BOTH could be
invalid (but equal)
c. why codify the role "source" and "destination" in this utility method?
`tableA` and `tableB` are acceptable operands that don't presume too much about
how this is to be used.
taken together, I might name:
```
failUnlessCompatibleStructure(TableMetadata tableA, TableMetadata tableB)
```
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Validator for Iceberg table metadata, ensuring that the source and
destination tables have
+ * compatible schemas and partition specifications.
+ */
+@Slf4j
+public class IcebergTableMetadataValidatorUtils {
+
+ private IcebergTableMetadataValidatorUtils() {
+ // Do not instantiate
+ }
+
+ /**
+ * Validates the metadata of the source and destination Iceberg tables.
+ *
+ * @param srcTableMetadata the metadata of the source table
+ * @param destTableMetadata the metadata of the destination table
+ * @throws IOException if the schemas or partition specifications do not
match
+ */
+ public static void validateSourceAndDestinationTablesMetadata(TableMetadata
srcTableMetadata,
+ TableMetadata destTableMetadata) throws IOException {
+ log.info("Starting validation of Source : {} and Destination : {} Iceberg
Tables Metadata",
+ srcTableMetadata.location(),
+ destTableMetadata.location());
+ Schema srcTableSchema = srcTableMetadata.schema();
+ Schema destTableSchema = destTableMetadata.schema();
+ PartitionSpec srcPartitionSpec = srcTableMetadata.spec();
+ PartitionSpec destPartitionSpec = destTableMetadata.spec();
+ validateSchemaForEquality(srcTableSchema, destTableSchema);
+ validatePartitionSpecForEquality(srcPartitionSpec, destPartitionSpec);
+ log.info("Validation of Source : {} and Destination : {} Iceberg Tables
Metadata completed successfully",
+ srcTableMetadata.location(),
+ destTableMetadata.location());
+ }
+
+ private static void validateSchemaForEquality(Schema srcTableSchema, Schema
destTableSchema) throws IOException {
+ // TODO: Need to add support for schema evolution, currently only
supporting copying
+ // between iceberg tables with same schema.
+ // This function needs to be broken down into multiple functions to
support schema evolution
+ // Possible cases - Src Schema == Dest Schema,
+ // - Src Schema is subset of Dest Schema [ Destination Schema Evolved ],
+ // - Src Schema is superset of Dest Schema [ Source Schema Evolved ],
+ // - Other cases?
+ // Also consider using Strategy or any other design pattern for this to
make it a better solution
+ if (!srcTableSchema.sameSchema(destTableSchema)) {
+ String errMsg = String.format(
+ "Schema Mismatch between Source and Destination Iceberg Tables
Schema - Source-Schema-Id : {%s} and "
+ + "Destination-Schema-Id : {%s}",
+ srcTableSchema.schemaId(),
+ destTableSchema.schemaId()
+ );
+ log.error(errMsg);
+ throw new IOException(errMsg);
+ }
+ }
+
+ private static void validatePartitionSpecForEquality(PartitionSpec
srcPartitionSpec, PartitionSpec destPartitionSpec)
+ throws IOException {
+ // Currently, only supporting copying between iceberg tables with same
partition spec
+ if (!srcPartitionSpec.compatibleWith(destPartitionSpec)) {
Review Comment:
the method name says equality. is that the same as `compatibleWith`? it
sounds more lenient than strict equality.
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Validator for Iceberg table metadata, ensuring that the source and
destination tables have
+ * compatible schemas and partition specifications.
+ */
+@Slf4j
+public class IcebergTableMetadataValidatorUtils {
+
+ private IcebergTableMetadataValidatorUtils() {
+ // Do not instantiate
+ }
+
+ /**
+ * Validates the metadata of the source and destination Iceberg tables.
+ *
+ * @param srcTableMetadata the metadata of the source table
+ * @param destTableMetadata the metadata of the destination table
+ * @throws IOException if the schemas or partition specifications do not
match
+ */
+ public static void validateSourceAndDestinationTablesMetadata(TableMetadata
srcTableMetadata,
+ TableMetadata destTableMetadata) throws IOException {
+ log.info("Starting validation of Source : {} and Destination : {} Iceberg
Tables Metadata",
+ srcTableMetadata.location(),
+ destTableMetadata.location());
+ Schema srcTableSchema = srcTableMetadata.schema();
+ Schema destTableSchema = destTableMetadata.schema();
+ PartitionSpec srcPartitionSpec = srcTableMetadata.spec();
+ PartitionSpec destPartitionSpec = destTableMetadata.spec();
+ validateSchemaForEquality(srcTableSchema, destTableSchema);
+ validatePartitionSpecForEquality(srcPartitionSpec, destPartitionSpec);
+ log.info("Validation of Source : {} and Destination : {} Iceberg Tables
Metadata completed successfully",
+ srcTableMetadata.location(),
+ destTableMetadata.location());
+ }
+
+ private static void validateSchemaForEquality(Schema srcTableSchema, Schema
destTableSchema) throws IOException {
+ // TODO: Need to add support for schema evolution, currently only
supporting copying
+ // between iceberg tables with same schema.
+ // This function needs to be broken down into multiple functions to
support schema evolution
+ // Possible cases - Src Schema == Dest Schema,
+ // - Src Schema is subset of Dest Schema [ Destination Schema Evolved ],
+ // - Src Schema is superset of Dest Schema [ Source Schema Evolved ],
+ // - Other cases?
+ // Also consider using Strategy or any other design pattern for this to
make it a better solution
+ if (!srcTableSchema.sameSchema(destTableSchema)) {
+ String errMsg = String.format(
+ "Schema Mismatch between Source and Destination Iceberg Tables
Schema - Source-Schema-Id : {%s} and "
+ + "Destination-Schema-Id : {%s}",
+ srcTableSchema.schemaId(),
+ destTableSchema.schemaId()
+ );
Review Comment:
for context, the `.location()` of each belongs in both of the two messages
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class IcebergTableMetadataValidatorUtilsTest {
+ private static final org.apache.iceberg.shaded.org.apache.avro.Schema
avroDataSchema1 =
+ SchemaBuilder.record("schema1")
+ .fields()
+ .requiredString("field1")
+ .requiredString("field2")
+ .endRecord();
+ private static final org.apache.iceberg.shaded.org.apache.avro.Schema
avroDataSchema2 =
+ SchemaBuilder.record("schema2")
+ .fields()
+ .requiredString("field2")
+ .requiredString("field1")
+ .endRecord();
+ private static final org.apache.iceberg.shaded.org.apache.avro.Schema
avroDataSchema3 =
+ SchemaBuilder.record("schema3")
+ .fields()
+ .requiredString("field1")
+ .requiredString("field2")
+ .requiredInt("field3")
+ .endRecord();
+ private static final org.apache.iceberg.shaded.org.apache.avro.Schema
avroDataSchema4 =
+ SchemaBuilder.record("schema4")
+ .fields()
+ .requiredInt("field1")
+ .requiredString("field2")
+ .requiredInt("field3")
+ .endRecord();
Review Comment:
naming-wise, the numbering is hard to recall what's what, so can we capture
the intent/purpose of each?
```
schema1 = ...
schema2SameAsSchema1 = ...
schema3ExtendsSchema1 = ...
schema4IncompatWithOthers = ...
```
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class IcebergTableMetadataValidatorUtilsTest {
+ private static final org.apache.iceberg.shaded.org.apache.avro.Schema
avroDataSchema1 =
+ SchemaBuilder.record("schema1")
+ .fields()
+ .requiredString("field1")
+ .requiredString("field2")
+ .endRecord();
+ private static final org.apache.iceberg.shaded.org.apache.avro.Schema
avroDataSchema2 =
+ SchemaBuilder.record("schema2")
+ .fields()
+ .requiredString("field2")
+ .requiredString("field1")
+ .endRecord();
+ private static final org.apache.iceberg.shaded.org.apache.avro.Schema
avroDataSchema3 =
+ SchemaBuilder.record("schema3")
+ .fields()
+ .requiredString("field1")
+ .requiredString("field2")
+ .requiredInt("field3")
+ .endRecord();
+ private static final org.apache.iceberg.shaded.org.apache.avro.Schema
avroDataSchema4 =
+ SchemaBuilder.record("schema4")
+ .fields()
+ .requiredInt("field1")
+ .requiredString("field2")
+ .requiredInt("field3")
+ .endRecord();
+ private static final PartitionSpec unpartitionedPartitionSpec =
PartitionSpec.unpartitioned();
+ private static final Schema schema1 =
AvroSchemaUtil.toIceberg(avroDataSchema1);
+ private static final Schema schema2IsNotSchema1Compat =
AvroSchemaUtil.toIceberg(avroDataSchema2);
+ private static final Schema schema3 =
AvroSchemaUtil.toIceberg(avroDataSchema3);
+ private static final Schema schema4IsNotSchema3Compat =
AvroSchemaUtil.toIceberg(avroDataSchema4);
+ private static final PartitionSpec partitionSpec1 =
PartitionSpec.builderFor(schema1)
Review Comment:
actually you did it here
Issue Time Tracking
-------------------
Worklog Id: (was: 939865)
Remaining Estimate: 0h
Time Spent: 10m
> Add IcebergTable Metadata Validator
> ------------------------------------
>
> Key: GOBBLIN-2163
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2163
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Vivek Rai
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Add a new class IcebergTableMetadataValidator which should validates table
> metadata like schema partition spec between two iceberg tables
--
This message was sent by Atlassian Jira
(v8.20.10#820010)