This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 8ba01dc70a718ddab3044b45f96a545f0aae4084
Author: XuQianJin-Stars <forwar...@apache.com>
AuthorDate: Fri Oct 28 12:33:22 2022 +0800

    [HUDI-2624] Implement Non Index type for HUDI
---
 .../org/apache/hudi/config/HoodieIndexConfig.java  |  43 ++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  12 +
 .../java/org/apache/hudi/index/HoodieIndex.java    |   2 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |   3 +-
 .../apache/hudi/keygen/EmptyAvroKeyGenerator.java  |  65 +++++
 .../hudi/table/action/commit/BucketInfo.java       |   4 +
 .../hudi/table/action/commit/BucketType.java       |   2 +-
 .../apache/hudi/index/FlinkHoodieIndexFactory.java |   2 +
 .../org/apache/hudi/index/FlinkHoodieNonIndex.java |  65 +++++
 .../apache/hudi/index/SparkHoodieIndexFactory.java |   3 +
 .../hudi/index/nonindex/SparkHoodieNonIndex.java   |  73 ++++++
 .../hudi/io/storage/row/HoodieRowCreateHandle.java |   5 +-
 .../org/apache/hudi/keygen/EmptyKeyGenerator.java  |  80 +++++++
 .../commit/BaseSparkCommitActionExecutor.java      |  17 ++
 .../table/action/commit/UpsertPartitioner.java     |  35 ++-
 .../org/apache/hudi/common/model/FileSlice.java    |  13 +
 .../org/apache/hudi/common/model/HoodieKey.java    |   2 +
 .../table/log/HoodieMergedLogRecordScanner.java    |   3 +-
 .../apache/hudi/configuration/OptionsResolver.java |   4 +
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  32 ++-
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |   5 +
 .../sink/nonindex/NonIndexStreamWriteFunction.java | 265 +++++++++++++++++++++
 .../sink/nonindex/NonIndexStreamWriteOperator.java |  25 +-
 .../java/org/apache/hudi/sink/utils/Pipelines.java |   7 +
 .../org/apache/hudi/table/HoodieTableFactory.java  |  12 +
 .../org/apache/hudi/sink/TestWriteMergeOnRead.java |  54 +++++
 .../hudi/sink/utils/InsertFunctionWrapper.java     |   6 +
 .../sink/utils/StreamWriteFunctionWrapper.java     |  23 +-
 .../hudi/sink/utils/TestFunctionWrapper.java       |   6 +
 .../org/apache/hudi/sink/utils/TestWriteBase.java  |  48 ++++
 .../test/java/org/apache/hudi/utils/TestData.java  |  34 +++
 .../test/scala/org/apache/hudi/TestNonIndex.scala  | 110 +++++++++
 32 files changed, 1030 insertions(+), 30 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index ee5b83a43a..b5edaf4abc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -270,6 +270,34 @@ public class HoodieIndexConfig extends HoodieConfig {
       .withDocumentation("Index key. It is used to index the record and find 
its file group. "
           + "If not set, use record key field as default");
 
+  /**
+   *   public static final String 
NON_INDEX_PARTITION_FILE_GROUP_CACHE_INTERVAL_MINUTE = 
"hoodie.non.index.partition.file.group.cache.interval.minute";  //minutes
+   *   public static final String 
DEFAULT_NON_INDEX_PARTITION_FILE_GROUP_CACHE_INTERVAL_MINUTE = "1800";
+   *
+   *   public static final String NON_INDEX_PARTITION_FILE_GROUP_STORAGE_TYPE 
= "hoodie.non.index.partition.file.group.storage.type";
+   *   public static final String 
DEFAULT_NON_INDEX_PARTITION_FILE_GROUP_CACHE_STORAGE_TYPE = "IN_MEMORY";
+   *
+   *   public static final String NON_INDEX_PARTITION_FILE_GROUP_CACHE_SIZE = 
"hoodie.non.index.partition.file.group.cache.size";  //byte
+   *   public static final String 
DEFAULT_NON_INDEX_PARTITION_FILE_GROUP_CACHE_SIZE = String.valueOf(1048576000);
+   */
+  public static final ConfigProperty<Integer> 
NON_INDEX_PARTITION_FILE_GROUP_CACHE_INTERVAL_MINUTE = ConfigProperty
+      .key("hoodie.non.index.partition.file.group.cache.interval.minute")
+      .defaultValue(1800)
+      .withDocumentation("Only applies if index type is BUCKET. Determine the 
number of buckets in the hudi table, "
+          + "and each partition is divided to N buckets.");
+
+  public static final ConfigProperty<String> 
NON_INDEX_PARTITION_FILE_GROUP_STORAGE_TYPE = ConfigProperty
+      .key("hoodie.non.index.partition.file.group.storage.type")
+      .defaultValue("IN_MEMORY")
+      .withDocumentation("Only applies if index type is BUCKET. Determine the 
number of buckets in the hudi table, "
+          + "and each partition is divided to N buckets.");
+
+  public static final ConfigProperty<Long> 
NON_INDEX_PARTITION_FILE_GROUP_CACHE_SIZE = ConfigProperty
+      .key("hoodie.non.index.partition.file.group.cache.size")
+      .defaultValue(1024 * 1024 * 1024L)
+      .withDocumentation("Only applies if index type is BUCKET. Determine the 
number of buckets in the hudi table, "
+          + "and each partition is divided to N buckets.");
+
   /**
    * Deprecated configs. These are now part of {@link HoodieHBaseIndexConfig}.
    */
@@ -606,6 +634,21 @@ public class HoodieIndexConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withPartitionFileGroupCacheInterval(int cacheInterval) {
+      
hoodieIndexConfig.setValue(NON_INDEX_PARTITION_FILE_GROUP_CACHE_INTERVAL_MINUTE,
 String.valueOf(cacheInterval));
+      return this;
+    }
+
+    public Builder withPartitionFileGroupStorageType(String storageType) {
+      hoodieIndexConfig.setValue(NON_INDEX_PARTITION_FILE_GROUP_STORAGE_TYPE, 
storageType);
+      return this;
+    }
+
+    public Builder withPartitionFileGroupCacheSize(long size) {
+      hoodieIndexConfig.setValue(NON_INDEX_PARTITION_FILE_GROUP_CACHE_SIZE, 
String.valueOf(size));
+      return this;
+    }
+
     public HoodieIndexConfig build() {
       hoodieIndexConfig.setDefaultValue(INDEX_TYPE, 
getDefaultIndexType(engineType));
       hoodieIndexConfig.setDefaults(HoodieIndexConfig.class.getName());
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 9610ad382b..034519e64a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1673,6 +1673,18 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD);
   }
 
