Repository: spark
Updated Branches:
  refs/heads/master e838a25bd -> 000df2f0d


[SPARK-7895] [STREAMING] [EXAMPLES] Move Kafka examples from scala-2.10/src to 
src

Since `spark-streaming-kafka` now is published for both Scala 2.10 and 2.11, we 
can move `KafkaWordCount` and `DirectKafkaWordCount` from 
`examples/scala-2.10/src/` to `examples/src/` so that they will appear in 
`spark-examples-***-jar` for Scala 2.11.

Author: zsxwing <zsxw...@gmail.com>

Closes #6436 from zsxwing/SPARK-7895 and squashes the following commits:

c6052f1 [zsxwing] Update examples/pom.xml
0bcfa87 [zsxwing] Fix the sleep time
b9d1256 [zsxwing] Move Kafka examples from scala-2.10/src to src


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/000df2f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/000df2f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/000df2f0

Branch: refs/heads/master
Commit: 000df2f0d6af068bb188e81bbb207f0c2f43bf16
Parents: e838a25
Author: zsxwing <zsxw...@gmail.com>
Authored: Thu May 28 09:04:12 2015 -0700
Committer: Patrick Wendell <patr...@databricks.com>
Committed: Thu May 28 09:04:12 2015 -0700

----------------------------------------------------------------------
 examples/pom.xml                                |  44 +-------
 .../streaming/JavaDirectKafkaWordCount.java     | 113 -------------------
 .../examples/streaming/JavaKafkaWordCount.java  | 113 -------------------
 .../streaming/DirectKafkaWordCount.scala        |  72 ------------
 .../examples/streaming/KafkaWordCount.scala     | 103 -----------------
 .../streaming/JavaDirectKafkaWordCount.java     | 113 +++++++++++++++++++
 .../examples/streaming/JavaKafkaWordCount.java  | 113 +++++++++++++++++++
 .../streaming/DirectKafkaWordCount.scala        |  72 ++++++++++++
 .../examples/streaming/KafkaWordCount.scala     | 103 +++++++++++++++++
 9 files changed, 406 insertions(+), 440 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/000df2f0/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 5b04b4f..e4efee7 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -98,6 +98,11 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-testing-util</artifactId>
       <version>${hbase.version}</version>
@@ -392,45 +397,6 @@
         </dependency>
       </dependencies>
     </profile>
-    <profile>
-      <!-- We add a source directory specific to Scala 2.10 since Kafka
-           only works with it -->
-      <id>scala-2.10</id>
-      <activation>
-        <property><name>!scala-2.11</name></property>
-      </activation>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.spark</groupId>
-          
<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
-          <version>${project.version}</version>
-        </dependency>
-      </dependencies>
-      <build>
-        <plugins>
-          <plugin>
-            <groupId>org.codehaus.mojo</groupId>
-            <artifactId>build-helper-maven-plugin</artifactId>
-            <executions>
-              <execution>
-                <id>add-scala-sources</id>
-                <phase>generate-sources</phase>
-                <goals>
-                  <goal>add-source</goal>
-                </goals>
-                <configuration>
-                  <sources>
-                    <source>src/main/scala</source>
-                    <source>scala-2.10/src/main/scala</source>
-                    <source>scala-2.10/src/main/java</source>
-                  </sources>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
-        </plugins>
-      </build>
-    </profile>
 
     <!-- Profiles that disable inclusion of certain dependencies. -->
     <profile>

http://git-wip-us.apache.org/repos/asf/spark/blob/000df2f0/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
 
