danny0405 commented on a change in pull request #3134:
URL: https://github.com/apache/hudi/pull/3134#discussion_r657657299



##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
##########
@@ -83,10 +92,76 @@ protected void checkWrittenData(File baseFile, Map<String, 
String> expected, int
     TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, 
partitions, schema);
   }
 
-  @Disabled
   @Test
-  public void testIndexStateBootstrap() {
-    // Ignore the index bootstrap because we only support parquet load now.
+  public void testIndexStateBootstrap() throws Exception {
+    // open the function and ingest data
+    funcWrapper.openFunction();
+    for (RowData rowData : TestData.DATA_SET_INSERT) {
+      funcWrapper.invoke(rowData);
+    }
+
+    assertEmptyDataFiles();
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(1);
+
+    OperatorEvent nextEvent = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", nextEvent, 
instanceOf(BatchWriteSuccessEvent.class));
+
+    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the 
event");
+
+    funcWrapper.checkpointComplete(1);
+
+    // the data is not flushed yet
+    checkWrittenData(tempFile, EXPECTED1, 4);
+
+    // reset the config option
+    conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
+    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), 
conf);
+
+    // upsert another data buffer
+    funcWrapper.openFunction();
+    for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
+      funcWrapper.invoke(rowData);
+    }
+
+    assertTrue(funcWrapper.isAlreadyBootstrap());
+
+    checkIndexLoaded(
+            new HoodieKey("id1", "par1"),
+            new HoodieKey("id2", "par1"),
+            new HoodieKey("id3", "par2"),
+            new HoodieKey("id4", "par2"),
+            new HoodieKey("id5", "par3"),
+            new HoodieKey("id6", "par3"),
+            new HoodieKey("id7", "par4"),
+            new HoodieKey("id8", "par4"),
+            new HoodieKey("id9", "par3"),
+            new HoodieKey("id10", "par4"),
+            new HoodieKey("id11", "par4"));
+
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(1);
+
+    String instant = funcWrapper.getWriteClient()
+            .getLastPendingInstant(getTableType());
+
+    nextEvent = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", nextEvent, 
instanceOf(BatchWriteSuccessEvent.class));
+
+    // the data is not compact yet
+    checkWrittenData(tempFile, EXPECTED1, 4);
+

Review comment:
       This seems the only different with the parent method, can we abstract it 
out to override.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to