[1/2] spark git commit: [SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API

2016-06-29 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 a54852350 -> 3134f116a


http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
--
diff --git 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
new file mode 100644
index 000..aba45f5
--- /dev/null
+++ 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.collection.JavaConverters;
+
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JavaConsumerStrategySuite implements Serializable {
+
+  @Test
+  public void testConsumerStrategyConstructors() {
+final String topic1 = "topic1";
+final Collection topics = Arrays.asList(topic1);
+final scala.collection.Iterable sTopics =
+  JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
+final TopicPartition tp1 = new TopicPartition(topic1, 0);
+final TopicPartition tp2 = new TopicPartition(topic1, 1);
+final Collection parts = Arrays.asList(tp1, tp2);
+final scala.collection.Iterable sParts =
+  JavaConverters.collectionAsScalaIterableConverter(parts).asScala();
+final Map kafkaParams = new HashMap();
+kafkaParams.put("bootstrap.servers", "not used");
+final scala.collection.Map sKafkaParams =
+  JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
+final Map offsets = new HashMap<>();
+offsets.put(tp1, 23L);
+final scala.collection.Map sOffsets =
+  JavaConverters.mapAsScalaMapConverter(offsets).asScala();
+
+// make sure constructors can be called from java
+final ConsumerStrategy sub0 =
+  Subscribe.apply(topics, kafkaParams, offsets);
+final ConsumerStrategy sub1 =
+  Subscribe.apply(sTopics, sKafkaParams, sOffsets);
+final ConsumerStrategy sub2 =
+  Subscribe.apply(sTopics, sKafkaParams);
+final ConsumerStrategy sub3 =
+  Subscribe.create(topics, kafkaParams, offsets);
+final ConsumerStrategy sub4 =
+  Subscribe.create(topics, kafkaParams);
+
+Assert.assertEquals(
+  sub1.executorKafkaParams().get("bootstrap.servers"),
+  sub3.executorKafkaParams().get("bootstrap.servers"));
+
+final ConsumerStrategy asn0 =
+  Assign.apply(parts, kafkaParams, offsets);
+final ConsumerStrategy asn1 =
+  Assign.apply(sParts, sKafkaParams, sOffsets);
+final ConsumerStrategy asn2 =
+  Assign.apply(sParts, sKafkaParams);
+final ConsumerStrategy asn3 =
+  Assign.create(parts, kafkaParams, offsets);
+final ConsumerStrategy asn4 =
+  Assign.create(parts, kafkaParams);
+
+Assert.assertEquals(
+  asn1.executorKafkaParams().get("bootstrap.servers"),
+  asn3.executorKafkaParams().get("bootstrap.servers"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
--
diff --git 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
new file mode 100644
index 000..e57ede7
--- /dev/null
+++ 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you 

[1/2] spark git commit: [SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API

2016-06-29 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master bde1d6a61 -> dedbceec1


http://git-wip-us.apache.org/repos/asf/spark/blob/dedbceec/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
--
diff --git 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
new file mode 100644
index 000..aba45f5
--- /dev/null
+++ 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.collection.JavaConverters;
+
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JavaConsumerStrategySuite implements Serializable {
+
+  @Test
+  public void testConsumerStrategyConstructors() {
+final String topic1 = "topic1";
+final Collection topics = Arrays.asList(topic1);
+final scala.collection.Iterable sTopics =
+  JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
+final TopicPartition tp1 = new TopicPartition(topic1, 0);
+final TopicPartition tp2 = new TopicPartition(topic1, 1);
+final Collection parts = Arrays.asList(tp1, tp2);
+final scala.collection.Iterable sParts =
+  JavaConverters.collectionAsScalaIterableConverter(parts).asScala();
+final Map kafkaParams = new HashMap();
+kafkaParams.put("bootstrap.servers", "not used");
+final scala.collection.Map sKafkaParams =
+  JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
+final Map offsets = new HashMap<>();
+offsets.put(tp1, 23L);
+final scala.collection.Map sOffsets =
+  JavaConverters.mapAsScalaMapConverter(offsets).asScala();
+
+// make sure constructors can be called from java
+final ConsumerStrategy sub0 =
+  Subscribe.apply(topics, kafkaParams, offsets);
+final ConsumerStrategy sub1 =
+  Subscribe.apply(sTopics, sKafkaParams, sOffsets);
+final ConsumerStrategy sub2 =
+  Subscribe.apply(sTopics, sKafkaParams);
+final ConsumerStrategy sub3 =
+  Subscribe.create(topics, kafkaParams, offsets);
+final ConsumerStrategy sub4 =
+  Subscribe.create(topics, kafkaParams);
+
+Assert.assertEquals(
+  sub1.executorKafkaParams().get("bootstrap.servers"),
+  sub3.executorKafkaParams().get("bootstrap.servers"));
+
+final ConsumerStrategy asn0 =
+  Assign.apply(parts, kafkaParams, offsets);
+final ConsumerStrategy asn1 =
+  Assign.apply(sParts, sKafkaParams, sOffsets);
+final ConsumerStrategy asn2 =
+  Assign.apply(sParts, sKafkaParams);
+final ConsumerStrategy asn3 =
+  Assign.create(parts, kafkaParams, offsets);
+final ConsumerStrategy asn4 =
+  Assign.create(parts, kafkaParams);
+
+Assert.assertEquals(
+  asn1.executorKafkaParams().get("bootstrap.servers"),
+  asn3.executorKafkaParams().get("bootstrap.servers"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/dedbceec/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
--
diff --git 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
new file mode 100644
index 000..e57ede7
--- /dev/null
+++ 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may