This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-playgrounds.git


The following commit(s) were added to refs/heads/master by this push:
     new ffd3496  [FLINK-13863] Update playgrounds to Flink 1.9.0
ffd3496 is described below

commit ffd349616ac91528090ed8dff0faf369666ce163
Author: Fabian Hueske <fhue...@apache.org>
AuthorDate: Mon Aug 26 16:18:07 2019 +0200

    [FLINK-13863] Update playgrounds to Flink 1.9.0
    
    * Update Operations Playground (example job, dockerfile, 
docker-compose.yaml)
    * Update README.md
---
 README.md                                          |  2 +-
 docker/ops-playground-image/Dockerfile             |  2 +-
 .../java/flink-playground-clickcountjob/pom.xml    |  4 ++--
 .../ops/clickcount/ClickEventCount.java            |  7 ++++---
 .../ops/clickcount/ClickEventGenerator.java        | 10 ++++-----
 .../records/ClickEventSerializationSchema.java     | 23 ++++++++++++++-------
 .../ClickEventStatisticsSerializationSchema.java   | 24 +++++++++++++++++-----
 operations-playground/docker-compose.yaml          |  8 ++++----
 8 files changed, 52 insertions(+), 28 deletions(-)

diff --git a/README.md b/README.md
index c9881a3..2beaf5d 100644
--- a/README.md
+++ b/README.md
@@ -11,7 +11,7 @@ Currently, the following playgrounds are available:
 
 * The **Flink Operations Playground** in the (`operations-playground` folder) 
let's you explore and play with Flink's features to manage and operate stream 
processing jobs. You can witness how Flink recovers a job from a failure, 
upgrade and rescale a job, and query job metrics. The playground consists of a 
Flink cluster, a Kafka cluster and an example 
 Flink job. The playground is presented in detail in the
