xushiyan commented on code in PR #12678:
URL: https://github.com/apache/hudi/pull/12678#discussion_r1931069877


##########
hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java:
##########
@@ -21,101 +21,255 @@
 
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
+//import org.apache.hudi.hive.SchemaDifference;
 import org.apache.hudi.sync.common.HoodieSyncClient;
 import org.apache.hudi.sync.common.HoodieSyncException;
 import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
+import org.apache.hudi.sync.datahub.config.HoodieDataHubDatasetIdentifier;
+import org.apache.hudi.sync.datahub.util.SchemaFieldsUtil;
 
+import com.linkedin.common.BrowsePathEntry;
+import com.linkedin.common.BrowsePathEntryArray;
+import com.linkedin.common.BrowsePathsV2;
 import com.linkedin.common.Status;
+import com.linkedin.common.SubTypes;
+import com.linkedin.common.UrnArray;
 import com.linkedin.common.urn.DatasetUrn;
-import com.linkedin.data.template.SetMode;
-import com.linkedin.data.template.StringMap;
-import com.linkedin.dataset.DatasetProperties;
-import com.linkedin.schema.ArrayType;
-import com.linkedin.schema.BooleanType;
-import com.linkedin.schema.BytesType;
-import com.linkedin.schema.EnumType;
-import com.linkedin.schema.FixedType;
-import com.linkedin.schema.MapType;
-import com.linkedin.schema.NullType;
-import com.linkedin.schema.NumberType;
-import com.linkedin.schema.OtherSchema;
-import com.linkedin.schema.RecordType;
-import com.linkedin.schema.SchemaField;
-import com.linkedin.schema.SchemaFieldArray;
-import com.linkedin.schema.SchemaFieldDataType;
-import com.linkedin.schema.SchemaMetadata;
-import com.linkedin.schema.StringType;
-import com.linkedin.schema.UnionType;
+import com.linkedin.common.urn.Urn;
+import com.linkedin.container.Container;
+import com.linkedin.container.ContainerProperties;
+import com.linkedin.data.template.StringArray;
+import com.linkedin.domain.Domains;
+import 
com.linkedin.metadata.aspect.patch.builder.DatasetPropertiesPatchBuilder;
+import com.linkedin.mxe.MetadataChangeProposal;
+import datahub.client.MetadataWriteResponse;
 import datahub.client.rest.RestEmitter;
 import datahub.event.MetadataChangeProposalWrapper;
-import org.apache.avro.AvroTypeException;
+import io.datahubproject.schematron.converters.avro.AvroSchemaConverter;
 import org.apache.avro.Schema;
 import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class DataHubSyncClient extends HoodieSyncClient {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataHubSyncClient.class);
+
   protected final DataHubSyncConfig config;
   private final DatasetUrn datasetUrn;
+  private final Urn databaseUrn;
+  private final String tableName;
+  private final String databaseName;
   private static final Status SOFT_DELETE_FALSE = new 
Status().setRemoved(false);
 
