This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cb874fd533 [HUDI-8616] DataHub meta sync improvements (#12456)
3cb874fd533 is described below

commit 3cb874fd53390b4465a8a8b9acd615b1b7f362cf
Author: Sergio Gómez Villamor <[email protected]>
AuthorDate: Wed Dec 18 05:17:14 2024 +0100

    [HUDI-8616] DataHub meta sync improvements (#12456)
---
 hudi-sync/hudi-datahub-sync/pom.xml                |  20 +-
 ...ponseLogger.java => DataHubResponseLogger.java} |   6 +-
 .../hudi/sync/datahub/DataHubSyncClient.java       | 320 +++++++++++++++------
 .../apache/hudi/sync/datahub/DataHubSyncTool.java  |  72 ++++-
 .../hudi/sync/datahub/DataHubTableProperties.java  | 134 +++++++++
 .../sync/datahub/config/DataHubSyncConfig.java     |  76 ++++-
 .../config/HoodieDataHubDatasetIdentifier.java     |  43 ++-
 .../hudi/sync/datahub/util/SchemaFieldsUtil.java   |  56 ++++
 .../hudi/sync/datahub/TestDataHubSyncClient.java   | 142 ++++++++-
 .../sync/datahub/config/TestDataHubSyncConfig.java |   2 +-
 .../config/TestHoodieDataHubDatasetIdentifier.java | 147 ++++++++++
 packaging/hudi-datahub-sync-bundle/pom.xml         |   2 +-
 12 files changed, 891 insertions(+), 129 deletions(-)

diff --git a/hudi-sync/hudi-datahub-sync/pom.xml 
b/hudi-sync/hudi-datahub-sync/pom.xml
index a03c12af80c..733d1090d98 100644
--- a/hudi-sync/hudi-datahub-sync/pom.xml
+++ b/hudi-sync/hudi-datahub-sync/pom.xml
@@ -34,14 +34,14 @@
   <packaging>jar</packaging>
 
   <properties>
-    <datahub.version>0.8.45</datahub.version>
+    <datahub.version>0.15.0rc20</datahub.version>
     <httpasync.version>4.1.5</httpasync.version>
   </properties>
 
   <dependencies>
     <dependency>
       <groupId>io.acryl</groupId>
-      <artifactId>datahub-client</artifactId>
+      <artifactId>datahub-client-java8</artifactId>
       <version>${datahub.version}</version>
     </dependency>
 
@@ -85,6 +85,14 @@
       <version>${project.version}</version>
     </dependency>
 
+    <!-- Hadoop to extract Properties -->
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-hadoop-mr</artifactId>
+      <version>${project.version}</version>
+      <scope>compile</scope>
+    </dependency>
+
     <!-- Testing -->
     <dependency>
       <groupId>org.apache.hudi</groupId>
@@ -113,6 +121,14 @@
       <artifactId>kryo-shaded</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-hive-sync</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java
 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubResponseLogger.java
similarity index 91%
rename from 
hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java
rename to 
hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubResponseLogger.java
index d7d6945f515..277cffd2210 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubResponseLogger.java
@@ -27,12 +27,12 @@ import org.slf4j.LoggerFactory;
 /**
  * Handle responses to requests to Datahub Metastore. Just logs them.
  */
-public class DatahubResponseLogger implements Callback {
-  private static final Logger LOG = 
LoggerFactory.getLogger(DatahubResponseLogger.class);
+public class DataHubResponseLogger implements Callback {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataHubResponseLogger.class);
 
   @Override
   public void onCompletion(MetadataWriteResponse response) {
-    LOG.info("Completed Datahub RestEmitter request. "
+    LOG.info("Completed DataHub RestEmitter request. "
             + "Status: " + (response.isSuccess() ? " succeeded" : " failed"));
     if (!response.isSuccess()) {
       LOG.error("Request failed. " + response);
diff --git 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
index ac3f474d676..44ce04f59e6 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
@@ -21,54 +21,72 @@ package org.apache.hudi.sync.datahub;
 
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.hive.SchemaDifference;
 import org.apache.hudi.sync.common.HoodieSyncClient;
 import org.apache.hudi.sync.common.HoodieSyncException;
 import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
+import org.apache.hudi.sync.datahub.config.HoodieDataHubDatasetIdentifier;
+import org.apache.hudi.sync.datahub.util.SchemaFieldsUtil;
 
+import com.linkedin.common.BrowsePathEntry;
+import com.linkedin.common.BrowsePathEntryArray;
+import com.linkedin.common.BrowsePathsV2;
 import com.linkedin.common.Status;
+import com.linkedin.common.SubTypes;
+import com.linkedin.common.UrnArray;
 import com.linkedin.common.urn.DatasetUrn;
-import com.linkedin.data.template.SetMode;
-import com.linkedin.data.template.StringMap;
-import com.linkedin.dataset.DatasetProperties;
-import com.linkedin.schema.ArrayType;
-import com.linkedin.schema.BooleanType;
-import com.linkedin.schema.BytesType;
-import com.linkedin.schema.EnumType;
-import com.linkedin.schema.FixedType;
-import com.linkedin.schema.MapType;
-import com.linkedin.schema.NullType;
-import com.linkedin.schema.NumberType;
-import com.linkedin.schema.OtherSchema;
-import com.linkedin.schema.RecordType;
-import com.linkedin.schema.SchemaField;
-import com.linkedin.schema.SchemaFieldArray;
-import com.linkedin.schema.SchemaFieldDataType;
-import com.linkedin.schema.SchemaMetadata;
-import com.linkedin.schema.StringType;
-import com.linkedin.schema.UnionType;
+import com.linkedin.common.urn.Urn;
+import com.linkedin.container.Container;
+import com.linkedin.container.ContainerProperties;
+import com.linkedin.data.template.StringArray;
+import com.linkedin.domain.Domains;
+import 
com.linkedin.metadata.aspect.patch.builder.DatasetPropertiesPatchBuilder;
+import com.linkedin.mxe.MetadataChangeProposal;
+import datahub.client.MetadataWriteResponse;
 import datahub.client.rest.RestEmitter;
 import datahub.event.MetadataChangeProposalWrapper;
-import org.apache.avro.AvroTypeException;
+import io.datahubproject.schematron.converters.avro.AvroSchemaConverter;
 import org.apache.avro.Schema;
 import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class DataHubSyncClient extends HoodieSyncClient {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataHubSyncClient.class);
+
   protected final DataHubSyncConfig config;
   private final DatasetUrn datasetUrn;
+  private final Urn databaseUrn;
+  private final String tableName;
+  private final String databaseName;
   private static final Status SOFT_DELETE_FALSE = new 
Status().setRemoved(false);
 
   public DataHubSyncClient(DataHubSyncConfig config, HoodieTableMetaClient 
metaClient) {
     super(config, metaClient);
     this.config = config;
-    this.datasetUrn = config.datasetIdentifier.getDatasetUrn();
+    HoodieDataHubDatasetIdentifier datasetIdentifier =
+            config.getDatasetIdentifier();
+    this.datasetUrn = datasetIdentifier.getDatasetUrn();
+    this.databaseUrn = datasetIdentifier.getDatabaseUrn();
+    this.tableName = datasetIdentifier.getTableName();
+    this.databaseName = datasetIdentifier.getDatabaseName();
   }
 
   @Override
@@ -76,45 +94,171 @@ public class DataHubSyncClient extends HoodieSyncClient {
     throw new UnsupportedOperationException("Not supported: 
`getLastCommitTimeSynced`");
   }
 
+  protected Option<String> getLastCommitTime() {
+    return getActiveTimeline().lastInstant().map(HoodieInstant::requestedTime);
+  }
+
+  protected Option<String> getLastCommitCompletionTime() {
+    int countInstants = getActiveTimeline().countInstants();
+    return getActiveTimeline()
+        .getInstantsOrderedByCompletionTime()
+        .skip(countInstants - 1)
+        .findFirst()
+        .map(HoodieInstant::getCompletionTime)
+        .map(Option::of).orElseGet(Option::empty);
+  }
+
   @Override
   public void updateLastCommitTimeSynced(String tableName) {
-    updateTableProperties(tableName, 
Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, 
getActiveTimeline().lastInstant().get().requestedTime()));
+    Option<String> lastCommitTime = getLastCommitTime();
+    if (lastCommitTime.isPresent()) {
+      updateTableProperties(tableName, 
Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTime.get()));
+    } else {
+      LOG.error("Failed to get last commit time");
+    }
+
+    Option<String> lastCommitCompletionTime = getLastCommitCompletionTime();
+    if (lastCommitCompletionTime.isPresent()) {
+      updateTableProperties(tableName, 
Collections.singletonMap(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC, 
lastCommitCompletionTime.get()));
+    } else {
+      LOG.error("Failed to get last commit completion time");
+    }
+  }
+
+  private MetadataChangeProposal createDatasetPropertiesAspect(String 
tableName, Map<String, String> tableProperties) {
+    DatasetPropertiesPatchBuilder datasetPropertiesPatchBuilder = new 
DatasetPropertiesPatchBuilder().urn(datasetUrn);
+    if (tableProperties != null) {
+      
tableProperties.forEach(datasetPropertiesPatchBuilder::addCustomProperty);
+    }
+    if (tableName != null) {
+      datasetPropertiesPatchBuilder.setName(tableName);
+    }
+    return datasetPropertiesPatchBuilder.build();
   }
 
   @Override
   public boolean updateTableProperties(String tableName, Map<String, String> 
tableProperties) {
-    MetadataChangeProposalWrapper propertiesChangeProposal = 
MetadataChangeProposalWrapper.builder()
-            .entityType("dataset")
-            .entityUrn(datasetUrn)
-            .upsert()
-            .aspect(new DatasetProperties().setCustomProperties(new 
StringMap(tableProperties)))
-            .build();
-
-    DatahubResponseLogger responseLogger = new DatahubResponseLogger();
+    // Use PATCH API to avoid overwriting existing properties
+    MetadataChangeProposal proposal = createDatasetPropertiesAspect(tableName, 
tableProperties);
+    DataHubResponseLogger responseLogger = new DataHubResponseLogger();
 
     try (RestEmitter emitter = config.getRestEmitter()) {
-      emitter.emit(propertiesChangeProposal, responseLogger).get();
+      Future<MetadataWriteResponse> future = emitter.emit(proposal, 
responseLogger);
+      future.get();
       return true;
     } catch (Exception e) {
-      throw new HoodieDataHubSyncException("Fail to change properties for 
Dataset " + datasetUrn + ": "
-              + tableProperties, e);
+      if (!config.suppressExceptions()) {
+        throw new HoodieDataHubSyncException(
+                "Failed to sync properties for Dataset " + datasetUrn + ": " + 
tableProperties, e);
+      } else {
+        LOG.error("Failed to sync properties for Dataset {}: {}", datasetUrn, 
tableProperties, e);
+        return false;
+      }
     }
   }
 
   @Override
   public void updateTableSchema(String tableName, MessageType schema, 
SchemaDifference schemaDifference) {
     try (RestEmitter emitter = config.getRestEmitter()) {
-      DatahubResponseLogger responseLogger = new DatahubResponseLogger();
-      MetadataChangeProposalWrapper schemaChange = 
createSchemaMetadataUpdate(tableName);
-      emitter.emit(schemaChange, responseLogger).get();
-
-      // When updating an entity, it is necessary to set its soft-delete 
status to false, or else the update won't get
-      // reflected in the UI.
-      MetadataChangeProposalWrapper softDeleteUndoProposal = 
createUndoSoftDelete();
-      emitter.emit(softDeleteUndoProposal, responseLogger).get();
+      DataHubResponseLogger responseLogger = new DataHubResponseLogger();
+
+      Stream<MetadataChangeProposalWrapper> proposals =
+              Stream.of(createContainerEntity(), 
createDatasetEntity()).flatMap(stream -> stream);
+
+      // Execute all proposals in parallel and collect futures
+      List<Future<MetadataWriteResponse>> futures = proposals.map(
+              p -> {
+                try {
+                  return emitter.emit(p, responseLogger);
+                } catch (IOException e) {
+                  throw new RuntimeException(e);
+                }
+              }
+      ).collect(Collectors.toList());
+
+      List<MetadataWriteResponse> successfulResults = new ArrayList<>();
+      List<Throwable> failures = new ArrayList<>();
+
+      for (Future<MetadataWriteResponse> future : futures) {
+        try {
+          successfulResults.add(future.get(30, TimeUnit.SECONDS));
+        } catch (TimeoutException e) {
+          failures.add(new HoodieDataHubSyncException("Operation timed out", 
e));
+        } catch (InterruptedException | ExecutionException e) {
+          failures.add(e);
+        }
+      }
+
+      if (!failures.isEmpty()) {
+        if (!config.suppressExceptions()) {
+          throw new HoodieDataHubSyncException("Failed to sync " + 
failures.size() + " operations", failures.get(0));
+        } else {
+          for (Throwable failure : failures) {
+            LOG.error("Failed to sync operation", failure);
+          }
+        }
+      }
     } catch (Exception e) {
-      throw new HoodieDataHubSyncException("Fail to change schema for Dataset 
" + datasetUrn, e);
+      if (!config.suppressExceptions()) {
+        throw new HoodieDataHubSyncException(String.format("Failed to sync 
metadata for dataset %s", tableName), e);
+      } else {
+        LOG.error("Failed to sync metadata for dataset {}", tableName, e);
+      }
+    }
+  }
+
+  private MetadataChangeProposalWrapper createContainerAspect(Urn entityUrn, 
Urn containerUrn) {
+    MetadataChangeProposalWrapper containerProposal = 
MetadataChangeProposalWrapper.builder()
+            .entityType(entityUrn.getEntityType())
+            .entityUrn(entityUrn)
+            .upsert()
+            .aspect(new Container().setContainer(containerUrn))
+            .build();
+    return containerProposal;
+  }
+
+  private MetadataChangeProposalWrapper createBrowsePathsAspect(Urn entityUrn, 
List<BrowsePathEntry> path) {
+    BrowsePathEntryArray browsePathEntryArray = new BrowsePathEntryArray(path);
+    MetadataChangeProposalWrapper browsePathsProposal = 
MetadataChangeProposalWrapper.builder()
+            .entityType(entityUrn.getEntityType())
+            .entityUrn(entityUrn)
+            .upsert()
+            .aspect(new BrowsePathsV2().setPath(browsePathEntryArray))
+            .build();
+    return browsePathsProposal;
+  }
+
+  private MetadataChangeProposalWrapper createDomainAspect(Urn entityUrn) {
+    try {
+      Urn domainUrn = Urn.createFromString(config.getDomainIdentifier());
+      MetadataChangeProposalWrapper attachDomainProposal = 
MetadataChangeProposalWrapper.builder()
+              .entityType(entityUrn.getEntityType())
+              .entityUrn(entityUrn)
+              .upsert()
+              .aspect(new Domains().setDomains(new UrnArray(domainUrn)))
+              .build();
+      return attachDomainProposal;
+    } catch (URISyntaxException e) {
+      LOG.warn("Failed to create domain URN from string: {}", 
config.getDomainIdentifier());
     }
+    return null;
+  }
+
+  private Stream<MetadataChangeProposalWrapper> createContainerEntity() {
+    MetadataChangeProposalWrapper containerEntityProposal = 
MetadataChangeProposalWrapper.builder()
+            .entityType("container")
+            .entityUrn(databaseUrn)
+            .upsert()
+            .aspect(new ContainerProperties().setName(databaseName))
+            .build();
+
+    Stream<MetadataChangeProposalWrapper> resultStream = Stream.of(
+            containerEntityProposal,
+            createSubTypeAspect(databaseUrn, "Database"),
+            createBrowsePathsAspect(databaseUrn, Collections.emptyList()), 
createStatusAspect(databaseUrn),
+            config.attachDomain() ? createDomainAspect(databaseUrn) : null
+        ).filter(Objects::nonNull);
+    return resultStream;
   }
 
   @Override
@@ -127,42 +271,60 @@ public class DataHubSyncClient extends HoodieSyncClient {
     // no op;
   }
 
-  private MetadataChangeProposalWrapper createUndoSoftDelete() {
-    MetadataChangeProposalWrapper softDeleteUndoProposal = 
MetadataChangeProposalWrapper.builder()
-            .entityType("dataset")
-            .entityUrn(datasetUrn)
+  private MetadataChangeProposalWrapper<Status> createStatusAspect(Urn urn) {
+    MetadataChangeProposalWrapper<Status> softDeleteUndoProposal = 
MetadataChangeProposalWrapper.builder()
+            .entityType(urn.getEntityType())
+            .entityUrn(urn)
             .upsert()
             .aspect(SOFT_DELETE_FALSE)
-            .aspectName("status")
             .build();
     return softDeleteUndoProposal;
   }
 
-  private MetadataChangeProposalWrapper createSchemaMetadataUpdate(String 
tableName) {
+  private MetadataChangeProposalWrapper<SubTypes> createSubTypeAspect(Urn urn, 
String subType) {
+    MetadataChangeProposalWrapper subTypeProposal = 
MetadataChangeProposalWrapper.builder()
+            .entityType(urn.getEntityType())
+            .entityUrn(urn)
+            .upsert()
+            .aspect(new SubTypes().setTypeNames(new StringArray(subType)))
+            .build();
+    return subTypeProposal;
+  }
+
+  private MetadataChangeProposalWrapper createSchemaMetadataAspect(String 
tableName) {
     Schema avroSchema = getAvroSchemaWithoutMetadataFields(metaClient);
-    List<SchemaField> fields = avroSchema.getFields().stream().map(f -> new 
SchemaField()
-            .setFieldPath(f.name())
-            .setType(toSchemaFieldDataType(f.schema().getType()))
-            .setDescription(f.doc(), SetMode.IGNORE_NULL)
-            
.setNativeDataType(f.schema().getType().getName())).collect(Collectors.toList());
+    AvroSchemaConverter avroSchemaConverter = 
AvroSchemaConverter.builder().build();
+    com.linkedin.schema.SchemaMetadata schemaMetadata = 
avroSchemaConverter.toDataHubSchema(
+            avroSchema,
+            false,
+            false,
+            datasetUrn.getPlatformEntity(),
+            null
+    );
 
-    final SchemaMetadata.PlatformSchema platformSchema = new 
SchemaMetadata.PlatformSchema();
-    platformSchema.setOtherSchema(new 
OtherSchema().setRawSchema(avroSchema.toString()));
+    // Reorder fields to relocate _hoodie_ metadata fields to the end
+    
schemaMetadata.setFields(SchemaFieldsUtil.reorderPrefixedFields(schemaMetadata.getFields(),
 "_hoodie_"));
 
     return MetadataChangeProposalWrapper.builder()
             .entityType("dataset")
             .entityUrn(datasetUrn)
             .upsert()
-            .aspect(new SchemaMetadata()
-                    .setSchemaName(tableName)
-                    .setVersion(0)
-                    .setHash("")
-                    .setPlatform(datasetUrn.getPlatformEntity())
-                    .setPlatformSchema(platformSchema)
-                    .setFields(new SchemaFieldArray(fields)))
+            .aspect(schemaMetadata)
             .build();
   }
 
+  private Stream<MetadataChangeProposalWrapper> createDatasetEntity() {
+    Stream<MetadataChangeProposalWrapper> result = Stream.of(
+            createStatusAspect(datasetUrn),
+            createSubTypeAspect(datasetUrn, "Table"),
+            createBrowsePathsAspect(datasetUrn, Collections.singletonList(new 
BrowsePathEntry().setUrn(databaseUrn).setId(databaseName))),
+            createContainerAspect(datasetUrn, databaseUrn),
+            createSchemaMetadataAspect(tableName),
+            config.attachDomain() ? createDomainAspect(datasetUrn) : null
+    ).filter(Objects::nonNull);
+    return result;
+  }
+
   Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) {
     try {
       return new TableSchemaResolver(metaClient).getTableAvroSchema(true);
@@ -170,36 +332,4 @@ public class DataHubSyncClient extends HoodieSyncClient {
       throw new HoodieSyncException("Failed to read avro schema", e);
     }
   }
-
-  static SchemaFieldDataType toSchemaFieldDataType(Schema.Type type) {
-    switch (type) {
-      case BOOLEAN:
-        return new 
SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new 
BooleanType()));
-      case INT:
-      case LONG:
-      case FLOAT:
-      case DOUBLE:
-        return new 
SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new 
NumberType()));
-      case MAP:
-        return new 
SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new MapType()));
-      case ENUM:
-        return new 
SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new EnumType()));
-      case NULL:
-        return new 
SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new NullType()));
-      case ARRAY:
-        return new 
SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new ArrayType()));
-      case BYTES:
-        return new 
SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new BytesType()));
-      case FIXED:
-        return new 
SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new FixedType()));
-      case UNION:
-        return new 
SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new UnionType()));
-      case RECORD:
-        return new 
SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new 
RecordType()));
-      case STRING:
-        return new 
SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new 
StringType()));
-      default:
-        throw new AvroTypeException("Unexpected type: " + type.getName());
-    }
-  }
-}
+}
\ No newline at end of file
diff --git 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
index 182506fadec..148fc61dd26 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
@@ -27,10 +27,18 @@ import 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
 
 import com.beust.jcommander.JCommander;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC;
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
+import static 
org.apache.hudi.sync.datahub.DataHubTableProperties.HoodieTableMetadata;
+import static 
org.apache.hudi.sync.datahub.DataHubTableProperties.getTableProperties;
 
 /**
  * To sync with DataHub via REST APIs.
@@ -39,9 +47,12 @@ import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
  * @see <a href="https://datahubproject.io/";>https://datahubproject.io/</a>
  */
 public class DataHubSyncTool extends HoodieSyncTool {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataHubSyncTool.class);
 
   protected final DataHubSyncConfig config;