-["Getting Started" 
guide](https://ci.apache.org/projects/flink/flink-docs-release-1.8/getting-started/docker-playgrounds/flink-operations-playground.html)
 of Flink's documentation.
+["Getting Started" 
guide](https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/docker-playgrounds/flink-operations-playground.html)
 of Flink's documentation.
 
 * The interactive SQL playground is still under development and will be added 
shortly.
 
diff --git a/docker/ops-playground-image/Dockerfile 
b/docker/ops-playground-image/Dockerfile
index 8b64428..59b40a0 100644
--- a/docker/ops-playground-image/Dockerfile
+++ b/docker/ops-playground-image/Dockerfile
@@ -32,7 +32,7 @@ RUN mvn clean install
 # Build Operations Playground Image
 ###############################################################################
 
-FROM flink:1.8.1-scala_2.11
+FROM flink:1.9.0-scala_2.11
 
 WORKDIR /opt/flink/bin
 
diff --git 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
index f1f9b89..3d17fcd 100644
--- a/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
+++ b/docker/ops-playground-image/java/flink-playground-clickcountjob/pom.xml
@@ -22,7 +22,7 @@ under the License.
 
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-playground-clickcountjob</artifactId>
-       <version>1-FLINK-1.8_2.11</version>
+       <version>1-FLINK-1.9_2.11</version>
 
        <name>flink-playground-clickcountjob</name>
        <packaging>jar</packaging>
@@ -44,7 +44,7 @@ under the License.
 
     <properties>
                
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-               <flink.version>1.8.1</flink.version>
+               <flink.version>1.9.0</flink.version>
                <java.version>1.8</java.version>
                <scala.binary.version>2.11</scala.binary.version>
                <maven.compiler.source>${java.version}</maven.compiler.source>
diff --git 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
index 9f609e9..0316bc6 100644
--- 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
+++ 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventCount.java
@@ -86,10 +86,11 @@ public class ClickEventCount {
                        .aggregate(new CountingAggregator(),
                                new ClickEventStatisticsCollector())
                        .name("ClickEvent Counter")
-                       .addSink(new FlinkKafkaProducer<ClickEventStatistics>(
+                       .addSink(new FlinkKafkaProducer<>(
                                outputTopic,
-                               new ClickEventStatisticsSerializationSchema(),
-                               kafkaProps))
+                               new 
ClickEventStatisticsSerializationSchema(outputTopic),
+                               kafkaProps,
+                               FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))
                        .name("ClickEventStatistics Sink");
 
                env.execute("Click Event Count");
diff --git 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java
 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java
index 6a5c394..a789d83 100644
--- 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java
+++ 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/ClickEventGenerator.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.playgrounds.ops.clickcount;
 
-import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.java.utils.ParameterTool;
-
 import org.apache.flink.playgrounds.ops.clickcount.records.ClickEvent;
 import 
org.apache.flink.playgrounds.ops.clickcount.records.ClickEventSerializationSchema;
+
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -67,12 +66,13 @@ public class ClickEventGenerator {
                KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(kafkaProps);
 
                ClickIterator clickIterator = new ClickIterator();
-               SerializationSchema<ClickEvent> clickSerializer = new 
ClickEventSerializationSchema();
 
                while (true) {
 
-                       byte[] message = 
clickSerializer.serialize(clickIterator.next());
-                       ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(topic, message);
+                       ProducerRecord<byte[], byte[]> record = new 
ClickEventSerializationSchema(topic).serialize(
+                                       clickIterator.next(),
+                                       null);
+
                        producer.send(record);
 
                        Thread.sleep(DELAY);
diff --git 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java
 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java
index fab05d1..eb64a87 100644
--- 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java
+++ 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventSerializationSchema.java
@@ -17,28 +17,37 @@
 
 package org.apache.flink.playgrounds.ops.clickcount.records;
 
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
 
-import org.apache.flink.api.common.serialization.SerializationSchema;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
 /**
- * A Kafka {@link SerializationSchema} to serialize {@link ClickEvent}s as 
JSON.
+ * A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEvent}s 
as JSON.
  *
  */
-public class ClickEventSerializationSchema implements 
SerializationSchema<ClickEvent> {
+public class ClickEventSerializationSchema implements 
KafkaSerializationSchema<ClickEvent> {
 
        private static final ObjectMapper objectMapper = new ObjectMapper();
+       private String topic;
+
+       public ClickEventSerializationSchema(){
+       }
 
-       public ClickEventSerializationSchema() {
-               super();
+       public ClickEventSerializationSchema(String topic) {
+               this.topic = topic;
        }
 
        @Override
-       public byte[] serialize(ClickEvent message) {
+       public ProducerRecord<byte[], byte[]> serialize(
+                       final ClickEvent message, @Nullable final Long 
timestamp) {
                try {
                        //if topic is null, default topic will be used
-                       return objectMapper.writeValueAsBytes(message);
+                       return new ProducerRecord<>(topic, 
objectMapper.writeValueAsBytes(message));
                } catch (JsonProcessingException e) {
                        throw new IllegalArgumentException("Could not serialize 
record: " + message, e);
                }
diff --git 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
index 40a0dbd..b24807e 100644
--- 
a/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
+++ 
b/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventStatisticsSerializationSchema.java
@@ -18,23 +18,37 @@
 package org.apache.flink.playgrounds.ops.clickcount.records;
 
 
-import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
 /**
- * A Kafka {@link SerializationSchema} to serialize {@link 
ClickEventStatistics}s as JSON.
+ * A Kafka {@link KafkaSerializationSchema} to serialize {@link 
ClickEventStatistics}s as JSON.
  *
  */
-public class ClickEventStatisticsSerializationSchema implements 
SerializationSchema<ClickEventStatistics> {
+public class ClickEventStatisticsSerializationSchema implements 
KafkaSerializationSchema<ClickEventStatistics> {
 
        private static final ObjectMapper objectMapper = new ObjectMapper();
+       private String topic;
+
+       public ClickEventStatisticsSerializationSchema(){
+       }
+
+       public ClickEventStatisticsSerializationSchema(String topic) {
+               this.topic = topic;
+       }
 
        @Override
-       public byte[] serialize(ClickEventStatistics message) {
+       public ProducerRecord<byte[], byte[]> serialize(
+                       final ClickEventStatistics message, @Nullable final 
Long timestamp) {
                try {
                        //if topic is null, default topic will be used
-                       return objectMapper.writeValueAsBytes(message);
+                       return new ProducerRecord<>(topic, 
objectMapper.writeValueAsBytes(message));
                } catch (JsonProcessingException e) {
                        throw new IllegalArgumentException("Could not serialize 
record: " + message, e);
                }
diff --git a/operations-playground/docker-compose.yaml 
b/operations-playground/docker-compose.yaml
index d498070..9ed71c5 100644
--- a/operations-playground/docker-compose.yaml
+++ b/operations-playground/docker-compose.yaml
@@ -20,7 +20,7 @@ version: "2.1"
 services:
   client:
     build: ../docker/ops-playground-image
-    image: apache/flink-ops-playground:1-FLINK-1.8-scala_2.11
+    image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
     command: "flink run -d -p 2 /opt/ClickCountJob.jar --bootstrap.servers 
kafka:9092 --checkpointing --event-time"
     depends_on:
       - jobmanager
@@ -30,12 +30,12 @@ services:
     environment:
       - JOB_MANAGER_RPC_ADDRESS=jobmanager
   clickevent-generator:
-    image: apache/flink-ops-playground:1-FLINK-1.8-scala_2.11
+    image: apache/flink-ops-playground:1-FLINK-1.9-scala_2.11
     command: "java -classpath /opt/ClickCountJob.jar:/opt/flink/lib/* 
org.apache.flink.playgrounds.ops.clickcount.ClickEventGenerator 
--bootstrap.servers kafka:9092 --topic input"
     depends_on:
       - kafka
   jobmanager:
-    image: flink:1.8-scala_2.11
+    image: flink:1.9-scala_2.11
     command: "jobmanager.sh start-foreground"
     ports:
       - 8081:8081
@@ -46,7 +46,7 @@ services:
     environment:
       - JOB_MANAGER_RPC_ADDRESS=jobmanager
   taskmanager:
-    image: flink:1.8-scala_2.11
+    image: flink:1.9-scala_2.11
     depends_on:
       - jobmanager
     command: "taskmanager.sh start-foreground"

Reply via email to