[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6476: [HUDI-3478] Support CDC for Spark in Hudi
prasannarajaperumal commented on code in PR #6476: URL: https://github.com/apache/hudi/pull/6476#discussion_r972627223 ## hudi-common/src/main/java/org/apache/hudi/avro/SerializableRecord.java: ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.avro; + +import org.apache.avro.generic.GenericData; + +import java.io.Serializable; + +/** + * In some cases like putting the [[GenericData.Record]] into [[ExternalSpillableMap]], + * objects is asked to extend [[Serializable]]. + * + * This class wraps [[GenericData.Record]]. + */ +public class SerializableRecord implements Serializable { Review Comment: The serialization of external map is done by Kryo here. Is there a Serializer registered for Avro Schema with kyro?. I dont see the test for this with ExternalSpillableMap anymore - @YannByron - Can you check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6476: [HUDI-3478] Support CDC for Spark in Hudi
prasannarajaperumal commented on code in PR #6476: URL: https://github.com/apache/hudi/pull/6476#discussion_r972627223 ## hudi-common/src/main/java/org/apache/hudi/avro/SerializableRecord.java: ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.avro; + +import org.apache.avro.generic.GenericData; + +import java.io.Serializable; + +/** + * In some cases like putting the [[GenericData.Record]] into [[ExternalSpillableMap]], + * objects is asked to extend [[Serializable]]. + * + * This class wraps [[GenericData.Record]]. + */ +public class SerializableRecord implements Serializable { Review Comment: The serialization of external map is done by Kryo here. Is there a Serializer registered for Avro Schema with kyro?. I dont see the test for this in ExternalSpillableMap anymore - @YannByron - Can you check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6476: [HUDI-3478] Support CDC for Spark in Hudi
prasannarajaperumal commented on code in PR #6476: URL: https://github.com/apache/hudi/pull/6476#discussion_r972627223 ## hudi-common/src/main/java/org/apache/hudi/avro/SerializableRecord.java: ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.avro; + +import org.apache.avro.generic.GenericData; + +import java.io.Serializable; + +/** + * In some cases like putting the [[GenericData.Record]] into [[ExternalSpillableMap]], + * objects is asked to extend [[Serializable]]. + * + * This class wraps [[GenericData.Record]]. + */ +public class SerializableRecord implements Serializable { Review Comment: The serialization of external map is done by Kryo here. Is there a Serializer registered for Avro Schema with kyro? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6476: [HUDI-3478] Support CDC for Spark in Hudi
prasannarajaperumal commented on code in PR #6476: URL: https://github.com/apache/hudi/pull/6476#discussion_r969357231 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java: ## @@ -399,9 +451,65 @@ protected void writeIncomingRecords() throws IOException { } } + protected SerializableRecord cdcRecord(HoodieCDCOperation operation, String recordKey, String partitionPath, + GenericRecord oldRecord, GenericRecord newRecord) { +GenericData.Record record; +if (cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER)) { + record = CDCUtils.cdcRecord(operation.getValue(), instantTime, + oldRecord, addCommitMetadata(newRecord, recordKey, partitionPath)); +} else if (cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE)) { + record = CDCUtils.cdcRecord(operation.getValue(), recordKey, oldRecord); +} else { + record = CDCUtils.cdcRecord(operation.getValue(), recordKey); +} +return new SerializableRecord(record); + } + + protected GenericRecord addCommitMetadata(GenericRecord record, String recordKey, String partitionPath) { +if (record != null && config.populateMetaFields()) { + GenericRecord rewriteRecord = rewriteRecord(record); + String seqId = HoodieRecord.generateSequenceId(instantTime, getPartitionId(), writtenRecordCount.get()); + HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord, instantTime, seqId); + HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord, recordKey, partitionPath, newFilePath.getName()); + return rewriteRecord; +} +return record; + } + + protected Option writeCDCData() { +if (!cdcEnabled || cdcData.isEmpty() || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) { + // the following cases where we do not need to write out the cdc file: + // case 1: all the data from the previous file slice are deleted. and no new data is inserted; + // case 2: all the data are new-coming, + return Option.empty(); +} Review Comment: If inheritance hierarchy is complicated, lets us not do that. Instead can we create a HoodieCDCLogger as a helper class and move all code related to writing CDC data into this abstraction. You can just initialize a HoodieCDCLogger from HoodieMergeHandle? Would that work? ## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java: ## @@ -236,6 +241,43 @@ public static T fromJsonString(String jsonStr, Class clazz) throws Except return JsonUtils.getObjectMapper().readValue(jsonStr, clazz); } + /** + * parse the bytes of deltacommit, and get the base file and the log files belonging to this + * provided file group. + */ + public static Option>> getFileSliceForFileGroupFromDeltaCommit( Review Comment: Can you a leave a TODO in code for the refactor and link jira? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6476: [HUDI-3478] Support CDC for Spark in Hudi
prasannarajaperumal commented on code in PR #6476: URL: https://github.com/apache/hudi/pull/6476#discussion_r962517070 ## hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCExtractor.java: ## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.cdc; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieNotSupportedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Locale; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.ADD_BASE_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.CDC_LOG_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.MOR_LOG_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REMOVE_BASE_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REPLACED_FILE_GROUP; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.isInRange; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; + +public class CDCExtractor { + + private final HoodieTableMetaClient metaClient; + + private final Path basePath; + + private final FileSystem fs; + + private final String supplementalLoggingMode; + + private final String startInstant; + + private final String endInstant; + + // TODO: this will be used when support the cdc query type of 'read_optimized'. + private final String cdcQueryType; + + private Map commits; + + private HoodieTableFileSystemView fsView; + + public CDCExtractor( + HoodieTableMetaClient metaClient, + String startInstant, + String endInstant, + String cdcqueryType) { +this.metaClient = metaClient; +this.basePath = metaClient.getBasePathV2(); +this.fs = metaClient.getFs().getFileSystem(); +this.supplementalLoggingMode = metaClient.getTableConfig().cdcSupplementalLoggingMode(); +this.startInstant = startInstant; +this.endInstant = endInstant; +if (HoodieTableType.MERGE_ON_READ == metaClient.getTableType() +&& cdcqueryType.equals("read_optimized")) { + throw new HoodieNotSupportedException("The 'read_optimized' cdc query type hasn't been supported for now."); +} +this.cdcQueryType = cdcqueryType; +init(); + } + + private void init() { +initInstantAndCommitMetadatas(); +initFSView(); + } + + /** + * At the granularity of a file group, trace the mapping between + *
[GitHub] [hudi] prasannarajaperumal commented on a diff in pull request #6476: [HUDI-3478] Support CDC for Spark in Hudi
prasannarajaperumal commented on code in PR #6476: URL: https://github.com/apache/hudi/pull/6476#discussion_r962510356 ## hudi-common/src/main/java/org/apache/hudi/avro/SerializableRecord.java: ## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.avro; + +import org.apache.avro.generic.GenericData; + +import java.io.Serializable; + +/** + * In some cases like putting the [[GenericData.Record]] into [[ExternalSpillableMap]], + * objects is asked to extend [[Serializable]]. + * + * This class wraps [[GenericData.Record]]. + */ +public class SerializableRecord implements Serializable { Review Comment: How does this work? GenericData.Record is not serializable so how would storing this in SpillableMap actually serialize and de-serialize the data when spilled. 1. We should write a test on the spillable property with CDC context. 2. If the serialization is not thought through - Create something similar to HoodieAvroPayload (HoodieCDCPayload) and store the contents as byte[] ## hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCExtractor.java: ## @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.cdc; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieNotSupportedException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Locale; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.ADD_BASE_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.CDC_LOG_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.MOR_LOG_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REMOVE_BASE_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REPLACED_FILE_GROUP; +import static