Repository: spark Updated Branches: refs/heads/master ebd363aec -> 47af7c1eb
[SPARK-8389] [STREAMING] [KAFKA] Example of getting offset ranges out o⦠â¦f the existing java direct stream api Author: cody koeninger <c...@koeninger.org> Closes #6846 from koeninger/SPARK-8389 and squashes the following commits: 3f3c57a [cody koeninger] [Streaming][Kafka][SPARK-8389] Example of getting offset ranges out of the existing java direct stream api Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47af7c1e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47af7c1e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47af7c1e Branch: refs/heads/master Commit: 47af7c1ebfdbd7637f626ab07bf2bda6534f37ea Parents: ebd363a Author: cody koeninger <c...@koeninger.org> Authored: Fri Jun 19 14:51:19 2015 +0200 Committer: Sean Owen <so...@cloudera.com> Committed: Fri Jun 19 14:51:19 2015 +0200 ---------------------------------------------------------------------- .../streaming/kafka/JavaDirectKafkaStreamSuite.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/47af7c1e/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java index c0669fb..3913b71 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java @@ -32,6 +32,7 @@ import org.junit.Test; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; @@ -65,8 +66,8 @@ public class JavaDirectKafkaStreamSuite implements Serializable { @Test public void testKafkaStream() throws InterruptedException { - String topic1 = "topic1"; - String topic2 = "topic2"; + final String topic1 = "topic1"; + final String topic2 = "topic2"; String[] topic1data = createTopicAndSendData(topic1); String[] topic2data = createTopicAndSendData(topic2); @@ -87,6 +88,16 @@ public class JavaDirectKafkaStreamSuite implements Serializable { StringDecoder.class, kafkaParams, topicToSet(topic1) + ).transformToPair( + // Make sure you can get offset ranges from the rdd + new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() { + @Override + public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception { + OffsetRange[] offsets = ((HasOffsetRanges)rdd.rdd()).offsetRanges(); + Assert.assertEquals(offsets[0].topic(), topic1); + return rdd; + } + } ).map( new Function<Tuple2<String, String>, String>() { @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org