This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 36a9113aca2 [HUDI-7929] Create k8s example for flink hudi integration 
(#11570)
36a9113aca2 is described below

commit 36a9113aca280aed206dcfbdd317ccfb6b6a6382
Author: Peter Huang <huangzhenqiu0...@gmail.com>
AuthorDate: Sun Jul 7 21:16:34 2024 -0700

    [HUDI-7929] Create k8s example for flink hudi integration (#11570)
---
 hudi-examples/hudi-examples-k8s/Dockerfile         |  31 ++++
 hudi-examples/hudi-examples-k8s/README.md          | 113 ++++++++++++
 .../hudi-examples-k8s/config/hadoop/core-site.xml  |  62 +++++++
 .../config/k8s/flink-deployment.yml                |  59 ++++++
 .../config/k8s/minio-standalone.yaml               |  83 +++++++++
 hudi-examples/hudi-examples-k8s/pom.xml            | 200 +++++++++++++++++++++
 .../k8s/quickstart/HudiDataStreamWriter.java       | 170 ++++++++++++++++++
 .../k8s/quickstart/utils/DataGenerator.java        |  79 ++++++++
 hudi-examples/pom.xml                              |   1 +
 9 files changed, 798 insertions(+)

diff --git a/hudi-examples/hudi-examples-k8s/Dockerfile 
b/hudi-examples/hudi-examples-k8s/Dockerfile
new file mode 100644
index 00000000000..a3a7e7889f8
--- /dev/null
+++ b/hudi-examples/hudi-examples-k8s/Dockerfile
@@ -0,0 +1,31 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+FROM flink:1.18
+
+RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop
+RUN mv -v /opt/flink/opt/flink-s3-fs-hadoop-*.jar 
/opt/flink/plugins/flink-s3-fs-hadoop
+RUN chown -R flink:flink /opt/flink/plugins/flink-s3-fs-hadoop
+
+ENV FLINK_HADOOP_CONF /etc/hadoop/conf
+RUN mkdir -p $FLINK_HADOOP_CONF
+COPY config/hadoop/core-site.xml $FLINK_HADOOP_CONF
+ENV HADOOP_CLASSPATH=$FLINK_HADOOP_CONF
+WORKDIR /opt/hudi/examples
+COPY target/hudi-examples-k8s-*-SNAPSHOT.jar streaming/hudi-examples-k8s.jar
\ No newline at end of file
diff --git a/hudi-examples/hudi-examples-k8s/README.md 
b/hudi-examples/hudi-examples-k8s/README.md
new file mode 100644
index 00000000000..96371b6d3eb
--- /dev/null
+++ b/hudi-examples/hudi-examples-k8s/README.md
@@ -0,0 +1,113 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+# Quick Start
+
+## Prerequisites
+
+Install the helm chart for Flink on Kubernetes. See [Flink on 
Kubernetes](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/)
 for more details.
+
+Make sure local Docker has been assigned at least 8 cores and 8G memory.
+
+## Setting up a docker registry
+
+- Point docker-cli to Docker running in minikube
+    ```sh
+    eval $(minikube -p minikube docker-env)
+    ```
+
+- Start local Docker registry in minikube Docker (the `docker` command is 
talking to minikube Docker)
+    ```sh
+    docker run -d -p 5001:5000 --restart=always --name registry registry:2
+    ```
+   We tunnel port `5001` to `5000`, you can choose any available port.
+
+## Building the Hudi Flink Docker Image
+
+We need to mount a local path on the host for all containers in minikube to 
work with.
+
+```shell
+mkdir /tmp/minikubedata
+minikube mount /tmp/minikubedata:/data &
+```
+
+We need a custom docker image that extends the flink base image and adds the 
hudi flink example jar to it.
+You can build this docker image by running the following command:
+```shell
+docker build -t localhost:5001/hudi/hudi-flink .
+```
+
+This should have pushed the docker image to the docker registry.
+You can verify this by running
+```shell
+docker images
+```
+
+This should show you the docker image `hudi-flink:latest` in the list of 
images.
+```angular2html
+REPOSITORY                                           TAG                   
IMAGE ID       CREATED          SIZE
+localhost:5001/hudi/hudi-flink                       latest                
87b936181d74   32 minutes ago   1.08GB
+```
+
+## Start Minio server
+
+Create Minio server in Kubernetes.
+
+```shell
+kubectl apply -f config/k8s/minio-standalone.yaml
+```
+
+This will create a pod in Kubernetes that runs the Minio server.
+You can verify this by running
+```shell
+kubectl port-forward svc/minio-svc 9090
+```
+and then opening the Minio UI in your browser by hitting on 
`http://localhost:9090`.
+
+### Prepare the testing bucket
+
+Use the default credentials: `minioadmin` and `minioadmin` to log into the 
console.
+
+From the UI, manually create a bucket named `test`.
+
+### Destroy or replace Minio server
+
+```shell
+kubectl delete deployment minio-deploy
+kubectl replace -f config/k8s/minio-standalone.yaml
+```
+
+## Run the Flink Job
+
+We can now submit the Flink job to the Flink cluster running in Kubernetes.
+```shell
+kubectl apply -f config/k8s/flink-deployment.yaml
+```
+
+This will create a pod in Kubernetes that runs the Flink job.
+You can verify this by running
+```shell
+kubectl port-forward svc/basic-example-rest 8081
+```
+and then opening the Flink UI in your browser by hitting on 
`http://localhost:8081`.
+
+### Destroy or replace Flink app
+
+```shell
+kubectl delete deployment basic-example
+kubectl replace deployment basic-example
+```
\ No newline at end of file
diff --git a/hudi-examples/hudi-examples-k8s/config/hadoop/core-site.xml 
b/hudi-examples/hudi-examples-k8s/config/hadoop/core-site.xml
new file mode 100644
index 00000000000..90e3ef5bb18
--- /dev/null
+++ b/hudi-examples/hudi-examples-k8s/config/hadoop/core-site.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<configuration>
+
+    <property>
+        <name>fs.s3a.impl</name>
+        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
+    </property>
+
+    <!-- Comma separated list of local directories used to buffer
+         large results prior to transmitting them to S3. -->
+    <property>
+        <name>fs.s3a.buffer.dir</name>
+        <value>/tmp</value>
+    </property>
+
+    <property>
+        <name>fs.s3a.endpoint</name>
+        <value>http://minio-svc:9000</value>
+    </property>
+
+    <property>
+        <name>fs.s3a.path.style.access</name>
+        <value>true</value>
+    </property>
+
+    <property>
+        <name>fs.s3a.access.key</name>
+        <description>AWS access key ID.
+            Omit for IAM role-based or provider-based 
authentication.</description>
+        <value>minioadmin</value>
+    </property>
+
+    <property>
+        <name>fs.s3a.secret.key</name>
+        <description>AWS secret key.
+            Omit for IAM role-based or provider-based 
authentication.</description>
+        <value>minioadmin</value>
+    </property>
+
+    <property>
+        <name>fs.s3a.aws.credentials.provider</name>
+        <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
+    </property>
+
+</configuration>
\ No newline at end of file
diff --git a/hudi-examples/hudi-examples-k8s/config/k8s/flink-deployment.yml 
b/hudi-examples/hudi-examples-k8s/config/k8s/flink-deployment.yml
new file mode 100644
index 00000000000..733d2d04d7c
--- /dev/null
+++ b/hudi-examples/hudi-examples-k8s/config/k8s/flink-deployment.yml
@@ -0,0 +1,59 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1beta1
+kind: FlinkDeployment
+metadata:
+  name: basic-example
+spec:
+  image: localhost:5001/hudi/hudi-flink:latest
+  flinkVersion: v1_18
+  flinkConfiguration:
+    taskmanager.numberOfTaskSlots: "1"
+    execution.checkpointing.checkpoints-after-tasks-finish.enabled: "true"
+    state.backend.type: filesystem
+    state.savepoints.dir: s3a://test/savepoints/
+    state.checkpoints.dir: s3a://test/checkpoints/
+    s3.access-key: minioadmin
+    s3.secret-key: minioadmin
+    s3.endpoint: http://minio-svc:9000
+    s3.path.style.access: "true"
+    s3a.access-key: minioadmin
+    s3a.secret-key: minioadmin
+    s3a.endpoint: http://minio-svc:9000
+    s3a.path.style.access: "true"
+  serviceAccount: flink
+  jobManager:
+    resource:
+      memory: "1g"
+      cpu: 1
+    podTemplate:
+      spec:
+        containers:
+          - name: flink-main-container
+            env:
+              - name: TARGET_S3_PATH
+                value: 's3a://test/hudi'
+  taskManager:
+    resource:
+      memory: "1g"
+      cpu: 1
+  job:
+    jarURI: local:///opt/hudi/examples/streaming/hudi-examples-k8s.jar
+    parallelism: 2
+    upgradeMode: stateless
\ No newline at end of file
diff --git a/hudi-examples/hudi-examples-k8s/config/k8s/minio-standalone.yaml 
b/hudi-examples/hudi-examples-k8s/config/k8s/minio-standalone.yaml
new file mode 100644
index 00000000000..d04ec46f307
--- /dev/null
+++ b/hudi-examples/hudi-examples-k8s/config/k8s/minio-standalone.yaml
@@ -0,0 +1,83 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  creationTimestamp: null
+  labels:
+    app: minio
+  name: minio-deploy
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: minio
+  strategy: {}
+  template:
+    metadata:
+      creationTimestamp: null
+      labels:
+        app: minio
+    spec:
+      volumes:
+        - name: hostvolume
+          hostPath:
+            path: /tmp/minikubedata
+            type: DirectoryOrCreate
+      initContainers:
+        - name: prepare
+          image: busybox:1.28
+          command: ['sh', '-c', 'mkdir -p /data/minio/ && chown 9999 
/data/minio/' ]
+          volumeMounts:
+            - mountPath: /data
+              name: hostvolume
+      containers:
+        - name: minio
+          image: quay.io/minio/minio:RELEASE.2024-01-13T07-53-03Z
+          command:
+            - /bin/bash
+            - -c
+          args:
+            - minio server /data/minio --address :9000 --console-address :9090
+          volumeMounts:
+            - mountPath: /data
+              name: hostvolume
+status: {}
+
+---
+apiVersion: v1
+kind: Service
+metadata:
+  labels:
+    app: minio
+  name: minio-svc
+spec:
+  ports:
+    - name: webconsole
+      port: 9090
+      protocol: TCP
+      targetPort: 9090
+    - name: api
+      port: 9000
+      protocol: TCP
+      targetPort: 9000
+  selector:
+    app: minio
+status:
+  loadBalancer: {}
\ No newline at end of file
diff --git a/hudi-examples/hudi-examples-k8s/pom.xml 
b/hudi-examples/hudi-examples-k8s/pom.xml
new file mode 100644
index 00000000000..82870562f92
--- /dev/null
+++ b/hudi-examples/hudi-examples-k8s/pom.xml
@@ -0,0 +1,200 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>hudi-examples</artifactId>
+        <groupId>org.apache.hudi</groupId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>hudi-examples-k8s</artifactId>
+
+    <properties>
+        <main.basedir>${project.parent.basedir}</main.basedir>
+        <checkstyle.skip>true</checkstyle.skip>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${maven-shade-plugin.version}</version>
+                <executions>
+                    <!-- Run shade goal on package phase -->
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <artifactSet>
+                                <includes>
+                                    <include>*:*</include>
+                                </includes>
+                                <excludes>
+                                    
<exclude>org.apache.hbase.thirdparty:*</exclude>
+                                    
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
+                                    
<exclude>com.google.code.findbugs:jsr305</exclude>
+                                    <exclude>org.slf4j:*</exclude>
+                                    
<exclude>org.apache.logging.log4j:*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <!-- Do not copy the signatures in the 
META-INF folder.
+                                    Otherwise, this might cause 
SecurityExceptions when using the JAR. -->
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+\                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    
<mainClass>org.apache.hudi.examples.k8s.quickstart.HudiDataStreamWriter</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jacoco</groupId>
+                <artifactId>jacoco-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+            </plugin>
+        </plugins>
+
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+    </build>
+
+    <dependencies>
+        <!-- Hoodie -->
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-flink${flink.bundle.version}-bundle</artifactId>
+            <scope>compile</scope>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Flink -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>${flink.streaming.java.artifactId}</artifactId>
+            <scope>provided</scope>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>${flink.clients.artifactId}</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.esotericsoftware.kryo</groupId>
+                    <artifactId>kryo</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.esotericsoftware.minlog</groupId>
+                    <artifactId>minlog</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>${flink.table.runtime.artifactId}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Hadoop -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <scope>runtime</scope>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>runtime</scope>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-auth</artifactId>
+            <scope>runtime</scope>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <scope>runtime</scope>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-aws</artifactId>
+            <scope>runtime</scope>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-tests-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/HudiDataStreamWriter.java
 
b/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/HudiDataStreamWriter.java
new file mode 100644
index 00000000000..e79db5337c5
--- /dev/null
+++ 
b/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/HudiDataStreamWriter.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.k8s.quickstart;
+
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.examples.k8s.quickstart.utils.DataGenerator;
+import org.apache.hudi.util.HoodiePipeline;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This Flink program serves as a demonstration of inserting, updating, and 
deleting records in a Hudi table using the DataStream API.
+ * The program inserts four messages for ten batches. Two of the messages 
generate a random UUID, acting as new insert records, while
+ * two records reuse the same record keys, resulting in an update for those 
two records in each batch.
+ * In the first batch, four new records are inserted into a newly created Hudi 
table.
+ * Subsequently, after each batch, two new records are inserted, leading to an 
increment in the count by two with each batch.
+ * In the 11th batch, we illustrate the delete operation by publishing a 
record with the delete row kind. As a result,
+ * we observe the deletion of the third ID after this batch.
+ *
+ * After this code finishes you should see total 21 records in hudi table.
+ */
+public class HudiDataStreamWriter {
+
+     public static DataType ROW_DATA_TYPE = DataTypes.ROW(
+            DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field
+            DataTypes.FIELD("uuid", DataTypes.VARCHAR(40)),// record key
+            DataTypes.FIELD("rider", DataTypes.VARCHAR(20)),
+            DataTypes.FIELD("driver", DataTypes.VARCHAR(20)),
+            DataTypes.FIELD("fare", DataTypes.DOUBLE()),
+            DataTypes.FIELD("city", DataTypes.VARCHAR(20))).notNull();
+
+    /**
+     * Main Entry point takes two parameters.
+     * It can be run with Flink cli.
+     * Sample Command - bin/flink run -c 
com.hudi.flink.quickstart.HudiDataStreamWriter 
${HUDI_FLINK_QUICKSTART_REPO}/target/hudi-examples-0.1.jar hudi_table 
"file:///tmp/hudi_table"
+     *
+     * @param args
+     * @throws Exception
+     */
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // Enable checkpointing
+        configureCheckpointing(env);
+
+        DataStreamSource<RowData> dataStream = env.addSource(new 
SampleDataSource());
+
+        final String targetS3Path = System.getenv("TARGET_S3_PATH");
+        HoodiePipeline.Builder builder = createHudiPipeline("hudi_table", 
createHudiOptions(targetS3Path));
+        builder.sink(dataStream, false);
+
+        env.execute("Api_Sink");
+    }
+
+    /**
+     * Configure Flink checkpointing settings.
+     *
+     * @param env The Flink StreamExecutionEnvironment.
+     */
+    private static void configureCheckpointing(StreamExecutionEnvironment env) 
{
+        env.enableCheckpointing(5000); // Checkpoint every 5 seconds
+        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
+        checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+        checkpointConfig.setMinPauseBetweenCheckpoints(10000); // Minimum time 
between checkpoints
+        checkpointConfig.setCheckpointTimeout(60000); // Checkpoint timeout in 
milliseconds
+    }
+
+    /**
+     * Create Hudi options for the data sink.
+     *
+     * @param basePath The base path for Hudi data.
+     * @return A Map containing Hudi options.
+     */
+    private static Map<String, String> createHudiOptions(String basePath) {
+        Map<String, String> options = new HashMap<>();
+        options.put(FlinkOptions.PATH.key(), basePath);
+        
options.put(HoodieCommonConfig.HOODIE_FS_ATOMIC_CREATION_SUPPORT.key(), "s3a");
+        options.put(FlinkOptions.TABLE_TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name());
+        options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
+        options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+        options.put(FlinkOptions.IGNORE_FAILED.key(), "true");
+        return options;
+    }
+
+    /**
+     * Create a HudiPipeline.Builder with the specified target table and 
options.
+     *
+     * @param targetTable The name of the Hudi table.
+     * @param options     The Hudi options for the data sink.
+     * @return A HudiPipeline.Builder.
+     */
+    private static HoodiePipeline.Builder createHudiPipeline(String 
targetTable, Map<String, String> options) {
+        return HoodiePipeline.builder(targetTable)
+                .column("ts TIMESTAMP(3)")
+                .column("uuid VARCHAR(40)")
+                .column("rider VARCHAR(20)")
+                .column("driver VARCHAR(20)")
+                .column("fare DOUBLE")
+                .column("city VARCHAR(20)")
+                .pk("uuid")
+                .partition("city")
+                .options(options);
+    }
+
+    /**
+     * Sample data source for generating RowData objects.
+     */
+    static class SampleDataSource implements SourceFunction<RowData> {
+        private volatile boolean isRunning = true;
+
+        @Override
+        public void run(SourceContext<RowData> ctx) throws Exception {
+            int batchNum = 0;
+            while (isRunning) {
+                batchNum ++;
+                List<RowData> DATA_SET_INSERT = 
DataGenerator.generateRandomRowData(ROW_DATA_TYPE);
+                if(batchNum < 11) {
+                    // For first 10 batches, inserting 4 records. 2 with 
random id (INSERTS) and 2 with hardcoded UUID(UPDATE)
+                    for (RowData row : DATA_SET_INSERT) {
+                        ctx.collect(row);
+                    }
+                }else{
+                    // For 11th Batch, inserting only one record with row kind 
delete.
+                    RowData rowToBeDeleted = DATA_SET_INSERT.get(2);
+                    rowToBeDeleted.setRowKind(RowKind.DELETE);
+                    ctx.collect(rowToBeDeleted);
+                    TimeUnit.MILLISECONDS.sleep(10000);
+                    // Stop the stream once deleted
+                    isRunning = false;
+                }
+                TimeUnit.MILLISECONDS.sleep(10000); // Simulate a delay
+            }
+        }
+
+        @Override
+        public void cancel() {
+            isRunning = false;
+        }
+    }
+}
diff --git 
a/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/utils/DataGenerator.java
 
b/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/utils/DataGenerator.java
new file mode 100644
index 00000000000..97bbeb9bbe5
--- /dev/null
+++ 
b/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/utils/DataGenerator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.examples.k8s.quickstart.utils;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.data.writer.BinaryWriter;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Util class for testing data generation.
+ */
+public class DataGenerator {
+
+    public static List<RowData> generateRandomRowData(DataType dataType) {
+
+        // For Every Batch, it adds two new rows with RANDOM uuid and updates 
the row with uuid
+        // "334e26e9-8355-45cc-97c6-c31daf0df330"  and  
"7fd3fd07-cf04-4a1d-9511-142736932983"
+        return Arrays.asList(
+                DataGenerator.createRowData(dataType, 
TimestampData.fromEpochMillis(System.currentTimeMillis()),
+                        StringData.fromString(UUID.randomUUID().toString()), 
StringData.fromString("rider-A"),
+                        StringData.fromString("driver-K"), 1.0 + Math.random() 
* (90), StringData.fromString("san_francisco")),
+                
DataGenerator.createRowData(dataType,TimestampData.fromEpochMillis(System.currentTimeMillis()),
+                        StringData.fromString(UUID.randomUUID().toString()), 
StringData.fromString("rider-B"),
+                        StringData.fromString("driver-M"), 1.0 + Math.random() 
* (90), StringData.fromString("brazil")),
+                
DataGenerator.createRowData(dataType,TimestampData.fromEpochMillis(System.currentTimeMillis()),
+                        
StringData.fromString("334e26e9-8355-45cc-97c6-c31daf0df330"), 
StringData.fromString("rider-C"),
+                        StringData.fromString("driver-L"), 15.4, 
StringData.fromString("chennai")),
+                
DataGenerator.createRowData(dataType,TimestampData.fromEpochMillis(System.currentTimeMillis()),
+                        
StringData.fromString("7fd3fd07-cf04-4a1d-9511-142736932983"), 
StringData.fromString("rider-D"),
+                        StringData.fromString("driver-N"), 1.0 + Math.random() 
* (90), StringData.fromString("london"))
+        );
+    }
+
+    public static BinaryRowData createRowData(DataType dataType, Object... 
fields) {
+        RowType ROW_TYPE = (RowType) dataType.getLogicalType();
+        LogicalType[] types = 
ROW_TYPE.getFields().stream().map(RowType.RowField::getType)
+                .toArray(LogicalType[]::new);
+        BinaryRowData row = new BinaryRowData(fields.length);
+        BinaryRowWriter writer = new BinaryRowWriter(row);
+        writer.reset();
+        for (int i = 0; i < fields.length; i++) {
+            Object field = fields[i];
+            if (field == null) {
+                writer.setNullAt(i);
+            } else {
+                BinaryWriter.write(writer, i, field, types[i], 
InternalSerializers.create(types[i]));
+            }
+        }
+        writer.complete();
+        return row;
+    }
+}
diff --git a/hudi-examples/pom.xml b/hudi-examples/pom.xml
index 6e890f5c34d..94f17f005fe 100644
--- a/hudi-examples/pom.xml
+++ b/hudi-examples/pom.xml
@@ -32,6 +32,7 @@
     <module>hudi-examples-spark</module>
     <module>hudi-examples-flink</module>
     <module>hudi-examples-java</module>
+    <module>hudi-examples-k8s</module>
   </modules>
 
 </project>


Reply via email to