yanghua commented on a change in pull request #2640: URL: https://github.com/apache/hudi/pull/2640#discussion_r589959661
########## File path: hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java ########## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.operator.FlinkOptions; +import org.apache.hudi.operator.StreamReadMonitoringFunction; +import org.apache.hudi.operator.StreamReadOperator; +import org.apache.hudi.operator.utils.TestConfigurations; +import org.apache.hudi.operator.utils.TestData; +import org.apache.hudi.source.format.FilePathUtils; +import org.apache.hudi.source.format.mor.MergeOnReadInputFormat; +import org.apache.hudi.source.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.source.format.mor.MergeOnReadTableState; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.utils.TestUtils; + +import org.apache.avro.Schema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction; +import org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor; +import org.apache.flink.streaming.util.CollectingSourceContext; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for {@link StreamReadOperator}. + */ +public class TestStreamReadOperator { + private Configuration conf; + + @TempDir + File tempFile; + + @BeforeEach + public void before() throws Exception { + final String basePath = tempFile.getAbsolutePath(); + conf = TestConfigurations.getDefaultConf(basePath); + conf.setString(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + + StreamerUtil.initTableIfNotExists(conf); + } + + @Test + void testWriteRecords() throws Exception { + TestData.writeData(TestData.DATA_SET_ONE, conf); + try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> harness = createReader()) { + harness.setup(); + harness.open(); + + SteppingMailboxProcessor processor = createLocalMailbox(harness); + StreamReadMonitoringFunction func = getMonitorFunc(); + + List<MergeOnReadInputSplit> splits = generateSplits(func); + assertThat("Should have 4 splits", splits.size(), is(4)); + for (MergeOnReadInputSplit split : splits) { + // Process this element to enqueue to mail-box. + harness.processElement(split, -1); + + // Run the mail-box once to read all records from the given split. + assertThat("Should process 1 split", processor.runMailboxStep()); + } + // Assert the output has expected elements. + TestData.assertRowDataEquals(harness.extractOutputValues(), TestData.DATA_SET_ONE); + + TestData.writeData(TestData.DATA_SET_TWO, conf); + final List<MergeOnReadInputSplit> splits2 = generateSplits(func); + assertThat("Should have 4 splits", splits2.size(), is(4)); + for (MergeOnReadInputSplit split : splits2) { + // Process this element to enqueue to mail-box. + harness.processElement(split, -1); + + // Run the mail-box once to read all records from the given split. + assertThat("Should processed 1 split", processor.runMailboxStep()); + } + // The result sets behaves like append only: DATA_SET_ONE + DATA_SET_TWO + List<RowData> expected = new ArrayList<>(TestData.DATA_SET_ONE); + expected.addAll(TestData.DATA_SET_TWO); + TestData.assertRowDataEquals(harness.extractOutputValues(), expected); + } + } + + @Test + public void testCheckpoint() throws Exception { + // Received emitted splits: split1, split2, split3, split4, checkpoint request is triggered + // when reading records from split1. + TestData.writeData(TestData.DATA_SET_ONE, conf); + long timestamp = 0; + try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> harness = createReader()) { + harness.setup(); + harness.open(); + + SteppingMailboxProcessor processor = createLocalMailbox(harness); + StreamReadMonitoringFunction func = getMonitorFunc(); + + List<MergeOnReadInputSplit> splits = generateSplits(func); + assertThat("Should have 4 splits", splits.size(), is(4)); + + harness.processElement(splits.get(0), ++timestamp); + harness.processElement(splits.get(1), ++timestamp); + harness.processElement(splits.get(2), ++timestamp); + harness.processElement(splits.get(3), ++timestamp); + + // Trigger snapshot state, it will start to work once all records from split0 are read. + processor.getMainMailboxExecutor() + .execute(() -> harness.snapshot(1, 3), "Trigger snapshot"); + + assertTrue(processor.runMailboxStep(), "Should have processed the split0"); + assertTrue(processor.runMailboxStep(), "Should have processed the snapshot state action"); + + assertThat(TestData.rowDataToString(harness.extractOutputValues()), + is("[id3,Julian,53,1970-01-01T00:00:00.003,par2, id4,Fabian,31,1970-01-01T00:00:00.004,par2]")); + + // Read records from split1. + assertTrue(processor.runMailboxStep(), "Should have processed the split1"); + + // Read records from split2. + assertTrue(processor.runMailboxStep(), "Should have processed the split2"); + + // Read records from split3. + assertTrue(processor.runMailboxStep(), "Should have processed the split3"); + + // Assert the output has expected elements. + TestData.assertRowDataEquals(harness.extractOutputValues(), TestData.DATA_SET_ONE); + } + } + + @Test + public void testCheckpointRestore() throws Exception { + TestData.writeData(TestData.DATA_SET_ONE, conf); + + final Map<String, String> expected = new HashMap<>(); + expected.put("par1", "id1,Danny,23,1970-01-01T00:00:00.001,par1, id2,Stephen,33,1970-01-01T00:00:00.002,par1"); + expected.put("par2", "id3,Julian,53,1970-01-01T00:00:00.003,par2, id4,Fabian,31,1970-01-01T00:00:00.004,par2"); + expected.put("par3", "id5,Sophia,18,1970-01-01T00:00:00.005,par3, id6,Emma,20,1970-01-01T00:00:00.006,par3"); + expected.put("par4", "id7,Bob,44,1970-01-01T00:00:00.007,par4, id8,Han,56,1970-01-01T00:00:00.008,par4"); + + OperatorSubtaskState state; + final List<MergeOnReadInputSplit> splits; + try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> harness = createReader()) { + harness.setup(); + harness.open(); + + StreamReadMonitoringFunction func = getMonitorFunc(); + + splits = generateSplits(func); + assertThat("Should have 4 splits", splits.size(), is(4)); + + // Enqueue all the splits. + for (MergeOnReadInputSplit split : splits) { + harness.processElement(split, -1); + } + + // Read all records from the first 2 splits. + SteppingMailboxProcessor localMailbox = createLocalMailbox(harness); + for (int i = 0; i < 2; i++) { + assertTrue(localMailbox.runMailboxStep(), "Should have processed the split#" + i); + } + + assertThat(TestData.rowDataToString(harness.extractOutputValues()), + is(getSplitExpected(splits.subList(0, 2), expected))); + + // Snapshot state now, there are 2 splits left in the state. + state = harness.snapshot(1, 1); + } + + try (OneInputStreamOperatorTestHarness<MergeOnReadInputSplit, RowData> harness = createReader()) { + harness.setup(); + // Recover to process the remaining splits. + harness.initializeState(state); + harness.open(); + + SteppingMailboxProcessor localMailbox = createLocalMailbox(harness); + + for (int i = 2; i < 4; i++) { + assertTrue(localMailbox.runMailboxStep(), "Should have processed one split#" + i); + } + + // expect to output the left data + assertThat(TestData.rowDataToString(harness.extractOutputValues()), + is(getSplitExpected(splits.subList(2, 4), expected))); + } + } + + private static String getSplitExpected(List<MergeOnReadInputSplit> splits, Map<String, String> expected) { + return splits.stream() + .map(TestUtils::getSplitPartitionPath) + .map(expected::get) + .sorted(Comparator.naturalOrder()) + .collect(Collectors.toList()).toString(); + } + + private StreamReadMonitoringFunction getMonitorFunc() { Review comment: ok, the difference between thoughts. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org