danny0405 commented on a change in pull request #3134:
URL: https://github.com/apache/hudi/pull/3134#discussion_r657673459



##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
##########
@@ -83,10 +92,76 @@ protected void checkWrittenData(File baseFile, Map<String, 
String> expected, int
     TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, 
partitions, schema);
   }
 
-  @Disabled
   @Test
-  public void testIndexStateBootstrap() {
-    // Ignore the index bootstrap because we only support parquet load now.
+  public void testIndexStateBootstrap() throws Exception {
+    // open the function and ingest data
+    funcWrapper.openFunction();
+    for (RowData rowData : TestData.DATA_SET_INSERT) {
+      funcWrapper.invoke(rowData);
+    }
+
+    assertEmptyDataFiles();
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(1);
+
+    OperatorEvent nextEvent = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", nextEvent, 
instanceOf(BatchWriteSuccessEvent.class));
+
+    funcWrapper.getCoordinator().handleEventFromOperator(0, nextEvent);
+    assertNotNull(funcWrapper.getEventBuffer()[0], "The coordinator missed the 
event");
+
+    funcWrapper.checkpointComplete(1);
+
+    // the data is not flushed yet
+    checkWrittenData(tempFile, EXPECTED1, 4);
+
+    // reset the config option
+    conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
+    funcWrapper = new StreamWriteFunctionWrapper<>(tempFile.getAbsolutePath(), 
conf);
+
+    // upsert another data buffer
+    funcWrapper.openFunction();
+    for (RowData rowData : TestData.DATA_SET_UPDATE_INSERT) {
+      funcWrapper.invoke(rowData);
+    }
+
+    assertTrue(funcWrapper.isAlreadyBootstrap());
+
+    checkIndexLoaded(
+            new HoodieKey("id1", "par1"),
+            new HoodieKey("id2", "par1"),
+            new HoodieKey("id3", "par2"),
+            new HoodieKey("id4", "par2"),
+            new HoodieKey("id5", "par3"),
+            new HoodieKey("id6", "par3"),
+            new HoodieKey("id7", "par4"),
+            new HoodieKey("id8", "par4"),
+            new HoodieKey("id9", "par3"),
+            new HoodieKey("id10", "par4"),
+            new HoodieKey("id11", "par4"));
+
+    // this triggers the data write and event send
+    funcWrapper.checkpointFunction(1);
+
+    String instant = funcWrapper.getWriteClient()
+            .getLastPendingInstant(getTableType());
+
+    nextEvent = funcWrapper.getNextEvent();
+    assertThat("The operator expect to send an event", nextEvent, 
instanceOf(BatchWriteSuccessEvent.class));
+
+    // the data is not compact yet
+    checkWrittenData(tempFile, EXPECTED1, 4);
+

Review comment:
       We override before to skip the tests, now you open it again, we should 
reuse the code as much as possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to