-  private final HoodieTableMetaClient metaClient;
+  protected final HoodieTableMetaClient metaClient;
+  protected DataHubSyncClient syncClient;
+  private final String tableName;
 
   public DataHubSyncTool(Properties props) {
     this(props, HadoopConfigUtils.createHadoopConf(props), Option.empty());
@@ -50,20 +61,59 @@ public class DataHubSyncTool extends HoodieSyncTool {
   public DataHubSyncTool(Properties props, Configuration hadoopConf, 
Option<HoodieTableMetaClient> metaClientOption) {
     super(props, hadoopConf);
     this.config = new DataHubSyncConfig(props);
+    this.tableName = config.getString(META_SYNC_TABLE_NAME);
     this.metaClient = metaClientOption.orElseGet(() -> 
buildMetaClient(config));
+    this.syncClient = new DataHubSyncClient(config, metaClient);
   }
 
-  /**
-   * Sync to a DataHub Dataset.
-   *
-   * @implNote DataHub sync is an experimental feature, which overwrites the 
DataHub Dataset's schema
-   * and last commit time sync'ed upon every invocation.
-   */
   @Override
   public void syncHoodieTable() {
-    try (DataHubSyncClient syncClient = new DataHubSyncClient(config, 
metaClient)) {
-      syncClient.updateTableSchema(config.getString(META_SYNC_TABLE_NAME), 
null, null);
-      
syncClient.updateLastCommitTimeSynced(config.getString(META_SYNC_TABLE_NAME));
+    try {
+      LOG.info("Syncing target Hoodie table with DataHub dataset({}). DataHub 
URL: {}, basePath: {}",
+          tableName, config.getDataHubServerEndpoint(), 
config.getString(META_SYNC_BASE_PATH));
+
+      syncSchema();
+      syncTableProperties();
+      updateLastCommitTimeIfNeeded();
+
+      LOG.info("Sync completed for table {}", tableName);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to sync table " + tableName + " to 
DataHub", e);
+    } finally {
+      close();
+    }
+  }
+
+  private void syncSchema() throws Exception {
+    syncClient.updateTableSchema(tableName, null, null);
+    LOG.info("Schema synced for table {}", tableName);
+  }
+
+  private void syncTableProperties() throws Exception {
+    MessageType storageSchema = syncClient.getStorageSchema();
+    HoodieTableMetadata tableMetadata = new HoodieTableMetadata(metaClient, 
storageSchema);
+    Map<String, String> tableProperties = getTableProperties(config, 
tableMetadata);
+    syncClient.updateTableProperties(tableName, tableProperties);
+    LOG.info("Properties synced for table {}", tableName);
+  }
+
+  private void updateLastCommitTimeIfNeeded() throws Exception {
+    boolean shouldUpdateLastCommitTime = 
!config.getBoolean(META_SYNC_CONDITIONAL_SYNC);
+    if (shouldUpdateLastCommitTime) {
+      syncClient.updateLastCommitTimeSynced(tableName);
+      LOG.info("Updated last sync time for table {}", tableName);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (syncClient != null) {
+      try {
+        syncClient.close();
+        syncClient = null;
+      } catch (Exception e) {
+        LOG.error("Error closing DataHub sync client", e);
+      }
     }
   }
 
@@ -77,4 +127,4 @@ public class DataHubSyncTool extends HoodieSyncTool {
     }
     new DataHubSyncTool(params.toProps()).syncHoodieTable();
   }
-}
+}
\ No newline at end of file
diff --git 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java
 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java
new file mode 100644
index 00000000000..f951d67a4ff
--- /dev/null
+++ 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubTableProperties.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.datahub;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
+import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getInputFormatClassName;
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getOutputFormatClassName;
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getSerDeClassName;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.HIVE_TABLE_SERDE_PROPERTIES;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_BASE_FILE_FORMAT;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_BASE_PATH;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_PARTITION_FIELDS;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_SPARK_VERSION;
+
+public class DataHubTableProperties {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataHubTableProperties.class);
+
+  public static final String HOODIE_META_SYNC_DATAHUB_TABLE_PROPERTIES = 
"hoodie.meta.sync.datahub.table.properties";
+  public static final String HUDI_TABLE_TYPE = "hudi.table.type";
+  public static final String HUDI_TABLE_VERSION = "hudi.table.version";
+  public static final String HUDI_BASE_PATH = "hudi.base.path";
+  public static final String HUDI_PARTITION_FIELDS = "hudi.partition.fields";
+
+  public static final ConfigProperty<String> DATAHUB_TABLE_PROPERTIES =
+      ConfigProperty.key(HOODIE_META_SYNC_DATAHUB_TABLE_PROPERTIES)
+          .defaultValue("")
+          .withDocumentation("Additional properties to be attached to the 
DataHub dataset, specified as key1=val1,key2=val2");
+
+  public static Map<String, String> getTableProperties(DataHubSyncConfig 
config, HoodieTableMetadata tableMetadata) {
+    Map<String, String> properties = new HashMap<>();
+    addBasicHudiTableProperties(properties, config, tableMetadata);
+    addPartitioningInformation(properties, config);
+    addUserDefinedProperties(properties, config);
+    addSparkRelatedProperties(properties, config, tableMetadata);
+    return properties;
+  }
+
+  private static void addBasicHudiTableProperties(Map<String, String> 
properties, DataHubSyncConfig config, HoodieTableMetadata tableMetadata) {
+    properties.put(HUDI_TABLE_TYPE, tableMetadata.getTableType());
+    properties.put(HUDI_TABLE_VERSION, tableMetadata.getTableVersion());
+    properties.put(HUDI_BASE_PATH, config.getString(META_SYNC_BASE_PATH));
+  }
+
+  private static void addPartitioningInformation(Map<String, String> 
properties, DataHubSyncConfig config) {
+    if (!config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) {
+      properties.put(HUDI_PARTITION_FIELDS, String.join(",", 
config.getSplitStrings(META_SYNC_PARTITION_FIELDS)));
+    }
+  }
+
+  private static void addUserDefinedProperties(Map<String, String> properties, 
DataHubSyncConfig config) {
+    Map<String, String> userDefinedProps = 
ConfigUtils.toMap(config.getString(DATAHUB_TABLE_PROPERTIES));
+    properties.putAll(userDefinedProps);
+  }
+
+  private static void addSparkRelatedProperties(Map<String, String> 
properties, DataHubSyncConfig config, HoodieTableMetadata tableMetadata) {
+    Map<String, String> sparkProperties = 
SparkDataSourceTableUtils.getSparkTableProperties(
+        config.getSplitStrings(META_SYNC_PARTITION_FIELDS),
+        config.getString(META_SYNC_SPARK_VERSION),
+        config.getInt(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD),
+        tableMetadata.getSchema()
+    );
+    properties.putAll(sparkProperties);
+    properties.putAll(getSerdeProperties(config, false));
+  }
+
+  private static Map<String, String> getSerdeProperties(DataHubSyncConfig 
config, boolean readAsOptimized) {
+    HoodieFileFormat baseFileFormat = 
HoodieFileFormat.valueOf(config.getStringOrDefault(META_SYNC_BASE_FILE_FORMAT).toUpperCase());
+    String inputFormatClassName = getInputFormatClassName(baseFileFormat, 
false, false);
+    String outputFormatClassName = getOutputFormatClassName(baseFileFormat);
+    String serDeFormatClassName = getSerDeClassName(baseFileFormat);
+
+    Map<String, String> serdeProperties = 
ConfigUtils.toMap(config.getString(HIVE_TABLE_SERDE_PROPERTIES));
+    serdeProperties.put("inputFormat", inputFormatClassName);
+    serdeProperties.put("outputFormat", outputFormatClassName);
+    serdeProperties.put("serdeClass", serDeFormatClassName);
+    Map<String, String> sparkSerdeProperties = 
SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, 
config.getString(META_SYNC_BASE_PATH));
+    sparkSerdeProperties.forEach((k, v) -> 
serdeProperties.putIfAbsent(k.startsWith("spark.") ? k : "spark." + k, v));
+    LOG.info("Serde Properties : {}", serdeProperties);
+    return serdeProperties;
+  }
+
+  public static class HoodieTableMetadata {
+    private final HoodieTableMetaClient metaClient;
+    private final MessageType schema;
+
+    public HoodieTableMetadata(HoodieTableMetaClient metaClient, MessageType 
schema) {
+      this.metaClient = metaClient;
+      this.schema = schema;
+    }
+
+    public String getTableType() {
+      return metaClient.getTableType().name();
+    }
+
+    public String getTableVersion() {
+      return metaClient.getTableConfig().getTableVersion().toString();
+    }
+
+    public MessageType getSchema() {
+      return schema;
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
index 2a96171b3c1..a54c7c85e48 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
@@ -29,6 +29,8 @@ import org.apache.hudi.sync.common.HoodieSyncConfig;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParametersDelegate;
 import datahub.client.rest.RestEmitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.Immutable;
 
@@ -43,6 +45,8 @@ import static 
org.apache.hudi.sync.datahub.config.HoodieDataHubDatasetIdentifier
     description = "Configurations used by the Hudi to sync metadata to 
DataHub.")
 public class DataHubSyncConfig extends HoodieSyncConfig {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataHubSyncConfig.class);
+
   public static final ConfigProperty<String> 
META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS = ConfigProperty
       .key("hoodie.meta.sync.datahub.dataset.identifier.class")
       .defaultValue(HoodieDataHubDatasetIdentifier.class.getName())
@@ -80,12 +84,47 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
       .markAdvanced()
       .withDocumentation("Environment to use when pushing entities to 
Datahub");
 
-  public final HoodieDataHubDatasetIdentifier datasetIdentifier;
+  public static final ConfigProperty<String> 
META_SYNC_DATAHUB_DOMAIN_IDENTIFIER = ConfigProperty
+      .key("hoodie.meta.sync.datahub.domain.identifier")
+      .noDefaultValue()
+      .markAdvanced()
+      .withDocumentation("Domain identifier for the dataset. When provided all 
datasets will be attached to the provided domain. Must be in urn form (e.g., 
urn:li:domain:_domain_id).");
+
+  public static final ConfigProperty<String> HIVE_TABLE_SERDE_PROPERTIES = 
ConfigProperty
+      .key("hoodie.datasource.hive_sync.serde_properties")
+      .noDefaultValue()
+      .markAdvanced()
+      .withDocumentation("Serde properties to hive table.");
+
+
+  public static final ConfigProperty<Integer> 
HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD = ConfigProperty
+      .key("hoodie.datasource.hive_sync.schema_string_length_thresh")
+      .defaultValue(4000)
+      .markAdvanced()
+      .withDocumentation("");
+
+  public static final ConfigProperty<Boolean> 
META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS = ConfigProperty
+      .key("hoodie.meta.sync.datahub.sync.suppress.exceptions")
+      .defaultValue(true)
+      .markAdvanced()
+      .withDocumentation("Suppress exceptions during DataHub sync. This is 
true by default to ensure that when running inline with other jobs, the sync 
does not fail the job.");
 
   public DataHubSyncConfig(Properties props) {
     super(props);
+    // Log warning if the domain identifier is provided but is not in urn form
+    if (contains(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER) && 
!getString(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER).startsWith("urn:li:domain:")) {
+      LOG.warn(
+          "Domain identifier must be in urn form (e.g., 
urn:li:domain:_domain_id). Provided {}. Will remove this from configuration.",
+          getString(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER));
+      this.props.remove(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER.key());
+    }
+  }
+
+  public HoodieDataHubDatasetIdentifier getDatasetIdentifier() {
     String identifierClass = 
getStringOrDefault(META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS);
-    datasetIdentifier = (HoodieDataHubDatasetIdentifier) 
ReflectionUtils.loadClass(identifierClass, new Class<?>[] {Properties.class}, 
props);
+    // Use reflection to instantiate the class
+    HoodieDataHubDatasetIdentifier datasetIdentifier = 
(HoodieDataHubDatasetIdentifier) ReflectionUtils.loadClass(identifierClass, new 
Class<?>[] {Properties.class}, props);
+    return datasetIdentifier;
   }
 
   public RestEmitter getRestEmitter() {
@@ -98,6 +137,22 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
     }
   }
 
+  public Boolean suppressExceptions() {
+    return getBoolean(META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS);
+  }
+
+  public String getDataHubServerEndpoint() {
+    return getString(META_SYNC_DATAHUB_EMITTER_SERVER);
+  }
+
+  public boolean attachDomain() {
+    return contains(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER);
+  }
+
+  public String getDomainIdentifier() {
+    return getString(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER);
+  }
+
   public static class DataHubSyncConfigParams {
 
     @ParametersDelegate()
@@ -122,6 +177,14 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
     @Parameter(names = {"--dataset-env"}, description = "Which Datahub 
Environment to use when pushing entities")
     public String datasetEnv;
 
+    @Parameter(names = {
+        "--domain"}, description = "Domain identifier for the dataset. When 
provided all datasets will be attached to the provided domain. Must be in urn 
form (e.g., urn:li:domain:_domain_id).")
+    public String domainIdentifier;
+
+    @Parameter(names = {
+        "--suppress-exceptions"}, description = "Suppress exceptions during 
DataHub sync.")
+    public String suppressExceptions;
+
     public boolean isHelp() {
       return hoodieSyncConfigParams.isHelp();
     }
@@ -134,7 +197,14 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
       
props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(), 
emitterSupplierClass);
       props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATAPLATFORM_NAME.key(), 
dataPlatformName);
       props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATASET_ENV.key(), 
datasetEnv);
+      props.setPropertyIfNonNull(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER.key(), 
domainIdentifier);
+      // We want the default behavior of DataHubSync Tool when run as command 
line to NOT suppress exceptions
+      if (suppressExceptions == null) {
+        props.setProperty(META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS.key(), 
"false");
+      } else {
+        props.setProperty(META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS.key(), 
String.valueOf(suppressExceptions));
+      }
       return props;
     }
   }
-}
+}
\ No newline at end of file
diff --git 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
index 6c8ea076ffc..664fab31e8e 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
@@ -22,6 +22,8 @@ package org.apache.hudi.sync.datahub.config;
 import com.linkedin.common.FabricType;
 import com.linkedin.common.urn.DataPlatformUrn;
 import com.linkedin.common.urn.DatasetUrn;
+import com.linkedin.common.urn.Urn;
+import io.datahubproject.models.util.DatabaseKey;
 
 import java.util.Properties;
 
@@ -41,19 +43,50 @@ public class HoodieDataHubDatasetIdentifier {
   public static final FabricType DEFAULT_DATAHUB_ENV = FabricType.DEV;
 
   protected final Properties props;
+  private final DatasetUrn datasetUrn;
+  private final Urn databaseUrn;
+  private final String tableName;
+  private final String databaseName;
 
   public HoodieDataHubDatasetIdentifier(Properties props) {
     this.props = props;
-  }
-
-  public DatasetUrn getDatasetUrn() {
+    if (props == null || props.isEmpty()) {
+      throw new IllegalArgumentException("Properties cannot be null or empty");
+    }
     DataHubSyncConfig config = new DataHubSyncConfig(props);
 
-    return new DatasetUrn(
+    this.datasetUrn = new DatasetUrn(
             
createDataPlatformUrn(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME)),
             createDatasetName(config.getString(META_SYNC_DATABASE_NAME), 
config.getString(META_SYNC_TABLE_NAME)),
             
FabricType.valueOf(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV))
     );
+
+    this.tableName = config.getString(META_SYNC_TABLE_NAME);
+    this.databaseName = config.getString(META_SYNC_DATABASE_NAME);
+
+    DatabaseKey databaseKey = DatabaseKey.builder()
+            
.platform(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME))
+            .instance(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV))
+            .database(this.databaseName)
+            .build();
+
+    this.databaseUrn = databaseKey.asUrn();
+  }
+
+  public DatasetUrn getDatasetUrn() {
+    return this.datasetUrn;
+  }
+
+  public Urn getDatabaseUrn() {
+    return this.databaseUrn;
+  }
+
+  public String getTableName() {
+    return this.tableName;
+  }
+
+  public String getDatabaseName() {
+    return this.databaseName;
   }
 
   private static DataPlatformUrn createDataPlatformUrn(String platformUrn) {
@@ -63,4 +96,4 @@ public class HoodieDataHubDatasetIdentifier {
   private static String createDatasetName(String databaseName, String 
tableName) {
     return String.format("%s.%s", databaseName, tableName);
   }
-}
+}
\ No newline at end of file
diff --git 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/util/SchemaFieldsUtil.java
 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/util/SchemaFieldsUtil.java
