This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git
commit 626ca07bf71f0b1340df25677ae1ec0969719a9c Author: Fabian Hueske <fhue...@apache.org> AuthorDate: Mon Aug 12 18:48:23 2019 +0200 [FLINK-12749] Add Dockerfile for Operations Playground image * Rename Cluster Playground to Operations Playground * Add Java code for Click Count demo job * Add Dockerfile to build the custom image for the operations playground * Add README files --- .gitignore | 3 + README.md | 21 +- .../ops-playground-image/Dockerfile | 32 ++- docker/ops-playground-image/README.md | 5 + .../java/flink-playground-clickcountjob/LICENSE | 201 ++++++++++++++++++ .../java/flink-playground-clickcountjob/pom.xml | 230 +++++++++++++++++++++ .../ops/clickcount/ClickEventCount.java | 116 +++++++++++ .../ops/clickcount/ClickEventGenerator.java | 122 +++++++++++ .../functions/ClickEventStatisticsCollector.java | 47 +++++ .../clickcount/functions/CountingAggregator.java | 47 +++++ .../ops/clickcount/records/ClickEvent.java | 85 ++++++++ .../records/ClickEventDeserializationSchema.java | 51 +++++ .../records/ClickEventSerializationSchema.java | 46 +++++ .../clickcount/records/ClickEventStatistics.java | 116 +++++++++++ .../ClickEventStatisticsSerializationSchema.java | 42 ++++ .../src/main/resources/log4j.properties | 15 +- howto-update-playgrounds.md | 42 ++++ operations-playground/README.md | 50 +++++ .../conf/flink-conf.yaml | 0 .../conf/log4j-cli.properties | 0 .../conf/log4j-console.properties | 0 .../docker-compose.yaml | 13 +- 22 files changed, 1247 insertions(+), 37 deletions(-) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d4e4d76 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*/.idea +*/target +*/dependency-reduced-pom.xml diff --git a/README.md b/README.md index 8fe2903..c9881a3 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,23 @@ # Apache Flink Playgrounds -Apache Flink is an open source stream processing framework with powerful stream- and batch- -processing capabilities. +This repository provides playgrounds to quickly and easily explore [Apache Flink](https://flink.apache.org)'s features. -Learn more about Flink at [http://flink.apache.org/](http://flink.apache.org/) +The playgrounds are based on [docker-compose](https://docs.docker.com/compose/) environments. +Each subfolder of this repository contains the docker-compose setup of a playground, except for the `./docker` folder which contains code and configuration to build custom Docker images for the playgrounds. -## Playgrounds +## Available Playgrounds -This repository contains the configuration files for two Apache Flink playgrounds. +Currently, the following playgrounds are available: -* The [Flink Cluster Playground](../master/flink-cluster-playground) consists of a Flink Session Cluster, a Kafka Cluster and a simple -Flink Job. It is explained in detail as part of -[Apache Flink's "Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/docker-playgrounds/flink-cluster-playground.html). +* The **Flink Operations Playground** in the (`operations-playground` folder) let's you explore and play with Flink's features to manage and operate stream processing jobs. You can witness how Flink recovers a job from a failure, upgrade and rescale a job, and query job metrics. The playground consists of a Flink cluster, a Kafka cluster and an example +Flink job. The playground is presented in detail in the +["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.8/getting-started/docker-playgrounds/flink-operations-playground.html) of Flink's documentation. * The interactive SQL playground is still under development and will be added shortly. ## About -Apache Flink is an open source project of The Apache Software Foundation (ASF). \ No newline at end of file +Apache Flink is an open source project of The Apache Software Foundation (ASF). + +Flink is distributed data processing framework with powerful stream and batch processing capabilities. +Learn more about Flink at [http://flink.apache.org/](https://flink.apache.org/) diff --git a/flink-cluster-playground/conf/flink-conf.yaml b/docker/ops-playground-image/Dockerfile similarity index 52% copy from flink-cluster-playground/conf/flink-conf.yaml copy to docker/ops-playground-image/Dockerfile index 5c8d0e6..8b64428 100644 --- a/flink-cluster-playground/conf/flink-conf.yaml +++ b/docker/ops-playground-image/Dockerfile @@ -1,4 +1,4 @@ -################################################################################ +############################################################################### # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -14,17 +14,27 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -################################################################################ +############################################################################### -jobmanager.rpc.address: jobmanager -blob.server.port: 6124 -query.server.port: 6125 +############################################################################### +# Build Click Count Job +############################################################################### -taskmanager.numberOfTaskSlots: 2 +FROM maven:3.6-jdk-8-slim AS builder -state.backend: filesystem -state.checkpoints.dir: file:///tmp/flink-checkpoints-directory -state.savepoints.dir: file:///tmp/flink-savepoints-directory +# Get Click Count job and compile it +COPY ./java/flink-playground-clickcountjob /opt/flink-playground-clickcountjob +WORKDIR /opt/flink-playground-clickcountjob +RUN mvn clean install -heartbeat.interval: 1000 -heartbeat.timeout: 5000 + +############################################################################### +# Build Operations Playground Image +############################################################################### + +FROM flink:1.8.1-scala_2.11 + +WORKDIR /opt/flink/bin + +# Copy Click Count Job +COPY --from=builder /opt/flink-playground-clickcountjob/target/flink-playground-clickcountjob-*.jar /opt/ClickCountJob.jar diff --git a/docker/ops-playground-image/README.md b/docker/ops-playground-image/README.md new file mode 100644 index 0000000..a22382d --- /dev/null +++ b/docker/ops-playground-image/README.md @@ -0,0 +1,5 @@ +# Flink Operations Playground Image + +The image defined by the `Dockerfile` in this folder is required by the Flink operations playground. + +The image is based on the official Flink image and adds a demo Flink job (Click Event Count) and a corresponding data generator. The code of the application is located in the `./java/flink-ops-playground` folder. \ No newline at end of file diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/LICENSE b/docker/ops-playground-image/java/flink-playground-clickcountjob/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml new file mode 100644 index 0000000..f1f9b89 --- /dev/null +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml @@ -0,0 +1,230 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.flink</groupId> + <artifactId>flink-playground-clickcountjob</artifactId> + <version>1-FLINK-1.8_2.11</version> + + <name>flink-playground-clickcountjob</name> + <packaging>jar</packaging> + <url>https://flink.apache.org</url> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url> + <distribution>repo</distribution> + </license> + </licenses> + + <scm> + <url>https://github.com/apache/flink-playgrounds</url> + <connection>g...@github.com:apache/flink-playgrounds.git</connection> + <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/flink-playgrounds.git</developerConnection> + </scm> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <flink.version>1.8.1</flink.version> + <java.version>1.8</java.version> + <scala.binary.version>2.11</scala.binary.version> + <maven.compiler.source>${java.version}</maven.compiler.source> + <maven.compiler.target>${java.version}</maven.compiler.target> + </properties> + + <dependencies> + <!-- Apache Flink dependencies --> + <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <!-- Connector dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + + <!-- Logging dependencies --> + <!-- These dependencies are excluded from the application JAR by default. --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.7</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> + <scope>runtime</scope> + </dependency> + </dependencies> + + <build> + <plugins> + + <!-- Java Compiler --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.1</version> + <configuration> + <source>${java.version}</source> + <target>${java.version}</target> + </configuration> + </plugin> + + <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> + <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <!-- Run shade goal on package phase --> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <artifactSet> + <excludes> + <exclude>org.apache.flink:force-shading</exclude> + <exclude>com.google.code.findbugs:jsr305</exclude> + <exclude>org.slf4j:*</exclude> + <exclude>log4j:*</exclude> + </excludes> + </artifactSet> + <filters> + <filter> + <!-- Do not copy the signatures in the META-INF folder. + Otherwise, this might cause SecurityExceptions when using the JAR. --> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.playgrounds.ops.clickcount.ClickEventCount</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + + <pluginManagement> + <plugins> + + <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --> + <plugin> + <groupId>org.eclipse.m2e</groupId> + <artifactId>lifecycle-mapping</artifactId> + <version>1.0.0</version> + <configuration> + <lifecycleMappingMetadata> + <pluginExecutions> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <versionRange>[3.0.0,)</versionRange> + <goals> + <goal>shade</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore/> + </action> + </pluginExecution> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <versionRange>[3.1,)</versionRange> + <goals> + <goal>testCompile</goal> + <goal>compile</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore/> + </action> + </pluginExecution> + </pluginExecutions> + </lifecycleMappingMetadata> + </configuration> + </plugin> + </plugins> + </pluginManagement> + </build> + + <!-- This profile helps to make things run out of the box in IntelliJ --> + <!-- Its adds Flink's core classes to the runtime class path. --> + <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' --> + <profiles> + <profile> + <id>add-dependencies-for-IDEA</id> + + <activation> + <property> + <name>idea.version</name> + </property> + </activation> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <scope>compile</scope> + </dependency> + </dependencies> + </profile> + </profiles> + + +</project> diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java new file mode 100644 index 0000000..9f609e9 --- /dev/null +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.playgrounds.ops.clickcount; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.playgrounds.ops.clickcount.functions.ClickEventStatisticsCollector; +import org.apache.flink.playgrounds.ops.clickcount.functions.CountingAggregator; +import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent; +import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventDeserializationSchema; +import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics; +import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatisticsSerializationSchema; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; + +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +/** + * A simple streaming job reading {@link ClickEvent}s from Kafka, counting events per 15 seconds and + * writing the resulting {@link ClickEventStatistics} back to Kafka. + * + * <p> It can be run with or without checkpointing and with event time or processing time semantics. + * </p> + * + * <p>The Job can be configured via the command line:</p> + * * "--checkpointing": enables checkpointing + * * "--event-time": set the StreamTimeCharacteristic to EventTime + * * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from + * * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to + * * "--bootstrap.servers": comma-separated list of Kafka brokers + * + */ +public class ClickEventCount { + + public static final String CHECKPOINTING_OPTION = "checkpointing"; + public static final String EVENT_TIME_OPTION = "event-time"; + + public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS); + + public static void main(String[] args) throws Exception { + final ParameterTool params = ParameterTool.fromArgs(args); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + configureEnvironment(params, env); + + String inputTopic = params.get("input-topic", "input"); + String outputTopic = params.get("output-topic", "output"); + String brokers = params.get("bootstrap.servers", "localhost:9092"); + Properties kafkaProps = new Properties(); + kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count"); + + env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps)) + .name("ClickEvent Source") + .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.of(200, TimeUnit.MILLISECONDS)) { + @Override + public long extractTimestamp(final ClickEvent element) { + return element.getTimestamp().getTime(); + } + }) + .keyBy(ClickEvent::getPage) + .timeWindow(WINDOW_SIZE) + .aggregate(new CountingAggregator(), + new ClickEventStatisticsCollector()) + .name("ClickEvent Counter") + .addSink(new FlinkKafkaProducer<ClickEventStatistics>( + outputTopic, + new ClickEventStatisticsSerializationSchema(), + kafkaProps)) + .name("ClickEventStatistics Sink"); + + env.execute("Click Event Count"); + } + + private static void configureEnvironment( + final ParameterTool params, + final StreamExecutionEnvironment env) { + + boolean checkpointingEnabled = params.has(CHECKPOINTING_OPTION); + boolean eventTimeSemantics = params.has(EVENT_TIME_OPTION); + + if (checkpointingEnabled) { + env.enableCheckpointing(1000); + } + + if (eventTimeSemantics) { + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + } + + //disabling Operator chaining to make it easier to follow the Job in the WebUI + env.disableOperatorChaining(); + } +} diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java new file mode 100644 index 0000000..6a5c394 --- /dev/null +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.playgrounds.ops.clickcount; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.java.utils.ParameterTool; + +import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent; +import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventSerializationSchema; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.playgrounds.ops.clickcount.ClickEventCount.WINDOW_SIZE; + +/** + * A generator which pushes {@link ClickEvent}s into a Kafka Topic configured via `--topic` and + * `--bootstrap.servers`. + * + * <p> The generator creates the same number of {@link ClickEvent}s for all pages. The delay between + * events is chosen such that processing time and event time roughly align. The generator always + * creates the same sequence of events. </p> + * + */ +public class ClickEventGenerator { + + public static final int EVENTS_PER_WINDOW = 1000; + + private static final List<String> pages = Arrays.asList("/help", "/index", "/shop", "/jobs", "/about", "/news"); + + //this calculation is only accurate as long as pages.size() * EVENTS_PER_WINDOW divides the + //window size + public static final long DELAY = WINDOW_SIZE.toMilliseconds() / pages.size() / EVENTS_PER_WINDOW; + + public static void main(String[] args) throws Exception { + + final ParameterTool params = ParameterTool.fromArgs(args); + + String topic = params.get("topic", "input"); + + Properties kafkaProps = createKafkaProperties(params); + + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProps); + + ClickIterator clickIterator = new ClickIterator(); + SerializationSchema<ClickEvent> clickSerializer = new ClickEventSerializationSchema(); + + while (true) { + + byte[] message = clickSerializer.serialize(clickIterator.next()); + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, message); + producer.send(record); + + Thread.sleep(DELAY); + } + } + + private static Properties createKafkaProperties(final ParameterTool params) { + String brokers = params.get("bootstrap.servers", "localhost:9092"); + Properties kafkaProps = new Properties(); + kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); + return kafkaProps; + } + + static class ClickIterator { + + private Map<String, Long> nextTimestampPerKey; + private int nextPageIndex; + + ClickIterator() { + nextTimestampPerKey = new HashMap<>(); + nextPageIndex = 0; + } + + ClickEvent next() { + String page = nextPage(); + return new ClickEvent(nextTimestamp(page), page); + } + + private Date nextTimestamp(String page) { + long nextTimestamp = nextTimestampPerKey.getOrDefault(page, 0L); + nextTimestampPerKey.put(page, nextTimestamp + WINDOW_SIZE.toMilliseconds() / EVENTS_PER_WINDOW); + return new Date(nextTimestamp); + } + + private String nextPage() { + String nextPage = pages.get(nextPageIndex); + if (nextPageIndex == pages.size() - 1) { + nextPageIndex = 0; + } else { + nextPageIndex++; + } + return nextPage; + } + } +} diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/ClickEventStatisticsCollector.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/ClickEventStatisticsCollector.java new file mode 100644 index 0000000..cb6af36 --- /dev/null +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/ClickEventStatisticsCollector.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.playgrounds.ops.clickcount.functions; + +import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent; +import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventStatistics; +import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.Collector; + +import java.util.Date; + +/** + * A simple {@link ProcessWindowFunction}, which wraps a count of {@link ClickEvent}s into an + * instance of {@link ClickEventStatistics}. + * + **/ +public class ClickEventStatisticsCollector + extends ProcessWindowFunction<Long, ClickEventStatistics, String, TimeWindow> { + + @Override + public void process( + final String page, + final Context context, + final Iterable<Long> elements, + final Collector<ClickEventStatistics> out) throws Exception { + + Long count = elements.iterator().next(); + + out.collect(new ClickEventStatistics(new Date(context.window().getStart()), new Date(context.window().getEnd()), page, count)); + } +} diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/CountingAggregator.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/CountingAggregator.java new file mode 100644 index 0000000..5ebc381 --- /dev/null +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/functions/CountingAggregator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.playgrounds.ops.clickcount.functions; + +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent; + +/** + * An {@link AggregateFunction} which simply counts {@link ClickEvent}s. + * + */ +public class CountingAggregator implements AggregateFunction<ClickEvent, Long, Long> { + @Override + public Long createAccumulator() { + return 0L; + } + + @Override + public Long add(final ClickEvent value, final Long accumulator) { + return accumulator + 1; + } + + @Override + public Long getResult(final Long accumulator) { + return accumulator; + } + + @Override + public Long merge(final Long a, final Long b) { + return a + b; + } +} diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEvent.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEvent.java new file mode 100644 index 0000000..3c87a1f --- /dev/null +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEvent.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.playgrounds.ops.clickcount.records; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonFormat; + +import java.util.Date; +import java.util.Objects; + +/** + * A simple event recording a click on a {@link ClickEvent#page} at time {@link ClickEvent#timestamp}. + * + */ +public class ClickEvent { + + //using java.util.Date for better readability + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS") + private Date timestamp; + private String page; + + public ClickEvent() { + } + + public ClickEvent(final Date timestamp, final String page) { + this.timestamp = timestamp; + this.page = page; + } + + public Date getTimestamp() { + return timestamp; + } + + public void setTimestamp(final Date timestamp) { + this.timestamp = timestamp; + } + + public String getPage() { + return page; + } + + public void setPage(final String page) { + this.page = page; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ClickEvent that = (ClickEvent) o; + return Objects.equals(timestamp, that.timestamp) && Objects.equals(page, that.page); + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, page); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("ClickEvent{"); + sb.append("timestamp=").append(timestamp); + sb.append(", page='").append(page).append('\''); + sb.append('}'); + return sb.toString(); + } +} diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java new file mode 100644 index 0000000..bb47df5 --- /dev/null +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.playgrounds.ops.clickcount.records; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; + +/** + * A Kafka {@link DeserializationSchema} to deserialize {@link ClickEvent}s from JSON. + * + */ +public class ClickEventDeserializationSchema implements DeserializationSchema<ClickEvent> { + + private static final long serialVersionUID = 1L; + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public ClickEvent deserialize(byte[] message) throws IOException { + return objectMapper.readValue(message, ClickEvent.class); + } + + @Override + public boolean isEndOfStream(ClickEvent nextElement) { + return false; + } + + @Override + public TypeInformation<ClickEvent> getProducedType() { + return TypeInformation.of(ClickEvent.class); + } +} diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java new file mode 100644 index 0000000..fab05d1 --- /dev/null +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.playgrounds.ops.clickcount.records; + + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +/** + * A Kafka {@link SerializationSchema} to serialize {@link ClickEvent}s as JSON. + * + */ +public class ClickEventSerializationSchema implements SerializationSchema<ClickEvent> { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + public ClickEventSerializationSchema() { + super(); + } + + @Override + public byte[] serialize(ClickEvent message) { + try { + //if topic is null, default topic will be used + return objectMapper.writeValueAsBytes(message); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Could not serialize record: " + message, e); + } + } +} diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatistics.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatistics.java new file mode 100644 index 0000000..de2d754 --- /dev/null +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatistics.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.playgrounds.ops.clickcount.records; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonFormat; + +import java.util.Date; +import java.util.Objects; + +/** + * A small wrapper class for windowed page counts. + * + */ +public class ClickEventStatistics { + + //using java.util.Date for better readability + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS") + private Date windowStart; + //using java.util.Date for better readability + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS") + private Date windowEnd; + private String page; + private long count; + + public ClickEventStatistics() { + } + + public ClickEventStatistics( + final Date windowStart, + final Date windowEnd, + final String page, + final long count) { + this.windowStart = windowStart; + this.windowEnd = windowEnd; + this.page = page; + this.count = count; + } + + public Date getWindowStart() { + return windowStart; + } + + public void setWindowStart(final Date windowStart) { + this.windowStart = windowStart; + } + + public Date getWindowEnd() { + return windowEnd; + } + + public void setWindowEnd(final Date windowEnd) { + this.windowEnd = windowEnd; + } + + public String getPage() { + return page; + } + + public void setPage(final String page) { + this.page = page; + } + + public long getCount() { + return count; + } + + public void setCount(final long count) { + this.count = count; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ClickEventStatistics that = (ClickEventStatistics) o; + return count == that.count && + Objects.equals(windowStart, that.windowStart) && + Objects.equals(windowEnd, that.windowEnd) && + Objects.equals(page, that.page); + } + + @Override + public int hashCode() { + return Objects.hash(windowStart, windowEnd, page, count); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("ClickEventStatistics{"); + sb.append("windowStart=").append(windowStart); + sb.append(", windowEnd=").append(windowEnd); + sb.append(", page='").append(page).append('\''); + sb.append(", count=").append(count); + sb.append('}'); + return sb.toString(); + } +} diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java new file mode 100644 index 0000000..40a0dbd --- /dev/null +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.playgrounds.ops.clickcount.records; + + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +/** + * A Kafka {@link SerializationSchema} to serialize {@link ClickEventStatistics}s as JSON. + * + */ +public class ClickEventStatisticsSerializationSchema implements SerializationSchema<ClickEventStatistics> { + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public byte[] serialize(ClickEventStatistics message) { + try { + //if topic is null, default topic will be used + return objectMapper.writeValueAsBytes(message); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Could not serialize record: " + message, e); + } + } +} diff --git a/flink-cluster-playground/conf/flink-conf.yaml b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/resources/log4j.properties similarity index 75% copy from flink-cluster-playground/conf/flink-conf.yaml copy to docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/resources/log4j.properties index 5c8d0e6..da32ea0 100644 --- a/flink-cluster-playground/conf/flink-conf.yaml +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/resources/log4j.properties @@ -16,15 +16,8 @@ # limitations under the License. ################################################################################ -jobmanager.rpc.address: jobmanager -blob.server.port: 6124 -query.server.port: 6125 +log4j.rootLogger=INFO, console -taskmanager.numberOfTaskSlots: 2 - -state.backend: filesystem -state.checkpoints.dir: file:///tmp/flink-checkpoints-directory -state.savepoints.dir: file:///tmp/flink-savepoints-directory - -heartbeat.interval: 1000 -heartbeat.timeout: 5000 +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/howto-update-playgrounds.md b/howto-update-playgrounds.md new file mode 100644 index 0000000..2885b62 --- /dev/null +++ b/howto-update-playgrounds.md @@ -0,0 +1,42 @@ + +# Versioning + +When updating the playgrounds we have to deal with three versions that need to be adjusted. + +Externally defined versions: + +* *Flink version*: Version of Apache Flink +* *Flink Docker image version*: Version of the official Flink Docker image on [Docker Hub](https://hub.docker.com/_/flink) + +Internally defined version: + +* *Playground Version*: This version is used for the Java artifacts and Docker images. It follows the scheme of `<Playground-Version>-Flink-<Minor-Flink-Version>`. For example `2-Flink-1.9` denotes Version 2 of the playground for Flink 1.9.x. + +# Updating the playgrounds + +## Update playgrounds due to a new minor (or major) Flink release + +First of all, check that a Flink Docker image was published on [Docker Hub](https://hub.docker.com/_/flink) for the new Flink version. + +Update all playgrounds as follows: + +1. All `pom.xml`: + * Update the versions of all Flink dependencies + * Update the Maven artifact version to the new playground version (`1-Flink-1.9` if 1.9.0 is the new Flink release). + * Check that all Maven projects build. +2. All `Dockerfile`: + * Update version of the base image to the new Flink Docker image version. +3. `docker-compose.yaml`: + * Update the version of the Flink containers to the new Flink docker image version. + * Update the version of the custom Docker images to the new playground version. + +## Update a playground due to bug in a custom Docker image + +Whenever, you need to update a Docker image, please increment the playground version in the following places. + +* the artifact version of all Maven projects in all `pom.xml` files. +* the tag (name and version) of all custom images in all `docker-compose.yaml` files. + +## Update a playground without updating a custom Docker image + +Just update the `docker-compose.yaml` file. diff --git a/operations-playground/README.md b/operations-playground/README.md new file mode 100644 index 0000000..5287be7 --- /dev/null +++ b/operations-playground/README.md @@ -0,0 +1,50 @@ +# Flink Operations Playground + +The Flink operations playground let's you explore and play with [Apache Flink](https://flink.apache.org)'s features to manage and operate stream processing jobs, including + +* Observing automatic failure recovery of an application +* Upgrading and rescaling an application +* Querying the runtime metrics of an application + +It is based on a [docker-compose](https://docs.docker.com/compose/) environment and super easy to setup. + +## Setup + +The operations playground requires a custom Docker image in addition to public images for Flink, Kafka, and ZooKeeper. + +The `docker-compose.yaml` file of the operations playground is located in the `operations-playground` directory. Assuming you are at the root directory of the [`flink-playgrounds`](https://github.com/apache/flink-playgrounds) repository, change to the `operations-playground` folder by running + +```bash +cd operations-playground +``` + +### Building the custom Docker image + +Build the Docker image by running + +```bash +docker-compose build +``` + +### Starting the Playground + +Once you built the Docker image, run the following command to start the playground + +```bash +docker-compose up -d +``` + +You can check if the playground was successfully started, if you can access the WebUI of the Flink cluster at [http://localhost:8081](http://localhost:8081). + +### Stopping the Playground + +To stop the playground, run the following command + +```bash +docker-compose down +``` + +## Further instructions + +The playground setup and more detailed instructions are presented in the +["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.8/getting-started/docker-playgrounds/flink-operations-playground.html) of Flink's documentation. diff --git a/flink-cluster-playground/conf/flink-conf.yaml b/operations-playground/conf/flink-conf.yaml similarity index 100% rename from flink-cluster-playground/conf/flink-conf.yaml rename to operations-playground/conf/flink-conf.yaml diff --git a/flink-cluster-playground/conf/log4j-cli.properties b/operations-playground/conf/log4j-cli.properties similarity index 100% rename from flink-cluster-playground/conf/log4j-cli.properties rename to operations-playground/conf/log4j-cli.properties diff --git a/flink-cluster-playground/conf/log4j-console.properties b/operations-playground/conf/log4j-console.properties similarity index 100% rename from flink-cluster-playground/conf/log4j-console.properties rename to operations-playground/conf/log4j-console.properties diff --git a/flink-cluster-playground/docker-compose.yaml b/operations-playground/docker-compose.yaml similarity index 81% rename from flink-cluster-playground/docker-compose.yaml rename to operations-playground/docker-compose.yaml index 7842762..d498070 100644 --- a/flink-cluster-playground/docker-compose.yaml +++ b/operations-playground/docker-compose.yaml @@ -19,8 +19,9 @@ version: "2.1" services: client: - image: flink:1.9-scala_2.11 - command: "flink run -d -p 2 /opt/flink/examples/streaming/ClickEventCount.jar --bootstrap.servers kafka:9092 --checkpointing --event-time" + build: ../docker/ops-playground-image + image: apache/flink-ops-playground:1-FLINK-1.8-scala_2.11 + command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time" depends_on: - jobmanager - kafka @@ -29,12 +30,12 @@ services: environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager clickevent-generator: - image: flink:1.9-scala_2.11 - command: "java -classpath /opt/flink/examples/streaming/ClickEventCount.jar:/opt/flink/lib/* org.apache.flink.streaming.examples.windowing.clickeventcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input" + image: apache/flink-ops-playground:1-FLINK-1.8-scala_2.11 + command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input" depends_on: - kafka jobmanager: - image: flink:1.9-scala_2.11 + image: flink:1.8-scala_2.11 command: "jobmanager.sh start-foreground" ports: - 8081:8081 @@ -45,7 +46,7 @@ services: environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager: - image: flink:1.9-scala_2.11 + image: flink:1.8-scala_2.11 depends_on: - jobmanager command: "taskmanager.sh start-foreground"