-  public DataHubSyncClient(DataHubSyncConfig config) {
+  public DataHubSyncClient(DataHubSyncConfig config, HoodieTableMetaClient 
metaClient) {
+    //super(config, metaClient);
     super(config);
     this.config = config;
-    this.datasetUrn = config.datasetIdentifier.getDatasetUrn();
+    HoodieDataHubDatasetIdentifier datasetIdentifier =
+        config.getDatasetIdentifier();
+    this.datasetUrn = datasetIdentifier.getDatasetUrn();
+    this.databaseUrn = datasetIdentifier.getDatabaseUrn();
+    this.tableName = datasetIdentifier.getTableName();
+    this.databaseName = datasetIdentifier.getDatabaseName();
   }
 
   @Override
   public Option<String> getLastCommitTimeSynced(String tableName) {
     throw new UnsupportedOperationException("Not supported: 
`getLastCommitTimeSynced`");
   }
 
+  protected Option<String> getLastCommitTime() {
+    return getActiveTimeline().lastInstant()
+        //.map(HoodieInstant::requestedTime);
+        .map(HoodieInstant::getTimestamp);
+  }
+
+  protected Option<String> getLastCommitCompletionTime() {
+    int countInstants = getActiveTimeline().countInstants();
+    // return getActiveTimeline()
+    //     .getInstantsOrderedByCompletionTime()
+    //     .skip(countInstants - 1)
+    //     .findFirst()
+    //     .map(HoodieInstant::getCompletionTime)
+    //     .map(Option::of).orElseGet(Option::empty);
+    return getActiveTimeline()
+        .getInstantsOrderedByStateTransitionTime()
+        .skip(countInstants - 1)
+        .findFirst()
+        .map(HoodieInstant::getTimestamp)
+        .map(Option::of).orElseGet(Option::empty);
+  }
+
   @Override
   public void updateLastCommitTimeSynced(String tableName) {
-    updateTableProperties(tableName, 
Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, 
getActiveTimeline().lastInstant().get().getTimestamp()));
+    Option<String> lastCommitTime = getLastCommitTime();
+    if (lastCommitTime.isPresent()) {
+      updateTableProperties(tableName, 
Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTime.get()));
+    } else {
+      LOG.error("Failed to get last commit time");
+    }
+
+    Option<String> lastCommitCompletionTime = getLastCommitCompletionTime();
+    if (lastCommitCompletionTime.isPresent()) {
+      updateTableProperties(tableName, 
Collections.singletonMap(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC, 
lastCommitCompletionTime.get()));
+    } else {
+      LOG.error("Failed to get last commit completion time");
+    }
+  }
+
+  private MetadataChangeProposal createDatasetPropertiesAspect(String 
tableName, Map<String, String> tableProperties) {
+    DatasetPropertiesPatchBuilder datasetPropertiesPatchBuilder = new 
DatasetPropertiesPatchBuilder().urn(datasetUrn);
+    if (tableProperties != null) {
+      
tableProperties.forEach(datasetPropertiesPatchBuilder::addCustomProperty);
+    }
+    if (tableName != null) {
+      datasetPropertiesPatchBuilder.setName(tableName);
+    }
+    return datasetPropertiesPatchBuilder.build();
   }
 
   @Override
   public boolean updateTableProperties(String tableName, Map<String, String> 
tableProperties) {
-    MetadataChangeProposalWrapper propertiesChangeProposal = 
MetadataChangeProposalWrapper.builder()
-            .entityType("dataset")
-            .entityUrn(datasetUrn)
-            .upsert()
-            .aspect(new DatasetProperties().setCustomProperties(new 
StringMap(tableProperties)))
-            .build();
-
-    DatahubResponseLogger responseLogger = new DatahubResponseLogger();
+    // Use PATCH API to avoid overwriting existing properties
+    MetadataChangeProposal proposal = createDatasetPropertiesAspect(tableName, 
tableProperties);
+    DataHubResponseLogger responseLogger = new DataHubResponseLogger();
 
     try (RestEmitter emitter = config.getRestEmitter()) {
-      emitter.emit(propertiesChangeProposal, responseLogger).get();
+      Future<MetadataWriteResponse> future = emitter.emit(proposal, 
responseLogger);
+      future.get();
       return true;
     } catch (Exception e) {
-      throw new HoodieDataHubSyncException("Fail to change properties for 
Dataset " + datasetUrn + ": "
-              + tableProperties, e);
+      if (!config.suppressExceptions()) {

Review Comment:
   negate the boolean condition for smoother read



##########
hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java:
##########
@@ -21,101 +21,255 @@
 
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
+//import org.apache.hudi.hive.SchemaDifference;
 import org.apache.hudi.sync.common.HoodieSyncClient;
 import org.apache.hudi.sync.common.HoodieSyncException;
 import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
+import org.apache.hudi.sync.datahub.config.HoodieDataHubDatasetIdentifier;
+import org.apache.hudi.sync.datahub.util.SchemaFieldsUtil;
 
+import com.linkedin.common.BrowsePathEntry;
+import com.linkedin.common.BrowsePathEntryArray;
+import com.linkedin.common.BrowsePathsV2;
 import com.linkedin.common.Status;
+import com.linkedin.common.SubTypes;
+import com.linkedin.common.UrnArray;
 import com.linkedin.common.urn.DatasetUrn;
-import com.linkedin.data.template.SetMode;
-import com.linkedin.data.template.StringMap;
-import com.linkedin.dataset.DatasetProperties;
-import com.linkedin.schema.ArrayType;
-import com.linkedin.schema.BooleanType;
-import com.linkedin.schema.BytesType;
-import com.linkedin.schema.EnumType;
-import com.linkedin.schema.FixedType;
-import com.linkedin.schema.MapType;
-import com.linkedin.schema.NullType;
-import com.linkedin.schema.NumberType;
-import com.linkedin.schema.OtherSchema;
-import com.linkedin.schema.RecordType;
-import com.linkedin.schema.SchemaField;
-import com.linkedin.schema.SchemaFieldArray;
-import com.linkedin.schema.SchemaFieldDataType;
-import com.linkedin.schema.SchemaMetadata;
-import com.linkedin.schema.StringType;
-import com.linkedin.schema.UnionType;
+import com.linkedin.common.urn.Urn;
+import com.linkedin.container.Container;
+import com.linkedin.container.ContainerProperties;
+import com.linkedin.data.template.StringArray;
+import com.linkedin.domain.Domains;
+import 
com.linkedin.metadata.aspect.patch.builder.DatasetPropertiesPatchBuilder;
+import com.linkedin.mxe.MetadataChangeProposal;
+import datahub.client.MetadataWriteResponse;
 import datahub.client.rest.RestEmitter;
 import datahub.event.MetadataChangeProposalWrapper;
-import org.apache.avro.AvroTypeException;
+import io.datahubproject.schematron.converters.avro.AvroSchemaConverter;
 import org.apache.avro.Schema;
 import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class DataHubSyncClient extends HoodieSyncClient {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataHubSyncClient.class);
+
   protected final DataHubSyncConfig config;
   private final DatasetUrn datasetUrn;
+  private final Urn databaseUrn;
+  private final String tableName;
+  private final String databaseName;
   private static final Status SOFT_DELETE_FALSE = new 
Status().setRemoved(false);
 
-  public DataHubSyncClient(DataHubSyncConfig config) {
+  public DataHubSyncClient(DataHubSyncConfig config, HoodieTableMetaClient 
metaClient) {
+    //super(config, metaClient);
     super(config);
     this.config = config;
-    this.datasetUrn = config.datasetIdentifier.getDatasetUrn();
+    HoodieDataHubDatasetIdentifier datasetIdentifier =
+        config.getDatasetIdentifier();
+    this.datasetUrn = datasetIdentifier.getDatasetUrn();
+    this.databaseUrn = datasetIdentifier.getDatabaseUrn();
+    this.tableName = datasetIdentifier.getTableName();
+    this.databaseName = datasetIdentifier.getDatabaseName();
   }
 
   @Override
   public Option<String> getLastCommitTimeSynced(String tableName) {
     throw new UnsupportedOperationException("Not supported: 
`getLastCommitTimeSynced`");
   }
 
+  protected Option<String> getLastCommitTime() {
+    return getActiveTimeline().lastInstant()
+        //.map(HoodieInstant::requestedTime);
+        .map(HoodieInstant::getTimestamp);
+  }
+
+  protected Option<String> getLastCommitCompletionTime() {
+    int countInstants = getActiveTimeline().countInstants();
+    // return getActiveTimeline()
+    //     .getInstantsOrderedByCompletionTime()
+    //     .skip(countInstants - 1)
+    //     .findFirst()
+    //     .map(HoodieInstant::getCompletionTime)
+    //     .map(Option::of).orElseGet(Option::empty);
+    return getActiveTimeline()
+        .getInstantsOrderedByStateTransitionTime()
+        .skip(countInstants - 1)
+        .findFirst()
+        .map(HoodieInstant::getTimestamp)

Review Comment:
   think you'd need to get `stateTransitionTime` for this instant



##########
hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java:
##########
@@ -21,101 +21,250 @@
 
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
+//import org.apache.hudi.hive.SchemaDifference;
 import org.apache.hudi.sync.common.HoodieSyncClient;
 import org.apache.hudi.sync.common.HoodieSyncException;
 import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
+import org.apache.hudi.sync.datahub.config.HoodieDataHubDatasetIdentifier;
+import org.apache.hudi.sync.datahub.util.SchemaFieldsUtil;
 
+import com.linkedin.common.BrowsePathEntry;
+import com.linkedin.common.BrowsePathEntryArray;
+import com.linkedin.common.BrowsePathsV2;
 import com.linkedin.common.Status;
+import com.linkedin.common.SubTypes;
+import com.linkedin.common.UrnArray;
 import com.linkedin.common.urn.DatasetUrn;
-import com.linkedin.data.template.SetMode;
-import com.linkedin.data.template.StringMap;
-import com.linkedin.dataset.DatasetProperties;
-import com.linkedin.schema.ArrayType;
-import com.linkedin.schema.BooleanType;
-import com.linkedin.schema.BytesType;
-import com.linkedin.schema.EnumType;
-import com.linkedin.schema.FixedType;
-import com.linkedin.schema.MapType;
-import com.linkedin.schema.NullType;
-import com.linkedin.schema.NumberType;
-import com.linkedin.schema.OtherSchema;
-import com.linkedin.schema.RecordType;
-import com.linkedin.schema.SchemaField;
-import com.linkedin.schema.SchemaFieldArray;
-import com.linkedin.schema.SchemaFieldDataType;
-import com.linkedin.schema.SchemaMetadata;
-import com.linkedin.schema.StringType;
-import com.linkedin.schema.UnionType;
+import com.linkedin.common.urn.Urn;
+import com.linkedin.container.Container;
+import com.linkedin.container.ContainerProperties;
+import com.linkedin.data.template.StringArray;
+import com.linkedin.domain.Domains;
+import 
com.linkedin.metadata.aspect.patch.builder.DatasetPropertiesPatchBuilder;
+import com.linkedin.mxe.MetadataChangeProposal;
+import datahub.client.MetadataWriteResponse;
 import datahub.client.rest.RestEmitter;
 import datahub.event.MetadataChangeProposalWrapper;
-import org.apache.avro.AvroTypeException;
+import io.datahubproject.schematron.converters.avro.AvroSchemaConverter;
 import org.apache.avro.Schema;
 import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class DataHubSyncClient extends HoodieSyncClient {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataHubSyncClient.class);
+
   protected final DataHubSyncConfig config;
   private final DatasetUrn datasetUrn;
+  private final Urn databaseUrn;
+  private final String tableName;
+  private final String databaseName;
   private static final Status SOFT_DELETE_FALSE = new 
Status().setRemoved(false);
 
-  public DataHubSyncClient(DataHubSyncConfig config) {
+  public DataHubSyncClient(DataHubSyncConfig config, HoodieTableMetaClient 
metaClient) {
+    //super(config, metaClient);
     super(config);
     this.config = config;
-    this.datasetUrn = config.datasetIdentifier.getDatasetUrn();
+    HoodieDataHubDatasetIdentifier datasetIdentifier =
+        config.getDatasetIdentifier();
+    this.datasetUrn = datasetIdentifier.getDatasetUrn();
+    this.databaseUrn = datasetIdentifier.getDatabaseUrn();
+    this.tableName = datasetIdentifier.getTableName();
+    this.databaseName = datasetIdentifier.getDatabaseName();
   }
 
   @Override
   public Option<String> getLastCommitTimeSynced(String tableName) {
     throw new UnsupportedOperationException("Not supported: 
`getLastCommitTimeSynced`");
   }
 
+  protected Option<String> getLastCommitTime() {
+    return getActiveTimeline().lastInstant()
+        //.map(HoodieInstant::requestedTime);
+        .map(HoodieInstant::getTimestamp);

Review Comment:
   correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to