b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
deleted file mode 100644
index bab9f24..0000000
--- 
a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Arrays;
-import java.util.regex.Pattern;
-
-import scala.Tuple2;
-
-import com.google.common.collect.Lists;
-import kafka.serializer.StringDecoder;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.*;
-import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-import org.apache.spark.streaming.Durations;
-
-/**
- * Consumes messages from one or more topics in Kafka and does wordcount.
- * Usage: DirectKafkaWordCount <brokers> <topics>
- *   <brokers> is a list of one or more Kafka brokers
- *   <topics> is a list of one or more kafka topics to consume from
- *
- * Example:
- *    $ bin/run-example streaming.KafkaWordCount 
broker1-host:port,broker2-host:port topic1,topic2
- */
-
-public final class JavaDirectKafkaWordCount {
-  private static final Pattern SPACE = Pattern.compile(" ");
-
-  public static void main(String[] args) {
-    if (args.length < 2) {
-      System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" +
-          "  <brokers> is a list of one or more Kafka brokers\n" +
-          "  <topics> is a list of one or more kafka topics to consume 
from\n\n");
-      System.exit(1);
-    }
-
-    StreamingExamples.setStreamingLogLevels();
-
-    String brokers = args[0];
-    String topics = args[1];
-
-    // Create context with 2 second batch interval
-    SparkConf sparkConf = new 
SparkConf().setAppName("JavaDirectKafkaWordCount");
-    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(2));
-
-    HashSet<String> topicsSet = new 
HashSet<String>(Arrays.asList(topics.split(",")));
-    HashMap<String, String> kafkaParams = new HashMap<String, String>();
-    kafkaParams.put("metadata.broker.list", brokers);
-
-    // Create direct kafka stream with brokers and topics
-    JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
-        jssc,
-        String.class,
-        String.class,
-        StringDecoder.class,
-        StringDecoder.class,
-        kafkaParams,
-        topicsSet
-    );
-
-    // Get the lines, split them into words, count the words and print
-    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, 
String>, String>() {
-      @Override
-      public String call(Tuple2<String, String> tuple2) {
-        return tuple2._2();
-      }
-    });
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
-      @Override
-      public Iterable<String> call(String x) {
-        return Lists.newArrayList(SPACE.split(x));
-      }
-    });
-    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
-      new PairFunction<String, String, Integer>() {
-        @Override
-        public Tuple2<String, Integer> call(String s) {
-          return new Tuple2<String, Integer>(s, 1);
-        }
-      }).reduceByKey(
-        new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
-        }
-      });
-    wordCounts.print();
-
-    // Start the computation
-    jssc.start();
-    jssc.awaitTermination();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/000df2f0/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
 
b/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
deleted file mode 100644
index 16ae9a3..0000000
--- 
a/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming;
-
-import java.util.Map;
-import java.util.HashMap;
-import java.util.regex.Pattern;
-
-
-import scala.Tuple2;
-
-import com.google.common.collect.Lists;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.examples.streaming.StreamingExamples;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-
-/**
- * Consumes messages from one or more topics in Kafka and does wordcount.
- *
- * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>
- *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
- *   <group> is the name of kafka consumer group
- *   <topics> is a list of one or more kafka topics to consume from
- *   <numThreads> is the number of threads the kafka consumer should use
- *
- * To run this example:
- *   `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount 
zoo01,zoo02, \
- *    zoo03 my-consumer-group topic1,topic2 1`
- */
-
-public final class JavaKafkaWordCount {
-  private static final Pattern SPACE = Pattern.compile(" ");
-
-  private JavaKafkaWordCount() {
-  }
-
-  public static void main(String[] args) {
-    if (args.length < 4) {
-      System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> 
<topics> <numThreads>");
-      System.exit(1);
-    }
-
-    StreamingExamples.setStreamingLogLevels();
-    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
-    // Create the context with a 1 second batch size
-    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(2000));
-
-    int numThreads = Integer.parseInt(args[3]);
-    Map<String, Integer> topicMap = new HashMap<String, Integer>();
-    String[] topics = args[2].split(",");
-    for (String topic: topics) {
-      topicMap.put(topic, numThreads);
-    }
-
-    JavaPairReceiverInputDStream<String, String> messages =
-            KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
-
-    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, 
String>, String>() {
-      @Override
-      public String call(Tuple2<String, String> tuple2) {
-        return tuple2._2();
-      }
-    });
-
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
-      @Override
-      public Iterable<String> call(String x) {
-        return Lists.newArrayList(SPACE.split(x));
-      }
-    });
-
-    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
-      new PairFunction<String, String, Integer>() {
-        @Override
-        public Tuple2<String, Integer> call(String s) {
-          return new Tuple2<String, Integer>(s, 1);
-        }
-      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
-        }
-      });
-
-    wordCounts.print();
-    jssc.start();
-    jssc.awaitTermination();
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/000df2f0/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
 
