This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 36a9113aca2 [HUDI-7929] Create k8s example for flink hudi integration (#11570) 36a9113aca2 is described below commit 36a9113aca280aed206dcfbdd317ccfb6b6a6382 Author: Peter Huang <huangzhenqiu0...@gmail.com> AuthorDate: Sun Jul 7 21:16:34 2024 -0700 [HUDI-7929] Create k8s example for flink hudi integration (#11570) --- hudi-examples/hudi-examples-k8s/Dockerfile | 31 ++++ hudi-examples/hudi-examples-k8s/README.md | 113 ++++++++++++ .../hudi-examples-k8s/config/hadoop/core-site.xml | 62 +++++++ .../config/k8s/flink-deployment.yml | 59 ++++++ .../config/k8s/minio-standalone.yaml | 83 +++++++++ hudi-examples/hudi-examples-k8s/pom.xml | 200 +++++++++++++++++++++ .../k8s/quickstart/HudiDataStreamWriter.java | 170 ++++++++++++++++++ .../k8s/quickstart/utils/DataGenerator.java | 79 ++++++++ hudi-examples/pom.xml | 1 + 9 files changed, 798 insertions(+) diff --git a/hudi-examples/hudi-examples-k8s/Dockerfile b/hudi-examples/hudi-examples-k8s/Dockerfile new file mode 100644 index 00000000000..a3a7e7889f8 --- /dev/null +++ b/hudi-examples/hudi-examples-k8s/Dockerfile @@ -0,0 +1,31 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +FROM flink:1.18 + +RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop +RUN mv -v /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/flink-s3-fs-hadoop +RUN chown -R flink:flink /opt/flink/plugins/flink-s3-fs-hadoop + +ENV FLINK_HADOOP_CONF /etc/hadoop/conf +RUN mkdir -p $FLINK_HADOOP_CONF +COPY config/hadoop/core-site.xml $FLINK_HADOOP_CONF +ENV HADOOP_CLASSPATH=$FLINK_HADOOP_CONF +WORKDIR /opt/hudi/examples +COPY target/hudi-examples-k8s-*-SNAPSHOT.jar streaming/hudi-examples-k8s.jar \ No newline at end of file diff --git a/hudi-examples/hudi-examples-k8s/README.md b/hudi-examples/hudi-examples-k8s/README.md new file mode 100644 index 00000000000..96371b6d3eb --- /dev/null +++ b/hudi-examples/hudi-examples-k8s/README.md @@ -0,0 +1,113 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +# Quick Start + +## Prerequisites + +Install the helm chart for Flink on Kubernetes. See [Flink on Kubernetes](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/) for more details. + +Make sure local Docker has been assigned at least 8 cores and 8G memory. + +## Setting up a docker registry + +- Point docker-cli to Docker running in minikube + ```sh + eval $(minikube -p minikube docker-env) + ``` + +- Start local Docker registry in minikube Docker (the `docker` command is talking to minikube Docker) + ```sh + docker run -d -p 5001:5000 --restart=always --name registry registry:2 + ``` + We tunnel port `5001` to `5000`, you can choose any available port. + +## Building the Hudi Flink Docker Image + +We need to mount a local path on the host for all containers in minikube to work with. + +```shell +mkdir /tmp/minikubedata +minikube mount /tmp/minikubedata:/data & +``` + +We need a custom docker image that extends the flink base image and adds the hudi flink example jar to it. +You can build this docker image by running the following command: +```shell +docker build -t localhost:5001/hudi/hudi-flink . +``` + +This should have pushed the docker image to the docker registry. +You can verify this by running +```shell +docker images +``` + +This should show you the docker image `hudi-flink:latest` in the list of images. +```angular2html +REPOSITORY TAG IMAGE ID CREATED SIZE +localhost:5001/hudi/hudi-flink latest 87b936181d74 32 minutes ago 1.08GB +``` + +## Start Minio server + +Create Minio server in Kubernetes. + +```shell +kubectl apply -f config/k8s/minio-standalone.yaml +``` + +This will create a pod in Kubernetes that runs the Minio server. +You can verify this by running +```shell +kubectl port-forward svc/minio-svc 9090 +``` +and then opening the Minio UI in your browser by hitting on `http://localhost:9090`. + +### Prepare the testing bucket + +Use the default credentials: `minioadmin` and `minioadmin` to log into the console. + +From the UI, manually create a bucket named `test`. + +### Destroy or replace Minio server + +```shell +kubectl delete deployment minio-deploy +kubectl replace -f config/k8s/minio-standalone.yaml +``` + +## Run the Flink Job + +We can now submit the Flink job to the Flink cluster running in Kubernetes. +```shell +kubectl apply -f config/k8s/flink-deployment.yaml +``` + +This will create a pod in Kubernetes that runs the Flink job. +You can verify this by running +```shell +kubectl port-forward svc/basic-example-rest 8081 +``` +and then opening the Flink UI in your browser by hitting on `http://localhost:8081`. + +### Destroy or replace Flink app + +```shell +kubectl delete deployment basic-example +kubectl replace deployment basic-example +``` \ No newline at end of file diff --git a/hudi-examples/hudi-examples-k8s/config/hadoop/core-site.xml b/hudi-examples/hudi-examples-k8s/config/hadoop/core-site.xml new file mode 100644 index 00000000000..90e3ef5bb18 --- /dev/null +++ b/hudi-examples/hudi-examples-k8s/config/hadoop/core-site.xml @@ -0,0 +1,62 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration> + + <property> + <name>fs.s3a.impl</name> + <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> + </property> + + <!-- Comma separated list of local directories used to buffer + large results prior to transmitting them to S3. --> + <property> + <name>fs.s3a.buffer.dir</name> + <value>/tmp</value> + </property> + + <property> + <name>fs.s3a.endpoint</name> + <value>http://minio-svc:9000</value> + </property> + + <property> + <name>fs.s3a.path.style.access</name> + <value>true</value> + </property> + + <property> + <name>fs.s3a.access.key</name> + <description>AWS access key ID. + Omit for IAM role-based or provider-based authentication.</description> + <value>minioadmin</value> + </property> + + <property> + <name>fs.s3a.secret.key</name> + <description>AWS secret key. + Omit for IAM role-based or provider-based authentication.</description> + <value>minioadmin</value> + </property> + + <property> + <name>fs.s3a.aws.credentials.provider</name> + <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value> + </property> + +</configuration> \ No newline at end of file diff --git a/hudi-examples/hudi-examples-k8s/config/k8s/flink-deployment.yml b/hudi-examples/hudi-examples-k8s/config/k8s/flink-deployment.yml new file mode 100644 index 00000000000..733d2d04d7c --- /dev/null +++ b/hudi-examples/hudi-examples-k8s/config/k8s/flink-deployment.yml @@ -0,0 +1,59 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: basic-example +spec: + image: localhost:5001/hudi/hudi-flink:latest + flinkVersion: v1_18 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + execution.checkpointing.checkpoints-after-tasks-finish.enabled: "true" + state.backend.type: filesystem + state.savepoints.dir: s3a://test/savepoints/ + state.checkpoints.dir: s3a://test/checkpoints/ + s3.access-key: minioadmin + s3.secret-key: minioadmin + s3.endpoint: http://minio-svc:9000 + s3.path.style.access: "true" + s3a.access-key: minioadmin + s3a.secret-key: minioadmin + s3a.endpoint: http://minio-svc:9000 + s3a.path.style.access: "true" + serviceAccount: flink + jobManager: + resource: + memory: "1g" + cpu: 1 + podTemplate: + spec: + containers: + - name: flink-main-container + env: + - name: TARGET_S3_PATH + value: 's3a://test/hudi' + taskManager: + resource: + memory: "1g" + cpu: 1 + job: + jarURI: local:///opt/hudi/examples/streaming/hudi-examples-k8s.jar + parallelism: 2 + upgradeMode: stateless \ No newline at end of file diff --git a/hudi-examples/hudi-examples-k8s/config/k8s/minio-standalone.yaml b/hudi-examples/hudi-examples-k8s/config/k8s/minio-standalone.yaml new file mode 100644 index 00000000000..d04ec46f307 --- /dev/null +++ b/hudi-examples/hudi-examples-k8s/config/k8s/minio-standalone.yaml @@ -0,0 +1,83 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: apps/v1 +kind: Deployment +metadata: + creationTimestamp: null + labels: + app: minio + name: minio-deploy +spec: + replicas: 1 + selector: + matchLabels: + app: minio + strategy: {} + template: + metadata: + creationTimestamp: null + labels: + app: minio + spec: + volumes: + - name: hostvolume + hostPath: + path: /tmp/minikubedata + type: DirectoryOrCreate + initContainers: + - name: prepare + image: busybox:1.28 + command: ['sh', '-c', 'mkdir -p /data/minio/ && chown 9999 /data/minio/' ] + volumeMounts: + - mountPath: /data + name: hostvolume + containers: + - name: minio + image: quay.io/minio/minio:RELEASE.2024-01-13T07-53-03Z + command: + - /bin/bash + - -c + args: + - minio server /data/minio --address :9000 --console-address :9090 + volumeMounts: + - mountPath: /data + name: hostvolume +status: {} + +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: minio + name: minio-svc +spec: + ports: + - name: webconsole + port: 9090 + protocol: TCP + targetPort: 9090 + - name: api + port: 9000 + protocol: TCP + targetPort: 9000 + selector: + app: minio +status: + loadBalancer: {} \ No newline at end of file diff --git a/hudi-examples/hudi-examples-k8s/pom.xml b/hudi-examples/hudi-examples-k8s/pom.xml new file mode 100644 index 00000000000..82870562f92 --- /dev/null +++ b/hudi-examples/hudi-examples-k8s/pom.xml @@ -0,0 +1,200 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>hudi-examples</artifactId> + <groupId>org.apache.hudi</groupId> + <version>1.0.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>hudi-examples-k8s</artifactId> + + <properties> + <main.basedir>${project.parent.basedir}</main.basedir> + <checkstyle.skip>true</checkstyle.skip> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>${maven-shade-plugin.version}</version> + <executions> + <!-- Run shade goal on package phase --> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <includes> + <include>*:*</include> + </includes> + <excludes> + <exclude>org.apache.hbase.thirdparty:*</exclude> + <exclude>org.apache.flink:flink-shaded-force-shading</exclude> + <exclude>com.google.code.findbugs:jsr305</exclude> + <exclude>org.slf4j:*</exclude> + <exclude>org.apache.logging.log4j:*</exclude> + </excludes> + </artifactSet> + <filters> + <filter> + <!-- Do not copy the signatures in the META-INF folder. + Otherwise, this might cause SecurityExceptions when using the JAR. --> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> +\ </excludes> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.hudi.examples.k8s.quickstart.HudiDataStreamWriter</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + </plugin> + </plugins> + + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + </build> + + <dependencies> + <!-- Hoodie --> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-flink${flink.bundle.version}-bundle</artifactId> + <scope>compile</scope> + <version>${project.version}</version> + </dependency> + + <!-- Flink --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>${flink.streaming.java.artifactId}</artifactId> + <scope>provided</scope> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>${flink.clients.artifactId}</artifactId> + <exclusions> + <exclusion> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + </exclusion> + <exclusion> + <groupId>com.esotericsoftware.minlog</groupId> + <artifactId>minlog</artifactId> + </exclusion> + </exclusions> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>${flink.table.runtime.artifactId}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Hadoop --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>runtime</scope> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>runtime</scope> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <scope>runtime</scope> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>runtime</scope> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + <scope>runtime</scope> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-tests-common</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/HudiDataStreamWriter.java b/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/HudiDataStreamWriter.java new file mode 100644 index 00000000000..e79db5337c5 --- /dev/null +++ b/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/HudiDataStreamWriter.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.k8s.quickstart; + +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.examples.k8s.quickstart.utils.DataGenerator; +import org.apache.hudi.util.HoodiePipeline; +import org.apache.hudi.common.config.HoodieCommonConfig; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * This Flink program serves as a demonstration of inserting, updating, and deleting records in a Hudi table using the DataStream API. + * The program inserts four messages for ten batches. Two of the messages generate a random UUID, acting as new insert records, while + * two records reuse the same record keys, resulting in an update for those two records in each batch. + * In the first batch, four new records are inserted into a newly created Hudi table. + * Subsequently, after each batch, two new records are inserted, leading to an increment in the count by two with each batch. + * In the 11th batch, we illustrate the delete operation by publishing a record with the delete row kind. As a result, + * we observe the deletion of the third ID after this batch. + * + * After this code finishes you should see total 21 records in hudi table. + */ +public class HudiDataStreamWriter { + + public static DataType ROW_DATA_TYPE = DataTypes.ROW( + DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), // precombine field + DataTypes.FIELD("uuid", DataTypes.VARCHAR(40)),// record key + DataTypes.FIELD("rider", DataTypes.VARCHAR(20)), + DataTypes.FIELD("driver", DataTypes.VARCHAR(20)), + DataTypes.FIELD("fare", DataTypes.DOUBLE()), + DataTypes.FIELD("city", DataTypes.VARCHAR(20))).notNull(); + + /** + * Main Entry point takes two parameters. + * It can be run with Flink cli. + * Sample Command - bin/flink run -c com.hudi.flink.quickstart.HudiDataStreamWriter ${HUDI_FLINK_QUICKSTART_REPO}/target/hudi-examples-0.1.jar hudi_table "file:///tmp/hudi_table" + * + * @param args + * @throws Exception + */ + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // Enable checkpointing + configureCheckpointing(env); + + DataStreamSource<RowData> dataStream = env.addSource(new SampleDataSource()); + + final String targetS3Path = System.getenv("TARGET_S3_PATH"); + HoodiePipeline.Builder builder = createHudiPipeline("hudi_table", createHudiOptions(targetS3Path)); + builder.sink(dataStream, false); + + env.execute("Api_Sink"); + } + + /** + * Configure Flink checkpointing settings. + * + * @param env The Flink StreamExecutionEnvironment. + */ + private static void configureCheckpointing(StreamExecutionEnvironment env) { + env.enableCheckpointing(5000); // Checkpoint every 5 seconds + CheckpointConfig checkpointConfig = env.getCheckpointConfig(); + checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + checkpointConfig.setMinPauseBetweenCheckpoints(10000); // Minimum time between checkpoints + checkpointConfig.setCheckpointTimeout(60000); // Checkpoint timeout in milliseconds + } + + /** + * Create Hudi options for the data sink. + * + * @param basePath The base path for Hudi data. + * @return A Map containing Hudi options. + */ + private static Map<String, String> createHudiOptions(String basePath) { + Map<String, String> options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), basePath); + options.put(HoodieCommonConfig.HOODIE_FS_ATOMIC_CREATION_SUPPORT.key(), "s3a"); + options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); + options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts"); + options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid"); + options.put(FlinkOptions.IGNORE_FAILED.key(), "true"); + return options; + } + + /** + * Create a HudiPipeline.Builder with the specified target table and options. + * + * @param targetTable The name of the Hudi table. + * @param options The Hudi options for the data sink. + * @return A HudiPipeline.Builder. + */ + private static HoodiePipeline.Builder createHudiPipeline(String targetTable, Map<String, String> options) { + return HoodiePipeline.builder(targetTable) + .column("ts TIMESTAMP(3)") + .column("uuid VARCHAR(40)") + .column("rider VARCHAR(20)") + .column("driver VARCHAR(20)") + .column("fare DOUBLE") + .column("city VARCHAR(20)") + .pk("uuid") + .partition("city") + .options(options); + } + + /** + * Sample data source for generating RowData objects. + */ + static class SampleDataSource implements SourceFunction<RowData> { + private volatile boolean isRunning = true; + + @Override + public void run(SourceContext<RowData> ctx) throws Exception { + int batchNum = 0; + while (isRunning) { + batchNum ++; + List<RowData> DATA_SET_INSERT = DataGenerator.generateRandomRowData(ROW_DATA_TYPE); + if(batchNum < 11) { + // For first 10 batches, inserting 4 records. 2 with random id (INSERTS) and 2 with hardcoded UUID(UPDATE) + for (RowData row : DATA_SET_INSERT) { + ctx.collect(row); + } + }else{ + // For 11th Batch, inserting only one record with row kind delete. + RowData rowToBeDeleted = DATA_SET_INSERT.get(2); + rowToBeDeleted.setRowKind(RowKind.DELETE); + ctx.collect(rowToBeDeleted); + TimeUnit.MILLISECONDS.sleep(10000); + // Stop the stream once deleted + isRunning = false; + } + TimeUnit.MILLISECONDS.sleep(10000); // Simulate a delay + } + } + + @Override + public void cancel() { + isRunning = false; + } + } +} diff --git a/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/utils/DataGenerator.java b/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/utils/DataGenerator.java new file mode 100644 index 00000000000..97bbeb9bbe5 --- /dev/null +++ b/hudi-examples/hudi-examples-k8s/src/main/java/org/apache/hudi/examples/k8s/quickstart/utils/DataGenerator.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.examples.k8s.quickstart.utils; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.data.writer.BinaryRowWriter; +import org.apache.flink.table.data.writer.BinaryWriter; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +/** + * Util class for testing data generation. + */ +public class DataGenerator { + + public static List<RowData> generateRandomRowData(DataType dataType) { + + // For Every Batch, it adds two new rows with RANDOM uuid and updates the row with uuid + // "334e26e9-8355-45cc-97c6-c31daf0df330" and "7fd3fd07-cf04-4a1d-9511-142736932983" + return Arrays.asList( + DataGenerator.createRowData(dataType, TimestampData.fromEpochMillis(System.currentTimeMillis()), + StringData.fromString(UUID.randomUUID().toString()), StringData.fromString("rider-A"), + StringData.fromString("driver-K"), 1.0 + Math.random() * (90), StringData.fromString("san_francisco")), + DataGenerator.createRowData(dataType,TimestampData.fromEpochMillis(System.currentTimeMillis()), + StringData.fromString(UUID.randomUUID().toString()), StringData.fromString("rider-B"), + StringData.fromString("driver-M"), 1.0 + Math.random() * (90), StringData.fromString("brazil")), + DataGenerator.createRowData(dataType,TimestampData.fromEpochMillis(System.currentTimeMillis()), + StringData.fromString("334e26e9-8355-45cc-97c6-c31daf0df330"), StringData.fromString("rider-C"), + StringData.fromString("driver-L"), 15.4, StringData.fromString("chennai")), + DataGenerator.createRowData(dataType,TimestampData.fromEpochMillis(System.currentTimeMillis()), + StringData.fromString("7fd3fd07-cf04-4a1d-9511-142736932983"), StringData.fromString("rider-D"), + StringData.fromString("driver-N"), 1.0 + Math.random() * (90), StringData.fromString("london")) + ); + } + + public static BinaryRowData createRowData(DataType dataType, Object... fields) { + RowType ROW_TYPE = (RowType) dataType.getLogicalType(); + LogicalType[] types = ROW_TYPE.getFields().stream().map(RowType.RowField::getType) + .toArray(LogicalType[]::new); + BinaryRowData row = new BinaryRowData(fields.length); + BinaryRowWriter writer = new BinaryRowWriter(row); + writer.reset(); + for (int i = 0; i < fields.length; i++) { + Object field = fields[i]; + if (field == null) { + writer.setNullAt(i); + } else { + BinaryWriter.write(writer, i, field, types[i], InternalSerializers.create(types[i])); + } + } + writer.complete(); + return row; + } +} diff --git a/hudi-examples/pom.xml b/hudi-examples/pom.xml index 6e890f5c34d..94f17f005fe 100644 --- a/hudi-examples/pom.xml +++ b/hudi-examples/pom.xml @@ -32,6 +32,7 @@ <module>hudi-examples-spark</module> <module>hudi-examples-flink</module> <module>hudi-examples-java</module> + <module>hudi-examples-k8s</module> </modules> </project>