This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9804662  [HUDI-1738] Emit deletes for flink MOR table streaming read 
(#2742)
9804662 is described below

commit 9804662bc8e17d6936c20326f17ec7c0360dcaf6
Author: Danny Chan <yuzhao....@gmail.com>
AuthorDate: Thu Apr 1 15:25:31 2021 +0800

    [HUDI-1738] Emit deletes for flink MOR table streaming read (#2742)
    
    Current we did a soft delete for DELETE row data when writes into hoodie
    table. For streaming read of MOR table, the Flink reader detects the
    delete records and still emit them if the record key semantics are still
    kept.
    
    This is useful and actually a must for streaming ETL pipeline
    incremental computation.
---
 .../java/org/apache/hudi/keygen/KeyGenUtils.java   |  27 +++++
 .../apache/hudi/client/HoodieFlinkWriteClient.java |   4 +-
 .../table/timeline/HoodieActiveTimeline.java       |  23 +++-
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  14 +--
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |  73 +++----------
 .../apache/hudi/sink/compact/CompactFunction.java  |  14 +--
 .../hudi/sink/compact/CompactionCommitSink.java    |  14 +--
 .../hudi/sink/compact/CompactionPlanOperator.java  |  45 +++++---
 .../hudi/sink/event/BatchWriteSuccessEvent.java    |   5 +-
 .../org/apache/hudi/table/HoodieTableSource.java   |  27 +++--
 .../table/format/mor/MergeOnReadInputFormat.java   | 119 +++++++++++++++++++--
 .../table/format/mor/MergeOnReadTableState.java    |  33 +++++-
 .../apache/hudi/util/StringToRowDataConverter.java | 107 ++++++++++++++++++
 .../sink/TestStreamWriteOperatorCoordinator.java   |   6 +-
 .../apache/hudi/source/TestStreamReadOperator.java |  18 ++--
 .../apache/hudi/table/HoodieDataSourceITCase.java  |  35 ++++++
 .../apache/hudi/table/format/TestInputFormat.java  |  25 +++++
 .../test/java/org/apache/hudi/utils/TestData.java  |   8 +-
 .../hudi/utils/TestStringToRowDataConverter.java   | 107 ++++++++++++++++++
 .../utils/factory/CollectSinkTableFactory.java     |  11 +-
 20 files changed, 557 insertions(+), 158 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index 1f59bab..4773ef1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.List;
 
 public class KeyGenUtils {
@@ -41,6 +42,32 @@ public class KeyGenUtils {
   protected static final String DEFAULT_PARTITION_PATH = "default";
   protected static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
 
+  /**
+   * Extracts the record key fields in strings out of the given record key,
+   * this is the reverse operation of {@link #getRecordKey(GenericRecord, 
String)}.
+   *
+   * @see SimpleAvroKeyGenerator
+   * @see org.apache.hudi.keygen.ComplexAvroKeyGenerator
+   */
+  public static String[] extractRecordKeys(String recordKey) {
+    String[] fieldKV = recordKey.split(",");
+    if (fieldKV.length == 1) {
+      return fieldKV;
+    } else {
+      // a complex key
+      return Arrays.stream(fieldKV).map(kv -> {
+        final String[] kvArray = kv.split(":");
+        if (kvArray[1].equals(NULL_RECORDKEY_PLACEHOLDER)) {
+          return null;
+        } else if (kvArray[1].equals(EMPTY_RECORDKEY_PLACEHOLDER)) {
+          return "";
+        } else {
+          return kvArray[1];
+        }
+      }).toArray(String[]::new);
+    }
+  }
+
   public static String getRecordKey(GenericRecord record, List<String> 
recordKeyFields) {
     boolean keyIsNullEmpty = true;
     StringBuilder recordKey = new StringBuilder();
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index de4d8ca..6a6bced 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -429,8 +429,8 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
     HoodieFlinkTable<T> table = getHoodieTable();
     String commitType = 
CommitUtils.getCommitActionType(HoodieTableType.valueOf(tableType));
     HoodieActiveTimeline activeTimeline = 
table.getMetaClient().getActiveTimeline();
-    activeTimeline.deletePending(HoodieInstant.State.INFLIGHT, commitType, 
instant);
-    activeTimeline.deletePending(HoodieInstant.State.REQUESTED, commitType, 
instant);
+    activeTimeline.deletePendingIfExists(HoodieInstant.State.INFLIGHT, 
commitType, instant);
+    activeTimeline.deletePendingIfExists(HoodieInstant.State.REQUESTED, 
commitType, instant);
   }
 
   public void transitionRequestedToInflight(String tableType, String 
inFlightInstant) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index 25ea95e..9ff50fe 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -173,10 +173,10 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
     deleteInstantFile(instant);
   }
 
-  public void deletePending(HoodieInstant.State state, String action, String 
instantStr) {
+  public void deletePendingIfExists(HoodieInstant.State state, String action, 
String instantStr) {
     HoodieInstant instant = new HoodieInstant(state, action, instantStr);
     ValidationUtils.checkArgument(!instant.isCompleted());
-    deleteInstantFile(instant);
+    deleteInstantFileIfExists(instant);
   }
 
   public void deleteCompactionRequested(HoodieInstant instant) {
@@ -185,6 +185,25 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
     deleteInstantFile(instant);
   }
 
+  private void deleteInstantFileIfExists(HoodieInstant instant) {
+    LOG.info("Deleting instant " + instant);
+    Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), 
instant.getFileName());
+    try {
+      if (metaClient.getFs().exists(inFlightCommitFilePath)) {
+        boolean result = metaClient.getFs().delete(inFlightCommitFilePath, 
false);
+        if (result) {
+          LOG.info("Removed instant " + instant);
+        } else {
+          throw new HoodieIOException("Could not delete instant " + instant);
+        }
+      } else {
+        LOG.warn("The commit " + inFlightCommitFilePath + " to remove does not 
exist");
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Could not remove inflight commit " + 
inFlightCommitFilePath, e);
+    }
+  }
+
   private void deleteInstantFile(HoodieInstant instant) {
     LOG.info("Deleting instant " + instant);
     Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), 
instant.getFileName());
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 53104fb..364d28e 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -18,11 +18,8 @@
 
 package org.apache.hudi.sink;
 
-import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.ObjectSizeCalculator;
@@ -160,8 +157,8 @@ public class StreamWriteFunction<K, I, O>
   @Override
   public void open(Configuration parameters) throws IOException {
     this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+    this.writeClient = StreamerUtil.createWriteClient(this.config, 
getRuntimeContext());
     initBuffer();
-    initWriteClient();
     initWriteFunction();
   }
 
@@ -254,15 +251,6 @@ public class StreamWriteFunction<K, I, O>
     this.addToBufferCondition = this.bufferLock.newCondition();
   }
 
-  private void initWriteClient() {
-    HoodieFlinkEngineContext context =
-        new HoodieFlinkEngineContext(
-            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
-            new FlinkTaskContextSupplier(getRuntimeContext()));
-
-    writeClient = new HoodieFlinkWriteClient<>(context, 
StreamerUtil.getHoodieClientConfig(this.config));
-  }
-
   private void initWriteFunction() {
     final String writeOperation = this.config.get(FlinkOptions.OPERATION);
     switch (WriteOperationType.fromValue(writeOperation)) {
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index ad06617..51149e2 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -18,11 +18,11 @@
 
 package org.apache.hudi.sink;
 
-import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.event.BatchWriteSuccessEvent;
@@ -54,7 +54,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
@@ -138,7 +137,7 @@ public class StreamWriteOperatorCoordinator
     // initialize event buffer
     reset();
     // writeClient
-    initWriteClient();
+    this.writeClient = StreamerUtil.createWriteClient(conf, null);
     // init table, create it if not exists.
     initTableIfNotExists(this.conf);
     // start a new instant
@@ -178,7 +177,10 @@ public class StreamWriteOperatorCoordinator
   @Override
   public void notifyCheckpointComplete(long checkpointId) {
     // start to commit the instant.
-    checkAndCommitWithRetry();
+    final String errorMsg = String.format("Instant [%s] has a complete 
checkpoint [%d],\n"
+        + "but the coordinator has not received full write success events,\n"
+        + "rolls back the instant and rethrow", this.instant, checkpointId);
+    checkAndForceCommit(errorMsg);
     // if async compaction is on, schedule the compaction
     if (needsScheduleCompaction) {
       writeClient.scheduleCompaction(Option.empty());
@@ -226,10 +228,14 @@ public class StreamWriteOperatorCoordinator
   @Override
   public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
     // no event to handle
-    Preconditions.checkState(operatorEvent instanceof BatchWriteSuccessEvent,
+    ValidationUtils.checkState(operatorEvent instanceof BatchWriteSuccessEvent,
         "The coordinator can only handle BatchWriteSuccessEvent");
     BatchWriteSuccessEvent event = (BatchWriteSuccessEvent) operatorEvent;
-    Preconditions.checkState(event.getInstantTime().equals(this.instant),
+    // the write task does not block after checkpointing(and before it 
receives a checkpoint success event),
+    // if it it checkpoints succeed then flushes the data buffer again before 
this coordinator receives a checkpoint
+    // success event, the data buffer would flush with an older instant time.
+    ValidationUtils.checkState(
+        HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.GREATER_THAN_OR_EQUALS, event.getInstantTime()),
         String.format("Receive an unexpected event for instant %s from task 
%d",
             event.getInstantTime(), event.getTaskID()));
     if (this.eventBuffer[event.getTaskID()] != null) {
@@ -258,14 +264,6 @@ public class StreamWriteOperatorCoordinator
   //  Utilities
   // -------------------------------------------------------------------------
 
-  @SuppressWarnings("rawtypes")
-  private void initWriteClient() {
-    writeClient = new HoodieFlinkWriteClient(
-        new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)),
-        StreamerUtil.getHoodieClientConfig(this.conf),
-        true);
-  }
-
   private void initHiveSync() {
     this.executor = new NonThrownExecutor();
     this.hiveSyncContext = HiveSyncContext.create(conf);
@@ -338,51 +336,6 @@ public class StreamWriteOperatorCoordinator
     doCommit();
   }
 
-  @SuppressWarnings("unchecked")
-  private void checkAndCommitWithRetry() {
-    int retryTimes = this.conf.getInteger(FlinkOptions.RETRY_TIMES);
-    if (retryTimes < 0) {
-      retryTimes = 1;
-    }
-    long retryIntervalMillis = 
this.conf.getLong(FlinkOptions.RETRY_INTERVAL_MS);
-    int tryTimes = 0;
-    while (tryTimes++ < retryTimes) {
-      try {
-        if (!checkReady()) {
-          // Do not throw if the try times expires but the event buffer are 
still not ready,
-          // because we have a force check when next checkpoint starts.
-          if (tryTimes == retryTimes) {
-            // Throw if the try times expires but the event buffer are still 
not ready
-            throw new HoodieException("Try " + retryTimes + " to commit 
instant [" + this.instant + "] failed");
-          }
-          sleepFor(retryIntervalMillis);
-          continue;
-        }
-        doCommit();
-        return;
-      } catch (Throwable throwable) {
-        String cause = throwable.getCause() == null ? "" : 
throwable.getCause().toString();
-        LOG.warn("Try to commit the instant {} failed, with times {} and cause 
{}", this.instant, tryTimes, cause);
-        if (tryTimes == retryTimes) {
-          throw new HoodieException("Not all write tasks finish the batch 
write to commit", throwable);
-        }
-        sleepFor(retryIntervalMillis);
-      }
-    }
-  }
-
-  /**
-   * Sleep {@code intervalMillis} milliseconds in current thread.
-   */
-  private void sleepFor(long intervalMillis) {
-    try {
-      TimeUnit.MILLISECONDS.sleep(intervalMillis);
-    } catch (InterruptedException e) {
-      LOG.error("Thread interrupted while waiting to retry the instant 
commits");
-      throw new HoodieException(e);
-    }
-  }
-
   /** Checks the buffer is ready to commit. */
   private boolean checkReady() {
     return Arrays.stream(eventBuffer)
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
index d0fa51b..7f4f7b9 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java
@@ -18,11 +18,8 @@
 
 package org.apache.hudi.sink.compact;
 
-import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.model.CompactionOperation;
 import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
 import 
org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
@@ -62,7 +59,7 @@ public class CompactFunction extends 
KeyedProcessFunction<Long, CompactionPlanEv
   @Override
   public void open(Configuration parameters) throws Exception {
     this.taskID = getRuntimeContext().getIndexOfThisSubtask();
-    initWriteClient();
+    this.writeClient = StreamerUtil.createWriteClient(conf, 
getRuntimeContext());
   }
 
   @Override
@@ -82,13 +79,4 @@ public class CompactFunction extends 
KeyedProcessFunction<Long, CompactionPlanEv
         instantTime);
     collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, 
taskID));
   }
-
-  private void initWriteClient() {
-    HoodieFlinkEngineContext context =
-        new HoodieFlinkEngineContext(
-            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
-            new FlinkTaskContextSupplier(getRuntimeContext()));
-
-    writeClient = new HoodieFlinkWriteClient<>(context, 
StreamerUtil.getHoodieClientConfig(conf));
-  }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 86529a1..86dae20 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -19,11 +19,8 @@
 package org.apache.hudi.sink.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -86,7 +83,7 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
   @Override
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
-    initWriteClient();
+    this.writeClient = StreamerUtil.createWriteClient(conf, 
getRuntimeContext());
     this.commitBuffer = new ArrayList<>();
   }
 
@@ -142,13 +139,4 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
     this.commitBuffer.clear();
     this.compactionInstantTime = null;
   }
-
-  private void initWriteClient() {
-    HoodieFlinkEngineContext context =
-        new HoodieFlinkEngineContext(
-            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
-            new FlinkTaskContextSupplier(getRuntimeContext()));
-
-    writeClient = new HoodieFlinkWriteClient<>(context, 
StreamerUtil.getHoodieClientConfig(this.conf));
-  }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index 6eebe94..e48f4ed 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -19,15 +19,14 @@
 package org.apache.hudi.sink.compact;
 
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.client.common.HoodieFlinkEngineContext;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -37,6 +36,7 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.util.List;
@@ -74,7 +74,7 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
   @Override
   public void open() throws Exception {
     super.open();
-    initWriteClient();
+    this.writeClient = StreamerUtil.createWriteClient(conf, 
getRuntimeContext());
   }
 
   @Override
@@ -113,8 +113,16 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
       HoodieInstant instant = 
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
       HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
       if (!pendingCompactionTimeline.containsInstant(instant)) {
-        throw new IllegalStateException(
-            "No Compaction request available at " + compactionInstantTime + " 
to run compaction");
+        // this means that the compaction plan was written to auxiliary 
path(.tmp)
+        // but not the meta path(.hoodie), this usually happens when the job 
crush
+        // exceptionally.
+
+        // clean the compaction plan in auxiliary path and cancels the 
compaction.
+
+        LOG.warn("The compaction plan was fetched through the auxiliary 
path(.tmp) but not the meta path(.hoodie).\n"
+            + "Clean the compaction plan in auxiliary path and cancels the 
compaction");
+        cleanInstant(table.getMetaClient(), instant);
+        return;
       }
 
       // Mark instant as compaction inflight
@@ -130,17 +138,24 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
     }
   }
 
+  private void cleanInstant(HoodieTableMetaClient metaClient, HoodieInstant 
instant) {
+    Path commitFilePath = new Path(metaClient.getMetaAuxiliaryPath(), 
instant.getFileName());
+    try {
+      if (metaClient.getFs().exists(commitFilePath)) {
+        boolean deleted = metaClient.getFs().delete(commitFilePath, false);
+        if (deleted) {
+          LOG.info("Removed instant " + instant);
+        } else {
+          throw new HoodieIOException("Could not delete instant " + instant);
+        }
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Could not remove requested commit " + 
commitFilePath, e);
+    }
+  }
+
   @VisibleForTesting
   public void setOutput(Output<StreamRecord<CompactionPlanEvent>> output) {
     this.output = output;
   }
-
-  private void initWriteClient() {
-    HoodieFlinkEngineContext context =
-        new HoodieFlinkEngineContext(
-            new SerializableConfiguration(StreamerUtil.getHadoopConf()),
-            new FlinkTaskContextSupplier(getRuntimeContext()));
-
-    writeClient = new HoodieFlinkWriteClient<>(context, 
StreamerUtil.getHoodieClientConfig(this.conf));
-  }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java
index dd40de2..186a470 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/event/BatchWriteSuccessEvent.java
@@ -35,7 +35,7 @@ public class BatchWriteSuccessEvent implements OperatorEvent {
 
   private List<WriteStatus> writeStatuses;
   private final int taskID;
-  private final String instantTime;
+  private String instantTime;
   private boolean isLastBatch;
   /**
    * Flag saying whether the event comes from the end of input, e.g. the source
@@ -102,8 +102,9 @@ public class BatchWriteSuccessEvent implements 
OperatorEvent {
    * @param other The event to be merged
    */
   public void mergeWith(BatchWriteSuccessEvent other) {
-    ValidationUtils.checkArgument(this.instantTime.equals(other.instantTime));
     ValidationUtils.checkArgument(this.taskID == other.taskID);
+    // the instant time could be monotonically increasing
+    this.instantTime = other.instantTime;
     this.isLastBatch |= other.isLastBatch; // true if one of the event 
isLastBatch true.
     List<WriteStatus> statusList = new ArrayList<>();
     statusList.addAll(this.writeStatuses);
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index f92e1a8..ada9e01 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -191,9 +191,9 @@ public class HoodieTableSource implements
         } else {
           InputFormatSourceFunction<RowData> func = new 
InputFormatSourceFunction<>(getInputFormat(), typeInfo);
           DataStreamSource<RowData> source = execEnv.addSource(func, 
asSummaryString(), typeInfo);
-          return source.name("streaming_source")
+          return source.name("bounded_source")
               .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS))
-              .uid("uid_streaming_source");
+              .uid("uid_bounded_source");
         }
       }
     };