b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
deleted file mode 100644
index 11a8cf0..0000000
--- 
a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming
-
-import kafka.serializer.StringDecoder
-
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.kafka._
-import org.apache.spark.SparkConf
-
-/**
- * Consumes messages from one or more topics in Kafka and does wordcount.
- * Usage: DirectKafkaWordCount <brokers> <topics>
- *   <brokers> is a list of one or more Kafka brokers
- *   <topics> is a list of one or more kafka topics to consume from
- *
- * Example:
- *    $ bin/run-example streaming.DirectKafkaWordCount 
broker1-host:port,broker2-host:port \
- *    topic1,topic2
- */
-object DirectKafkaWordCount {
-  def main(args: Array[String]) {
-    if (args.length < 2) {
-      System.err.println(s"""
-        |Usage: DirectKafkaWordCount <brokers> <topics>
-        |  <brokers> is a list of one or more Kafka brokers
-        |  <topics> is a list of one or more kafka topics to consume from
-        |
-        """.stripMargin)
-      System.exit(1)
-    }
-
-    StreamingExamples.setStreamingLogLevels()
-
-    val Array(brokers, topics) = args
-
-    // Create context with 2 second batch interval
-    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
-    val ssc =  new StreamingContext(sparkConf, Seconds(2))
-
-    // Create direct kafka stream with brokers and topics
-    val topicsSet = topics.split(",").toSet
-    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
-    val messages = KafkaUtils.createDirectStream[String, String, 
StringDecoder, StringDecoder](
-      ssc, kafkaParams, topicsSet)
-
-    // Get the lines, split them into words, count the words and print
-    val lines = messages.map(_._2)
-    val words = lines.flatMap(_.split(" "))
-    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
-    wordCounts.print()
-
-    // Start the computation
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/000df2f0/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
 
b/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
deleted file mode 100644
index f407367..0000000
--- 
a/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.examples.streaming
-
-import java.util.HashMap
-
-import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, 
ProducerRecord}
-
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.kafka._
-import org.apache.spark.SparkConf
-
-/**
- * Consumes messages from one or more topics in Kafka and does wordcount.
- * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
- *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
- *   <group> is the name of kafka consumer group
- *   <topics> is a list of one or more kafka topics to consume from
- *   <numThreads> is the number of threads the kafka consumer should use
- *
- * Example:
- *    `$ bin/run-example \
- *      org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
- *      my-consumer-group topic1,topic2 1`
- */
-object KafkaWordCount {
-  def main(args: Array[String]) {
-    if (args.length < 4) {
-      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> 
<numThreads>")
-      System.exit(1)
-    }
-
-    StreamingExamples.setStreamingLogLevels()
-
-    val Array(zkQuorum, group, topics, numThreads) = args
-    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
-    val ssc =  new StreamingContext(sparkConf, Seconds(2))
-    ssc.checkpoint("checkpoint")
-
-    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
-    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, 
topicMap).map(_._2)
-    val words = lines.flatMap(_.split(" "))
-    val wordCounts = words.map(x => (x, 1L))
-      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
-    wordCounts.print()
-
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-
-// Produces some random words between 1 and 100.
-object KafkaWordCountProducer {
-
-  def main(args: Array[String]) {
-    if (args.length < 4) {
-      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> 
<topic> " +
-        "<messagesPerSec> <wordsPerMessage>")
-      System.exit(1)
-    }
-
-    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
-
-    // Zookeeper connection properties
-    val props = new HashMap[String, Object]()
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
-    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-      "org.apache.kafka.common.serialization.StringSerializer")
-    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-      "org.apache.kafka.common.serialization.StringSerializer")
-
-    val producer = new KafkaProducer[String, String](props)
-
-    // Send some messages
-    while(true) {
-      (1 to messagesPerSec.toInt).foreach { messageNum =>
-        val str = (1 to wordsPerMessage.toInt).map(x => 
scala.util.Random.nextInt(10).toString)
-          .mkString(" ")
-
-        val message = new ProducerRecord[String, String](topic, null, str)
-        producer.send(message)
-      }
-
-      Thread.sleep(100)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/000df2f0/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
new file mode 100644
index 0000000..bab9f24
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+import scala.Tuple2;
+
+import com.google.common.collect.Lists;
+import kafka.serializer.StringDecoder;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+import org.apache.spark.streaming.Durations;
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: DirectKafkaWordCount <brokers> <topics>
+ *   <brokers> is a list of one or more Kafka brokers
+ *   <topics> is a list of one or more kafka topics to consume from
+ *
+ * Example:
+ *    $ bin/run-example streaming.KafkaWordCount 
broker1-host:port,broker2-host:port topic1,topic2
+ */
+
+public final class JavaDirectKafkaWordCount {
+  private static final Pattern SPACE = Pattern.compile(" ");
+
+  public static void main(String[] args) {
+    if (args.length < 2) {
+      System.err.println("Usage: DirectKafkaWordCount <brokers> <topics>\n" +
+          "  <brokers> is a list of one or more Kafka brokers\n" +
+          "  <topics> is a list of one or more kafka topics to consume 
from\n\n");
+      System.exit(1);
+    }
+
+    StreamingExamples.setStreamingLogLevels();
+
+    String brokers = args[0];
+    String topics = args[1];
+
+    // Create context with 2 second batch interval
+    SparkConf sparkConf = new 
SparkConf().setAppName("JavaDirectKafkaWordCount");
+    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(2));
+
+    HashSet<String> topicsSet = new 
HashSet<String>(Arrays.asList(topics.split(",")));
+    HashMap<String, String> kafkaParams = new HashMap<String, String>();
+    kafkaParams.put("metadata.broker.list", brokers);
+
+    // Create direct kafka stream with brokers and topics
+    JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
+        jssc,
+        String.class,
+        String.class,
+        StringDecoder.class,
+        StringDecoder.class,
+        kafkaParams,
+        topicsSet
+    );
+
+    // Get the lines, split them into words, count the words and print
+    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, 
String>, String>() {
+      @Override
+      public String call(Tuple2<String, String> tuple2) {
+        return tuple2._2();
+      }
+    });
+    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
+      @Override
+      public Iterable<String> call(String x) {
+        return Lists.newArrayList(SPACE.split(x));
+      }
+    });
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+      new PairFunction<String, String, Integer>() {
+        @Override
+        public Tuple2<String, Integer> call(String s) {
+          return new Tuple2<String, Integer>(s, 1);
+        }
+      }).reduceByKey(
+        new Function2<Integer, Integer, Integer>() {
+        @Override
+        public Integer call(Integer i1, Integer i2) {
+          return i1 + i2;
+        }
+      });
+    wordCounts.print();
+
+    // Start the computation
+    jssc.start();
+    jssc.awaitTermination();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/000df2f0/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
new file mode 100644
index 0000000..16ae9a3
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.regex.Pattern;
+
+
+import scala.Tuple2;
+
+import com.google.common.collect.Lists;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.examples.streaming.StreamingExamples;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ *
+ * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>
+ *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
+ *   <group> is the name of kafka consumer group
+ *   <topics> is a list of one or more kafka topics to consume from
+ *   <numThreads> is the number of threads the kafka consumer should use
+ *
+ * To run this example:
+ *   `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount 
zoo01,zoo02, \
+ *    zoo03 my-consumer-group topic1,topic2 1`
+ */
+
+public final class JavaKafkaWordCount {
+  private static final Pattern SPACE = Pattern.compile(" ");
+
+  private JavaKafkaWordCount() {
+  }
+
+  public static void main(String[] args) {
+    if (args.length < 4) {
+      System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> 
<topics> <numThreads>");
+      System.exit(1);
+    }
+
+    StreamingExamples.setStreamingLogLevels();
+    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
+    // Create the context with a 1 second batch size
+    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(2000));
+
+    int numThreads = Integer.parseInt(args[3]);
+    Map<String, Integer> topicMap = new HashMap<String, Integer>();
+    String[] topics = args[2].split(",");
+    for (String topic: topics) {
+      topicMap.put(topic, numThreads);
+    }
+
+    JavaPairReceiverInputDStream<String, String> messages =
+            KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
+
+    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, 
String>, String>() {
+      @Override
+      public String call(Tuple2<String, String> tuple2) {
+        return tuple2._2();
+      }
+    });
+
+    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, 
String>() {
+      @Override
+      public Iterable<String> call(String x) {
+        return Lists.newArrayList(SPACE.split(x));
+      }
+    });
+
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+      new PairFunction<String, String, Integer>() {
+        @Override
+        public Tuple2<String, Integer> call(String s) {
+          return new Tuple2<String, Integer>(s, 1);
+        }
+      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+        @Override
+        public Integer call(Integer i1, Integer i2) {
+          return i1 + i2;
+        }
+      });
+
+    wordCounts.print();
+    jssc.start();
+    jssc.awaitTermination();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/000df2f0/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
new file mode 100644
index 0000000..11a8cf0
--- /dev/null
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import kafka.serializer.StringDecoder
+
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.kafka._
+import org.apache.spark.SparkConf
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: DirectKafkaWordCount <brokers> <topics>
+ *   <brokers> is a list of one or more Kafka brokers
+ *   <topics> is a list of one or more kafka topics to consume from
+ *
+ * Example:
+ *    $ bin/run-example streaming.DirectKafkaWordCount 
broker1-host:port,broker2-host:port \
+ *    topic1,topic2
+ */
+object DirectKafkaWordCount {
+  def main(args: Array[String]) {
+    if (args.length < 2) {
+      System.err.println(s"""
+        |Usage: DirectKafkaWordCount <brokers> <topics>
+        |  <brokers> is a list of one or more Kafka brokers
+        |  <topics> is a list of one or more kafka topics to consume from
+        |
+        """.stripMargin)
+      System.exit(1)
+    }
+
+    StreamingExamples.setStreamingLogLevels()
+
+    val Array(brokers, topics) = args
+
+    // Create context with 2 second batch interval
+    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
+    val ssc =  new StreamingContext(sparkConf, Seconds(2))
+
+    // Create direct kafka stream with brokers and topics
+    val topicsSet = topics.split(",").toSet
+    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
+    val messages = KafkaUtils.createDirectStream[String, String, 
StringDecoder, StringDecoder](
+      ssc, kafkaParams, topicsSet)
+
+    // Get the lines, split them into words, count the words and print
+    val lines = messages.map(_._2)
+    val words = lines.flatMap(_.split(" "))
+    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
+    wordCounts.print()
+
+    // Start the computation
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/000df2f0/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
new file mode 100644
index 0000000..9ae1b04
--- /dev/null
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.util.HashMap
+
+import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, 
ProducerRecord}
+
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.kafka._
+import org.apache.spark.SparkConf
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
+ *   <zkQuorum> is a list of one or more zookeeper servers that make quorum
+ *   <group> is the name of kafka consumer group
+ *   <topics> is a list of one or more kafka topics to consume from
+ *   <numThreads> is the number of threads the kafka consumer should use
+ *
+ * Example:
+ *    `$ bin/run-example \
+ *      org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
+ *      my-consumer-group topic1,topic2 1`
+ */
+object KafkaWordCount {
+  def main(args: Array[String]) {
+    if (args.length < 4) {
+      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> 
<numThreads>")
+      System.exit(1)
+    }
+
+    StreamingExamples.setStreamingLogLevels()
+
+    val Array(zkQuorum, group, topics, numThreads) = args
+    val sparkConf = new SparkConf().setAppName("KafkaWordCount")
+    val ssc =  new StreamingContext(sparkConf, Seconds(2))
+    ssc.checkpoint("checkpoint")
+
+    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
+    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, 
topicMap).map(_._2)
+    val words = lines.flatMap(_.split(" "))
+    val wordCounts = words.map(x => (x, 1L))
+      .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
+    wordCounts.print()
+
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}
+
+// Produces some random words between 1 and 100.
+object KafkaWordCountProducer {
+
+  def main(args: Array[String]) {
+    if (args.length < 4) {
+      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> 
<topic> " +
+        "<messagesPerSec> <wordsPerMessage>")
+      System.exit(1)
+    }
+
+    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
+
+    // Zookeeper connection properties
+    val props = new HashMap[String, Object]()
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+      "org.apache.kafka.common.serialization.StringSerializer")
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+      "org.apache.kafka.common.serialization.StringSerializer")
+
+    val producer = new KafkaProducer[String, String](props)
+
+    // Send some messages
+    while(true) {
+      (1 to messagesPerSec.toInt).foreach { messageNum =>
+        val str = (1 to wordsPerMessage.toInt).map(x => 
scala.util.Random.nextInt(10).toString)
+          .mkString(" ")
+
+        val message = new ProducerRecord[String, String](topic, null, str)
+        producer.send(message)
+      }
+
+      Thread.sleep(1000)
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to