danny0405 commented on a change in pull request #2640:
URL: https://github.com/apache/hudi/pull/2640#discussion_r589907439



##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.operator.FlinkOptions;
+import org.apache.hudi.operator.StreamReadMonitoringFunction;
+import org.apache.hudi.operator.utils.TestConfigurations;
+import org.apache.hudi.operator.utils.TestData;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestUtils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link StreamReadMonitoringFunction}.
+ */
+public class TestStreamReadMonitoringFunction {
+  private static final long WAIT_TIME_MILLIS = 5 * 1000L;
+
+  private Configuration conf;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  public void before() throws Exception {
+    final String basePath = tempFile.getAbsolutePath();
+    conf = TestConfigurations.getDefaultConf(basePath);
+    conf.setString(FlinkOptions.TABLE_TYPE, 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    conf.setInteger(FlinkOptions.STREAMING_CHECK_INTERVAL, 2); // check every 
2 seconds
+
+    StreamerUtil.initTableIfNotExists(conf);
+  }
+
+  @Test
+  public void testConsumeFromLatestCommit() throws Exception {
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    StreamReadMonitoringFunction function = getMonitorFunc();
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().noneMatch(split -> 
split.getInstantRange().isPresent()),
+          "No instants should have range limit");
+
+      Thread.sleep(1000L);
+
+      // reset the source context
+      latch = new CountDownLatch(4);
+      sourceContext.reset(latch);
+
+      // write another instant and validate
+      TestData.writeData(TestData.DATA_SET_TWO, conf);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All the instants should have range limit");
+
+      // Stop the stream task.
+      function.close();
+    }
+  }
+
+  @Test
+  public void testConsumeFromSpecifiedCommit() throws Exception {
+    // write 2 commits first, use the second commit time as the specified 
start instant,
+    // all the splits should come from the second commit.
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+    TestData.writeData(TestData.DATA_SET_TWO, conf);
+    String specifiedCommit = 
TestUtils.getLatestCommit(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.STREAMING_START_COMMIT, specifiedCommit);
+    StreamReadMonitoringFunction function = getMonitorFunc();
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All the instants should have range limit");
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getLatestCommit().equals(specifiedCommit)),
+          "All the splits should be with specified instant time");
+
+      // Stop the stream task.
+      function.close();
+    }
+  }
+
+  @Test
+  public void testCheckpointRestore() throws Exception {
+    TestData.writeData(TestData.DATA_SET_ONE, conf);
+
+    StreamReadMonitoringFunction function = getMonitorFunc();
+    OperatorSubtaskState state;
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      Thread.sleep(1000L);
+
+      state = harness.snapshot(1, 1);
+
+      // Stop the stream task.
+      function.close();
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().noneMatch(split -> 
split.getInstantRange().isPresent()),
+          "No instants should have range limit");
+
+    }
+
+    TestData.writeData(TestData.DATA_SET_TWO, conf);
+    StreamReadMonitoringFunction function2 = getMonitorFunc();
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function2)) {
+      harness.setup();
+      // Recover to process the remaining snapshots.
+      harness.initializeState(state);
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      TestSourceContext sourceContext = new TestSourceContext(latch);
+      runAsync(sourceContext, function2);
+
+      // Stop the stream task.
+      function.close();
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should 
finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
+          "All the instants should have range limit");
+    }
+  }
+
+  private StreamReadMonitoringFunction getMonitorFunc() {
+    final String basePath = tempFile.getAbsolutePath();
+    final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(StreamerUtil.getHadoopConf()).setBasePath(basePath).build();
+    final List<String> partitionKeys = Collections.singletonList("partition");
+    return new StreamReadMonitoringFunction(conf, new Path(basePath), 
metaClient,
+        partitionKeys, 1024 * 1024L);
+  }
+
+  private AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> 
createHarness(
+      StreamReadMonitoringFunction function) throws Exception {
+    StreamSource<MergeOnReadInputSplit, StreamReadMonitoringFunction> 
streamSource = new StreamSource<>(function);
+    return new AbstractStreamOperatorTestHarness<>(streamSource, 1, 1, 0);
+  }
+
+  private void runAsync(
+      TestSourceContext sourceContext,
+      StreamReadMonitoringFunction function) {
+    Thread task = new Thread(() -> {
+      try {
+        function.run(sourceContext);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+    task.start();
+  }
+
+  private static class TestSourceContext implements 
SourceFunction.SourceContext<MergeOnReadInputSplit> {

Review comment:
       Sorry, i think the original name is better.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to