@@ -363,21 +363,26 @@ public class HoodieTableSource implements
                 requiredRowType,
                 tableAvroSchema.toString(),
                 
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
-                inputSplits);
-            return new MergeOnReadInputFormat(
-                this.conf,
-                FilePathUtils.toFlinkPaths(paths),
-                hoodieTableState,
-                rowDataType.getChildren(), // use the explicit fields data 
type because the AvroSchemaConverter is not very stable.
-                "default",
-                this.limit);
+                inputSplits,
+                conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
+            return MergeOnReadInputFormat.builder()
+                .config(this.conf)
+                .paths(FilePathUtils.toFlinkPaths(paths))
+                .tableState(hoodieTableState)
+                // use the explicit fields data type because the 
AvroSchemaConverter
+                // is not very stable.
+                .fieldTypes(rowDataType.getChildren())
+                
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
+                .limit(this.limit)
+                .emitDelete(isStreaming)
+                .build();
           case COPY_ON_WRITE:
             FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
                 FilePathUtils.toFlinkPaths(paths),
                 this.schema.getFieldNames(),
                 this.schema.getFieldDataTypes(),
                 this.requiredPos,
-                "default",
+                this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
                 this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, 
// ParquetInputFormat always uses the limit value
                 getParquetConf(this.conf, this.hadoopConf),
                 this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index a3a6f85..12bebdf 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.table.format.FormatUtils;
 import org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader;