+  public int getNonIndexPartitionFileGroupCacheIntervalMinute() {
+    return 
getIntOrDefault(HoodieIndexConfig.NON_INDEX_PARTITION_FILE_GROUP_CACHE_INTERVAL_MINUTE);
+  }
+
+  public String getNonIndexPartitionFileGroupStorageType() {
+    return 
getString(HoodieIndexConfig.NON_INDEX_PARTITION_FILE_GROUP_STORAGE_TYPE);
+  }
+
+  public long getNonIndexPartitionFileGroupCacheSize() {
+    return 
getLong(HoodieIndexConfig.NON_INDEX_PARTITION_FILE_GROUP_CACHE_SIZE);
+  }
+
   /**
    * storage properties.
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java
index 7ebd94748a..074a90fe5d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java
@@ -141,7 +141,7 @@ public abstract class HoodieIndex<I, O> implements 
Serializable {
   }
 
   public enum IndexType {
-    HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, BUCKET, 
FLINK_STATE
+    HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, BUCKET, 
FLINK_STATE, NON_INDEX
   }
 
   public enum BucketIndexEngineType {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 82c6de5761..e629c6a51e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -333,8 +333,9 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload, I, K, O> extends H
    */
   public void write(GenericRecord oldRecord) {
     String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, 
keyGeneratorOpt);
+    boolean isEmptyRecordKey = key.equals(HoodieKey.EMPTY_RECORD_KEY);
     boolean copyOldRecord = true;
