[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6476: [HUDI-3478] Support CDC for Spark in Hudi

2022-09-15 Thread GitBox


prasannarajaperumal commented on code in PR #6476:
URL: https://github.com/apache/hudi/pull/6476#discussion_r972627223


##
hudi-common/src/main/java/org/apache/hudi/avro/SerializableRecord.java:
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.generic.GenericData;
+
+import java.io.Serializable;
+
+/**
+ * In some cases like putting the [[GenericData.Record]] into 
[[ExternalSpillableMap]],
+ * objects is asked to extend [[Serializable]].
+ *
+ * This class wraps [[GenericData.Record]].
+ */
+public class SerializableRecord implements Serializable {

Review Comment:
   The serialization of external map is done by Kryo here. Is there a 
Serializer registered for Avro Schema with kyro?. I dont see the test for this 
with ExternalSpillableMap anymore - @YannByron - Can you check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6476: [HUDI-3478] Support CDC for Spark in Hudi

2022-09-15 Thread GitBox


prasannarajaperumal commented on code in PR #6476:
URL: https://github.com/apache/hudi/pull/6476#discussion_r972627223


##
hudi-common/src/main/java/org/apache/hudi/avro/SerializableRecord.java:
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.generic.GenericData;
+
+import java.io.Serializable;
+
+/**
+ * In some cases like putting the [[GenericData.Record]] into 
[[ExternalSpillableMap]],
+ * objects is asked to extend [[Serializable]].
+ *
+ * This class wraps [[GenericData.Record]].
+ */
+public class SerializableRecord implements Serializable {

Review Comment:
   The serialization of external map is done by Kryo here. Is there a 
Serializer registered for Avro Schema with kyro?. I dont see the test for this 
in ExternalSpillableMap anymore - @YannByron - Can you check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6476: [HUDI-3478] Support CDC for Spark in Hudi

2022-09-15 Thread GitBox


prasannarajaperumal commented on code in PR #6476:
URL: https://github.com/apache/hudi/pull/6476#discussion_r972627223


##
hudi-common/src/main/java/org/apache/hudi/avro/SerializableRecord.java:
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.generic.GenericData;
+
+import java.io.Serializable;
+
+/**
+ * In some cases like putting the [[GenericData.Record]] into 
[[ExternalSpillableMap]],
+ * objects is asked to extend [[Serializable]].
+ *
+ * This class wraps [[GenericData.Record]].
+ */
+public class SerializableRecord implements Serializable {

Review Comment:
   The serialization of external map is done by Kryo here. Is there a 
Serializer registered for Avro Schema with kyro? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6476: [HUDI-3478] Support CDC for Spark in Hudi

2022-09-13 Thread GitBox


prasannarajaperumal commented on code in PR #6476:
URL: https://github.com/apache/hudi/pull/6476#discussion_r969357231


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##
@@ -399,9 +451,65 @@ protected void writeIncomingRecords() throws IOException {
 }
   }
 
+  protected SerializableRecord cdcRecord(HoodieCDCOperation operation, String 
recordKey, String partitionPath,
+ GenericRecord oldRecord, 
GenericRecord newRecord) {
+GenericData.Record record;
+if 
(cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER))
 {
+  record = CDCUtils.cdcRecord(operation.getValue(), instantTime,
+  oldRecord, addCommitMetadata(newRecord, recordKey, partitionPath));
+} else if 
(cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE))
 {
+  record = CDCUtils.cdcRecord(operation.getValue(), recordKey, oldRecord);
+} else {
+  record = CDCUtils.cdcRecord(operation.getValue(), recordKey);
+}
+return new SerializableRecord(record);
+  }
+
+  protected GenericRecord addCommitMetadata(GenericRecord record, String 
recordKey, String partitionPath) {
+if (record != null && config.populateMetaFields()) {
+  GenericRecord rewriteRecord = rewriteRecord(record);
+  String seqId = HoodieRecord.generateSequenceId(instantTime, 
getPartitionId(), writtenRecordCount.get());
+  HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord, instantTime, 
seqId);
+  HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord, recordKey, 
partitionPath, newFilePath.getName());
+  return rewriteRecord;
+}
+return record;
+  }
+
+  protected Option writeCDCData() {
+if (!cdcEnabled || cdcData.isEmpty() || recordsWritten == 0L || 
(recordsWritten == insertRecordsWritten)) {
+  // the following cases where we do not need to write out the cdc file:
+  // case 1: all the data from the previous file slice are deleted. and no 
new data is inserted;
+  // case 2: all the data are new-coming,
+  return Option.empty();
+}

Review Comment:
   If inheritance hierarchy is complicated, lets us not do that. Instead can we 
create a HoodieCDCLogger as a helper class and move all code related to writing 
CDC data  into this abstraction. You can just initialize a HoodieCDCLogger from 
HoodieMergeHandle?
   
   Would that work?



##
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java:
##
@@ -236,6 +241,43 @@ public static  T fromJsonString(String jsonStr, 
Class clazz) throws Except
 return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
   }
 
