This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git
The following commit(s) were added to refs/heads/master by this push: new ffd3496 [FLINK-13863] Update playgrounds to Flink 1.9.0 ffd3496 is described below commit ffd349616ac91528090ed8dff0faf369666ce163 Author: Fabian Hueske <fhue...@apache.org> AuthorDate: Mon Aug 26 16:18:07 2019 +0200 [FLINK-13863] Update playgrounds to Flink 1.9.0 * Update Operations Playground (example job, dockerfile, docker-compose.yaml) * Update README.md --- README.md | 2 +- docker/ops-playground-image/Dockerfile | 2 +- .../java/flink-playground-clickcountjob/pom.xml | 4 ++-- .../ops/clickcount/ClickEventCount.java | 7 ++++--- .../ops/clickcount/ClickEventGenerator.java | 10 ++++----- .../records/ClickEventSerializationSchema.java | 23 ++++++++++++++------- .../ClickEventStatisticsSerializationSchema.java | 24 +++++++++++++++++----- operations-playground/docker-compose.yaml | 8 ++++---- 8 files changed, 52 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index c9881a3..2beaf5d 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Currently, the following playgrounds are available: * The **Flink Operations Playground** in the (`operations-playground` folder) let's you explore and play with Flink's features to manage and operate stream processing jobs. You can witness how Flink recovers a job from a failure, upgrade and rescale a job, and query job metrics. The playground consists of a Flink cluster, a Kafka cluster and an example Flink job. The playground is presented in detail in the -["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.8/getting-started/docker-playgrounds/flink-operations-playground.html) of Flink's documentation. +["Getting Started" guide](https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/docker-playgrounds/flink-operations-playground.html) of Flink's documentation. * The interactive SQL playground is still under development and will be added shortly. diff --git a/docker/ops-playground-image/Dockerfile b/docker/ops-playground-image/Dockerfile index 8b64428..59b40a0 100644 --- a/docker/ops-playground-image/Dockerfile +++ b/docker/ops-playground-image/Dockerfile @@ -32,7 +32,7 @@ RUN mvn clean install # Build Operations Playground Image ############################################################################### -FROM flink:1.8.1-scala_2.11 +FROM flink:1.9.0-scala_2.11 WORKDIR /opt/flink/bin diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml index f1f9b89..3d17fcd 100644 --- a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml @@ -22,7 +22,7 @@ under the License. <groupId>org.apache.flink</groupId> <artifactId>flink-playground-clickcountjob</artifactId> - <version>1-FLINK-1.8_2.11</version> + <version>1-FLINK-1.9_2.11</version> <name>flink-playground-clickcountjob</name> <packaging>jar</packaging> @@ -44,7 +44,7 @@ under the License. <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <flink.version>1.8.1</flink.version> + <flink.version>1.9.0</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${java.version}</maven.compiler.source> diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java index 9f609e9..0316bc6 100644 --- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java @@ -86,10 +86,11 @@ public class ClickEventCount { .aggregate(new CountingAggregator(), new ClickEventStatisticsCollector()) .name("ClickEvent Counter") - .addSink(new FlinkKafkaProducer<ClickEventStatistics>( + .addSink(new FlinkKafkaProducer<>( outputTopic, - new ClickEventStatisticsSerializationSchema(), - kafkaProps)) + new ClickEventStatisticsSerializationSchema(outputTopic), + kafkaProps, + FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)) .name("ClickEventStatistics Sink"); env.execute("Click Event Count"); diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java index 6a5c394..a789d83 100644 --- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java @@ -18,11 +18,10 @@ package org.apache.flink.playgrounds.ops.clickcount; -import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.java.utils.ParameterTool; - import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent; import org.apache.flink.playgrounds.ops.clickcount.records.ClickEventSerializationSchema; + import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -67,12 +66,13 @@ public class ClickEventGenerator { KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProps); ClickIterator clickIterator = new ClickIterator(); - SerializationSchema<ClickEvent> clickSerializer = new ClickEventSerializationSchema(); while (true) { - byte[] message = clickSerializer.serialize(clickIterator.next()); - ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, message); + ProducerRecord<byte[], byte[]> record = new ClickEventSerializationSchema(topic).serialize( + clickIterator.next(), + null); + producer.send(record); Thread.sleep(DELAY); diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java index fab05d1..eb64a87 100644 --- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java @@ -17,28 +17,37 @@ package org.apache.flink.playgrounds.ops.clickcount.records; +import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; -import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.producer.ProducerRecord; + +import javax.annotation.Nullable; + /** - * A Kafka {@link SerializationSchema} to serialize {@link ClickEvent}s as JSON. + * A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEvent}s as JSON. * */ -public class ClickEventSerializationSchema implements SerializationSchema<ClickEvent> { +public class ClickEventSerializationSchema implements KafkaSerializationSchema<ClickEvent> { private static final ObjectMapper objectMapper = new ObjectMapper(); + private String topic; + + public ClickEventSerializationSchema(){ + } - public ClickEventSerializationSchema() { - super(); + public ClickEventSerializationSchema(String topic) { + this.topic = topic; } @Override - public byte[] serialize(ClickEvent message) { + public ProducerRecord<byte[], byte[]> serialize( + final ClickEvent message, @Nullable final Long timestamp) { try { //if topic is null, default topic will be used - return objectMapper.writeValueAsBytes(message); + return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message)); } catch (JsonProcessingException e) { throw new IllegalArgumentException("Could not serialize record: " + message, e); } diff --git a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java index 40a0dbd..b24807e 100644 --- a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java +++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java @@ -18,23 +18,37 @@ package org.apache.flink.playgrounds.ops.clickcount.records; -import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; + import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.producer.ProducerRecord; + +import javax.annotation.Nullable; + /** - * A Kafka {@link SerializationSchema} to serialize {@link ClickEventStatistics}s as JSON. + * A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEventStatistics}s as JSON. * */ -public class ClickEventStatisticsSerializationSchema implements SerializationSchema<ClickEventStatistics> { +public class ClickEventStatisticsSerializationSchema implements KafkaSerializationSchema<ClickEventStatistics> { private static final ObjectMapper objectMapper = new ObjectMapper(); + private String topic; + + public ClickEventStatisticsSerializationSchema(){ + } + + public ClickEventStatisticsSerializationSchema(String topic) { + this.topic = topic; + } @Override - public byte[] serialize(ClickEventStatistics message) { + public ProducerRecord<byte[], byte[]> serialize( + final ClickEventStatistics message, @Nullable final Long timestamp) { try { //if topic is null, default topic will be used - return objectMapper.writeValueAsBytes(message); + return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message)); } catch (JsonProcessingException e) { throw new IllegalArgumentException("Could not serialize record: " + message, e); } diff --git a/operations-playground/docker-compose.yaml b/operations-playground/docker-compose.yaml index d498070..9ed71c5 100644 --- a/operations-playground/docker-compose.yaml +++ b/operations-playground/docker-compose.yaml @@ -20,7 +20,7 @@ version: "2.1" services: client: build: ../docker/ops-playground-image - image: apache/flink-ops-playground:1-FLINK-1.8-scala_2.11 + image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11 command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers kafka:9092 --checkpointing --event-time" depends_on: - jobmanager @@ -30,12 +30,12 @@ services: environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager clickevent-generator: - image: apache/flink-ops-playground:1-FLINK-1.8-scala_2.11 + image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11 command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator --bootstrap.servers kafka:9092 --topic input" depends_on: - kafka jobmanager: - image: flink:1.8-scala_2.11 + image: flink:1.9-scala_2.11 command: "jobmanager.sh start-foreground" ports: - 8081:8081 @@ -46,7 +46,7 @@ services: environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager: - image: flink:1.8-scala_2.11 + image: flink:1.9-scala_2.11 depends_on: - jobmanager command: "taskmanager.sh start-foreground"