-    if (keyToNewRecords.containsKey(key)) {
+    if (!isEmptyRecordKey && keyToNewRecords.containsKey(key)) {
       // If we have duplicate records that we are updating, then the hoodie 
record will be deflated after
       // writing the first record. So make a copy of the record to be merged
       HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key).newInstance();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/EmptyAvroKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/EmptyAvroKeyGenerator.java
new file mode 100644
index 0000000000..01536f95e4
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/EmptyAvroKeyGenerator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.hudi.common.model.HoodieKey;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+/**
+ * Avro key generator for empty record key Hudi tables.
+ */
+public class EmptyAvroKeyGenerator extends BaseKeyGenerator {
+
+  private static final Logger LOG = 
LogManager.getLogger(EmptyAvroKeyGenerator.class);
+  public static final String EMPTY_RECORD_KEY = HoodieKey.EMPTY_RECORD_KEY;
+  private static final List<String> EMPTY_RECORD_KEY_FIELD_LIST = 
Collections.emptyList();
+
+  public EmptyAvroKeyGenerator(TypedProperties props) {
+    super(props);
+    if (config.containsKey(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())) {
+      LOG.warn(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() + " will be 
ignored while using "
+          + this.getClass().getSimpleName());
+    }
+    this.recordKeyFields = EMPTY_RECORD_KEY_FIELD_LIST;
+    this.partitionPathFields = 
Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
+        .split(",")).map(String::trim).filter(s -> 
!s.isEmpty()).collect(Collectors.toList());
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    return EMPTY_RECORD_KEY;
+  }
+
+  @Override
+  public String getPartitionPath(GenericRecord record) {
+    return KeyGenUtils.getRecordPartitionPath(record, 
getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath, 
isConsistentLogicalTimestampEnabled());
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
index 6547da6425..6db5b270b0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketInfo.java
@@ -48,6 +48,10 @@ public class BucketInfo implements Serializable {
     return partitionPath;
   }
 
+  public void setBucketType(BucketType bucketType) {
+    this.bucketType = bucketType;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("BucketInfo {");
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
index 70ee473d24..b32d32db40 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
@@ -19,5 +19,5 @@
 package org.apache.hudi.table.action.commit;
 
 public enum BucketType {
-  UPDATE, INSERT
+  UPDATE, INSERT, APPEND
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
index b10014b918..ba2f5a39fd 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
@@ -60,6 +60,8 @@ public final class FlinkHoodieIndexFactory {
         return new HoodieGlobalSimpleIndex(config, Option.empty());
       case BUCKET:
         return new HoodieSimpleBucketIndex(config);
+      case NON_INDEX:
+        return new FlinkHoodieNonIndex(config);
       default:
         throw new HoodieIndexException("Unsupported index type " + 
config.getIndexType());
     }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieNonIndex.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieNonIndex.java
new file mode 100644
index 0000000000..a7eb2c2262
--- /dev/null
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieNonIndex.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index;
+
+import java.util.List;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+public class FlinkHoodieNonIndex<T extends HoodieRecordPayload> extends 
FlinkHoodieIndex<T> {
+
+  public FlinkHoodieNonIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    return true;
+  }
+
+  @Override
+  public boolean isGlobal() {
+    throw new UnsupportedOperationException("Unsupport operation.");
+  }
+
+  @Override
+  public boolean canIndexLogFiles() {
+    return false;
+  }
+
+  @Override
+  public boolean isImplicitWithStorage() {
+    return true;
+  }
+
+  @Override
+  public List<WriteStatus> updateLocation(List<WriteStatus> writeStatuses, 
HoodieEngineContext context, HoodieTable hoodieTable) throws 
HoodieIndexException {
+    return null;
+  }
+
+  @Override
+  public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> 
hoodieRecords, HoodieEngineContext context, HoodieTable hoodieTable) throws 
HoodieIndexException {
+    return null;
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
index 4525490c8d..4f07a7f2aa 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java
@@ -32,6 +32,7 @@ import org.apache.hudi.index.bucket.HoodieSimpleBucketIndex;
 import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
 import org.apache.hudi.index.hbase.SparkHoodieHBaseIndex;
 import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex;
+import org.apache.hudi.index.nonindex.SparkHoodieNonIndex;
 import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex;
 import org.apache.hudi.index.simple.HoodieSimpleIndex;
 import org.apache.hudi.keygen.BaseKeyGenerator;
@@ -74,6 +75,8 @@ public final class SparkHoodieIndexFactory {
           default:
             throw new HoodieIndexException("Unknown bucket index engine type: 
" + config.getBucketIndexEngineType());
         }
+      case NON_INDEX:
+        return new SparkHoodieNonIndex<>(config);
       default:
         throw new HoodieIndexException("Index type unspecified, set " + 
config.getIndexType());
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/nonindex/SparkHoodieNonIndex.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/nonindex/SparkHoodieNonIndex.java
new file mode 100644
index 0000000000..d239b7d6f7
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/nonindex/SparkHoodieNonIndex.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.nonindex;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.SparkHoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.spark.api.java.JavaRDD;
+
+public class SparkHoodieNonIndex<T extends HoodieRecordPayload<T>> extends 
SparkHoodieIndex<T> {
+
+  public SparkHoodieNonIndex(HoodieWriteConfig config) {
+    super(config);
+  }
+
+  @Override
+  public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> 
writeStatusRDD,
+                                             HoodieEngineContext context,
+                                             HoodieTable hoodieTable)
+      throws HoodieIndexException {
+    return writeStatusRDD;
+  }
+
+  @Override
+  public boolean rollbackCommit(String instantTime) {
+    return true;
+  }
+
+  @Override
+  public boolean isGlobal() {
+    return false;
+  }
+
+  @Override
+  public boolean canIndexLogFiles() {
+    return false;
+  }
+
+  @Override
+  public boolean isImplicitWithStorage() {
+    return true;
+  }
+
+  @Override
+  public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> records,
+                                              HoodieEngineContext context,
+                                              HoodieTable hoodieTable)
+      throws HoodieIndexException {
+    throw new UnsupportedOperationException("Unsupport operation.");
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
index 9da04f7260..da63a0f2c5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
@@ -169,7 +169,10 @@ public class HoodieRowCreateHandle implements Serializable 
{
       //          and [[String]])
       //          - Repeated computations (for ex, converting file-path to 
[[UTF8String]] over and
       //          over again)
-      UTF8String recordKey = 
row.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_ORD);
+      UTF8String recordKey = null;
+      if (!row.isNullAt(HoodieRecord.RECORD_KEY_META_FIELD_ORD)) {
+        recordKey = row.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_ORD);
+      }
       UTF8String partitionPath = 
row.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_ORD);
       // This is the only meta-field that is generated dynamically, hence 
conversion b/w
       // [[String]] and [[UTF8String]] is unavoidable if 
preserveHoodieMetadata is false
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/EmptyKeyGenerator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/EmptyKeyGenerator.java
new file mode 100644
index 0000000000..9e4090a537
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/EmptyKeyGenerator.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+/**
+ * Key generator for Hudi tables without record key.
+ */
+public class EmptyKeyGenerator extends BuiltinKeyGenerator {
+
+  public static final String EMPTY_RECORD_KEY = 
EmptyAvroKeyGenerator.EMPTY_RECORD_KEY;
+
+  private final EmptyAvroKeyGenerator emptyAvroKeyGenerator;
+
+  public EmptyKeyGenerator(TypedProperties config) {
+    super(config);
+    this.emptyAvroKeyGenerator = new EmptyAvroKeyGenerator(config);
+    this.recordKeyFields = emptyAvroKeyGenerator.getRecordKeyFieldNames();
+    this.partitionPathFields = 
Arrays.stream(config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
+        .split(",")).map(String::trim).filter(s -> 
!s.isEmpty()).collect(Collectors.toList());
+  }
+
+  @Override
+  public String getRecordKey(GenericRecord record) {
+    return EMPTY_RECORD_KEY;
+  }
+
+  @Override
+  public String getRecordKey(Row row) {
+    return EMPTY_RECORD_KEY;
+  }
+
+  @Override
+  public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    return combineCompositeRecordKeyUnsafe(EMPTY_RECORD_KEY);
+  }
+
+  @Override
+  public String getPartitionPath(GenericRecord record) {
+    return emptyAvroKeyGenerator.getPartitionPath(record);
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    tryInitRowAccessor(row.schema());
+    return combinePartitionPath(rowAccessor.getRecordPartitionPathValues(row));
+  }
+
+  @Override
+  public UTF8String getPartitionPath(InternalRow row, StructType schema) {
+    tryInitRowAccessor(schema);
+    return 
combinePartitionPathUnsafe(rowAccessor.getRecordPartitionPathValues(row));
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index f8e4b31ff6..bb9e00e74e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -45,6 +45,7 @@ import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.execution.SparkLazyInsertIterable;
 import org.apache.hudi.io.CreateHandleFactory;
 import org.apache.hudi.io.HoodieConcatHandle;
+import org.apache.hudi.io.HoodieAppendHandle;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.HoodieSortedMergeHandle;
 import org.apache.hudi.keygen.BaseKeyGenerator;
@@ -320,6 +321,8 @@ public abstract class BaseSparkCommitActionExecutor<T 
extends HoodieRecordPayloa
         return handleInsert(binfo.fileIdPrefix, recordItr);
       } else if (btype.equals(BucketType.UPDATE)) {
         return handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, 
recordItr);
+      } else if (btype.equals(BucketType.APPEND)) {
+        return handleAppend(binfo.partitionPath, binfo.fileIdPrefix, 
recordItr);
       } else {
         throw new HoodieUpsertException("Unknown bucketType " + btype + " for 
partition :" + partition);
       }
@@ -335,6 +338,20 @@ public abstract class BaseSparkCommitActionExecutor<T 
extends HoodieRecordPayloa
     return handleUpsertPartition(instantTime, partition, recordItr, 
partitioner);
   }
 
+  @SuppressWarnings("unchecked")
+  protected Iterator<List<WriteStatus>> handleAppend(String partitionPath, 
String fileId,
+                                                    Iterator<HoodieRecord<T>> 
recordItr) throws IOException {
+    // This is needed since sometimes some buckets are never picked in 
getPartition() and end up with 0 records
+    if (!recordItr.hasNext()) {
+      LOG.info("Empty partition with fileId => " + fileId);
+      return Collections.singletonList((List<WriteStatus>) 
Collections.EMPTY_LIST).iterator();
+    }
+    HoodieAppendHandle<?, ?, ?, ?> appendHandle = new 
HoodieAppendHandle<>(config, instantTime,
+        table, partitionPath, fileId, recordItr, taskContextSupplier);
+    appendHandle.doAppend();
+    return Collections.singletonList(appendHandle.close()).iterator();
+  }
+
   @Override
   public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String 
fileId,
                                                   Iterator<HoodieRecord<T>> 
recordItr)
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index c2f5a43066..66cef31ff4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.util.NumericUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.nonindex.SparkHoodieNonIndex;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.table.WorkloadStat;
@@ -84,6 +85,8 @@ public class UpsertPartitioner<T extends 
HoodieRecordPayload<T>> extends SparkHo
 
   protected final HoodieWriteConfig config;
 
+  private int recordCnt;
+
   public UpsertPartitioner(WorkloadProfile profile, HoodieEngineContext 
context, HoodieTable table,
       HoodieWriteConfig config) {
     super(profile, table);
@@ -106,7 +109,7 @@ public class UpsertPartitioner<T extends 
HoodieRecordPayload<T>> extends SparkHo
       WorkloadStat outputWorkloadStats = 
profile.getOutputPartitionPathStatMap().getOrDefault(partitionStat.getKey(), 
new WorkloadStat());
       for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
           partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
-        addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
+        addNewBucket(partitionStat.getKey(), updateLocEntry.getKey());
         if (profile.hasOutputWorkLoadStats()) {
           HoodieRecordLocation hoodieRecordLocation = new 
HoodieRecordLocation(updateLocEntry.getValue().getKey(), 
updateLocEntry.getKey());
           outputWorkloadStats.addUpdates(hoodieRecordLocation, 
updateLocEntry.getValue().getValue());
@@ -118,10 +121,16 @@ public class UpsertPartitioner<T extends 
HoodieRecordPayload<T>> extends SparkHo
     }
   }
 
-  private int addUpdateBucket(String partitionPath, String fileIdHint) {
+  private int addNewBucket(String partitionPath, String fileIdHint) {
+    BucketInfo bucketInfo;
+    if (table.getIndex() instanceof SparkHoodieNonIndex) {
+      bucketInfo = new BucketInfo(BucketType.APPEND, fileIdHint, 
partitionPath);
+    } else {
+      bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, 
partitionPath);
+    }
+
     int bucket = totalBuckets;
     updateLocationToBucket.put(fileIdHint, bucket);
-    BucketInfo bucketInfo = new BucketInfo(BucketType.UPDATE, fileIdHint, 
partitionPath);
     bucketInfoMap.put(totalBuckets, bucketInfo);
     totalBuckets++;
     return bucket;
@@ -196,7 +205,7 @@ public class UpsertPartitioner<T extends 
HoodieRecordPayload<T>> extends SparkHo
               bucket = 
updateLocationToBucket.get(smallFile.location.getFileId());
               LOG.info("Assigning " + recordsToAppend + " inserts to existing 
update bucket " + bucket);
             } else {
-              bucket = addUpdateBucket(partitionPath, 
smallFile.location.getFileId());
+              bucket = addNewBucket(partitionPath, 
smallFile.location.getFileId());
               LOG.info("Assigning " + recordsToAppend + " inserts to new 
update bucket " + bucket);
             }
             if (profile.hasOutputWorkLoadStats()) {
@@ -336,12 +345,18 @@ public class UpsertPartitioner<T extends 
HoodieRecordPayload<T>> extends SparkHo
     } else {
       String partitionPath = keyLocation._1().getPartitionPath();
       List<InsertBucketCumulativeWeightPair> targetBuckets = 
partitionPathToInsertBucketInfos.get(partitionPath);
-      // pick the target bucket to use based on the weights.
-      final long totalInserts = Math.max(1, 
profile.getWorkloadStat(partitionPath).getNumInserts());
-      final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", 
keyLocation._1().getRecordKey());
-      final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / 
totalInserts;
-
-      int index = Collections.binarySearch(targetBuckets, new 
InsertBucketCumulativeWeightPair(new InsertBucket(), r));
+      int index;
+      if (keyLocation._1.getRecordKey().equals(HoodieKey.EMPTY_RECORD_KEY)) {
+        // round robing
+        index = (++recordCnt) % targetBuckets.size();
+      } else {
+        // pick the target bucket to use based on the weights.
+        final long totalInserts = Math.max(1, 
profile.getWorkloadStat(partitionPath).getNumInserts());
+        final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", 
keyLocation._1().getRecordKey());
+        final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / 
totalInserts;
+
+        index = Collections.binarySearch(targetBuckets, new 
InsertBucketCumulativeWeightPair(new InsertBucket(), r));
+      }
 
       if (index >= 0) {
         return targetBuckets.get(index).getKey().bucketNumber;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
index 0fc580db0b..6cb1ed90a7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
@@ -116,6 +116,19 @@ public class FileSlice implements Serializable {
     return (baseFile == null) && (logFiles.isEmpty());
   }
 
+  public long getFileGroupSize() {
+    long totalSize = 0;
+    if (baseFile != null) {
+      totalSize += baseFile.getFileSize();
+    }
+    if (logFiles != null) {
+      for (HoodieLogFile logFile : logFiles) {
+        totalSize += logFile.getFileSize();
+      }
+    }
+    return totalSize;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("FileSlice {");
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
index 9030204099..68ae58e5f8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
@@ -29,6 +29,8 @@ import java.util.Objects;
  */
 public class HoodieKey implements Serializable {
 
+  public static final String EMPTY_RECORD_KEY = "_hoodie_empty_record_key_";
+
   private String recordKey;
   private String partitionPath;
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index e3d8554d00..5ef0a6821f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -77,6 +77,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
   private long maxMemorySizeInBytes;
   // Stores the total time taken to perform reading and merging of log blocks
   private long totalTimeTakenToReadAndMergeBlocks;
+  private int emptyKeySuffix;
 
   @SuppressWarnings("unchecked")
   protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, 
List<String> logFilePaths, Schema readerSchema,
@@ -158,7 +159,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
       }
     } else {
       // Put the record as is
-      records.put(key, hoodieRecord);
+      records.put(key.equals(HoodieKey.EMPTY_RECORD_KEY) ? key + 
(emptyKeySuffix++) : key, hoodieRecord);
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 00ebf09426..72f8f39011 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -119,6 +119,10 @@ public class OptionsResolver {
     return 
conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name());
   }
 
+  public static boolean isNonIndexType(Configuration conf) {
+    return 
conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.NON_INDEX.name());
+  }
+
   /**
    * Returns whether the source should emit changelog.
    *
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 2748af5290..a0f994f04a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -202,7 +202,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
    * <p>A {@link HoodieRecord} was firstly transformed into a {@link DataItem}
    * for buffering, it then transforms back to the {@link HoodieRecord} before 
flushing.
    */
-  private static class DataItem {
+  protected static class DataItem {
     private final String key; // record key
     private final String instant; // 'U' or 'I'
     private final HoodieRecordPayload<?> data; // record payload
@@ -283,7 +283,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
    * Tool to detect if to flush out the existing buffer.
    * Sampling the record to compute the size with 0.01 percentage.
    */
-  private static class BufferSizeDetector {
+  protected static class BufferSizeDetector {
     private final Random random = new Random(47);
     private static final int DENOMINATOR = 100;
 
@@ -292,11 +292,11 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     private long lastRecordSize = -1L;
     private long totalSize = 0L;
 
-    BufferSizeDetector(double batchSizeMb) {
+    public BufferSizeDetector(double batchSizeMb) {
       this.batchSizeBytes = batchSizeMb * 1024 * 1024;
     }
 
-    boolean detect(Object record) {
+    public boolean detect(Object record) {
       if (lastRecordSize == -1 || sampling()) {
         lastRecordSize = ObjectSizeCalculator.getObjectSize(record);
       }
@@ -304,15 +304,29 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
       return totalSize > this.batchSizeBytes;
     }
 
-    boolean sampling() {
+    public boolean detect(long recordSize) {
+      lastRecordSize = recordSize;
+      totalSize += lastRecordSize;
+      return totalSize > this.batchSizeBytes;
+    }
+
+    public boolean sampling() {
       // 0.01 sampling percentage
       return random.nextInt(DENOMINATOR) == 1;
     }
 
-    void reset() {
+    public void reset() {
       this.lastRecordSize = -1L;
       this.totalSize = 0L;
     }
+
+    public void setTotalSize(long totalSize) {
+      this.totalSize = totalSize;
+    }
+
+    public long getLastRecordSize() {
+      return lastRecordSize;
+    }
   }
 
   /**
@@ -381,7 +395,7 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
 
     bucket.records.add(item);
 
-    boolean flushBucket = bucket.detector.detect(item);
+    boolean flushBucket = shouldFlushBucket(bucket.detector, item, 
value.getPartitionPath());
     boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
     if (flushBucket) {
       if (flushBucket(bucket)) {
@@ -484,4 +498,8 @@ public class StreamWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     // blocks flushing until the coordinator starts a new instant
     this.confirming = true;
   }
+
+  protected boolean shouldFlushBucket(BufferSizeDetector detector, DataItem 
item, String partitionPath) {
+    return detector.detect(item);
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index c87d5b2443..670748b90f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -562,6 +562,11 @@ public class StreamWriteOperatorCoordinator
     return instant;
   }
 
+  @VisibleForTesting
+  public HoodieFlinkWriteClient getWriteClient() {
+    return writeClient;
+  }
+
   @VisibleForTesting
   public Context getContext() {
     return context;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/nonindex/NonIndexStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/nonindex/NonIndexStreamWriteFunction.java
new file mode 100644
index 0000000000..517234fc92
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/nonindex/NonIndexStreamWriteFunction.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.nonindex;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.hudi.client.FlinkTaskContextSupplier;
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.sink.StreamWriteFunction;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Used for MOR table if index type is non_index.
+ */
+public class NonIndexStreamWriteFunction<I>
+    extends StreamWriteFunction<I> {
+
+  private static final Logger LOG = 
LogManager.getLogger(NonIndexStreamWriteFunction.class);
+
+  private transient Map<String, FileGroupInfo> partitionPathToFileId;
+
+  private transient HoodieWriteConfig writeConfig;
+
+  private transient PartitionFileGroupHandle partitionFileGroupHandle;
+
+  /**
+   * Constructs a StreamingSinkFunction.
+   *
+   * @param config The config options
+   */
+  public NonIndexStreamWriteFunction(Configuration config) {
+    super(config);
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+    partitionPathToFileId = new ConcurrentHashMap<>();
+    writeConfig = StreamerUtil.getHoodieClientConfig(this.config);
+    partitionFileGroupHandle = 
getPartitionHandle(PartitionFileGroupStorageType.valueOf(writeConfig.getNonIndexPartitionFileGroupStorageType()));
+    partitionFileGroupHandle.init();
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {
+    super.snapshotState(functionSnapshotContext);
+    partitionFileGroupHandle.reset();
+  }
+
+  @Override
+  public void processElement(I value, ProcessFunction<I, Object>.Context 
context, Collector<Object> collector) throws Exception {
+    HoodieRecord<?> record = (HoodieRecord<?>) value;
+    assignFileId(record);
+    bufferRecord(record);
+  }
+
+  private void assignFileId(HoodieRecord<?> record) {
+    String partitionPath = record.getPartitionPath();
+    FileGroupInfo fileGroupInfo = 
partitionPathToFileId.computeIfAbsent(partitionPath,
+        key -> getLatestFileGroupInfo(record.getPartitionPath()));
+    if (fileGroupInfo == null || !fileGroupInfo.canAssign(record)) {
+      fileGroupInfo = new 
FileGroupInfo(FSUtils.getFileId(FSUtils.createNewFileIdPfx()));
+      partitionPathToFileId.put(partitionPath, fileGroupInfo);
+    }
+    record.unseal();
+    record.setCurrentLocation(new HoodieRecordLocation("U", 
fileGroupInfo.getFileId()));
+    record.seal();
+  }
+
+  private FileGroupInfo getLatestFileGroupInfo(String partitionPath) {
+    return partitionFileGroupHandle.getLatestFileGroupInfo(partitionPath);
+  }
+
+  @Override
+  public void endInput() {
+    super.endInput();
+    partitionPathToFileId.clear();
+  }
+
+  private abstract class PartitionFileGroupHandle {
+
+    abstract void init();
+
+    abstract FileGroupInfo getLatestFileGroupInfo(String partitionPath);
+
+    abstract void reset();
+
+  }
+
+  private class InMemoryPartitionFileGroupHandle extends 
PartitionFileGroupHandle {
+
+    private long cacheIntervalMills;
+
+    @Override
+    public void init() {
+      cacheIntervalMills = 
writeConfig.getNonIndexPartitionFileGroupCacheIntervalMinute() * 60 * 1000;
+    }
+
+    @Override
+    public FileGroupInfo getLatestFileGroupInfo(String partitionPath) {
+      return null;
+    }
+
+    @Override
+    public void reset() {
+      long curTime = System.currentTimeMillis();
+      for (Entry<String, FileGroupInfo> entry : 
partitionPathToFileId.entrySet()) {
+        if (curTime - entry.getValue().getCreateTime() >= cacheIntervalMills) {
+          partitionPathToFileId.remove(entry.getKey());
+        }
+      }
+    }
+  }
+
+  @Override
+  protected boolean shouldFlushBucket(BufferSizeDetector detector, DataItem 
item, String partitionPath) {
+    FileGroupInfo fileGroupInfo = partitionPathToFileId.get(partitionPath);
+    return fileGroupInfo == null || fileGroupInfo.getLastRecordSize() == -1 ? 
detector.detect(item)
+        : detector.detect(fileGroupInfo.getLastRecordSize());
+  }
+
+  private class FileSystemPartitionFileGroupHandle extends 
PartitionFileGroupHandle {
+
+    private String writeToken;
+
+    private HoodieTable<?, ?, ?, ?> table;
+
+    private HoodieFlinkEngineContext engineContext;
+
+    private SyncableFileSystemView fileSystemView;
+
+    private boolean needSync = false;
+
+    @Override
+    public void init() {
+      this.engineContext = new HoodieFlinkEngineContext(
+          new 
SerializableConfiguration(HadoopConfigurations.getHadoopConf(config)),
+          new FlinkTaskContextSupplier(getRuntimeContext()));
+      this.writeToken = FSUtils.makeWriteToken(
+          
engineContext.getTaskContextSupplier().getPartitionIdSupplier().get(),
+          engineContext.getTaskContextSupplier().getStageIdSupplier().get(),
+          engineContext.getTaskContextSupplier().getAttemptIdSupplier().get());
+      this.table = HoodieFlinkTable.create(writeConfig, engineContext);
+      this.fileSystemView = table.getHoodieView();
+    }
+
+    @Override
+    public FileGroupInfo getLatestFileGroupInfo(String partitionPath) {
+      if (needSync) {
+        fileSystemView.sync();
+        needSync = false;
+      }
+      List<HoodieLogFile> hoodieLogFiles = 
fileSystemView.getAllFileSlices(partitionPath)
+          .map(FileSlice::getLatestLogFile)
+          .filter(Option::isPresent).map(Option::get)
+          .filter(logFile -> 
FSUtils.getWriteTokenFromLogPath(logFile.getPath()).equals(writeToken))
+          
.sorted(HoodieLogFile.getReverseLogFileComparator()).collect(Collectors.toList());
+      if (hoodieLogFiles.size() > 0) {
+        HoodieLogFile hoodieLogFile = hoodieLogFiles.get(0);
+        Option<FileSlice> fileSlice = 
fileSystemView.getLatestFileSlice(partitionPath, hoodieLogFile.getFileId());
+        if (fileSlice.isPresent()) {
+          return new 
FileGroupInfo(FSUtils.getFileIdFromFilePath(hoodieLogFile.getPath()),
+              fileSlice.get().getFileGroupSize());
+        }
+        LOG.warn("Can location fileSlice, partitionPath: " + partitionPath + 
", fileId: "
+            + hoodieLogFile.getFileId());
+      }
+      return null;
+    }
+
+    @Override
+    public void reset() {
+      partitionPathToFileId.clear();
+      needSync = true;
+    }
+  }
+
+  private PartitionFileGroupHandle 
getPartitionHandle(PartitionFileGroupStorageType storageType) {
+    switch (storageType) {
+      case IN_MEMORY:
+        return new InMemoryPartitionFileGroupHandle();
+      case FILE_SYSTEM:
+        return new FileSystemPartitionFileGroupHandle();
+      default:
+        throw new IllegalArgumentException("UnSupport storage type :" + 
storageType.name());
+    }
+  }
+
+  private enum PartitionFileGroupStorageType {
+    IN_MEMORY, FILE_SYSTEM
+  }
+
+  private class FileGroupInfo {
+
+    private final String fileId;
+    private final long createTime;
+    private final BufferSizeDetector detector;
+
+    public FileGroupInfo(String fileId) {
+      this.fileId = fileId;
+      this.createTime = System.currentTimeMillis();
+      this.detector = new BufferSizeDetector((double) 
writeConfig.getNonIndexPartitionFileGroupCacheSize() / 1024 / 1024);
+    }
+
+    public FileGroupInfo(String fileId, long initFileSize) {
+      this(fileId);
+      detector.setTotalSize(initFileSize);
+    }
+
+    public boolean canAssign(HoodieRecord<?> record) {
+      return !detector.detect(record);
+    }
+
+    public String getFileId() {
+      return fileId;
+    }
+
+    public long getCreateTime() {
+      return createTime;
+    }
+
+    public long getLastRecordSize() {
+      return detector.getLastRecordSize();
+    }
+  }
+
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/nonindex/NonIndexStreamWriteOperator.java
similarity index 51%
copy from 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
copy to 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/nonindex/NonIndexStreamWriteOperator.java
index 70ee473d24..560716aa9c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BucketType.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/nonindex/NonIndexStreamWriteOperator.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,8 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.table.action.commit;
+package org.apache.hudi.sink.nonindex;
 
-public enum BucketType {
-  UPDATE, INSERT
+import org.apache.hudi.sink.common.AbstractWriteOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * Operator for {@link NonIndexStreamWriteFunction}.
+ *
+ * @param <I> The input type
+ */
+public class NonIndexStreamWriteOperator<I> extends AbstractWriteOperator<I> {
+
+  public NonIndexStreamWriteOperator(Configuration conf) {
+    super(new NonIndexStreamWriteFunction<>(conf));
+  }
+
+  public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {
+    return WriteOperatorFactory.instance(conf, new 
NonIndexStreamWriteOperator<>(conf));
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 82761adf73..55428610d9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -44,6 +44,7 @@ import org.apache.hudi.sink.compact.CompactionCommitEvent;
 import org.apache.hudi.sink.compact.CompactionCommitSink;
 import org.apache.hudi.sink.compact.CompactionPlanEvent;
 import org.apache.hudi.sink.compact.CompactionPlanOperator;
+import org.apache.hudi.sink.nonindex.NonIndexStreamWriteOperator;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.partitioner.BucketIndexPartitioner;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
@@ -323,6 +324,12 @@ public class Pipelines {
           .transform(opIdentifier("bucket_write", conf), 
TypeInformation.of(Object.class), operatorFactory)
           .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+    } else if (OptionsResolver.isNonIndexType(conf)) {
+      WriteOperatorFactory<HoodieRecord> operatorFactory = 
NonIndexStreamWriteOperator.getFactory(conf);
+      return dataStream.transform("non_index_write",
+              TypeInformation.of(Object.class), operatorFactory)
+          .uid("uid_non_index_write" + conf.getString(FlinkOptions.TABLE_NAME))
+          .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
     } else {
       WriteOperatorFactory<HoodieRecord> operatorFactory = 
StreamWriteOperator.getFactory(conf);
       return dataStream
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 1718175240..b0380c5878 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -19,12 +19,14 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
+import org.apache.hudi.keygen.EmptyAvroKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -56,6 +58,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.hudi.configuration.FlinkOptions.REALTIME_SKIP_MERGE;
 
 /**
  * Hoodie data source/sink factory.
@@ -185,6 +188,15 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
     setupWriteOptions(conf);
     // infer avro schema from physical DDL schema
     inferAvroSchema(conf, 
schema.toPhysicalRowDataType().notNull().getLogicalType());
+    setupDefaultOptionsForNonIndex(conf);
+  }
+
+  private static void setupDefaultOptionsForNonIndex(Configuration conf) {
+    if (OptionsResolver.isNonIndexType(conf)) {
+      conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, 
EmptyAvroKeyGenerator.class.getName());
+      conf.setString(FlinkOptions.OPERATION, 
WriteOperationType.INSERT.value());
+      conf.setString(FlinkOptions.MERGE_TYPE, REALTIME_SKIP_MERGE);
+    }
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index df01fc9076..7efdc181e9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -20,7 +20,13 @@ package org.apache.hudi.sink;
 
 import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.EmptyAvroKeyGenerator;
 import org.apache.hudi.utils.TestData;
 
 import org.apache.flink.configuration.Configuration;
@@ -141,6 +147,54 @@ public class TestWriteMergeOnRead extends 
TestWriteCopyOnWrite {
         .end();
   }
 
+  @Test
+  public void testInsertNonIndexOnMemoryMode() throws Exception {
+    testInsertNonIndex("IN_MEMORY");
+  }
+
+  @Test
+  public void testInsertNonIndexOnFileSystemMode() throws Exception {
+    testInsertNonIndex("FILE_SYSTEM");
+  }
+
+  public void testInsertNonIndex(String storageType) throws Exception {
+    // open the function and ingest data
+    conf.setString(FlinkOptions.INDEX_TYPE, 
HoodieIndex.IndexType.NON_INDEX.name());
+    conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.name());
+    conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, 
EmptyAvroKeyGenerator.class.getName());
+    
conf.setLong(HoodieIndexConfig.NON_INDEX_PARTITION_FILE_GROUP_CACHE_SIZE.key(), 
2000L);
+    
conf.setInteger(HoodieIndexConfig.NON_INDEX_PARTITION_FILE_GROUP_CACHE_INTERVAL_MINUTE.key(),
 0);
+    
conf.setString(HoodieIndexConfig.NON_INDEX_PARTITION_FILE_GROUP_STORAGE_TYPE.key(),
 storageType);
+    conf.setString(FileSystemViewStorageConfig.VIEW_TYPE.key(), 
FileSystemViewStorageConfig.VIEW_TYPE.defaultValue().name());
+    
conf.setString(HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key(),"false");
+
+    TestHarness harness = TestHarness.instance().preparePipeline(tempFile, 
conf)
+        .consume(TestData.DATA_SET_INSERT_NOM_INDEX)
+        .emptyEventBuffer()
+        .checkpoint(1)
+        .assertPartitionFileGroups("par1", 2)
+        .assertPartitionFileGroups("par2", 1)
+        .assertNextEvent()
+        .checkpointComplete(1)
+        .consume(TestData.DATA_SET_INSERT_NOM_INDEX)
+        .emptyEventBuffer()
+        .checkpoint(2);
+
+    if (storageType.equals("FILE_SYSTEM")) {
+      harness = harness.assertPartitionFileGroups("par2", 1);
+    } else if (storageType.equals("IN_MEMORY")) {
+      harness = harness
+          .assertPartitionFileGroups("par1", 4)
+          .assertPartitionFileGroups("par2", 2);
+    }
+
+    harness
+        .assertNextEvent()
+        .checkpointComplete(2)
+        .checkWrittenData(EXPECTED6, 4)
+        .end();
+  }
+
   @Override
   public void testInsertClustering() {
     // insert clustering is only valid for cow table.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
index 707fe45c47..3666940b4e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/InsertFunctionWrapper.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.sink.utils;
 
+import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
@@ -144,6 +145,11 @@ public class InsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     return coordinator;
   }
 
+  @Override
+  public HoodieFlinkWriteClient getWriteClient() {
+    return coordinator.getWriteClient();
+  }
+
   @Override
   public void close() throws Exception {
     this.coordinator.close();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index a1a14456e3..b83f3cc478 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.sink.utils;
 
+import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -27,6 +28,7 @@ import org.apache.hudi.sink.StreamWriteFunction;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.bootstrap.BootstrapOperator;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.nonindex.NonIndexStreamWriteFunction;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
 import org.apache.hudi.utils.TestConfigurations;
@@ -164,9 +166,13 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   public void invoke(I record) throws Exception {
     HoodieRecord<?> hoodieRecord = toHoodieFunction.map((RowData) record);
     ScalaCollector<HoodieRecord<?>> collector = ScalaCollector.getInstance();
-    bucketAssignerFunction.processElement(hoodieRecord, null, collector);
-    bucketAssignFunctionContext.setCurrentKey(hoodieRecord.getRecordKey());
-    writeFunction.processElement(collector.getVal(), null, null);
+    if (OptionsResolver.isNonIndexType(conf)) {
+      writeFunction.processElement(hoodieRecord, null, null);
+    } else {
+      bucketAssignerFunction.processElement(hoodieRecord, null, collector);
+      bucketAssignFunctionContext.setCurrentKey(hoodieRecord.getRecordKey());
+      writeFunction.processElement(collector.getVal(), null, null);
+    }
   }
 
   public WriteMetadataEvent[] getEventBuffer() {
@@ -233,6 +239,11 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     return coordinator;
   }
 
+  @Override
+  public HoodieFlinkWriteClient getWriteClient() {
+    return coordinator.getWriteClient();
+  }
+
   public MockOperatorCoordinatorContext getCoordinatorContext() {
     return coordinatorContext;
   }
@@ -254,7 +265,11 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
   // -------------------------------------------------------------------------
 
   private void setupWriteFunction() throws Exception {
-    writeFunction = new StreamWriteFunction<>(conf);
+    if (OptionsResolver.isNonIndexType(conf)) {
+      writeFunction = new NonIndexStreamWriteFunction<>(conf);
+    } else {
+      writeFunction = new StreamWriteFunction<>(conf);
+    }
     writeFunction.setRuntimeContext(runtimeContext);
     writeFunction.setOperatorEventGateway(gateway);
     writeFunction.initializeState(this.stateInitializationContext);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
index d2fe819650..967fdd3b24 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestFunctionWrapper.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.sink.utils;
 
+import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
@@ -68,6 +69,11 @@ public interface TestFunctionWrapper<I> {
    */
   StreamWriteOperatorCoordinator getCoordinator();
 
+  /**
+   * Returns the write client.
+   */
+  HoodieFlinkWriteClient getWriteClient();
+
   /**
    * Returns the data buffer of the write task.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index b6ae0767d6..05ce879bcb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -18,12 +18,15 @@
 
 package org.apache.hudi.sink.utils;
 
+import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
@@ -69,6 +72,8 @@ public class TestWriteBase {
 
   protected static final Map<String, List<String>> EXPECTED5 = new HashMap<>();
 
+  protected static final Map<String, String> EXPECTED6 = new HashMap<>();
+
   static {
     EXPECTED1.put("par1", "[id1,par1,id1,Danny,23,1,par1, 
id2,par1,id2,Stephen,33,2,par1]");
     EXPECTED1.put("par2", "[id3,par2,id3,Julian,53,3,par2, 
id4,par2,id4,Fabian,31,4,par2]");
@@ -102,6 +107,38 @@ public class TestWriteBase {
         "id1,par1,id1,Danny,23,3,par1",
         "id1,par1,id1,Danny,23,4,par1",
         "id1,par1,id1,Danny,23,4,par1"));
+
+    EXPECTED6.put("par1", "["
+        + "_hoodie_empty_record_key_,par1,id1,Danny,23,1,par1, "
+        + "_hoodie_empty_record_key_,par1,id1,Danny,23,1,par1, "
+        + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, "
+        + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, "
+        + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, "
+        + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, "
+        + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, "
+        + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, "
+        + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, "
+        + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, "
+        + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, "
+        + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, "
+        + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, "
+        + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, "
+        + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, "
+        + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, "
+        + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1, "
+        + "_hoodie_empty_record_key_,par1,id2,Stephen,33,2,par1]");
+
+    EXPECTED6.put("par2", "["
+        + "_hoodie_empty_record_key_,par2,id3,Julian,53,3,par2, "
+        + "_hoodie_empty_record_key_,par2,id3,Julian,53,3,par2]");
+
+    EXPECTED6.put("par3", "["
+        + "_hoodie_empty_record_key_,par3,id5,Sophia,18,5,par3, "
+        + "_hoodie_empty_record_key_,par3,id5,Sophia,18,5,par3]");
+
+    EXPECTED6.put("par4", "["
+        + "_hoodie_empty_record_key_,par4,id7,Bob,44,7,par4, "
+        + "_hoodie_empty_record_key_,par4,id7,Bob,44,7,par4]");
   }
 
   // -------------------------------------------------------------------------
@@ -224,6 +261,17 @@ public class TestWriteBase {
       return this;
     }
 
+    public TestHarness assertPartitionFileGroups(String partitionPath, int 
numFileGroups) {
+      HoodieFlinkWriteClient writeClient = this.pipeline.getWriteClient();
+      SyncableFileSystemView fileSystemView = 
writeClient.getHoodieTable().getHoodieView();
+      fileSystemView.sync();
+      //check part1
+      List<HoodieFileGroup> fileGroups = 
fileSystemView.getAllFileGroups(partitionPath).collect(
+          Collectors.toList());
+      assertEquals(numFileGroups, fileGroups.size());
+      return this;
+    }
+
     /**
      * Checkpoints the pipeline, which triggers the data write and event send.
      */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 8e1dd9964c..6d0183053f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -286,6 +286,40 @@ public class TestData {
           TimestampData.fromEpochMillis(5), StringData.fromString("par1"))
   );
 
+  public static List<RowData> DATA_SET_INSERT_NOM_INDEX = Arrays.asList(
+      //partition1
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
+          TimestampData.fromEpochMillis(1), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id2"), 
StringData.fromString("Stephen"), 33,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+
+      //partition2
+      insertRow(StringData.fromString("id3"), StringData.fromString("Julian"), 
53,
+          TimestampData.fromEpochMillis(3), StringData.fromString("par2")),
+
+      //partition3
+      insertRow(StringData.fromString("id5"), StringData.fromString("Sophia"), 
18,
+          TimestampData.fromEpochMillis(5), StringData.fromString("par3")),
+
+      //partition4
+      insertRow(StringData.fromString("id7"), StringData.fromString("Bob"), 44,
+          TimestampData.fromEpochMillis(7), StringData.fromString("par4"))
+  );
+
   public static List<RowData> DATA_SET_SINGLE_INSERT = 
Collections.singletonList(
       insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 
23,
           TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestNonIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestNonIndex.scala
new file mode 100644
index 0000000000..cfe9564a35
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestNonIndex.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
+import org.apache.hudi.index.HoodieIndex
+import org.apache.hudi.keygen.EmptyKeyGenerator
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.testutils.{DataSourceTestUtils, HoodieClientTestBase}
+import org.apache.spark.sql.{Dataset, Row, SaveMode}
+import org.junit.jupiter.api.Test
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters
+
+class TestNonIndex extends HoodieClientTestBase {
+  val commonOpts = Map(
+    "hoodie.insert.shuffle.parallelism" -> "4",
+    "hoodie.upsert.shuffle.parallelism" -> "4",
+    KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() -> "partition",
+    DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp",
+    HoodieIndexConfig.INDEX_TYPE.key() -> 
HoodieIndex.IndexType.NON_INDEX.name(),
+    HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key() -> 
classOf[EmptyKeyGenerator].getName
+  )
+
+  @Test
+  def testNonIndexMORInsert(): Unit = {
+    val spark = sqlContext.sparkSession
+
+    val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+    // first insert, parquet files
+    val inputDF1: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key(), 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    val records2 = recordsToStrings(dataGen.generateInserts("002", 100)).toList
+    // second insert, log files
+    val inputDF2: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key(), 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
+    assert(fs.globStatus(new Path(basePath, "201*/*/*/.*.log*")).length > 0)
+    assert(fs.globStatus(new Path(basePath, "201*/*/*/*.parquet")).length > 0)
+
+    // check data
+    val result = spark.read.format("org.apache.hudi").load(basePath + 
"/*/*/*/*")
+      .selectExpr(inputDF2.schema.map(_.name): _*)
+    val inputAll = inputDF1.unionAll(inputDF2)
+    assert(result.except(inputAll).count() == 0)
+    assert(inputAll.except(result).count == 0)
+  }
+
+  @Test
+  def testBulkInsertDatasetWithOutIndex(): Unit = {
+    val spark = sqlContext.sparkSession
+    val schema = DataSourceTestUtils.getStructTypeExampleSchema
+
+    // create a new table
+    val fooTableModifier = commonOpts
+      .updated(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+      .updated(DataSourceWriteOptions.ENABLE_ROW_WRITER.key, "true")
+      .updated(HoodieIndexConfig.INDEX_TYPE.key(), 
HoodieIndex.IndexType.NON_INDEX.toString)
+      .updated(HoodieWriteConfig.AVRO_SCHEMA_STRING.key(), schema.toString)
+      .updated(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), "4")
+      .updated("path", basePath)
+    val fooTableParams = 
HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
+
+    val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+    for (_ <- 0 to 0) {
+      // generate the inserts
+      val records = DataSourceTestUtils.generateRandomRows(200)
+      val recordsSeq = 
JavaConverters.asScalaIteratorConverter(records.iterator).asScala.toSeq
+      val df = 
spark.createDataFrame(spark.sparkContext.parallelize(recordsSeq), structType)
+      // write to Hudi
+      HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams 
++ Seq(), df)
+
+      // Fetch records from entire dataset
+      val actualDf = sqlContext.read.format("org.apache.hudi").load(basePath + 
"/*/*/*")
+
+      assert(actualDf.where("_hoodie_record_key = 
'_hoodie_empty_record_key_'").count() == 200)
+    }
+  }
+}

Reply via email to