+  /**
+   * parse the bytes of deltacommit, and get the base file and the log files 
belonging to this
+   * provided file group.
+   */
+  public static Option>> 
getFileSliceForFileGroupFromDeltaCommit(

Review Comment:
   Can you a leave a TODO in code for the refactor and link jira?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6476: [HUDI-3478] Support CDC for Spark in Hudi

2022-09-05 Thread GitBox


prasannarajaperumal commented on code in PR #6476:
URL: https://github.com/apache/hudi/pull/6476#discussion_r962517070


##
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCExtractor.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.ADD_BASE_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.CDC_LOG_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.MOR_LOG_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REMOVE_BASE_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REPLACED_FILE_GROUP;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.isInRange;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+public class CDCExtractor {
+
+  private final HoodieTableMetaClient metaClient;
+
+  private final Path basePath;
+
+  private final FileSystem fs;
+
+  private final String supplementalLoggingMode;
+
+  private final String startInstant;
+
+  private final String endInstant;
+
+  // TODO: this will be used when support the cdc query type of 
'read_optimized'.
+  private final String cdcQueryType;
+
+  private Map commits;
+
+  private HoodieTableFileSystemView fsView;
+
+  public CDCExtractor(
+  HoodieTableMetaClient metaClient,
+  String startInstant,
+  String endInstant,
+  String cdcqueryType) {
+this.metaClient = metaClient;
+this.basePath = metaClient.getBasePathV2();
+this.fs = metaClient.getFs().getFileSystem();
+this.supplementalLoggingMode = 
metaClient.getTableConfig().cdcSupplementalLoggingMode();
+this.startInstant = startInstant;
+this.endInstant = endInstant;
+if (HoodieTableType.MERGE_ON_READ == metaClient.getTableType()
+&& cdcqueryType.equals("read_optimized")) {
+  throw new HoodieNotSupportedException("The 'read_optimized' cdc query 
type hasn't been supported for now.");
+}
+this.cdcQueryType = cdcqueryType;
+init();
+  }
+
+  private void init() {
+initInstantAndCommitMetadatas();
+initFSView();
+  }
+
+  /**
+   * At the granularity of a file group, trace the mapping between
+   * 

[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6476: [HUDI-3478] Support CDC for Spark in Hudi

2022-09-05 Thread GitBox


prasannarajaperumal commented on code in PR #6476:
URL: https://github.com/apache/hudi/pull/6476#discussion_r962510356


##
hudi-common/src/main/java/org/apache/hudi/avro/SerializableRecord.java:
##
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.generic.GenericData;
+
+import java.io.Serializable;
+
+/**
+ * In some cases like putting the [[GenericData.Record]] into 
[[ExternalSpillableMap]],
+ * objects is asked to extend [[Serializable]].
+ *
+ * This class wraps [[GenericData.Record]].
+ */
+public class SerializableRecord implements Serializable {

Review Comment:
   How does this work? GenericData.Record is not serializable so how would 
storing this in SpillableMap actually serialize and de-serialize the data when 
spilled. 
   1. We should write a test on the spillable property with CDC context. 
   2. If the serialization is not thought through - Create something similar to 
HoodieAvroPayload (HoodieCDCPayload) and store the contents as byte[]
   



##
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCExtractor.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.ADD_BASE_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.CDC_LOG_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.MOR_LOG_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REMOVE_BASE_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REPLACED_FILE_GROUP;
+import static