new file mode 100644
index 00000000000..0b3229ac38c
--- /dev/null
+++ 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/util/SchemaFieldsUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.datahub.util;
+
+import com.linkedin.schema.SchemaField;
+import com.linkedin.schema.SchemaFieldArray;
+import io.datahubproject.models.util.FieldPath;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SchemaFieldsUtil {
+  public static SchemaFieldArray reorderPrefixedFields(SchemaFieldArray 
fields, String prefix) {
+    if (fields == null || fields.isEmpty()) {
+      return fields;
+    }
+
+    // Split the list into underscore and non-underscore fields while 
preserving order
+    List<SchemaField> prefixedFields = new ArrayList<>();
+    List<SchemaField> normalFields = new ArrayList<>();
+
+    for (SchemaField field : fields) {
+      FieldPath fieldPath = new FieldPath(field.getFieldPath());
+
+      if (fieldPath.isTopLevel() && 
fieldPath.leafFieldName().startsWith(prefix)) {
+        prefixedFields.add(field);
+      } else {
+        normalFields.add(field);
+      }
+    }
+
+    // Combine the lists with underscore fields at the end
+    List<SchemaField> result = new ArrayList<>(normalFields.size() + 
prefixedFields.size());
+    result.addAll(normalFields);
+    result.addAll(prefixedFields);
+
+    return new SchemaFieldArray(result);
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
 
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
index a19e035ad1d..837536fb795 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
@@ -21,9 +21,11 @@ package org.apache.hudi.sync.datahub;
 
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
 
+import com.linkedin.mxe.MetadataChangeProposal;
 import datahub.client.MetadataWriteResponse;
 import datahub.client.rest.RestEmitter;
 import datahub.event.MetadataChangeProposalWrapper;
@@ -40,15 +42,22 @@ import org.mockito.MockitoAnnotations;
 
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 
 import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestDataHubSyncClient {
 
@@ -65,7 +74,7 @@ public class TestDataHubSyncClient {
   @BeforeAll
   public static void beforeAll() throws IOException {
     TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": 
\"triprec\",\"fields\": [ "
-        + "{\"name\": \"ts\",\"type\": \"long\"}]}";
+            + "{\"name\": \"ts\",\"type\": \"long\"}]}";
 
     avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
 
@@ -73,9 +82,9 @@ public class TestDataHubSyncClient {
     props.put("hoodie.table.name", "some_table");
     tableBasePath = Paths.get(tmpDir.toString(), "some_table").toString();
     HoodieTableMetaClient.newTableBuilder()
-        .fromProperties(props)
-        .setTableType(HoodieTableType.MERGE_ON_READ.name())
-        .initTable(HadoopFSUtils.getStorageConf(new Configuration()), 
tableBasePath);
+            .fromProperties(props)
+            .setTableType(HoodieTableType.MERGE_ON_READ.name())
+            .initTable(HadoopFSUtils.getStorageConf(new Configuration()), 
tableBasePath);
   }
 
   @BeforeEach
@@ -94,19 +103,125 @@ public class TestDataHubSyncClient {
     props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
 
     Mockito.when(
-        restEmitterMock.emit(any(MetadataChangeProposalWrapper.class), 
Mockito.any())
+            restEmitterMock.emit(any(MetadataChangeProposalWrapper.class), 
Mockito.any())
     ).thenReturn(
-        
CompletableFuture.completedFuture(MetadataWriteResponse.builder().build())
+            
CompletableFuture.completedFuture(MetadataWriteResponse.builder().build())
     );
 
     DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, 
restEmitterMock);
     DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
 
     dhClient.updateTableSchema("some_table", null, null);
-    verify(restEmitterMock, 
times(2)).emit(any(MetadataChangeProposalWrapper.class),
+    verify(restEmitterMock, 
times(9)).emit(any(MetadataChangeProposalWrapper.class),
             Mockito.any());
   }
 
+  @Test
+  public void testUpdateTableProperties() throws Exception {
+    Properties props = new Properties();
+    props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), 
DummyPartitionValueExtractor.class.getName());
+    props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
+
+    when(restEmitterMock.emit(any(MetadataChangeProposal.class), any()))
+            
.thenReturn(CompletableFuture.completedFuture(MetadataWriteResponse.builder().build()));
+
+    DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, 
restEmitterMock);
+    DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("key1", "value1");
+    properties.put("key2", "value2");
+
+    boolean result = dhClient.updateTableProperties("some_table", properties);
+    assertTrue(result);
+    verify(restEmitterMock, times(1)).emit(any(MetadataChangeProposal.class), 
any());
+  }
+
+  @Test
+  public void testUpdateTablePropertiesFailure() throws Exception {
+    Properties props = new Properties();
+    props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), 
DummyPartitionValueExtractor.class.getName());
+    props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
+    props.put(META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS.key(), "false");
+
+    CompletableFuture<MetadataWriteResponse> failedFuture = new 
CompletableFuture<>();
+    failedFuture.completeExceptionally(new IOException("Emission failed"));
+    when(restEmitterMock.emit(any(MetadataChangeProposalWrapper.class), any()))
+        .thenReturn(failedFuture);
+
+    DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, 
restEmitterMock);
+    DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("key1", "value1");
+
+    assertThrows(HoodieDataHubSyncException.class, () ->
+            dhClient.updateTableProperties("some_table", properties));
+  }
+
+  @Test
+  public void testGetLastCommitTimeSynced() {
+    Properties props = new Properties();
+    props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), 
DummyPartitionValueExtractor.class.getName());
+    props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
+
+    DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, 
restEmitterMock);
+    DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
+
+    assertThrows(UnsupportedOperationException.class, () ->
+            dhClient.getLastCommitTimeSynced("some_table"));
+  }
+
+  @Test
+  public void testGetMetastoreSchema() {
+    Properties props = new Properties();
+    props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), 
DummyPartitionValueExtractor.class.getName());
+    props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
+
+    DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, 
restEmitterMock);
+    DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
+
+    assertThrows(UnsupportedOperationException.class, () ->
+            dhClient.getMetastoreSchema("some_table"));
+  }
+
+  @Test
+  public void testUpdateTableSchemaWithEmitterFailure() throws Exception {
+    Properties props = new Properties();
+    props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), 
DummyPartitionValueExtractor.class.getName());
+    props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
+    props.put(META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS.key(), "false");
+
+    // Create a failed future that will throw when accessed
+    CompletableFuture<MetadataWriteResponse> future = new 
CompletableFuture<>();
+    future.completeExceptionally(new ExecutionException("Emission failed", new 
IOException()));
+
+    // Configure mock to return the failed future for ALL calls
+    when(restEmitterMock.emit((MetadataChangeProposalWrapper) any(), 
any())).thenReturn(future);
+
+    DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, 
restEmitterMock);
+    DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
+
+    assertThrows(HoodieDataHubSyncException.class, () ->
+            dhClient.updateTableSchema("some_table", null, null));
+  }
+
+  @Test
+  public void testUpdateLastCommitTimeSynced() throws Exception {
+    Properties props = new Properties();
+    props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), 
DummyPartitionValueExtractor.class.getName());
+    props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
+
+    when(restEmitterMock.emit(any(MetadataChangeProposal.class), any()))
+            
.thenReturn(CompletableFuture.completedFuture(MetadataWriteResponse.builder().build()));
+
+    DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, 
restEmitterMock);
+    DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
+
+    dhClient.updateLastCommitTimeSynced("some_table");
+    verify(restEmitterMock, times(2)).emit(any(MetadataChangeProposal.class), 
any());
+  }
+
   public class DataHubSyncClientStub extends DataHubSyncClient {
 
     public DataHubSyncClientStub(DataHubSyncConfig config) {
@@ -118,6 +233,16 @@ public class TestDataHubSyncClient {
       return avroSchema;
     }
 
+    @Override
+    protected Option<String> getLastCommitTime() {
+      return Option.of("1000");
+    }
+
+    @Override
+    protected Option<String> getLastCommitCompletionTime() {
+      return Option.of("1000");
+    }
+
   }
 
   public class DatahubSyncConfigStub extends DataHubSyncConfig {
@@ -133,6 +258,7 @@ public class TestDataHubSyncClient {
     public RestEmitter getRestEmitter() {
       return emitterMock;
     }
+
   }
 
-}
+}
\ No newline at end of file
diff --git 
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java
 
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java
index 9d92970c3b2..f693389fa4d 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestDataHubSyncConfig.java
@@ -47,7 +47,7 @@ class TestDataHubSyncConfig {
     Properties props = new Properties();
     props.setProperty(META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS.key(), 
DummyIdentifier.class.getName());
     DataHubSyncConfig syncConfig = new DataHubSyncConfig(props);
-    DatasetUrn datasetUrn = syncConfig.datasetIdentifier.getDatasetUrn();
+    DatasetUrn datasetUrn = syncConfig.getDatasetIdentifier().getDatasetUrn();
     assertEquals("foo", 
datasetUrn.getPlatformEntity().getPlatformNameEntity());
     assertEquals("project.database.table", datasetUrn.getDatasetNameEntity());
     assertEquals(FabricType.PROD, datasetUrn.getOriginEntity());
diff --git 
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java
 
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java
new file mode 100644
index 00000000000..52af11f0a85
--- /dev/null
+++ 
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.datahub.config;
+
+import com.linkedin.common.FabricType;
+import com.linkedin.common.urn.DatasetUrn;
+import com.linkedin.common.urn.Urn;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
+import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_NAME;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATASET_ENV;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestHoodieDataHubDatasetIdentifier {
+
+  private Properties props;
+
+  @BeforeEach
+  void setUp() {
+    props = new Properties();
+  }
+
+  @Test
+  @DisplayName("Test constructor with default values")
+  void testConstructorWithDefaultValues() {
+    // Given
+    props.setProperty(META_SYNC_DATABASE_NAME.key(), "test_db");
+    props.setProperty(META_SYNC_TABLE_NAME.key(), "test_table");
+
+    // When
+    HoodieDataHubDatasetIdentifier identifier = new 
HoodieDataHubDatasetIdentifier(props);
+
+    // Then
+    DatasetUrn datasetUrn = identifier.getDatasetUrn();
+    assertNotNull(datasetUrn);
+    
assertEquals(HoodieDataHubDatasetIdentifier.DEFAULT_HOODIE_DATAHUB_PLATFORM_NAME,
+        datasetUrn.getPlatformEntity().getId());
+    assertEquals("test_db.test_table", datasetUrn.getDatasetNameEntity());
+    assertEquals(HoodieDataHubDatasetIdentifier.DEFAULT_DATAHUB_ENV, 
datasetUrn.getOriginEntity());
+  }
+
+  @Test
+  @DisplayName("Test constructor with custom values")
+  void testConstructorWithCustomValues() {
+    // Given
+    props.setProperty(META_SYNC_DATABASE_NAME.key(), "custom_db");
+    props.setProperty(META_SYNC_TABLE_NAME.key(), "custom_table");
+    props.setProperty(META_SYNC_DATAHUB_DATAPLATFORM_NAME.key(), 
"custom_platform");
+    props.setProperty(META_SYNC_DATAHUB_DATASET_ENV.key(), "PROD");
+
+    // When
+    HoodieDataHubDatasetIdentifier identifier = new 
HoodieDataHubDatasetIdentifier(props);
+
+    // Then
+    DatasetUrn datasetUrn = identifier.getDatasetUrn();
+    assertNotNull(datasetUrn);
+    assertEquals("custom_platform", datasetUrn.getPlatformEntity().getId());
+    assertEquals("custom_db.custom_table", datasetUrn.getDatasetNameEntity());
+    assertEquals(FabricType.PROD, datasetUrn.getOriginEntity());
+  }
+
+  @Test
+  @DisplayName("Test getDatabaseUrn")
+  void testGetDatabaseUrn() {
+    // Given
+    props.setProperty(META_SYNC_DATABASE_NAME.key(), "test_db");
+    props.setProperty(META_SYNC_TABLE_NAME.key(), "test_table");
+    props.setProperty(META_SYNC_DATAHUB_DATASET_ENV.key(), "PROD");
+
+    // When
+    HoodieDataHubDatasetIdentifier identifier = new 
HoodieDataHubDatasetIdentifier(props);
+
+    // Then
+    Urn databaseUrn = identifier.getDatabaseUrn();
+    assertNotNull(databaseUrn);
+    assertFalse(databaseUrn.toString().contains("test_db"));
+    assertFalse(databaseUrn.toString().contains("PROD"));
+    assertTrue(databaseUrn.toString().startsWith("urn:li:container:"));
+  }
+
+  @Test
+  @DisplayName("Test getTableName")
+  void testGetTableName() {
+    // Given
+    String tableName = "test_table";
+    props.setProperty(META_SYNC_DATABASE_NAME.key(), "test_db");
+    props.setProperty(META_SYNC_TABLE_NAME.key(), tableName);
+
+    // When
+    HoodieDataHubDatasetIdentifier identifier = new 
HoodieDataHubDatasetIdentifier(props);
+
+    // Then
+    assertEquals(tableName, identifier.getTableName());
+  }
+
+  @Test
+  @DisplayName("Test constructor with missing required properties")
+  void testConstructorWithMissingProperties() {
+    // Given empty properties
+
+    // Then
+    assertThrows(IllegalArgumentException.class, () -> {
+      new HoodieDataHubDatasetIdentifier(props);
+    });
+  }
+
+  @Test
+  @DisplayName("Test constructor with invalid environment")
+  void testConstructorWithInvalidEnvironment() {
+    // Given
+    props.setProperty(META_SYNC_DATABASE_NAME.key(), "test_db");
+    props.setProperty(META_SYNC_TABLE_NAME.key(), "test_table");
+    props.setProperty(META_SYNC_DATAHUB_DATASET_ENV.key(), "INVALID_ENV");
+
+    // Then
+    assertThrows(IllegalArgumentException.class, () -> {
+      new HoodieDataHubDatasetIdentifier(props);
+    });
+  }
+}
\ No newline at end of file
diff --git a/packaging/hudi-datahub-sync-bundle/pom.xml 
b/packaging/hudi-datahub-sync-bundle/pom.xml
index fc3f0afa525..a918164ced9 100644
--- a/packaging/hudi-datahub-sync-bundle/pom.xml
+++ b/packaging/hudi-datahub-sync-bundle/pom.xml
@@ -73,7 +73,7 @@
                   <include>org.apache.hudi:hudi-sync-common</include>
                   <include>org.apache.hudi:hudi-datahub-sync</include>
 
-                  <include>io.acryl:datahub-client</include>
+                  <include>io.acryl:datahub-client-java8</include>
                   <include>com.beust:jcommander</include>
                   <include>commons-io:commons-io</include>
                   <include>org.apache.httpcomponents:httpasyncclient</include>

Reply via email to