@@ -30,22 +31,28 @@ import 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
 import org.apache.hudi.util.AvroToRowDataConverters;
 import org.apache.hudi.util.RowDataToAvroConverters;
 import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.util.StringToRowDataConverter;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -116,13 +123,20 @@ public class MergeOnReadInputFormat
    */
   private long currentReadCount = 0;
 
-  public MergeOnReadInputFormat(
+  /**
+   * Flag saying whether to emit the deletes. In streaming read mode, 
downstream
+   * operators need the delete messages to retract the legacy accumulator.
+   */
+  private boolean emitDelete;
+
+  private MergeOnReadInputFormat(
       Configuration conf,
       Path[] paths,
       MergeOnReadTableState tableState,
       List<DataType> fieldTypes,
       String defaultPartName,
-      long limit) {
+      long limit,
+      boolean emitDelete) {
     this.conf = conf;
     this.paths = paths;
     this.tableState = tableState;
@@ -133,6 +147,14 @@ public class MergeOnReadInputFormat
     // because we need to
     this.requiredPos = tableState.getRequiredPositions();
     this.limit = limit;
+    this.emitDelete = emitDelete;
+  }
+
+  /**
+   * Returns the builder for {@link MergeOnReadInputFormat}.
+   */
+  public static Builder builder() {
+    return new Builder();
   }
 
   @Override
@@ -177,7 +199,7 @@ public class MergeOnReadInputFormat
       if (filePath == null) {
         throw new IllegalArgumentException("File path was not specified in 
input format or configuration.");
       } else {
-        this.paths = new Path[] { new Path(filePath) };
+        this.paths = new Path[] {new Path(filePath)};
       }
     }
     // may supports nested files in the future.
@@ -269,6 +291,13 @@ public class MergeOnReadInputFormat
     final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords =
         FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
     final Iterator<String> logRecordsKeyIterator = 
logRecords.keySet().iterator();
+    final int[] pkOffset = tableState.getPkOffsetsInRequired();
+    // flag saying whether the pk semantics has been dropped by user specified
+    // projections. For e.g, if the pk fields are [a, b] but user only select 
a,
+    // then the pk semantics is lost.
+    final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> 
offset == -1);
+    final LogicalType[] pkTypes = pkSemanticLost ? null : 
tableState.getPkTypes(pkOffset);
+    final StringToRowDataConverter converter = pkSemanticLost ? null : new 
StringToRowDataConverter(pkTypes);
 
     return new Iterator<RowData>() {
       private RowData currentRecord;
@@ -278,14 +307,30 @@ public class MergeOnReadInputFormat
         if (logRecordsKeyIterator.hasNext()) {
           String curAvrokey = logRecordsKeyIterator.next();
           Option<IndexedRecord> curAvroRecord = null;
+          final HoodieRecord<?> hoodieRecord = logRecords.get(curAvrokey);
           try {
-            curAvroRecord = 
logRecords.get(curAvrokey).getData().getInsertValue(tableSchema);
+            curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
           } catch (IOException e) {
             throw new HoodieException("Get avro insert value error for key: " 
+ curAvrokey, e);
           }
           if (!curAvroRecord.isPresent()) {
-            // delete record found, skipping
-            return hasNext();
+            if (emitDelete && !pkSemanticLost) {
+              GenericRowData delete = new 
GenericRowData(tableState.getRequiredRowType().getFieldCount());
+
+              final String recordKey = hoodieRecord.getRecordKey();
+              final String[] pkFields = 
KeyGenUtils.extractRecordKeys(recordKey);
+              final Object[] converted = converter.convert(pkFields);
+              for (int i = 0; i < pkOffset.length; i++) {
+                delete.setField(pkOffset[i], converted[i]);
+              }
+              delete.setRowKind(RowKind.DELETE);
+
+              this.currentRecord =  delete;
+              return true;
+            } else {
+              // delete record found, skipping
+              return hasNext();
+            }
           } else {
             // should improve the code when log scanner supports
             // seeking by log blocks with commit time which is more
@@ -318,6 +363,10 @@ public class MergeOnReadInputFormat
     };
   }
 
+  // -------------------------------------------------------------------------
+  //  Inner Class
+  // -------------------------------------------------------------------------
+
   private interface RecordIterator {
     boolean reachedEnd() throws IOException;
 
@@ -521,4 +570,62 @@ public class MergeOnReadInputFormat
       return 
logRecords.get(curKey).getData().combineAndGetUpdateValue(historyAvroRecord, 
tableSchema);
     }
   }
