[GitHub] [hudi] danny0405 commented on a diff in pull request #5445: [HUDI-3953]Flink Hudi module should support low-level source and sink…

2022-05-09 Thread GitBox


danny0405 commented on code in PR #5445:
URL: https://github.com/apache/hudi/pull/5445#discussion_r868790677


##
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java:
##
@@ -248,8 +221,129 @@ private void testWriteToHoodie(
   // wait for the streaming job to finish
   client.getJobExecutionResult().get();
 }
+  }
 
-TestData.checkWrittenFullData(tempFile, expected);
+  public DataStream dataStreamGen(StreamExecutionEnvironment execEnv, boolean 
isMor, RowType rowType, Option transformer, int checkpoints) {
+
+JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
+rowType,
+InternalTypeInfo.of(rowType),
+false,
+true,
+TimestampFormat.ISO_8601
+);
+String sourcePath = Objects.requireNonNull(Thread.currentThread()
+
.getContextClassLoader().getResource("test_source.data")).toString();
+
+DataStream dataStream;
+if (isMor) {
+  TextInputFormat format = new TextInputFormat(new Path(sourcePath));
+  format.setFilesFilter(FilePathFilter.createDefaultFilter());
+  TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+  format.setCharsetName("UTF-8");
+
+  dataStream = execEnv
+  // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
+  .readFile(format, sourcePath, 
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
+  .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+  .setParallelism(1);
+} else {
+  dataStream = execEnv
+  // use continuous file source to trigger checkpoint
+  .addSource(new ContinuousFileSource.BoundedSourceFunction(new 
Path(sourcePath), checkpoints))
+  .name("continuous_file_source")
+  .setParallelism(1)
+  .map(record -> 
deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
+  .setParallelism(4);
+}
 
+if (transformer.isPresent()) {
+  dataStream = transformer.get().apply(dataStream);
+}
+return dataStream;
+  }
+
+  @Test
+  public void testSource() throws Exception {
+//create a StreamExecutionEnvironment instance.
+StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+execEnv.getConfig().disableObjectReuse();
+execEnv.setParallelism(1);
+// set up checkpoint interval
+execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+conf.setString(FlinkOptions.TABLE_NAME, "t1");
+conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
+
+// write 3 batches of data set
+TestData.writeData(TestData.dataSetInsert(1, 2), conf);
+TestData.writeData(TestData.dataSetInsert(3, 4), conf);
+TestData.writeData(TestData.dataSetInsert(5, 6), conf);
+
+String latestCommit = 
TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+
+Map options = new HashMap<>();
+options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+options.put(FlinkOptions.READ_START_COMMIT.key(), latestCommit);
+
+//read a hoodie table use low-level source api.
+HoodiePipeline.Builder builder = HoodiePipeline.builder("test_source")
+.column("uuid string not null")
+.column("name string")
+.column("age int")
+.column("`ts` timestamp(3)")
+.column("`partition` string")
+.pk("uuid")
+.partition("partition")
+.options(options);
+DataStream rowDataDataStream = builder.source(execEnv);
+List result = new ArrayList<>();
+rowDataDataStream.executeAndCollect().forEachRemaining(result::add);
+TimeUnit.SECONDS.sleep(2);//sleep 2 second for collect data
+assertRowDataEquals(result, TestData.dataSetInsert(5, 6));
+  }
+
+  @Test
+  public void testSink() throws Exception {
+StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();

Review Comment:
   `testPipelineBuilderSink()`



##
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java:
##
@@ -57,13 +59,16 @@
 
 import java.io.File;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.hudi.utils.TestData.assertRowDataEquals;
+

Review Comment:
   Can we avoid to import static methods and use `TestData.assertRowDataEquals` 
directly ?



##

[GitHub] [hudi] danny0405 commented on a diff in pull request #5445: [HUDI-3953]Flink Hudi module should support low-level source and sink…

2022-05-09 Thread GitBox


danny0405 commented on code in PR #5445:
URL: https://github.com/apache/hudi/pull/5445#discussion_r86940


##
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java:
##
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieTableFactory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ *  A tool class to construct hoodie flink pipeline.
+ *
+ *  How to use ?
+ *  Method {@link #builder(String)} returns a pipeline builder. The builder
+ *  can then define the hudi table columns, primary keys and partitions.
+ *
+ *  An example:
+ *  
+ *HoodiePipeline.Builder builder = HoodiePipeline.builder("myTable");
+ *DataStreamSink sinkStream = builder
+ *.column("f0 int")
+ *.column("f1 varchar(10)")
+ *.column("f2 varchar(20)")
+ *.pk("f0,f1")
+ *.partition("f2")
+ *.sink(input, false);
+ *  
+ */
+public class HoodiePipeline {
+
+  private static final Logger LOG = LogManager.getLogger(HoodiePipeline.class);
+
+  /**
+   * Returns the builder for hoodie pipeline construction.
+   */
+  public static Builder builder(String tableName) {
+return new Builder(tableName);
+  }
+
+  /**
+   * Builder for hudi source/sink pipeline construction.
+   */
+  public static class Builder {
+private final String tableName;
+private final List columns;
+private final Map options;
+
+private String pk;
+private List partitions;
+
+private Builder(String tableName) {
+  this.tableName = tableName;
+  this.columns = new ArrayList<>();
+  this.options = new HashMap<>();
+  this.partitions = new ArrayList<>();
+}
+
+/**
+ * Add a table column definition.
+ *
+ * @param column the column format should be in the form like 'f0 int'
+ */
+public Builder column(String column) {
+  this.columns.add(column);
+  return this;
+}
+
+/**
+ * Add primary keys.
+ */
+public Builder pk(String... pks) {
+  this.pk = String.join(",", pks);
+  return this;
+}
+
+/**
+ * Add partition fields.
+ */
+public Builder partition(String... partitions) {
+  this.partitions = new ArrayList<>(Arrays.asList(partitions));
+  return this;
+}
+
+/**
+ * Add a config option.
+ */
+public Builder option(ConfigOption option, Object val) {
+  this.options.put(option.key(), val.toString());
+  return this;
+}
+
+public Builder option(String key, Object val) {
+  this.options.put(key, val.toString());
+  return this;
+}
+
+public Builder options(Map options) {
+  this.options.putAll(options);
+  return 

[GitHub] [hudi] danny0405 commented on a diff in pull request #5445: [HUDI-3953]Flink Hudi module should support low-level source and sink…

2022-05-09 Thread GitBox


danny0405 commented on code in PR #5445:
URL: https://github.com/apache/hudi/pull/5445#discussion_r86363


##
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java:
##
@@ -57,19 +59,22 @@
 
 import java.io.File;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.hudi.utils.TestData.assertRowDataEquals;
+
 /**
  * Integration test for Flink Hoodie stream sink.
  */
 public class ITTestDataStreamWrite extends TestLogger {
 
-  private static final Map> EXPECTED = new HashMap<>();
+  public static final Map> EXPECTED = new HashMap<>();
   private static final Map> EXPECTED_TRANSFORMER = new 
HashMap<>();

Review Comment:
   Why make the `EXPECTED` public ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #5445: [HUDI-3953]Flink Hudi module should support low-level source and sink…

2022-05-04 Thread GitBox


danny0405 commented on code in PR #5445:
URL: https://github.com/apache/hudi/pull/5445#discussion_r865571243


##
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java:
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieTableFactory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ *  A tool class to construct hoodie flink pipeline.
+ *
+ *  How to use ?
+ *  Method {@link #builder(String)} returns a pipeline builder. The builder
+ *  can then define the hudi table columns, primary keys and partitions.
+ *
+ *  An example:
+ *  
+ *HoodiePipeline.Builder builder = HoodiePipeline.builder("myTable");
+ *DataStreamSink sinkStream = builder
+ *.column("f0 int")
+ *.column("f1 varchar(10)")
+ *.column("f2 varchar(20)")
+ *.pk("f0,f1")
+ *.partition("f2")
+ *.sink(input, false);
+ *  
+ */
+public class HoodiePipeline {
+
+  private static final Logger LOG = LogManager.getLogger(HoodiePipeline.class);
+
+  /**
+   * Returns the builder for hoodie pipeline construction.
+   */
+  public static Builder builder(String tableName) {
+return new Builder(tableName);
+  }
+
+  /**
+   * Builder for hudi source/sink pipeline construction.
+   */
+  public static class Builder {
+private final String tableName;
+private final List columns;
+private final Map options;
+
+private String pk;
+private List partitions;
+
+public Builder self() {
+  return this;
+}
+
+private Builder(String tableName) {
+  this.tableName = tableName;
+  this.columns = new ArrayList<>();
+  this.options = new HashMap<>();
+  this.partitions = new ArrayList<>();
+}
+
+/**
+ * Add a table column definition.
+ *
+ * @param column the column format should be in the form like 'f0 int'
+ */
+public Builder column(String column) {
+  this.columns.add(column);
+  return self();
+}
+
+/**
+ * Add primary keys.
+ */
+public Builder pk(String... pks) {
+  this.pk = String.join(",", pks);
+  return self();
+}
+
+/**
+ * Add partition fields.
+ */
+public Builder partition(String... partitions) {
+  this.partitions = new ArrayList<>(Arrays.asList(partitions));
+  return self();
+}
+
+/**
+ * Add a config option.
+ */
+public Builder option(ConfigOption option, Object val) {
+  this.options.put(option.key(), val.toString());
+  return self();
+}
+
+public Builder option(String key, Object val) {
+  this.options.put(key, val.toString());
+  return self();
+}
+
+public Builder 

[GitHub] [hudi] danny0405 commented on a diff in pull request #5445: [HUDI-3953]Flink Hudi module should support low-level source and sink…

2022-05-04 Thread GitBox


danny0405 commented on code in PR #5445:
URL: https://github.com/apache/hudi/pull/5445#discussion_r865567582


##
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##
@@ -1217,6 +1234,42 @@ void testBuiltinFunctionWithCatalog(String operation) {
 assertRowsEquals(partitionResult, "[+I[1, 2022-02-02]]");
   }
 
+  @Test
+  public void testSource() throws Exception {
+
+Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+conf.setString(FlinkOptions.TABLE_NAME, "t1");
+conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
+
+// write 3 batches of data set
+TestData.writeData(TestData.dataSetInsert(1, 2), conf);
+TestData.writeData(TestData.dataSetInsert(3, 4), conf);
+TestData.writeData(TestData.dataSetInsert(5, 6), conf);
+
+Map options = new HashMap<>();
+String latestCommit = 
TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath());
+
+options.clear();

Review Comment:
   Can this test also be moved to `ITTestDataStreamWrite `?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #5445: [HUDI-3953]Flink Hudi module should support low-level source and sink…

2022-04-29 Thread GitBox


danny0405 commented on code in PR #5445:
URL: https://github.com/apache/hudi/pull/5445#discussion_r862272988


##
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java:
##
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieTableFactory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ *  A tool class to construct hoodie flink pipeline.
+ *
+ *  How to use ?
+ *  Method {@link #builder(String)} returns a pipeline builder. The builder
+ *  can then define the hudi table columns, primary keys and partitions.
+ *
+ *  An example:
+ *  
+ *HoodiePipeline.Builder builder = HoodiePipeline.builder("myTable");
+ *DataStreamSink sinkStream = builder
+ *.column("f0 int")
+ *.column("f1 varchar(10)")
+ *.column("f2 varchar(20)")
+ *.pk("f0,f1")
+ *.partition("f2")
+ *.sink(input, false);
+ *  
+ */
+public class HoodiePipeline {
+
+  private static final Logger LOG = LogManager.getLogger(HoodiePipeline.class);
+
+  /**
+   * Returns the builder for hoodie pipeline construction.
+   */
+  public static Builder builder(String tableName) {
+return new Builder(tableName);
+  }
+
+  /**
+   * Builder for hudi source/sink pipeline construction.
+   */
+  public static class Builder {
+private final String tableName;
+private final List columns;

Review Comment:
   The builder is very similar to `TestConfigurations.Sql`, we can plan to 
refactor the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] danny0405 commented on a diff in pull request #5445: [HUDI-3953]Flink Hudi module should support low-level source and sink…

2022-04-29 Thread GitBox


danny0405 commented on code in PR #5445:
URL: https://github.com/apache/hudi/pull/5445#discussion_r862272886


##
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ITTestHoodiePipeline.java:
##
@@ -0,0 +1,220 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodiePipeline;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestUtils;
+import org.apache.hudi.utils.source.ContinuousFileSource;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.io.FilePathFilter;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hudi.sink.ITTestDataStreamWrite.EXPECTED;
+import static org.apache.hudi.utils.TestData.assertRowDataEquals;
+
+/**
+ *  IT test for test hoodie pipeline.
+ */
+public class ITTestHoodiePipeline {
+
+  public static final List RESULT = new ArrayList<>();
+  @TempDir
+  File tempFile;
+  private StreamExecutionEnvironment execEnv;
+  private boolean isMor;
+  private Map options = new HashMap<>();
+  private TableEnvironment streamTableEnv;
+
+  @BeforeEach
+  void beforEach() {
+this.execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+execEnv.getConfig().disableObjectReuse();
+execEnv.setParallelism(4);
+// set up checkpoint interval
+execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+options.put(FlinkOptions.INDEX_TYPE.key(), "FLINK_STATE");
+options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "4");
+options.put("table.type", HoodieTableType.MERGE_ON_READ.name());
+options.put(FlinkOptions.INDEX_KEY_FIELD.key(), "id");
+options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1");
+options.put(FlinkOptions.TABLE_TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name());
+options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), 
Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString());
+
+this.isMor = 
options.get(FlinkOptions.TABLE_TYPE.key()).equals(HoodieTableType.MERGE_ON_READ.name());
+
+EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+streamTableEnv = TableEnvironmentImpl.create(settings);
+