nsivabalan commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r466385455



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/io/HoodieRowCreateHandle.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.model.HoodieInternalRow;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriter;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriterFactory;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Create handle with InternalRow for datasource implemention of bulk insert.
+ */
+public class HoodieRowCreateHandle implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LogManager.getLogger(HoodieRowCreateHandle.class);
+  private static final AtomicLong SEQGEN = new AtomicLong(1);
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable table;
+  private final HoodieWriteConfig writeConfig;
+  private final HoodieInternalRowFileWriter fileWriter;
+  private final String partitionPath;
+  private final Path path;
+  private final String fileId;
+  private final FileSystem fs;
+  private final HoodieInternalWriteStatus writeStatus;
+  private final HoodieTimer currTimer;
+
+  public HoodieRowCreateHandle(HoodieTable table, HoodieWriteConfig 
writeConfig, String partitionPath, String fileId,
+      String instantTime, int taskPartitionId, long taskId, long taskEpochId,
+      StructType structType) {
+    this.partitionPath = partitionPath;
+    this.table = table;
+    this.writeConfig = writeConfig;
+    this.instantTime = instantTime;
+    this.taskPartitionId = taskPartitionId;
+    this.taskId = taskId;
+    this.taskEpochId = taskEpochId;
+    this.fileId = fileId;
+    this.currTimer = new HoodieTimer();
+    this.currTimer.startTimer();
+    this.fs = table.getMetaClient().getFs();
+    this.path = makeNewPath(partitionPath);
+    this.writeStatus = new 
HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+        writeConfig.getWriteStatusFailureFraction());
+    writeStatus.setPartitionPath(partitionPath);
+    writeStatus.setFileId(fileId);
+    try {
+      HoodiePartitionMetadata partitionMetadata =
+          new HoodiePartitionMetadata(
+              fs,
+              instantTime,
+              new Path(writeConfig.getBasePath()),
+              FSUtils.getPartitionPath(writeConfig.getBasePath(), 
partitionPath));
+      partitionMetadata.trySave(taskPartitionId);
+      createMarkerFile(partitionPath, 
FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, 
table.getBaseFileExtension()));
+      this.fileWriter = createNewFileWriter(path, table, writeConfig, 
structType);
+    } catch (IOException e) {
+      throw new HoodieInsertException("Failed to initialize file writer for 
path " + path, e);
+    }
+    LOG.info("New handle created for partition :" + partitionPath + " with 
fileId " + fileId);
+  }
+
+  /**
+   * Writes an {@link InternalRow} to the underlying 
HoodieInternalRowFileWriter. Before writing, value for meta columns are 
computed as required
+   * and wrapped in {@link HoodieInternalRow}. {@link HoodieInternalRow} is 
what gets written to HoodieInternalRowFileWriter.
+   * @param record instance of {@link InternalRow} that needs to be written to 
the fileWriter.
+   * @throws IOException
+   */
+  public void write(InternalRow record) throws IOException {
+    try {
+      String partitionPath = 
record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(
+          HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
+      String seqId = HoodieRecord.generateSequenceId(instantTime, 
taskPartitionId, SEQGEN.getAndIncrement());
+      String recordKey = 
record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(
+          HoodieRecord.RECORD_KEY_METADATA_FIELD)).toString();
+      HoodieInternalRow internalRow = new HoodieInternalRow(instantTime, 
seqId, recordKey, partitionPath, path.getName(),
+          record);
+      try {
+        fileWriter.writeRow(recordKey, internalRow);
+        writeStatus.markSuccess(recordKey);
+      } catch (Throwable t) {
+        writeStatus.markFailure(recordKey, t);
+      }
+    } catch (Throwable ge) {
+      writeStatus.setGlobalError(ge);
+      throw ge;
+    }
+  }
+
+  /**
+   * @returns {@code true} if this handle can take in more writes. else {@code 
false}.
+   */
+  public boolean canWrite() {
+    return fileWriter.canWrite();
+  }
+
+  /**
+   * Closes the {@link HoodieRowCreateHandle} and returns an instance of 
{@link HoodieInternalWriteStatus} containing the stats and
+   * status of the writes to this handle.
+   * @return the {@link HoodieInternalWriteStatus} containing the stats and 
status of the writes to this handle.
+   * @throws IOException
+   */
+  public HoodieInternalWriteStatus close() throws IOException {
+    fileWriter.close();
+    HoodieWriteStat stat = new HoodieWriteStat();
+    stat.setPartitionPath(partitionPath);
+    stat.setNumWrites(writeStatus.getTotalRecords());
+    stat.setNumDeletes(0);
+    stat.setNumInserts(writeStatus.getTotalRecords());
+    stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
+    stat.setFileId(fileId);
+    stat.setPath(new Path(writeConfig.getBasePath()), path);
+    long fileSizeInBytes = FSUtils.getFileSize(table.getMetaClient().getFs(), 
path);
+    stat.setTotalWriteBytes(fileSizeInBytes);
+    stat.setFileSizeInBytes(fileSizeInBytes);
+    stat.setTotalWriteErrors(writeStatus.getFailedRowsSize());
+    HoodieWriteStat.RuntimeStats runtimeStats = new 
HoodieWriteStat.RuntimeStats();
+    runtimeStats.setTotalCreateTime(currTimer.endTimer());
+    stat.setRuntimeStats(runtimeStats);
+    writeStatus.setStat(stat);
+    return writeStatus;
+  }
+
+  public String getFileName() {
+    return path.getName();
+  }
+
+  private Path makeNewPath(String partitionPath) {
+    Path path = FSUtils.getPartitionPath(writeConfig.getBasePath(), 
partitionPath);
+    try {
+      fs.mkdirs(path); // create a new partition as needed.
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to make dir " + path, e);
+    }
+    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+    return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, 
getWriteToken(), fileId,
+        tableConfig.getBaseFileFormat().getFileExtension()));
+  }
+
+  /**
+   * Creates an empty marker file corresponding to storage writer path.
+   *
+   * @param partitionPath Partition path
+   */
+  private void createMarkerFile(String partitionPath, String dataFileName) {

Review comment:
       Note to reviewer: these methods are copied from HoodieWriteHandle for 
now. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/io/HoodieRowCreateHandle.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.model.HoodieInternalRow;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriter;
+import org.apache.hudi.io.storage.HoodieInternalRowFileWriterFactory;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.MarkerFiles;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Create handle with InternalRow for datasource implemention of bulk insert.
+ */
+public class HoodieRowCreateHandle implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LogManager.getLogger(HoodieRowCreateHandle.class);
+  private static final AtomicLong SEQGEN = new AtomicLong(1);
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable table;
+  private final HoodieWriteConfig writeConfig;
+  private final HoodieInternalRowFileWriter fileWriter;
+  private final String partitionPath;
+  private final Path path;
+  private final String fileId;
+  private final FileSystem fs;
+  private final HoodieInternalWriteStatus writeStatus;
+  private final HoodieTimer currTimer;
+
+  public HoodieRowCreateHandle(HoodieTable table, HoodieWriteConfig 
writeConfig, String partitionPath, String fileId,
+      String instantTime, int taskPartitionId, long taskId, long taskEpochId,
+      StructType structType) {
+    this.partitionPath = partitionPath;
+    this.table = table;
+    this.writeConfig = writeConfig;
+    this.instantTime = instantTime;
+    this.taskPartitionId = taskPartitionId;
+    this.taskId = taskId;
+    this.taskEpochId = taskEpochId;
+    this.fileId = fileId;
+    this.currTimer = new HoodieTimer();
+    this.currTimer.startTimer();
+    this.fs = table.getMetaClient().getFs();
+    this.path = makeNewPath(partitionPath);
+    this.writeStatus = new 
HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+        writeConfig.getWriteStatusFailureFraction());
+    writeStatus.setPartitionPath(partitionPath);
+    writeStatus.setFileId(fileId);
+    try {
+      HoodiePartitionMetadata partitionMetadata =
+          new HoodiePartitionMetadata(
+              fs,
+              instantTime,
+              new Path(writeConfig.getBasePath()),
+              FSUtils.getPartitionPath(writeConfig.getBasePath(), 
partitionPath));
+      partitionMetadata.trySave(taskPartitionId);
+      createMarkerFile(partitionPath, 
FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, 
table.getBaseFileExtension()));
+      this.fileWriter = createNewFileWriter(path, table, writeConfig, 
structType);
+    } catch (IOException e) {
+      throw new HoodieInsertException("Failed to initialize file writer for 
path " + path, e);
+    }
+    LOG.info("New handle created for partition :" + partitionPath + " with 
fileId " + fileId);
+  }
+
+  /**
+   * Writes an {@link InternalRow} to the underlying 
HoodieInternalRowFileWriter. Before writing, value for meta columns are 
computed as required
+   * and wrapped in {@link HoodieInternalRow}. {@link HoodieInternalRow} is 
what gets written to HoodieInternalRowFileWriter.
+   * @param record instance of {@link InternalRow} that needs to be written to 
the fileWriter.
+   * @throws IOException
+   */
+  public void write(InternalRow record) throws IOException {
+    try {
+      String partitionPath = 
record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(

Review comment:
       Note to reviewer: if this statement fails, we had to consider it as 
global error and not as per record error since we don't have record key yet. 
This is different from how HoodieRecord write happens. So, in these cases, 
rowCreateHandle will throw exception and caller is expected to close the 
rowCreateHandle.

##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured 
for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {

Review comment:
       Note to reviewer: had to move this to hudi-spark as we need to access 
AvroConversionUtils for Row to GenericRecord converter function.

##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java
##########
@@ -129,45 +134,54 @@ public String getPartitionPath(GenericRecord record) {
     if (partitionVal == null) {
       partitionVal = 1L;
     }
+    try {
+      return getPartitionPath(partitionVal);
+    } catch (Exception e) {
+      throw new HoodieDeltaStreamerException("Unable to parse input partition 
field :" + partitionVal, e);
+    }
+  }
 
+  /**
+   * Parse and fetch partition path based on data type.
+   *
+   * @param partitionVal partition path object value fetched from record/row
+   * @return the parsed partition path based on data type
+   * @throws ParseException on any parse exception
+   */
+  private String getPartitionPath(Object partitionVal) throws ParseException {

Review comment:
       Note to reviewer: no changes here. just moved code to a private method 
for re-use

##########
File path: 
hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java
##########
@@ -116,20 +165,26 @@ public void testScalar() throws IOException {
 
     // timezone is GMT
     properties = getBaseKeyConfig("SCALAR", "yyyy-MM-dd hh", "GMT", "days");
-    HoodieKey hk5 = new 
TimestampBasedKeyGenerator(properties).getKey(baseRecord);
+    TimestampBasedKeyGenerator keyGen = new 
TimestampBasedKeyGenerator(properties);
+    HoodieKey hk5 = keyGen.getKey(baseRecord);
     assertEquals(hk5.getPartitionPath(), "2024-10-04 12");
+
+    // test w/ Row
+    baseRow = genericRecordToRow(baseRecord);
+    keyGen.initializeRowKeyGenerator(structType, testStructName, 
testNamespace);
+    assertEquals("2024-10-04 12", keyGen.getPartitionPathFromRow(baseRow));
   }
 
   @Test
   public void 
test_ExpectsMatch_SingleInputFormat_ISO8601WithMsZ_OutputTimezoneAsUTC() throws 
IOException {
     baseRecord.put("createTime", "2020-04-01T13:01:33.428Z");
     properties = this.getBaseKeyConfig(
-      "DATE_STRING",
-      "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
-      "",
-      "",
-      "yyyyMMddHH",
-      "GMT");
+        "DATE_STRING",

Review comment:
       Note to reviewer: I am yet to add tests to these new methods. Got these 
as part of rebase. Also, I notice few other test classes for each key 
generators after rebasing. Will add tests by tmrw to those new test classes. 

##########
File path: 
hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
##########
@@ -34,13 +36,28 @@ import org.scalatest.Assertions.fail
 class TestDataSourceDefaults {
 
   val schema = SchemaTestUtil.getComplexEvolvedSchema
+  val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)

Review comment:
       Note to reviewer: this is the test class where all key generators are 
tested for Row apis as well. Found new test classes for each key gen after 
rebasing. Yet to add tests to these new key gen test classes for Row based apis.

##########
File path: 
hudi-client/src/test/java/org/apache/hudi/testutils/HoodieDatasetTestUtils.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.testutils;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
+
+import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
+
+/**
+ * Dataset test utils.
+ */
+public class HoodieDatasetTestUtils {
+
+  public static final StructType STRUCT_TYPE = new StructType(new 
StructField[] {

Review comment:
       Note to reviewer: Can't leverage HoodieTestDataGenerator since each 
record is expected to be in certain format (meta columns followed by data 
columns). Hence introduced a new schema for testing "bulk insert dataset" 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.keygen;
-
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.exception.HoodieKeyException;
-
-import org.apache.avro.generic.GenericRecord;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Base class for all the built-in key generators. Contains methods structured 
for
- * code reuse amongst them.
- */
-public abstract class BuiltinKeyGenerator extends KeyGenerator {

Review comment:
       Note to reviewer: moved this file to hudi-spark 

##########
File path: 
hudi-client/src/test/java/org/apache/hudi/io/TestHoodieRowCreateHandle.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.ENCODER;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.STRUCT_TYPE;
+import static 
org.apache.hudi.testutils.HoodieDatasetTestUtils.getConfigBuilder;
+import static 
org.apache.hudi.testutils.HoodieDatasetTestUtils.getInternalRowWithError;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.getRandomRows;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.toInternalRows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Unit tests {@link HoodieRowCreateHandle}.
+ */
+public class TestHoodieRowCreateHandle extends HoodieClientTestHarness {
+
+  private static final Random RANDOM = new Random();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initSparkContexts("TestHoodieRowCreateHandle");
+    initPath();
+    initFileSystem();
+    initTestDataGenerator();
+    initMetaClient();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  @Test
+  public void testRowCreateHandle() throws IOException {
+    // init config and table
+    HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+    HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
+    List<String> fileNames = new ArrayList<>();
+    List<String> fileAbsPaths = new ArrayList<>();
+
+    Dataset<Row> totalInputRows = null;
+    // one round per partition
+    for (int i = 0; i < 5; i++) {
+      String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[i 
% 3];
+
+      // init some args
+      String fileId = UUID.randomUUID().toString();
+      String instantTime = "000";
+
+      HoodieRowCreateHandle handle = new HoodieRowCreateHandle(table, cfg, 
partitionPath, fileId, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), 
RANDOM.nextLong(), STRUCT_TYPE);
+      int size = 10 + RANDOM.nextInt(1000);
+      // Generate inputs
+      Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, 
false);
+      if (totalInputRows == null) {
+        totalInputRows = inputRows;
+      } else {
+        totalInputRows = totalInputRows.union(inputRows);
+      }
+
+      // issue writes
+      HoodieInternalWriteStatus writeStatus = 
writeAndGetWriteStatus(inputRows, handle);
+
+      fileAbsPaths.add(basePath + "/" + writeStatus.getStat().getPath());
+      fileNames.add(handle.getFileName());
+      // verify output
+      assertOutput(writeStatus, size, fileId, partitionPath, instantTime, 
totalInputRows, fileNames, fileAbsPaths);
+    }
+  }
+
+  /**
+   * Issue some corrupted or wrong schematized InternalRow after few valid 
InternalRows so that global error is thrown. write batch 1 of valid records 
write batch 2 of invalid records Global Error
+   * should be thrown.
+   */
+  @Test
+  public void testGlobalFailure() throws IOException {

Review comment:
       Note to reviewer: as mentioned above, if there is some error parsing 
partition path or record key, it will result in global error for the handle and 
not per record/row error. 
   I couldn't repro/test per record error. I tried writing a different datatype 
to one of the data column expecting the write to fail, but it didn't fail. So, 
as of now, there are no tests for per record failures. Same applies to 
RowFileWriter, InternalWriter etc. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/keygen/KeyGenerator.java
##########
@@ -51,4 +53,32 @@ protected KeyGenerator(TypedProperties config) {
     throw new UnsupportedOperationException("Bootstrap not supported for key 
generator. "
         + "Please override this method in your custom key generator.");
   }
+
+  /**
+   * Initializes {@link KeyGenerator} for {@link Row} based operations.
+   * @param structType structype of the dataset.
+   * @param structName struct name of the dataset.
+   * @param recordNamespace record namespace of the dataset.
+   */
+  public void initializeRowKeyGenerator(StructType structType, String 
structName, String recordNamespace) {

Review comment:
       Note to reviewer: introduced these new apis for Row based KeyGen. All 
Built in generators have implemented these. If any user has custom key 
generator, they don't need to implement these apis if not for 
"bulk_insert_dataset". But if they wish to use "bulk_insert_dataset", they 
might have to give implementations to these methods.

##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import scala.Option;
+
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
+
+/**
+ * Helper class to fetch fields from Row.
+ */
+public class RowKeyGeneratorHelper {
+
+  /**
+   * Generates record key for the corresponding {@link Row}.
+   * @param row instance of {@link Row} of interest
+   * @param recordKeyFields record key fields as a list
+   * @param recordKeyPositions record key positions for the corresponding 
record keys in {@code recordKeyFields}
+   * @param prefixFieldName {@code true} if field name need to be prefixed in 
the returned result. {@code false} otherwise.
+   * @return the record key thus generated
+   */
+  public static String getRecordKeyFromRow(Row row, List<String> 
recordKeyFields, Map<String, List<Integer>> recordKeyPositions, boolean 
prefixFieldName) {
+    AtomicBoolean keyIsNullOrEmpty = new AtomicBoolean(true);
+    String toReturn = IntStream.range(0, recordKeyFields.size()).mapToObj(idx 
-> {
+      String field = recordKeyFields.get(idx);
+      String val = null;
+      List<Integer> fieldPositions = recordKeyPositions.get(field);
+      if (fieldPositions.size() == 1) { // simple field
+        Integer fieldPos = fieldPositions.get(0);
+        if (row.isNullAt(fieldPos)) {
+          val = NULL_RECORDKEY_PLACEHOLDER;
+        } else {
+          val = row.getAs(field).toString();
+          if (val.isEmpty()) {
+            val = EMPTY_RECORDKEY_PLACEHOLDER;
+          } else {
+            keyIsNullOrEmpty.set(false);
+          }
+        }
+      } else { // nested fields
+        val = getNestedFieldVal(row, recordKeyPositions.get(field)).toString();
+        if (!val.contains(NULL_RECORDKEY_PLACEHOLDER) && 
!val.contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+          keyIsNullOrEmpty.set(false);
+        }
+      }
+      return prefixFieldName ? (field + ":" + val) : val;
+    }).collect(Collectors.joining(","));
+    if (keyIsNullOrEmpty.get()) {
+      throw new HoodieKeyException("recordKey value: \"" + toReturn + "\" for 
fields: \"" + Arrays.toString(recordKeyFields.toArray()) + "\" cannot be null 
or empty.");
+    }
+    return toReturn;
+  }
+
+  /**
+   * Generates partition path for the corresponding {@link Row}.
+   * @param row instance of {@link Row} of interest
+   * @param partitionPathFields partition path fields as a list
+   * @param hiveStylePartitioning {@code true} if hive style partitioning is 
set. {@code false} otherwise
+   * @param partitionPathPositions partition path positions for the 
corresponding fields in {@code partitionPathFields}
+   * @return the generated partition path for the row
+   */
+  public static String getPartitionPathFromRow(Row row, List<String> 
partitionPathFields, boolean hiveStylePartitioning, Map<String, List<Integer>> 
partitionPathPositions) {
+    return IntStream.range(0, partitionPathFields.size()).mapToObj(idx -> {
+      String field = partitionPathFields.get(idx);
+      String val = null;
+      List<Integer> fieldPositions = partitionPathPositions.get(field);
+      if (fieldPositions.size() == 1) { // simple
+        Integer fieldPos = fieldPositions.get(0);
+        // for partition path, if field is not found, index will be set to -1
+        if (fieldPos == -1 || row.isNullAt(fieldPos)) {
+          val = DEFAULT_PARTITION_PATH;
+        } else {
+          val = row.getAs(field).toString();
+          if (val.isEmpty()) {
+            val = DEFAULT_PARTITION_PATH;
+          }
+        }
+        if (hiveStylePartitioning) {
+          val = field + "=" + val;
+        }
+      } else { // nested
+        Object nestedVal = getNestedFieldVal(row, 
partitionPathPositions.get(field));
+        if (nestedVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || 
nestedVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+          val = hiveStylePartitioning ? field + "=" + DEFAULT_PARTITION_PATH : 
DEFAULT_PARTITION_PATH;
+        } else {
+          val = hiveStylePartitioning ? field + "=" + nestedVal.toString() : 
nestedVal.toString();
+        }
+      }
+      return val;
+    }).collect(Collectors.joining(DEFAULT_PARTITION_PATH_SEPARATOR));
+  }
+
+  /**
+   * Fetch the field value located at the positions requested for.
+   * @param row instance of {@link Row} of interest
+   * @param positions tree style positions where the leaf node need to be 
fetched and returned
+   * @return the field value as per the positions requested for.
+   */
+  public static Object getNestedFieldVal(Row row, List<Integer> positions) {
+    if (positions.size() == 1 && positions.get(0) == -1) {

Review comment:
       Note to reviewer: getNestedFieldIndices(StructType structType, String 
field, boolean isRecordKey) will return -1 for partitionPathIndices if 
partition path field is not found.

##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.AvroConversionHelper;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Function1;
+
+/**
+ * Base class for all the built-in key generators. Contains methods structured 
for
+ * code reuse amongst them.
+ */
+public abstract class BuiltinKeyGenerator extends KeyGenerator {
+
+  private List<String> recordKeyFields;

Review comment:
       Note to reviewer: Have unified code across Simple and Complex key gens. 

##########
File path: 
hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.hudi.exception.HoodieKeyException;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import scala.Option;
+
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH;
+import static 
org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
+
+/**
+ * Helper class to fetch fields from Row.
+ */
+public class RowKeyGeneratorHelper {
+
+  /**
+   * Generates record key for the corresponding {@link Row}.
+   * @param row instance of {@link Row} of interest
+   * @param recordKeyFields record key fields as a list
+   * @param recordKeyPositions record key positions for the corresponding 
record keys in {@code recordKeyFields}
+   * @param prefixFieldName {@code true} if field name need to be prefixed in 
the returned result. {@code false} otherwise.
+   * @return the record key thus generated
+   */
+  public static String getRecordKeyFromRow(Row row, List<String> 
recordKeyFields, Map<String, List<Integer>> recordKeyPositions, boolean 
prefixFieldName) {
+    AtomicBoolean keyIsNullOrEmpty = new AtomicBoolean(true);
+    String toReturn = IntStream.range(0, recordKeyFields.size()).mapToObj(idx 
-> {
+      String field = recordKeyFields.get(idx);
+      String val = null;
+      List<Integer> fieldPositions = recordKeyPositions.get(field);
+      if (fieldPositions.size() == 1) { // simple field
+        Integer fieldPos = fieldPositions.get(0);
+        if (row.isNullAt(fieldPos)) {
+          val = NULL_RECORDKEY_PLACEHOLDER;
+        } else {
+          val = row.getAs(field).toString();
+          if (val.isEmpty()) {
+            val = EMPTY_RECORDKEY_PLACEHOLDER;
+          } else {
+            keyIsNullOrEmpty.set(false);
+          }
+        }
+      } else { // nested fields
+        val = getNestedFieldVal(row, recordKeyPositions.get(field)).toString();
+        if (!val.contains(NULL_RECORDKEY_PLACEHOLDER) && 
!val.contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+          keyIsNullOrEmpty.set(false);
+        }
+      }
+      return prefixFieldName ? (field + ":" + val) : val;
+    }).collect(Collectors.joining(","));
+    if (keyIsNullOrEmpty.get()) {
+      throw new HoodieKeyException("recordKey value: \"" + toReturn + "\" for 
fields: \"" + Arrays.toString(recordKeyFields.toArray()) + "\" cannot be null 
or empty.");
+    }
+    return toReturn;
+  }
+
+  /**
+   * Generates partition path for the corresponding {@link Row}.
+   * @param row instance of {@link Row} of interest
+   * @param partitionPathFields partition path fields as a list
+   * @param hiveStylePartitioning {@code true} if hive style partitioning is 
set. {@code false} otherwise
+   * @param partitionPathPositions partition path positions for the 
corresponding fields in {@code partitionPathFields}
+   * @return the generated partition path for the row
+   */
+  public static String getPartitionPathFromRow(Row row, List<String> 
partitionPathFields, boolean hiveStylePartitioning, Map<String, List<Integer>> 
partitionPathPositions) {
+    return IntStream.range(0, partitionPathFields.size()).mapToObj(idx -> {
+      String field = partitionPathFields.get(idx);
+      String val = null;
+      List<Integer> fieldPositions = partitionPathPositions.get(field);
+      if (fieldPositions.size() == 1) { // simple
+        Integer fieldPos = fieldPositions.get(0);
+        // for partition path, if field is not found, index will be set to -1
+        if (fieldPos == -1 || row.isNullAt(fieldPos)) {
+          val = DEFAULT_PARTITION_PATH;
+        } else {
+          val = row.getAs(field).toString();
+          if (val.isEmpty()) {
+            val = DEFAULT_PARTITION_PATH;
+          }
+        }
+        if (hiveStylePartitioning) {
+          val = field + "=" + val;
+        }
+      } else { // nested
+        Object nestedVal = getNestedFieldVal(row, 
partitionPathPositions.get(field));
+        if (nestedVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER) || 
nestedVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) {
+          val = hiveStylePartitioning ? field + "=" + DEFAULT_PARTITION_PATH : 
DEFAULT_PARTITION_PATH;
+        } else {
+          val = hiveStylePartitioning ? field + "=" + nestedVal.toString() : 
nestedVal.toString();
+        }
+      }
+      return val;
+    }).collect(Collectors.joining(DEFAULT_PARTITION_PATH_SEPARATOR));
+  }
+
+  /**
+   * Fetch the field value located at the positions requested for.
+   * @param row instance of {@link Row} of interest
+   * @param positions tree style positions where the leaf node need to be 
fetched and returned
+   * @return the field value as per the positions requested for.
+   */
+  public static Object getNestedFieldVal(Row row, List<Integer> positions) {
+    if (positions.size() == 1 && positions.get(0) == -1) {
+      return DEFAULT_PARTITION_PATH;
+    }
+    int index = 0;
+    int totalCount = positions.size();
+    Row valueToProcess = row;
+    Object toReturn = null;
+
+    while (index < totalCount) {
+      if (index < totalCount - 1) {
+        if (valueToProcess.isNullAt(positions.get(index))) {
+          toReturn = NULL_RECORDKEY_PLACEHOLDER;
+          break;
+        }
+        valueToProcess = (Row) valueToProcess.get(positions.get(index));
+      } else { // last index
+        if (valueToProcess.getAs(positions.get(index)).toString().isEmpty()) {
+          toReturn = EMPTY_RECORDKEY_PLACEHOLDER;
+          break;
+        }
+        toReturn = valueToProcess.getAs(positions.get(index));
+      }
+      index++;
+    }
+    return toReturn;
+  }
+
+  /**
+   * Generate the tree style positions for the field requested for as per the 
defined struct type.
+   * @param structType schema of interest
+   * @param field field of interest for which the positions are requested for
+   * @param isRecordKey {@code true} if the field requested for is a record 
key. {@code false} incase of a partition path.
+   * @return the positions of the field as per the struct type.
+   */
+  public static List<Integer> getNestedFieldIndices(StructType structType, 
String field, boolean isRecordKey) {
+    String[] slices = field.split("\\.");
+    List<Integer> positions = new ArrayList<>();
+    int index = 0;
+    int totalCount = slices.length;
+    while (index < totalCount) {
+      String slice = slices[index];
+      Option<Object> curIndexOpt = structType.getFieldIndex(slice);
+      if (curIndexOpt.isDefined()) {
+        int curIndex = (int) curIndexOpt.get();
+        positions.add(curIndex);
+        final StructField nestedField = structType.fields()[curIndex];
+        if (index < totalCount - 1) {
+          if (!(nestedField.dataType() instanceof StructType)) {
+            if (isRecordKey) {
+              throw new HoodieKeyException("Nested field should be of type 
StructType " + nestedField);
+            } else {
+              positions = Collections.singletonList(-1);

Review comment:
       Note to reviewer: returning -1 only in case  of partition path. So, that 
 getNestedFieldVal(Row row, List<Integer> positions) will return 
DEFAULT_PARTITION_PATH if partition path field is not found. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/client/HoodieInternalWriteStatus.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Hoodie's internal write status used in datasource implementation of bulk 
insert.
+ */
+public class HoodieInternalWriteStatus implements Serializable {

Review comment:
       will address all feedback together. 

##########
File path: 
hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.ENCODER;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.STRUCT_TYPE;
+import static 
org.apache.hudi.testutils.HoodieDatasetTestUtils.getConfigBuilder;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.getRandomRows;
+import static org.apache.hudi.testutils.HoodieDatasetTestUtils.toInternalRows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Unit tests {@link HoodieDataSourceInternalWriter}.
+ */
+public class TestHoodieDataSourceInternalWriter extends 
HoodieClientTestHarness {
+
+  private static final Random RANDOM = new Random();
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initSparkContexts("TestHoodieDataSourceInternalWriter");
+    initPath();
+    initFileSystem();
+    initTestDataGenerator();
+    initMetaClient();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanupResources();
+  }
+
+  @Test
+  public void testDataSourceWriter() throws IOException {
+    // init config and table
+    HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+    String instantTime = "001";
+    // init writer
+    HoodieDataSourceInternalWriter dataSourceInternalWriter =
+        new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, 
sqlContext.sparkSession(), hadoopConf, WriteOperationType.BULK_INSERT_DATASET);
+    DataWriter<InternalRow> writer = 
dataSourceInternalWriter.createWriterFactory().createDataWriter(0, 
RANDOM.nextLong(), RANDOM.nextLong());
+
+    List<String> partitionPaths = 
Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
+    List<String> partitionPathsAbs = new ArrayList<>();
+    for (String partitionPath : partitionPaths) {
+      partitionPathsAbs.add(basePath + "/" + partitionPath + "/*");
+    }
+
+    int size = 10 + RANDOM.nextInt(1000);
+    int batches = 5;
+    Dataset<Row> totalInputRows = null;
+
+    for (int j = 0; j < batches; j++) {
+      String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j 
% 3];
+      Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, 
false);
+      writeRows(inputRows, writer);
+      if (totalInputRows == null) {
+        totalInputRows = inputRows;
+      } else {
+        totalInputRows = totalInputRows.union(inputRows);
+      }
+    }
+
+    HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) 
writer.commit();
+    List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
+    commitMessages.add(commitMetadata);
+    dataSourceInternalWriter.commit(commitMessages.toArray(new 
HoodieWriterCommitMessage[0]));
+    metaClient.reloadActiveTimeline();
+    Dataset<Row> result = HoodieClientTestUtils.read(jsc, basePath, 
sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
+    // verify output
+    assertOutput(totalInputRows, result, instantTime);
+    assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, 
size);
+  }
+
+  @Test
+  public void testMultipleDataSourceWrites() throws IOException {
+    // init config and table
+    HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+    int partitionCounter = 0;
+
+    // execute N rounds
+    for (int i = 0; i < 5; i++) {
+      String instantTime = "00" + i;
+      // init writer
+      HoodieDataSourceInternalWriter dataSourceInternalWriter =
+          new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, 
sqlContext.sparkSession(), hadoopConf, WriteOperationType.BULK_INSERT_DATASET);
+
+      List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
+      Dataset<Row> totalInputRows = null;
+      DataWriter<InternalRow> writer = 
dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++,
 RANDOM.nextLong(), RANDOM.nextLong());
+
+      int size = 10 + RANDOM.nextInt(1000);
+      int batches = 5; // one batch per partition
+
+      for (int j = 0; j < batches; j++) {
+        String partitionPath = 
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+        Dataset<Row> inputRows = getRandomRows(sqlContext, size, 
partitionPath, false);
+        writeRows(inputRows, writer);
+        if (totalInputRows == null) {
+          totalInputRows = inputRows;
+        } else {
+          totalInputRows = totalInputRows.union(inputRows);
+        }
+      }
+
+      HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) 
writer.commit();
+      commitMessages.add(commitMetadata);
+      dataSourceInternalWriter.commit(commitMessages.toArray(new 
HoodieWriterCommitMessage[0]));
+      metaClient.reloadActiveTimeline();
+
+      Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, 
sqlContext, metaClient.getCommitTimeline(), instantTime);
+
+      // verify output
+      assertOutput(totalInputRows, result, instantTime);
+      assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, 
size);
+    }
+  }
+
+  @Test
+  public void testLargeWrites() throws IOException {
+    // init config and table
+    HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
+    int partitionCounter = 0;
+
+    // execute N rounds
+    for (int i = 0; i < 3; i++) {
+      String instantTime = "00" + i;
+      // init writer
+      HoodieDataSourceInternalWriter dataSourceInternalWriter =
+          new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, 
sqlContext.sparkSession(), hadoopConf, WriteOperationType.BULK_INSERT_DATASET);
+
+      List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
+      Dataset<Row> totalInputRows = null;
+      DataWriter<InternalRow> writer = 
dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++,
 RANDOM.nextLong(), RANDOM.nextLong());
+
+      int size = 10000 + RANDOM.nextInt(10000);
+      int batches = 3; // one batch per partition
+
+      for (int j = 0; j < batches; j++) {
+        String partitionPath = 
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+        Dataset<Row> inputRows = getRandomRows(sqlContext, size, 
partitionPath, false);
+        writeRows(inputRows, writer);
+        if (totalInputRows == null) {
+          totalInputRows = inputRows;
+        } else {
+          totalInputRows = totalInputRows.union(inputRows);
+        }
+      }
+
+      HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) 
writer.commit();
+      commitMessages.add(commitMetadata);
+      dataSourceInternalWriter.commit(commitMessages.toArray(new 
HoodieWriterCommitMessage[0]));
+      metaClient.reloadActiveTimeline();
+
+      Dataset<Row> result = HoodieClientTestUtils.readCommit(basePath, 
sqlContext, metaClient.getCommitTimeline(), instantTime);
+
+      // verify output
+      assertOutput(totalInputRows, result, instantTime);
+      assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, 
size);
+    }
+  }
+
+  /**
+   * Tests that DataSourceWriter.abort() will abort the written records of 
interest write and commit batch1 write and abort batch2 Read of entire dataset 
should show only records from batch1.
+   * commit batch1
+   * abort batch2
+   * verify only records from batch1 is available to read
+   */
+  @Test
+  public void testAbort() throws IOException {

Review comment:
       Note to reviewer: here is the only place where we test abort for 
Datasource path. We couldn't test it elsewhere (TestHoodieRowCreateHandle, 
TestHoodieInternalRowParquetWriter, TestHoodieBulkInsertDataInternalWriter)

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -108,262 +106,280 @@ private[hudi] object HoodieSparkSqlWriter {
           throw new HoodieException(s"hoodie table with name 
$existingTableName already exist at $basePath")
         }
       }
-      val (writeStatuses, writeClient: 
HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
-        if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
-          // register classes & schemas
-          val structName = s"${tblName}_record"
-          val nameSpace = s"hoodie.${tblName}"
-          sparkContext.getConf.registerKryoClasses(
-            Array(classOf[org.apache.avro.generic.GenericData],
-              classOf[org.apache.avro.Schema]))
-          val schema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
-          sparkContext.getConf.registerAvroSchemas(schema)
-          log.info(s"Registered avro schema : ${schema.toString(true)}")
-
-          // Convert to RDD[HoodieRecord]
-          val keyGenerator = 
DataSourceUtils.createKeyGenerator(toProperties(parameters))
-          val genericRecords: RDD[GenericRecord] = 
AvroConversionUtils.createRdd(df, structName, nameSpace)
-          val hoodieAllIncomingRecords = genericRecords.map(gr => {
-            val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, 
parameters(PRECOMBINE_FIELD_OPT_KEY), false)
-              .asInstanceOf[Comparable[_]]
-            DataSourceUtils.createHoodieRecord(gr,
-              orderingVal, keyGenerator.getKey(gr),
-              parameters(PAYLOAD_CLASS_OPT_KEY))
-          }).toJavaRDD()
-
-          // Handle various save modes
-          if (mode == SaveMode.ErrorIfExists && exists) {
-            throw new HoodieException(s"hoodie table at $basePath already 
exists.")
-          }
 
-          if (mode == SaveMode.Overwrite && exists) {
-            log.warn(s"hoodie table at $basePath already exists. Deleting 
existing data & overwriting with new data.")
-            fs.delete(basePath, true)
-            exists = false
-          }
+      val (writeSuccessfulRetVal: Boolean, commitTimeRetVal: 
common.util.Option[String], compactionInstantRetVal: common.util.Option[String],
+      writeClientRetVal: HoodieWriteClient[HoodieRecordPayload[Nothing]], 
tableConfigRetVal: HoodieTableConfig) =
+         if 
(operation.equalsIgnoreCase(BULK_INSERT_DATASET_OPERATION_OPT_VAL)) {
+        // register classes & schemas
+        val structName = s"${tblName}_record"
+        val nameSpace = s"hoodie.${tblName}"
 
-          // Create the table if not present
-          if (!exists) {
-            //FIXME(bootstrap): bootstrapIndexClass needs to be set when 
bootstrap index class is integrated.
-            val tableMetaClient = 
HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
-              path.get, HoodieTableType.valueOf(tableType),
-              tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, 
null, null)
-            tableConfig = tableMetaClient.getTableConfig
-          }
+        // Handle various save modes
+        if (mode == SaveMode.ErrorIfExists && exists) {
+          throw new HoodieException(s"hoodie table at $basePath already 
exists.")
+        }
 
-          // Create a HoodieWriteClient & issue the write.
-          val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, 
schema.toString, path.get,
-            tblName, mapAsJavaMap(parameters)
-          )).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
+        val (success, commitTime: common.util.Option[String]) =
+          if (mode == SaveMode.Ignore && exists) {
+            log.warn(s"hoodie table at $basePath already exists. Ignoring & 
not performing actual writes.")
+            (false, common.util.Option.ofNullable(instantTime))
+          } else {
+            if (mode == SaveMode.Overwrite && exists) {
+              log.warn(s"hoodie table at $basePath already exists. Deleting 
existing data & overwriting with new data.")
+              fs.delete(basePath, true)
+              exists = false
+            }
 
-          if (asyncCompactionTriggerFn.isDefined &&
-            isAsyncCompactionEnabled(client, tableConfig, parameters, 
jsc.hadoopConfiguration())) {
-            asyncCompactionTriggerFn.get.apply(client)
-          }
+            // Create the table if not present
+            if (!exists) {
+              //FIXME(bootstrap): bootstrapIndexClass needs to be set when 
bootstrap index class is integrated.
+              val tableMetaClient = 
HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
+                path.get, HoodieTableType.valueOf(tableType),
+                tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, 
null, null)
+              tableConfig = tableMetaClient.getTableConfig
+            }
 
-          val hoodieRecords =
-            if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
-              DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, 
mapAsJavaMap(parameters))
+            val writeConfig = DataSourceUtils.createHoodieConfig(null, 
path.get, tblName,
+              mapAsJavaMap(parameters))
+
+            val hoodieDF = 
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, 
writeConfig, df, structName, nameSpace)
+            
hoodieDF.write.format("org.apache.hudi.internal").option(INSTANT_TIME, 
instantTime)
+              .options(parameters).save()
+            val hiveSyncEnabled = 
parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+            val syncHiveSucess = if (hiveSyncEnabled) {
+              log.info("Syncing to Hive Metastore (URL: " + 
parameters(HIVE_URL_OPT_KEY) + ")")
+              val fs = FSUtils.getFs(basePath.toString, 
jsc.hadoopConfiguration)
+              syncHive(basePath, fs, parameters)
             } else {
-              hoodieAllIncomingRecords
+              true
             }
-
-          if (hoodieRecords.isEmpty()) {
-            log.info("new batch has no new records, skipping...")
-            (true, common.util.Option.empty())
+            (syncHiveSucess, common.util.Option.ofNullable(instantTime))
           }
-          client.startCommitWithTime(instantTime)
-          val writeStatuses = DataSourceUtils.doWriteOperation(client, 
hoodieRecords, instantTime, operation)
-          (writeStatuses, client)
-        } else {
+        (success, commitTime, common.util.Option.of(""), 
hoodieWriteClient.orNull, tableConfig)
+       } else {
+        val (writeStatuses, writeClient: 
HoodieWriteClient[HoodieRecordPayload[Nothing]]) =

Review comment:
       Note to reviewer: no changes in the else section which is same as before 
for all write operations. Github does not show the difference well. Anyways, if 
possible just ensure there are no change as I had to manually resolve lot of 
conflicts during rebase. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to