+
+  /**
+   * Builder for {@link MergeOnReadInputFormat}.
+   */
+  public static class Builder {
+    private Configuration conf;
+    private Path[] paths;
+    private MergeOnReadTableState tableState;
+    private List<DataType> fieldTypes;
+    private String defaultPartName;
+    private long limit = -1;
+    private boolean emitDelete = false;
+
+    public Builder config(Configuration conf) {
+      this.conf = conf;
+      return this;
+    }
+
+    public Builder paths(Path[] paths) {
+      this.paths = paths;
+      return this;
+    }
+
+    public Builder tableState(MergeOnReadTableState tableState) {
+      this.tableState = tableState;
+      return this;
+    }
+
+    public Builder fieldTypes(List<DataType> fieldTypes) {
+      this.fieldTypes = fieldTypes;
+      return this;
+    }
+
+    public Builder defaultPartName(String defaultPartName) {
+      this.defaultPartName = defaultPartName;
+      return this;
+    }
+
+    public Builder limit(long limit) {
+      this.limit = limit;
+      return this;
+    }
+
+    public Builder emitDelete(boolean emitDelete) {
+      this.emitDelete = emitDelete;
+      return this;
+    }
+
+    public MergeOnReadInputFormat build() {
+      return new MergeOnReadInputFormat(conf, paths, tableState,
+          fieldTypes, defaultPartName, limit, emitDelete);
+    }
+  }
+
+  @VisibleForTesting
+  public void isEmitDelete(boolean emitDelete) {
+    this.emitDelete = emitDelete;
+  }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
index 7dedcda..9a32af6 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
@@ -18,9 +18,11 @@
 
 package org.apache.hudi.table.format.mor;
 
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -35,18 +37,21 @@ public class MergeOnReadTableState implements Serializable {
   private final String avroSchema;
   private final String requiredAvroSchema;
   private final List<MergeOnReadInputSplit> inputSplits;
+  private final String[] pkFields;
 
   public MergeOnReadTableState(
       RowType rowType,
       RowType requiredRowType,
       String avroSchema,
       String requiredAvroSchema,
-      List<MergeOnReadInputSplit> inputSplits) {
+      List<MergeOnReadInputSplit> inputSplits,
+      String[] pkFields) {
     this.rowType = rowType;
     this.requiredRowType = requiredRowType;
     this.avroSchema = avroSchema;
     this.requiredAvroSchema = requiredAvroSchema;
     this.inputSplits = inputSplits;
+    this.pkFields = pkFields;
   }
 
   public RowType getRowType() {
@@ -76,4 +81,30 @@ public class MergeOnReadTableState implements Serializable {
         .mapToInt(i -> i)
         .toArray();
   }
+
+  /**
+   * Get the primary key positions in required row type.
+   */
+  public int[] getPkOffsetsInRequired() {
+    final List<String> fieldNames = requiredRowType.getFieldNames();
+    return Arrays.stream(pkFields)
+        .map(fieldNames::indexOf)
+        .mapToInt(i -> i)
+        .toArray();
+  }
+
+  /**
+   * Returns the primary key fields logical type with given offsets.
+   *
+   * @param pkOffsets the pk offsets in required row type
+   * @return pk field logical types
+   *
+   * @see #getPkOffsetsInRequired()
+   */
+  public LogicalType[] getPkTypes(int[] pkOffsets) {
+    final LogicalType[] requiredTypes = requiredRowType.getFields().stream()
+        .map(RowType.RowField::getType).toArray(LogicalType[]::new);
+    return Arrays.stream(pkOffsets).mapToObj(offset -> requiredTypes[offset])
+        .toArray(LogicalType[]::new);
+  }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
new file mode 100644
index 0000000..b30d6d6
--- /dev/null
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.util.ValidationUtils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDate;
+import java.util.Arrays;
+
+/**
+ * A converter that converts a string array into internal row data fields.
+ * The converter is designed to be stateful(not pure stateless tool)
+ * in order to reuse the specific converters.
+ */
+@Internal
+public class StringToRowDataConverter {
+  private final Converter[] converters;
+
+  public StringToRowDataConverter(LogicalType[] fieldTypes) {
+    this.converters = Arrays.stream(fieldTypes)
+        .map(StringToRowDataConverter::getConverter)
+        .toArray(Converter[]::new);
+  }
+
+  public Object[] convert(String[] fields) {
+    ValidationUtils.checkArgument(converters.length == fields.length,
+        "Field types and values should equal with number");
+
+    Object[] converted = new Object[fields.length];
+    for (int i = 0; i < fields.length; i++) {
+      converted[i] = converters[i].convert(fields[i]);
+    }
+    return converted;
+  }
+
+  private interface Converter {
+    Object convert(String field);
+  }
+
+  private static Converter getConverter(LogicalType logicalType) {
+    switch (logicalType.getTypeRoot()) {
+      case NULL:
+        return field -> null;
+      case TINYINT:
+        return Byte::parseByte;
+      case SMALLINT:
+        return Short::parseShort;
+      case BOOLEAN:
+        return Boolean::parseBoolean;
+      case INTEGER:
+      case TIME_WITHOUT_TIME_ZONE:
+        return Integer::parseInt;
+      case BIGINT:
+        return Long::parseLong;
+      case FLOAT:
+        return Float::parseFloat;
+      case DOUBLE:
+        return Double::parseDouble;
+      case DATE:
+        // see HoodieAvroUtils#convertValueForAvroLogicalTypes
+        return field -> (int) LocalDate.parse(field).toEpochDay();
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+        return field -> TimestampData.fromEpochMillis(Long.parseLong(field));
+      case CHAR:
+      case VARCHAR:
+        return StringData::fromString;
+      case BINARY:
+      case VARBINARY:
+        return field -> field.getBytes(StandardCharsets.UTF_8);
+      case DECIMAL:
+        DecimalType decimalType = (DecimalType) logicalType;
+        return field ->
+            DecimalData.fromBigDecimal(
+                new BigDecimal(field),
+                decimalType.getPrecision(),
+                decimalType.getScale());
+      default:
+        throw new UnsupportedOperationException(
+            "Unsupported type " + logicalType.getTypeRoot() + " for " + 
StringToRowDataConverter.class.getName());
+    }
+  }
+}
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 90874f1..321abfa 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -137,7 +137,7 @@ public class TestStreamWriteOperatorCoordinator {
   }
 
   @Test
-  public void testCheckpointCompleteWithRetry() {
+  public void testCheckpointCompleteWithException() {
     final CompletableFuture<byte[]> future = new CompletableFuture<>();
     coordinator.checkpointCoordinator(1, future);
     String inflightInstant = coordinator.getInstant();
@@ -149,7 +149,9 @@ public class TestStreamWriteOperatorCoordinator {
     coordinator.handleEventFromOperator(0, event);
     assertThrows(HoodieException.class,
         () -> coordinator.notifyCheckpointComplete(1),
-        "Try 3 to commit instant");
+        "org.apache.hudi.exception.HoodieException: Instant [20210330153432] 
has a complete checkpoint [1],\n"
+            + "but the coordinator has not received full write success 
events,\n"
+            + "rolls back the instant and rethrow");
   }
 
   @Test
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index aaeed94..4abc79a 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -260,15 +260,17 @@ public class TestStreamReadOperator {
         TestConfigurations.ROW_TYPE,
         tableAvroSchema.toString(),
         
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
-        Collections.emptyList());
+        Collections.emptyList(),
+        new String[0]);
     Path[] paths = FilePathUtils.getReadPaths(new Path(basePath), conf, 
hadoopConf, partitionKeys);
-    MergeOnReadInputFormat inputFormat = new MergeOnReadInputFormat(
-        conf,
-        FilePathUtils.toFlinkPaths(paths),
-        hoodieTableState,
-        rowDataType.getChildren(),
-        "default",
-        1000L);
+    MergeOnReadInputFormat inputFormat = MergeOnReadInputFormat.builder()
+        .config(conf)
+        .paths(FilePathUtils.toFlinkPaths(paths))
+        .tableState(hoodieTableState)
+        .fieldTypes(rowDataType.getChildren())
+        .defaultPartName("default").limit(1000L)
+        .emitDelete(true)
+        .build();
 
     OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = 
StreamReadOperator.factory(inputFormat);
     OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> harness 
= new OneInputStreamOperatorTestHarness<>(
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 218c742..ffb9859 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -208,6 +208,41 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
         "some commits should be cleaned");
   }
 
+  @Test
+  void testStreamReadWithDeletes() throws Exception {
+    // create filesystem table named source
+
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.TABLE_NAME, "t1");
+    conf.setString(FlinkOptions.TABLE_TYPE, 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+
+    // write one commit
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    // write another commit with deletes
+    TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+    String latestCommit = StreamerUtil.createWriteClient(conf, null)
+        .getLastCompletedInstant(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.TABLE_TYPE.key(), 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
+    options.put(FlinkOptions.READ_STREAMING_CHECK_INTERVAL.key(), "2");
+    options.put(FlinkOptions.READ_STREAMING_START_COMMIT.key(), latestCommit);
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    streamTableEnv.executeSql(hoodieTableDDL);
+
+    List<Row> result = execSelectSql(streamTableEnv, "select * from t1", 10);
+    final String expected = "["
+        + "id1,Danny,24,1970-01-01T00:00:00.001,par1, "
+        + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
+        + "id3,null,null,null,null, "
+        + "id5,null,null,null,null, "
+        + "id9,null,null,null,null]";
+    assertRowsEquals(result, expected);
+  }
+
   @ParameterizedTest
   @EnumSource(value = ExecMode.class)
   void testWriteAndRead(ExecMode execMode) {
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index ae15aba..5e324de 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.format;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.table.HoodieTableSource;
+import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
@@ -43,6 +44,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -151,6 +153,29 @@ public class TestInputFormat {
     assertThat(actual, is(expected));
   }
 
+  @Test
+  void testReadWithDeletes() throws Exception {
+    beforeEach(HoodieTableType.MERGE_ON_READ);
+
+    // write another commit to read again
+    TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+    InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+    assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+    ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true);
+
+    List<RowData> result = readData(inputFormat);
+
+    final String actual = TestData.rowDataToString(result);
+    final String expected = "["
+        + "id1,Danny,24,1970-01-01T00:00:00.001,par1, "
+        + "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
+        + "id3,null,null,null,null, "
+        + "id5,null,null,null,null, "
+        + "id9,null,null,null,null]";
+    assertThat(actual, is(expected));
+  }
+
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class)
   void testReadWithPartitionPrune(HoodieTableType tableType) throws Exception {
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index f8d4e7d..2201aeb 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -224,6 +224,10 @@ public class TestData {
     funcWrapper.close();
   }
 
+  private static String toStringSafely(Object obj) {
+    return obj == null ? "null" : obj.toString();
+  }
+
   /**
    * Sort the {@code rows} using field at index 0 and asserts
    * it equals with the expected string {@code expected}.
@@ -233,7 +237,7 @@ public class TestData {
    */
   public static void assertRowsEquals(List<Row> rows, String expected) {
     String rowsString = rows.stream()
-        .sorted(Comparator.comparing(o -> o.getField(0).toString()))
+        .sorted(Comparator.comparing(o -> toStringSafely(o.getField(0))))
         .collect(Collectors.toList()).toString();
     assertThat(rowsString, is(expected));
   }
@@ -247,7 +251,7 @@ public class TestData {
    */
   public static void assertRowsEquals(List<Row> rows, List<RowData> expected) {
     String rowsString = rows.stream()
-        .sorted(Comparator.comparing(o -> o.getField(0).toString()))
+        .sorted(Comparator.comparing(o -> toStringSafely(o.getField(0))))
         .collect(Collectors.toList()).toString();
     assertThat(rowsString, is(rowDataToString(expected)));
   }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
new file mode 100644
index 0000000..dde19b8
--- /dev/null
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils;
+
+import org.apache.hudi.keygen.KeyGenUtils;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.RowDataToAvroConverters;
+import org.apache.hudi.util.StringToRowDataConverter;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.temporal.ChronoField;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+
+/**
+ * Test cases for {@link StringToRowDataConverter}.
+ */
+public class TestStringToRowDataConverter {
+  @Test
+  void testConvert() {
+    String[] fields = new String[] {"1.1", "3.4", "2021-03-30", "56669000", 
"1617119069000", "12345.67"};
+    LogicalType[] fieldTypes = new LogicalType[] {
+        DataTypes.FLOAT().getLogicalType(),
+        DataTypes.DOUBLE().getLogicalType(),
+        DataTypes.DATE().getLogicalType(),
+        DataTypes.TIME(3).getLogicalType(),
+        DataTypes.TIMESTAMP().getLogicalType(),
+        DataTypes.DECIMAL(7, 2).getLogicalType()
+    };
+    StringToRowDataConverter converter = new 
StringToRowDataConverter(fieldTypes);
+    Object[] converted = converter.convert(fields);
+    Object[] expected = new Object[] {
+        1.1f, 3.4D, (int) LocalDate.parse("2021-03-30").toEpochDay(),
+        LocalTime.parse("15:44:29").get(ChronoField.MILLI_OF_DAY),
+        
TimestampData.fromEpochMillis(Instant.parse("2021-03-30T15:44:29Z").toEpochMilli()),
+        DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2)
+    };
+    assertArrayEquals(expected, converted);
+  }
+
+  @Test
+  void testRowDataToAvroStringToRowData() {
+    GenericRowData rowData = new GenericRowData(6);
+    rowData.setField(0, 1.1f);
+    rowData.setField(1, 3.4D);
+    rowData.setField(2, (int) LocalDate.parse("2021-03-30").toEpochDay());
+    rowData.setField(3, 
LocalTime.parse("15:44:29").get(ChronoField.MILLI_OF_DAY));
+    rowData.setField(4, 
TimestampData.fromEpochMillis(Instant.parse("2021-03-30T15:44:29Z").toEpochMilli()));
+    rowData.setField(5, DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 
7, 2));
+
+    DataType dataType = DataTypes.ROW(
+        DataTypes.FIELD("f_float", DataTypes.FLOAT()),
+        DataTypes.FIELD("f_double", DataTypes.DOUBLE()),
+        DataTypes.FIELD("f_date", DataTypes.DATE()),
+        DataTypes.FIELD("f_time", DataTypes.TIME(3)),
+        DataTypes.FIELD("f_timestamp", DataTypes.TIMESTAMP(3)),
+        DataTypes.FIELD("f_decimal", DataTypes.DECIMAL(7, 2))
+    );
+    RowType rowType = (RowType) dataType.getLogicalType();
+    RowDataToAvroConverters.RowDataToAvroConverter converter =
+        RowDataToAvroConverters.createConverter(rowType);
+    GenericRecord avroRecord =
+        (GenericRecord) 
converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData);
+    StringToRowDataConverter stringToRowDataConverter =
+        new StringToRowDataConverter(rowType.getChildren().toArray(new 
LogicalType[0]));
+    final String recordKey = KeyGenUtils.getRecordKey(avroRecord, 
rowType.getFieldNames());
+    final String[] recordKeys = KeyGenUtils.extractRecordKeys(recordKey);
+    Object[] convertedKeys = stringToRowDataConverter.convert(recordKeys);
+
+    GenericRowData converted = new GenericRowData(6);
+    for (int i = 0; i < 6; i++) {
+      converted.setField(i, convertedKeys[i]);
+    }
+    assertThat(converted, is(rowData));
+  }
+}
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
 
b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
index b8d7de6..68e183b 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java
@@ -143,14 +143,9 @@ public class CollectSinkTableFactory implements 
DynamicTableSinkFactory {
 
     @Override
     public void invoke(RowData value, SinkFunction.Context context) {
-      if (value.getRowKind() == RowKind.INSERT) {
-        Row row = (Row) converter.toExternal(value);
-        assert row != null;
-        RESULT.get(taskID).add(row);
-      } else {
-        throw new RuntimeException(
-            "CollectSinkFunction received " + value.getRowKind() + " 
messages.");
-      }
+      Row row = (Row) converter.toExternal(value);
+      assert row != null;
+      RESULT.get(taskID).add(row);
     }
 
     @Override

Reply via email to