Repository: incubator-beam
Updated Branches:
  refs/heads/master 4fd9d74df -> 661a4a893


add Kafka IO examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/63bce07d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/63bce07d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/63bce07d

Branch: refs/heads/master
Commit: 63bce07d8c6cc5e610ad24e915e2585fef582567
Parents: aead96f
Author: Maximilian Michels <m...@apache.org>
Authored: Thu Apr 28 12:02:05 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Fri Apr 29 17:58:00 2016 +0200

----------------------------------------------------------------------
 .../examples/streaming/KafkaIOExamples.java     | 337 +++++++++++++++++++
 1 file changed, 337 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/63bce07d/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
new file mode 100644
index 0000000..af6bb35
--- /dev/null
+++ 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.examples.streaming;
+
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Properties;
+
+/**
+ * Recipes/Examples that demonstrate how to read/write data from/to Kafka.
+ */
+public class KafkaIOExamples {
+
+
+  private static final String KAFKA_TOPIC = "input";  // Default kafka topic 
to read from
+  private static final String KAFKA_AVRO_TOPIC = "output";  // Default kafka 
topic to read from
+  private static final String KAFKA_BROKER = "localhost:9092";  // Default 
kafka broker to contact
+  private static final String GROUP_ID = "myGroup";  // Default groupId
+  private static final String ZOOKEEPER = "localhost:2181";  // Default 
zookeeper to connect to for Kafka
+
+  /**
+   * Read/Write String data to Kafka
+   */
+  public static class KafkaString {
+
+    /**
+     * Read String data from Kafka
+     */
+    public static class ReadStringFromKafka {
+
+      public static void main(String[] args) {
+
+        Pipeline p = initializePipeline(args);
+        KafkaOptions options = getOptions(p);
+
+        FlinkKafkaConsumer08<String> kafkaConsumer =
+            new FlinkKafkaConsumer08<>(options.getKafkaTopic(),
+                new SimpleStringSchema(), getKafkaProps(options));
+
+        p
+            .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
+            .apply(ParDo.of(new PrintFn<>()));
+
+        p.run();
+
+      }
+
+    }
+
+    /**
+     * Write String data to Kafka
+     */
+    public static class WriteStringToKafka {
+
+      public static void main(String[] args) {
+
+        Pipeline p = initializePipeline(args);
+        KafkaOptions options = getOptions(p);
+
+        PCollection<String> words =
+            p.apply(Create.of("These", "are", "some", "words"));
+
+        FlinkKafkaProducer08<String> kafkaSink =
+            new FlinkKafkaProducer08<>(options.getKafkaTopic(),
+                new SimpleStringSchema(), getKafkaProps(options));
+
+        words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
+
+        p.run();
+      }
+
+    }
+  }
+
+  /**
+   * Read/Write Avro data to Kafka
+   */
+  public static class KafkaAvro {
+
+    /**
+     * Read Avro data from Kafka
+     */
+    public static class ReadAvroFromKafka {
+
+      public static void main(String[] args) {
+
+        Pipeline p = initializePipeline(args);
+        KafkaOptions options = getOptions(p);
+
+        FlinkKafkaConsumer08<MyType> kafkaConsumer =
+            new FlinkKafkaConsumer08<>(options.getKafkaAvroTopic(),
+                new AvroSerializationDeserializationSchema<>(MyType.class), 
getKafkaProps(options));
+
+        p
+            .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
+            .apply(ParDo.of(new PrintFn<>()));
+
+        p.run();
+
+      }
+
+    }
+
+    /**
+     * Write Avro data to Kafka
+     */
+    public static class WriteAvroToKafka {
+
+      public static void main(String[] args) {
+
+        Pipeline p = initializePipeline(args);
+        KafkaOptions options = getOptions(p);
+
+        PCollection<MyType> words =
+            p.apply(Create.of(
+                new MyType("word", 1L),
+                new MyType("another", 2L),
+                new MyType("yet another", 3L)));
+
+        FlinkKafkaProducer08<MyType> kafkaSink =
+            new FlinkKafkaProducer08<>(options.getKafkaAvroTopic(),
+                new AvroSerializationDeserializationSchema<>(MyType.class), 
getKafkaProps(options));
+
+        words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
+
+        p.run();
+
+      }
+    }
+
+    /**
+     * Serialiation/Deserialiation schema for Avro types
+     * @param <T>
+     */
+    static class AvroSerializationDeserializationSchema<T>
+        implements SerializationSchema<T>, DeserializationSchema<T> {
+
+      private final Class<T> avroType;
+
+      private final AvroCoder<T> coder;
+      private transient ByteArrayOutputStream out;
+
+      AvroSerializationDeserializationSchema(Class<T> clazz) {
+        this.avroType = clazz;
+        this.coder = AvroCoder.of(clazz);
+        this.out = new ByteArrayOutputStream();
+      }
+
+      @Override
+      public byte[] serialize(T element) {
+        if (out == null) {
+          out = new ByteArrayOutputStream();
+        }
+        try {
+          out.reset();
+          coder.encode(element, out, Coder.Context.NESTED);
+        } catch (IOException e) {
+          throw new RuntimeException("Avro encoding failed.", e);
+        }
+        return out.toByteArray();
+      }
+
+      @Override
+      public T deserialize(byte[] message) throws IOException {
+        return coder.decode(new ByteArrayInputStream(message), 
Coder.Context.NESTED);
+      }
+
+      @Override
+      public boolean isEndOfStream(T nextElement) {
+        return false;
+      }
+
+      @Override
+      public TypeInformation<T> getProducedType() {
+        return TypeExtractor.getForClass(avroType);
+      }
+    }
+
+    /**
+     * Custom type for Avro serialization
+     */
+    static class MyType implements Serializable {
+
+      public MyType() {}
+
+      MyType(String word, long count) {
+        this.word = word;
+        this.count = count;
+      }
+
+      String word;
+      long count;
+
+      @Override
+      public String toString() {
+        return "MyType{" +
+            "word='" + word + '\'' +
+            ", count=" + count +
+            '}';
+      }
+    }
+  }
+
+  // -------------- Utilities --------------
+
+  /**
+   * Custom options for the Pipeline
+   */
+  public interface KafkaOptions extends FlinkPipelineOptions {
+    @Description("The Kafka topic to read from")
+    @Default.String(KAFKA_TOPIC)
+    String getKafkaTopic();
+
+    void setKafkaTopic(String value);
+
+    void setKafkaAvroTopic(String value);
+
+    @Description("The Kafka topic to read from")
+    @Default.String(KAFKA_AVRO_TOPIC)
+    String getKafkaAvroTopic();
+
+    @Description("The Kafka Broker to read from")
+    @Default.String(KAFKA_BROKER)
+    String getBroker();
+
+    void setBroker(String value);
+
+    @Description("The Zookeeper server to connect to")
+    @Default.String(ZOOKEEPER)
+    String getZookeeper();
+
+    void setZookeeper(String value);
+
+    @Description("The groupId")
+    @Default.String(GROUP_ID)
+    String getGroup();
+
+    void setGroup(String value);
+  }
+
+  /**
+   * Initializes some options for the Flink runner
+   * @param args The command line args
+   * @return the pipeline
+   */
+  private static Pipeline initializePipeline(String[] args) {
+    KafkaOptions options =
+        PipelineOptionsFactory.fromArgs(args).as(KafkaOptions.class);
+
+    options.setStreaming(true);
+    options.setRunner(FlinkPipelineRunner.class);
+
+    options.setCheckpointingInterval(1000L);
+    options.setNumberOfExecutionRetries(5);
+    options.setExecutionRetryDelay(3000L);
+
+    return Pipeline.create(options);
+  }
+
+  /**
+   * Gets KafkaOptions from the Pipeline
+   * @param p the pipeline
+   * @return KafkaOptions
+   */
+  private static KafkaOptions getOptions(Pipeline p) {
+    return p.getOptions().as(KafkaOptions.class);
+  }
+
+  /**
+   * Helper method to set the Kafka props from the pipeline options.
+   * @param options KafkaOptions
+   * @return Kafka props
+   */
+  private static Properties getKafkaProps(KafkaOptions options) {
+
+    Properties props = new Properties();
+    props.setProperty("zookeeper.connect", options.getZookeeper());
+    props.setProperty("bootstrap.servers", options.getBroker());
+    props.setProperty("group.id", options.getGroup());
+
+    return props;
+  }
+
+  /**
+   * Print contents to stdout
+   * @param <T> type of the input
+   */
+  private static class PrintFn<T> extends DoFn<T, T> {
+
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      System.out.println(c.element().toString());
+    }
+  }
+
+}

Reply via email to