[ 
https://issues.apache.org/jira/browse/GOBBLIN-2163?focusedWorklogId=939865&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-939865
 ]

ASF GitHub Bot logged work on GOBBLIN-2163:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Oct/24 05:58
            Start Date: 24/Oct/24 05:58
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4064:
URL: https://github.com/apache/gobblin/pull/4064#discussion_r1814301437


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Validator for Iceberg table metadata, ensuring that the source and 
destination tables have
+ * compatible schemas and partition specifications.
+ */
+@Slf4j
+public class IcebergTableMetadataValidatorUtils {
+
+  private IcebergTableMetadataValidatorUtils() {
+    // Do not instantiate
+  }
+
+  /**
+   * Validates the metadata of the source and destination Iceberg tables.
+   *
+   * @param srcTableMetadata  the metadata of the source table
+   * @param destTableMetadata the metadata of the destination table
+   * @throws IOException if the schemas or partition specifications do not 
match
+   */
+  public static void validateSourceAndDestinationTablesMetadata(TableMetadata 
srcTableMetadata,

Review Comment:
   thoughts on the name and method semantics:
   
   a. it's uncommon for the purpose of a method to be to throw an exception.  
when it does, it's worth naming more explicitly, such as 
`failOnMetadataMismatch` or `throwIfTableMetadataMismatch`
   
   b. we're not validating anything about the metadata.  rather we're checking 
for equality/compatibility (you tell me which).  specifically BOTH could be 
invalid (but equal)
   
   c. why codify the role "source" and "destination" in this utility method?  
`tableA` and `tableB` are acceptable operands that don't presume too much about 
how this is to be used.
   
   taken together, I might name:
   ```
   failUnlessCompatibleStructure(TableMetadata tableA, TableMetadata tableB)
   ```



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Validator for Iceberg table metadata, ensuring that the source and 
destination tables have
+ * compatible schemas and partition specifications.
+ */
+@Slf4j
+public class IcebergTableMetadataValidatorUtils {
+
+  private IcebergTableMetadataValidatorUtils() {
+    // Do not instantiate
+  }
+
+  /**
+   * Validates the metadata of the source and destination Iceberg tables.
+   *
+   * @param srcTableMetadata  the metadata of the source table
+   * @param destTableMetadata the metadata of the destination table
+   * @throws IOException if the schemas or partition specifications do not 
match
+   */
+  public static void validateSourceAndDestinationTablesMetadata(TableMetadata 
srcTableMetadata,
+      TableMetadata destTableMetadata) throws IOException {
+    log.info("Starting validation of Source : {} and Destination : {} Iceberg 
Tables Metadata",
+        srcTableMetadata.location(),
+        destTableMetadata.location());
+    Schema srcTableSchema = srcTableMetadata.schema();
+    Schema destTableSchema = destTableMetadata.schema();
+    PartitionSpec srcPartitionSpec = srcTableMetadata.spec();
+    PartitionSpec destPartitionSpec = destTableMetadata.spec();
+    validateSchemaForEquality(srcTableSchema, destTableSchema);
+    validatePartitionSpecForEquality(srcPartitionSpec, destPartitionSpec);
+    log.info("Validation of Source : {} and Destination : {} Iceberg Tables 
Metadata completed successfully",
+        srcTableMetadata.location(),
+        destTableMetadata.location());
+  }
+
+  private static void validateSchemaForEquality(Schema srcTableSchema, Schema 
destTableSchema) throws IOException {
+    // TODO: Need to add support for schema evolution, currently only 
supporting copying
+    //  between iceberg tables with same schema.
+    //  This function needs to be broken down into multiple functions to 
support schema evolution
+    //  Possible cases - Src Schema == Dest Schema,
+    //  - Src Schema is subset of Dest Schema [ Destination Schema Evolved ],
+    //  - Src Schema is superset of Dest Schema [ Source Schema Evolved ],
+    //  - Other cases?
+    //  Also consider using Strategy or any other design pattern for this to 
make it a better solution
+    if (!srcTableSchema.sameSchema(destTableSchema)) {
+      String errMsg = String.format(
+          "Schema Mismatch between Source and Destination Iceberg Tables 
Schema - Source-Schema-Id : {%s} and "
+              + "Destination-Schema-Id : {%s}",
+          srcTableSchema.schemaId(),
+          destTableSchema.schemaId()
+      );
+      log.error(errMsg);
+      throw new IOException(errMsg);
+    }
+  }
+
+  private static void validatePartitionSpecForEquality(PartitionSpec 
srcPartitionSpec, PartitionSpec destPartitionSpec)
+      throws IOException {
+    // Currently, only supporting copying between iceberg tables with same 
partition spec
+    if (!srcPartitionSpec.compatibleWith(destPartitionSpec)) {

Review Comment:
   the method name says equality.  is that the same as `compatibleWith`?  it 
sounds more lenient than strict equality.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtils.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Validator for Iceberg table metadata, ensuring that the source and 
destination tables have
+ * compatible schemas and partition specifications.
+ */
+@Slf4j
+public class IcebergTableMetadataValidatorUtils {
+
+  private IcebergTableMetadataValidatorUtils() {
+    // Do not instantiate
+  }
+
+  /**
+   * Validates the metadata of the source and destination Iceberg tables.
+   *
+   * @param srcTableMetadata  the metadata of the source table
+   * @param destTableMetadata the metadata of the destination table
+   * @throws IOException if the schemas or partition specifications do not 
match
+   */
+  public static void validateSourceAndDestinationTablesMetadata(TableMetadata 
srcTableMetadata,
+      TableMetadata destTableMetadata) throws IOException {
+    log.info("Starting validation of Source : {} and Destination : {} Iceberg 
Tables Metadata",
+        srcTableMetadata.location(),
+        destTableMetadata.location());
+    Schema srcTableSchema = srcTableMetadata.schema();
+    Schema destTableSchema = destTableMetadata.schema();
+    PartitionSpec srcPartitionSpec = srcTableMetadata.spec();
+    PartitionSpec destPartitionSpec = destTableMetadata.spec();
+    validateSchemaForEquality(srcTableSchema, destTableSchema);
+    validatePartitionSpecForEquality(srcPartitionSpec, destPartitionSpec);
+    log.info("Validation of Source : {} and Destination : {} Iceberg Tables 
Metadata completed successfully",
+        srcTableMetadata.location(),
+        destTableMetadata.location());
+  }
+
+  private static void validateSchemaForEquality(Schema srcTableSchema, Schema 
destTableSchema) throws IOException {
+    // TODO: Need to add support for schema evolution, currently only 
supporting copying
+    //  between iceberg tables with same schema.
+    //  This function needs to be broken down into multiple functions to 
support schema evolution
+    //  Possible cases - Src Schema == Dest Schema,
+    //  - Src Schema is subset of Dest Schema [ Destination Schema Evolved ],
+    //  - Src Schema is superset of Dest Schema [ Source Schema Evolved ],
+    //  - Other cases?
+    //  Also consider using Strategy or any other design pattern for this to 
make it a better solution
+    if (!srcTableSchema.sameSchema(destTableSchema)) {
+      String errMsg = String.format(
+          "Schema Mismatch between Source and Destination Iceberg Tables 
Schema - Source-Schema-Id : {%s} and "
+              + "Destination-Schema-Id : {%s}",
+          srcTableSchema.schemaId(),
+          destTableSchema.schemaId()
+      );

Review Comment:
   for context, the `.location()` of each belongs in both of the two messages



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class IcebergTableMetadataValidatorUtilsTest {
+  private static final org.apache.iceberg.shaded.org.apache.avro.Schema 
avroDataSchema1 =
+      SchemaBuilder.record("schema1")
+          .fields()
+          .requiredString("field1")
+          .requiredString("field2")
+          .endRecord();
+  private static final org.apache.iceberg.shaded.org.apache.avro.Schema 
avroDataSchema2 =
+      SchemaBuilder.record("schema2")
+          .fields()
+          .requiredString("field2")
+          .requiredString("field1")
+          .endRecord();
+  private static final org.apache.iceberg.shaded.org.apache.avro.Schema 
avroDataSchema3 =
+      SchemaBuilder.record("schema3")
+          .fields()
+          .requiredString("field1")
+          .requiredString("field2")
+          .requiredInt("field3")
+          .endRecord();
+  private static final org.apache.iceberg.shaded.org.apache.avro.Schema 
avroDataSchema4 =
+      SchemaBuilder.record("schema4")
+          .fields()
+          .requiredInt("field1")
+          .requiredString("field2")
+          .requiredInt("field3")
+          .endRecord();

Review Comment:
   naming-wise, the numbering is hard to recall what's what, so can we capture 
the intent/purpose of each?
   ```
   schema1 = ...
   schema2SameAsSchema1 = ...
   schema3ExtendsSchema1 = ...
   schema4IncompatWithOthers = ...
   ```



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableMetadataValidatorUtilsTest.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class IcebergTableMetadataValidatorUtilsTest {
+  private static final org.apache.iceberg.shaded.org.apache.avro.Schema 
avroDataSchema1 =
+      SchemaBuilder.record("schema1")
+          .fields()
+          .requiredString("field1")
+          .requiredString("field2")
+          .endRecord();
+  private static final org.apache.iceberg.shaded.org.apache.avro.Schema 
avroDataSchema2 =
+      SchemaBuilder.record("schema2")
+          .fields()
+          .requiredString("field2")
+          .requiredString("field1")
+          .endRecord();
+  private static final org.apache.iceberg.shaded.org.apache.avro.Schema 
avroDataSchema3 =
+      SchemaBuilder.record("schema3")
+          .fields()
+          .requiredString("field1")
+          .requiredString("field2")
+          .requiredInt("field3")
+          .endRecord();
+  private static final org.apache.iceberg.shaded.org.apache.avro.Schema 
avroDataSchema4 =
+      SchemaBuilder.record("schema4")
+          .fields()
+          .requiredInt("field1")
+          .requiredString("field2")
+          .requiredInt("field3")
+          .endRecord();
+  private static final PartitionSpec unpartitionedPartitionSpec = 
PartitionSpec.unpartitioned();
+  private static final Schema schema1 = 
AvroSchemaUtil.toIceberg(avroDataSchema1);
+  private static final Schema schema2IsNotSchema1Compat = 
AvroSchemaUtil.toIceberg(avroDataSchema2);
+  private static final Schema schema3 = 
AvroSchemaUtil.toIceberg(avroDataSchema3);
+  private static final Schema schema4IsNotSchema3Compat = 
AvroSchemaUtil.toIceberg(avroDataSchema4);
+  private static final PartitionSpec partitionSpec1 = 
PartitionSpec.builderFor(schema1)

Review Comment:
   actually you did it here 

Issue Time Tracking
-------------------

            Worklog Id:     (was: 939865)
    Remaining Estimate: 0h
            Time Spent: 10m

> Add IcebergTable Metadata Validator 
> ------------------------------------
>
>                 Key: GOBBLIN-2163
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2163
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Vivek Rai
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Add a new class IcebergTableMetadataValidator which should validates table 
> metadata like schema partition spec between two iceberg tables



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to