http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala new file mode 100644 index 0000000..798912c --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/DefaultGeodeConnectionManagerTest.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal + +import io.pivotal.geode.spark.connector.GeodeConnectionConf +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FunSuite, Matchers} + +class DefaultGeodeConnectionManagerTest extends FunSuite with Matchers with MockitoSugar { + + test("DefaultGeodeConnectionFactory get/closeConnection") { + // note: connConf 1-4 share the same set of locators + val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234))) + val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678))) + val connConf3 = new GeodeConnectionConf(Seq(("host1", 1234), ("host2", 5678))) + val connConf4 = new GeodeConnectionConf(Seq(("host2", 5678), ("host1", 1234))) + val connConf5 = new GeodeConnectionConf(Seq(("host5", 3333))) + + val props: Map[String, String] = Map.empty + val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory] + val mockConn1 = mock[DefaultGeodeConnection] + val mockConn2 = mock[DefaultGeodeConnection] + when(mockConnFactory.newConnection(connConf3.locators, props)).thenReturn(mockConn1) + when(mockConnFactory.newConnection(connConf5.locators, props)).thenReturn(mockConn2) + + assert(DefaultGeodeConnectionManager.getConnection(connConf3)(mockConnFactory) == mockConn1) + // note: following 3 lines do not trigger connFactory.newConnection(...) + assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1) + assert(DefaultGeodeConnectionManager.getConnection(connConf2)(mockConnFactory) == mockConn1) + assert(DefaultGeodeConnectionManager.getConnection(connConf4)(mockConnFactory) == mockConn1) + assert(DefaultGeodeConnectionManager.getConnection(connConf5)(mockConnFactory) == mockConn2) + + // connFactory.newConnection(...) were invoked only twice + verify(mockConnFactory, times(1)).newConnection(connConf3.locators, props) + verify(mockConnFactory, times(1)).newConnection(connConf5.locators, props) + assert(DefaultGeodeConnectionManager.connections.size == 3) + + DefaultGeodeConnectionManager.closeConnection(connConf1) + assert(DefaultGeodeConnectionManager.connections.size == 1) + DefaultGeodeConnectionManager.closeConnection(connConf5) + assert(DefaultGeodeConnectionManager.connections.isEmpty) + } + + test("DefaultGeodeConnectionFactory newConnection(...) throws RuntimeException") { + val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234))) + val props: Map[String, String] = Map.empty + val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory] + when(mockConnFactory.newConnection(connConf1.locators, props)).thenThrow(new RuntimeException()) + intercept[RuntimeException] { DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) } + verify(mockConnFactory, times(1)).newConnection(connConf1.locators, props) + } + + test("DefaultGeodeConnectionFactory close() w/ non-exist connection") { + val props: Map[String, String] = Map.empty + val mockConnFactory: DefaultGeodeConnectionFactory = mock[DefaultGeodeConnectionFactory] + val connConf1 = new GeodeConnectionConf(Seq(("host1", 1234))) + val connConf2 = new GeodeConnectionConf(Seq(("host2", 5678))) + val mockConn1 = mock[DefaultGeodeConnection] + when(mockConnFactory.newConnection(connConf1.locators, props)).thenReturn(mockConn1) + assert(DefaultGeodeConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1) + assert(DefaultGeodeConnectionManager.connections.size == 1) + // connection does not exists in the connection manager + DefaultGeodeConnectionManager.closeConnection(connConf2) + assert(DefaultGeodeConnectionManager.connections.size == 1) + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala new file mode 100644 index 0000000..f2303e7 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.geodefunctions + +import com.gemstone.gemfire.DataSerializer +import com.gemstone.gemfire.cache.execute.{ResultCollector, ResultSender} +import com.gemstone.gemfire.cache.query.internal.types.{ObjectTypeImpl, StructTypeImpl} +import com.gemstone.gemfire.cache.query.types.ObjectType +import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput, HeapDataOutputStream} +import com.gemstone.gemfire.internal.cache.{CachedDeserializable, CachedDeserializableFactory} +import org.scalatest.{BeforeAndAfter, FunSuite} +import scala.collection.JavaConversions._ +import scala.concurrent.{Await, ExecutionContext, Future} +import ExecutionContext.Implicits.global +import scala.concurrent.duration._ + +class StructStreamingResultSenderAndCollectorTest extends FunSuite with BeforeAndAfter { + + /** + * A test ResultSender that connects struct ResultSender and ResultCollector + * Note: this test ResultSender has to copy the data (byte array) since the + * StructStreamingResultSender will reuse the byte array. + */ + class LocalResultSender(collector: ResultCollector[Array[Byte], _], num: Int = 1) extends ResultSender[Object] { + + var finishedNum = 0 + + override def sendResult(result: Object): Unit = + collector.addResult(null, result.asInstanceOf[Array[Byte]].clone()) + + /** exception should be sent via lastResult() */ + override def sendException(throwable: Throwable): Unit = + throw new UnsupportedOperationException("sendException is not supported.") + + override def lastResult(result: Object): Unit = { + collector.addResult(null, result.asInstanceOf[Array[Byte]].clone()) + this.synchronized { + finishedNum += 1 + if (finishedNum == num) + collector.endResults() + } + } + } + + /** common variables */ + var collector: StructStreamingResultCollector = _ + var baseSender: LocalResultSender = _ + /** common types */ + val objType = new ObjectTypeImpl("java.lang.Object").asInstanceOf[ObjectType] + val TwoColType = new StructTypeImpl(Array("key", "value"), Array(objType, objType)) + val OneColType = new StructTypeImpl(Array("value"), Array(objType)) + + before { + collector = new StructStreamingResultCollector + baseSender = new LocalResultSender(collector, 1) + } + + test("transfer simple data") { + verifySimpleTransfer(sendDataType = true) + } + + test("transfer simple data with no type info") { + verifySimpleTransfer(sendDataType = false) + } + + def verifySimpleTransfer(sendDataType: Boolean): Unit = { + val iter = (0 to 9).map(i => Array(i.asInstanceOf[Object], (i.toString * 5).asInstanceOf[Object])).toIterator + val dataType = if (sendDataType) TwoColType else null + new StructStreamingResultSender(baseSender, dataType , iter).send() + // println("type: " + collector.getResultType.toString) + assert(TwoColType.equals(collector.getResultType)) + val iter2 = collector.getResult + (0 to 9).foreach { i => + assert(iter2.hasNext) + val o = iter2.next() + assert(o.size == 2) + assert(o(0).asInstanceOf[Int] == i) + assert(o(1).asInstanceOf[String] == i.toString * 5) + } + assert(! iter2.hasNext) + } + + + /** + * A test iterator that generate integer data + * @param start the 1st value + * @param n number of integers generated + * @param genExcp generate Exception if true. This is used to test exception handling. + */ + def intIterator(start: Int, n: Int, genExcp: Boolean): Iterator[Array[Object]] = { + new Iterator[Array[Object]] { + val max = if (genExcp) start + n else start + n - 1 + var index: Int = start - 1 + + override def hasNext: Boolean = if (index < max) true else false + + override def next(): Array[Object] = + if (index < (start + n - 1)) { + index += 1 + Array(index.asInstanceOf[Object]) + } else throw new RuntimeException("simulated error") + } + } + + test("transfer data with 0 row") { + new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 0, genExcp = false)).send() + // println("type: " + collector.getResultType.toString) + assert(collector.getResultType == null) + val iter = collector.getResult + assert(! iter.hasNext) + } + + test("transfer data with 10K rows") { + new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 10000, genExcp = false)).send() + // println("type: " + collector.getResultType.toString) + assert(OneColType.equals(collector.getResultType)) + val iter = collector.getResult + // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) + (1 to 10000).foreach { i => + assert(iter.hasNext) + val o = iter.next() + assert(o.size == 1) + assert(o(0).asInstanceOf[Int] == i) + } + assert(! iter.hasNext) + } + + test("transfer data with 10K rows with 2 sender") { + baseSender = new LocalResultSender(collector, 2) + val total = 300 + val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()} + val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = false), "sender2").send()} + Await.result(sender1, 1.seconds) + Await.result(sender2, 1.seconds) + + // println("type: " + collector.getResultType.toString) + assert(OneColType.equals(collector.getResultType)) + val iter = collector.getResult + // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) + val set = scala.collection.mutable.Set[Int]() + (1 to total).foreach { i => + assert(iter.hasNext) + val o = iter.next() + assert(o.size == 1) + assert(! set.contains(o(0).asInstanceOf[Int])) + set.add(o(0).asInstanceOf[Int]) + } + assert(! iter.hasNext) + } + + test("transfer data with 10K rows with 2 sender with error") { + baseSender = new LocalResultSender(collector, 2) + val total = 1000 + val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()} + val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = true), "sender2").send()} + Await.result(sender1, 1 seconds) + Await.result(sender2, 1 seconds) + + // println("type: " + collector.getResultType.toString) + assert(OneColType.equals(collector.getResultType)) + val iter = collector.getResult + // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) + val set = scala.collection.mutable.Set[Int]() + intercept[RuntimeException] { + (1 to total).foreach { i => + assert(iter.hasNext) + val o = iter.next() + assert(o.size == 1) + assert(! set.contains(o(0).asInstanceOf[Int])) + set.add(o(0).asInstanceOf[Int]) + } + } + // println(s"rows received: ${set.size}") + } + + test("transfer data with Exception") { + new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 200, genExcp = true)).send() + // println("type: " + collector.getResultType.toString) + val iter = collector.getResult + intercept[RuntimeException] ( iter.foreach(_.mkString(",")) ) + } + + def stringPairIterator(n: Int, genExcp: Boolean): Iterator[Array[Object]] = + intIterator(1, n, genExcp).map(x => Array(s"key-${x(0)}", s"value-${x(0)}")) + + test("transfer string pair data with 200 rows") { + new StructStreamingResultSender(baseSender, TwoColType, stringPairIterator(1000, genExcp = false)).send() + // println("type: " + collector.getResultType.toString) + assert(TwoColType.equals(collector.getResultType)) + val iter = collector.getResult + // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) + (1 to 1000).foreach { i => + assert(iter.hasNext) + val o = iter.next() + assert(o.size == 2) + assert(o(0) == s"key-$i") + assert(o(1) == s"value-$i") + } + assert(! iter.hasNext) + } + + /** + * Usage notes: There are 3 kinds of data to transfer: + * (1) object, (2) byte array of serialized object, and (3) byte array + * this test shows how to handle all of them. + */ + test("DataSerializer usage") { + val outBuf = new HeapDataOutputStream(1024, null) + val inBuf = new ByteArrayDataInput() + + // 1. a regular object + val hello = "Hello World!" * 30 + // serialize the data + DataSerializer.writeObject(hello, outBuf) + val bytesHello = outBuf.toByteArray.clone() + // de-serialize the data + inBuf.initialize(bytesHello, Version.CURRENT) + val hello2 = DataSerializer.readObject(inBuf).asInstanceOf[Object] + assert(hello == hello2) + + // 2. byte array of serialized object + // serialize: byte array from `CachedDeserializable` + val cd: CachedDeserializable = CachedDeserializableFactory.create(bytesHello) + outBuf.reset() + DataSerializer.writeByteArray(cd.getSerializedValue, outBuf) + // de-serialize the data in 2 steps + inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT) + val bytesHello2: Array[Byte] = DataSerializer.readByteArray(inBuf) + inBuf.initialize(bytesHello2, Version.CURRENT) + val hello3 = DataSerializer.readObject(inBuf).asInstanceOf[Object] + assert(hello == hello3) + + // 3. byte array + outBuf.reset() + DataSerializer.writeByteArray(bytesHello, outBuf) + inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT) + val bytesHello3: Array[Byte] = DataSerializer.readByteArray(inBuf) + assert(bytesHello sameElements bytesHello3) + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala new file mode 100644 index 0000000..54394e8 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/test/scala/io/pivotal/geode/spark/connector/internal/oql/QueryParserTest.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.oql + +import org.scalatest.FunSuite + +class QueryParserTest extends FunSuite { + + test("select * from /r1") { + val r = QueryParser.parseOQL("select * from /r1").get + assert(r == "List(/r1)") + } + + test("select c2 from /r1") { + val r = QueryParser.parseOQL("select c2 from /r1").get + assert(r == "List(/r1)") + } + + test("select key, value from /r1.entries") { + val r = QueryParser.parseOQL("select key, value from /r1.entries").get + assert(r == "List(/r1.entries)") + } + + test("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2") { + val r = QueryParser.parseOQL("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2").get + assert(r == "List(/r1)") + } + + test("select * from /r1/r2 where c1 >= 200") { + val r = QueryParser.parseOQL("select * from /r1/r2 where c1 >= 200").get + assert(r == "List(/r1/r2)") + } + + test("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100") { + val r = QueryParser.parseOQL("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100").get + assert(r == "List(/r1/r2, /r3/r4)") + } + + test("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100") { + val r = QueryParser.parseOQL("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100").get + assert(r == "List(/r1/r2)") + } + + test("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc") { + val r = QueryParser.parseOQL("IMPORT io.pivotal.geode IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc").get + assert(r == "List(/root/sub.entries)") + } + + test("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status") { + val r = QueryParser.parseOQL("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status").get + assert(r == "List(/region)") + } + + test("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID") { + val r = QueryParser.parseOQL("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID").get + assert(r == "List(/QueryRegion1, /QueryRegion2)") + } + + test("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'") { + val r = QueryParser.parseOQL("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'").get + println("r.type=" + r.getClass.getName + " r=" + r) + assert(r == "List(/obj_obj_region)") + } + + test("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'") { + val r = QueryParser.parseOQL("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'").get + assert(r == "List(/obj_obj_region, r.positions.values)") + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/ConnectorImplicitsTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/ConnectorImplicitsTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/ConnectorImplicitsTest.scala new file mode 100644 index 0000000..b0464cc --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/ConnectorImplicitsTest.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unittest.io.pivotal.geode.spark.connector + +import io.pivotal.geode.spark.connector._ +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext +import org.scalatest.FunSuite +import org.scalatest.mock.MockitoSugar +import org.scalatest.Matchers + +class ConnectorImplicitsTest extends FunSuite with Matchers with MockitoSugar { + + test("implicit map2Properties") { + verifyProperties(Map.empty) + verifyProperties(Map("One" -> "1", "Two" -> "2", "Three" ->"3")) + } + + def verifyProperties(map: Map[String, String]): Unit = { + val props: java.util.Properties = map + assert(props.size() == map.size) + map.foreach(p => assert(props.getProperty(p._1) == p._2)) + } + + test("Test Implicit SparkContext Conversion") { + val mockSparkContext = mock[SparkContext] + val gfscf: GeodeSparkContextFunctions = mockSparkContext + assert(gfscf.isInstanceOf[GeodeSparkContextFunctions]) + } + + test("Test Implicit SQLContext Conversion") { + val mockSQLContext = mock[SQLContext] + val gfscf: GeodeSQLContextFunctions = mockSQLContext + assert(gfscf.isInstanceOf[GeodeSQLContextFunctions]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeConnectionConfTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeConnectionConfTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeConnectionConfTest.scala new file mode 100644 index 0000000..a3076f4 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeConnectionConfTest.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unittest.io.pivotal.geode.spark.connector + +import org.apache.spark.SparkConf +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, FunSuite} +import io.pivotal.geode.spark.connector._ + +class GeodeConnectionConfTest extends FunSuite with Matchers with MockitoSugar { + + test("apply(SparkConf) w/ GeodeLocator property and empty geodeProps") { + val (host1, port1) = ("host1", 1234) + val (host2, port2) = ("host2", 5678) + val conf = new SparkConf().set(GeodeLocatorPropKey, s"$host1[$port1],$host2[$port2]") + val connConf = GeodeConnectionConf(conf) + assert(connConf.locators == Seq((host1, port1),(host2, port2))) + assert(connConf.geodeProps.isEmpty) + } + + test("apply(SparkConf) w/ GeodeLocator property and geode properties") { + val (host1, port1) = ("host1", 1234) + val (host2, port2) = ("host2", 5678) + val (propK1, propV1) = ("ack-severe-alert-threshold", "1") + val (propK2, propV2) = ("ack-wait-threshold", "10") + val conf = new SparkConf().set(GeodeLocatorPropKey, s"$host1[$port1],$host2[$port2]") + .set(s"spark.geode.$propK1", propV1).set(s"spark.geode.$propK2", propV2) + val connConf = GeodeConnectionConf(conf) + assert(connConf.locators == Seq((host1, port1),(host2, port2))) + assert(connConf.geodeProps == Map(propK1 -> propV1, propK2 -> propV2)) + } + + test("apply(SparkConf) w/o GeodeLocator property") { + intercept[RuntimeException] { GeodeConnectionConf(new SparkConf()) } + } + + test("apply(SparkConf) w/ invalid GeodeLocator property") { + val conf = new SparkConf().set(GeodeLocatorPropKey, "local^host:1234") + intercept[Exception] { GeodeConnectionConf(conf) } + } + + test("apply(locatorStr, geodeProps) w/ valid locatorStr and non geodeProps") { + val (host1, port1) = ("host1", 1234) + val connConf = GeodeConnectionConf(s"$host1:$port1") + assert(connConf.locators == Seq((host1, port1))) + assert(connConf.geodeProps.isEmpty) + } + + test("apply(locatorStr, geodeProps) w/ valid locatorStr and non-empty geodeProps") { + val (host1, port1) = ("host1", 1234) + val (host2, port2) = ("host2", 5678) + val (propK1, propV1) = ("ack-severe-alert-threshold", "1") + val (propK2, propV2) = ("ack-wait-threshold", "10") + val props = Map(propK1 -> propV1, propK2 -> propV2) + val connConf = GeodeConnectionConf(s"$host1:$port1,$host2:$port2", props) + assert(connConf.locators == Seq((host1, port1),(host2, port2))) + assert(connConf.geodeProps == props) + } + + test("apply(locatorStr, geodeProps) w/ invalid locatorStr") { + intercept[Exception] { GeodeConnectionConf("local~host:4321") } + } + + test("constructor w/ empty (host,port) pairs") { + intercept[IllegalArgumentException] { new GeodeConnectionConf(Seq.empty) } + } + + test("getConnection() normal") { + implicit val mockFactory = mock[GeodeConnectionManager] + val mockConnection = mock[GeodeConnection] + when(mockFactory.getConnection(org.mockito.Matchers.any[GeodeConnectionConf])).thenReturn(mockConnection) + val connConf = GeodeConnectionConf("localhost:1234") + assert(connConf.getConnection == mockConnection) + verify(mockFactory).getConnection(connConf) + } + + test("getConnection() failure") { + implicit val mockFactory = mock[GeodeConnectionManager] + when(mockFactory.getConnection(org.mockito.Matchers.any[GeodeConnectionConf])).thenThrow(new RuntimeException) + val connConf = GeodeConnectionConf("localhost:1234") + intercept[RuntimeException] { connConf.getConnection } + verify(mockFactory).getConnection(connConf) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeDStreamFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeDStreamFunctionsTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeDStreamFunctionsTest.scala new file mode 100644 index 0000000..bcba7e1 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeDStreamFunctionsTest.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unittest.io.pivotal.geode.spark.connector + +import com.gemstone.gemfire.cache.Region +import io.pivotal.geode.spark.connector.{GeodeConnection, GeodeConnectionConf} +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.dstream.DStream +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, FunSuite} +import org.mockito.Matchers.{eq => mockEq, any => mockAny} + +import scala.reflect.ClassTag + +class GeodeDStreamFunctionsTest extends FunSuite with Matchers with MockitoSugar { + + test("test GeodePairDStreamFunctions Implicit") { + import io.pivotal.geode.spark.connector.streaming._ + val mockDStream = mock[DStream[(Int, String)]] + // the implicit make the following line valid + val pairDStream: GeodePairDStreamFunctions[Int, String] = mockDStream + pairDStream shouldBe a[GeodePairDStreamFunctions[_, _]] + } + + test("test GeodeDStreamFunctions Implicit") { + import io.pivotal.geode.spark.connector.streaming._ + val mockDStream = mock[DStream[String]] + // the implicit make the following line valid + val dstream: GeodeDStreamFunctions[String] = mockDStream + dstream shouldBe a[GeodeDStreamFunctions[_]] + } + + def createMocks[K, V](regionPath: String) + (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]]) + : (String, GeodeConnectionConf, GeodeConnection, Region[K, V]) = { + val mockConnection = mock[GeodeConnection] + val mockConnConf = mock[GeodeConnectionConf] + val mockRegion = mock[Region[K, V]] + when(mockConnConf.getConnection).thenReturn(mockConnection) + when(mockConnConf.locators).thenReturn(Seq.empty) + (regionPath, mockConnConf, mockConnection, mockRegion) + } + + test("test GeodePairDStreamFunctions.saveToGeode()") { + import io.pivotal.geode.spark.connector.streaming._ + val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test") + val mockDStream = mock[DStream[(String, String)]] + mockDStream.saveToGeode(regionPath, mockConnConf) + verify(mockConnConf).getConnection + verify(mockConnection).validateRegion[String, String](regionPath) + verify(mockDStream).foreachRDD(mockAny[(RDD[(String, String)]) => Unit]) + } + + test("test GeodeDStreamFunctions.saveToGeode()") { + import io.pivotal.geode.spark.connector.streaming._ + val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, Int]("test") + val mockDStream = mock[DStream[String]] + mockDStream.saveToGeode[String, Int](regionPath, (s: String) => (s, s.length), mockConnConf) + verify(mockConnConf).getConnection + verify(mockConnection).validateRegion[String, String](regionPath) + verify(mockDStream).foreachRDD(mockAny[(RDD[String]) => Unit]) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeRDDFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeRDDFunctionsTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeRDDFunctionsTest.scala new file mode 100644 index 0000000..96e5f26 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/GeodeRDDFunctionsTest.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unittest.io.pivotal.geode.spark.connector + +import com.gemstone.gemfire.cache.Region +import io.pivotal.geode.spark.connector._ +import io.pivotal.geode.spark.connector.internal.rdd.{GeodeRDDWriter, GeodePairRDDWriter} +import org.apache.spark.{TaskContext, SparkContext} +import org.apache.spark.rdd.RDD +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{FunSuite, Matchers} +import collection.JavaConversions._ +import scala.reflect.ClassTag +import org.mockito.Matchers.{eq => mockEq, any => mockAny} + +class GeodeRDDFunctionsTest extends FunSuite with Matchers with MockitoSugar { + + test("test PairRDDFunction Implicit") { + import io.pivotal.geode.spark.connector._ + val mockRDD = mock[RDD[(Int, String)]] + // the implicit make the following line valid + val pairRDD: GeodePairRDDFunctions[Int, String] = mockRDD + pairRDD shouldBe a [GeodePairRDDFunctions[_, _]] + } + + test("test RDDFunction Implicit") { + import io.pivotal.geode.spark.connector._ + val mockRDD = mock[RDD[String]] + // the implicit make the following line valid + val nonPairRDD: GeodeRDDFunctions[String] = mockRDD + nonPairRDD shouldBe a [GeodeRDDFunctions[_]] + } + + def createMocks[K, V](regionPath: String) + (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]]): (String, GeodeConnectionConf, GeodeConnection, Region[K, V]) = { + val mockConnection = mock[GeodeConnection] + val mockConnConf = mock[GeodeConnectionConf] + val mockRegion = mock[Region[K, V]] + when(mockConnConf.getConnection).thenReturn(mockConnection) + when(mockConnection.getRegionProxy[K, V](regionPath)).thenReturn(mockRegion) + // mockRegion shouldEqual mockConn.getRegionProxy[K, V](regionPath) + (regionPath, mockConnConf, mockConnection, mockRegion) + } + + test("test GeodePairRDDWriter") { + val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test") + val writer = new GeodePairRDDWriter[String, String](regionPath, mockConnConf) + val data = List(("1", "one"), ("2", "two"), ("3", "three")) + writer.write(null, data.toIterator) + val expectedMap: Map[String, String] = data.toMap + verify(mockRegion).putAll(expectedMap) + } + + test("test GeodeNonPairRDDWriter") { + val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[Int, String]("test") + val writer = new GeodeRDDWriter[String, Int, String](regionPath, mockConnConf) + val data = List("a", "ab", "abc") + val f: String => (Int, String) = s => (s.length, s) + writer.write(f)(null, data.toIterator) + val expectedMap: Map[Int, String] = data.map(f).toMap + verify(mockRegion).putAll(expectedMap) + } + + test("test PairRDDFunctions.saveToGeode") { + verifyPairRDDFunction(useOpConf = false) + } + + test("test PairRDDFunctions.saveToGeode w/ opConf") { + verifyPairRDDFunction(useOpConf = true) + } + + def verifyPairRDDFunction(useOpConf: Boolean): Unit = { + import io.pivotal.geode.spark.connector._ + val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test") + val mockRDD = mock[RDD[(String, String)]] + val mockSparkContext = mock[SparkContext] + when(mockRDD.sparkContext).thenReturn(mockSparkContext) + val result = + if (useOpConf) + mockRDD.saveToGeode(regionPath, mockConnConf, Map(RDDSaveBatchSizePropKey -> "5000")) + else + mockRDD.saveToGeode(regionPath, mockConnConf) + verify(mockConnection, times(1)).validateRegion[String, String](regionPath) + result === Unit + verify(mockSparkContext, times(1)).runJob[(String, String), Unit]( + mockEq(mockRDD), mockAny[(TaskContext, Iterator[(String, String)]) => Unit])(mockAny(classOf[ClassTag[Unit]])) + + // Note: current implementation make following code not compilable + // so not negative test for this case + // val rdd: RDD[(K, V)] = ... + // rdd.saveToGeode(regionPath, s => (s.length, s)) + } + + test("test RDDFunctions.saveToGeode") { + verifyRDDFunction(useOpConf = false) + } + + test("test RDDFunctions.saveToGeode w/ opConf") { + verifyRDDFunction(useOpConf = true) + } + + def verifyRDDFunction(useOpConf: Boolean): Unit = { + import io.pivotal.geode.spark.connector._ + val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[Int, String]("test") + val mockRDD = mock[RDD[(String)]] + val mockSparkContext = mock[SparkContext] + when(mockRDD.sparkContext).thenReturn(mockSparkContext) + val result = + if (useOpConf) + mockRDD.saveToGeode(regionPath, s => (s.length, s), mockConnConf, Map(RDDSaveBatchSizePropKey -> "5000")) + else + mockRDD.saveToGeode(regionPath, s => (s.length, s), mockConnConf) + verify(mockConnection, times(1)).validateRegion[Int, String](regionPath) + result === Unit + verify(mockSparkContext, times(1)).runJob[String, Unit]( + mockEq(mockRDD), mockAny[(TaskContext, Iterator[String]) => Unit])(mockAny(classOf[ClassTag[Unit]])) + + // Note: current implementation make following code not compilable + // so not negative test for this case + // val rdd: RDD[T] = ... // T is not a (K, V) tuple + // rdd.saveToGeode(regionPath) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/LocatorHelperTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/LocatorHelperTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/LocatorHelperTest.scala new file mode 100644 index 0000000..c775784 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/LocatorHelperTest.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unittest.io.pivotal.geode.spark.connector + +import java.net.InetAddress + +import io.pivotal.geode.spark.connector.internal.LocatorHelper +import org.scalatest.FunSuite + +class LocatorHelperTest extends FunSuite { + + test("locatorStr2HostPortPair hostname w/o domain") { + val (host, port) = ("localhost", 10334) + assert(LocatorHelper.locatorStr2HostPortPair(s"$host:$port").get ==(host, port)) + assert(LocatorHelper.locatorStr2HostPortPair(s"$host[$port]").get ==(host, port)) + } + + test("locatorStr2HostPortPair hostname w/ domain") { + val (host, port) = ("localhost", 10334) + assert(LocatorHelper.locatorStr2HostPortPair(s"$host:$port").get ==(host, port)) + assert(LocatorHelper.locatorStr2HostPortPair(s"$host[$port]").get ==(host, port)) + } + + test("locatorStr2HostPortPair w/ invalid host name") { + // empty or null locatorStr + assert(LocatorHelper.locatorStr2HostPortPair("").isFailure) + assert(LocatorHelper.locatorStr2HostPortPair(null).isFailure) + // host name has leading `.` + assert(LocatorHelper.locatorStr2HostPortPair(".localhost.1234").isFailure) + // host name has leading and/or tail white space + assert(LocatorHelper.locatorStr2HostPortPair(" localhost.1234").isFailure) + assert(LocatorHelper.locatorStr2HostPortPair("localhost .1234").isFailure) + assert(LocatorHelper.locatorStr2HostPortPair(" localhost .1234").isFailure) + // host name contain invalid characters + assert(LocatorHelper.locatorStr2HostPortPair("local%host.1234").isFailure) + assert(LocatorHelper.locatorStr2HostPortPair("localhost*.1234").isFailure) + assert(LocatorHelper.locatorStr2HostPortPair("^localhost.1234").isFailure) + } + + test("locatorStr2HostPortPair w/ valid port") { + val host = "192.168.0.1" + // port has 2, 3, 4, 5 digits + assert(LocatorHelper.locatorStr2HostPortPair(s"$host:20").get ==(host, 20)) + assert(LocatorHelper.locatorStr2HostPortPair(s"$host:300").get ==(host, 300)) + assert(LocatorHelper.locatorStr2HostPortPair(s"$host:4000").get ==(host, 4000)) + assert(LocatorHelper.locatorStr2HostPortPair(s"$host:50000").get ==(host, 50000)) + } + + test("locatorStr2HostPortPair w/ invalid port") { + // port number is less than 2 digits + assert(LocatorHelper.locatorStr2HostPortPair("locslhost.9").isFailure) + // port number is more than 5 digits + assert(LocatorHelper.locatorStr2HostPortPair("locslhost.100000").isFailure) + // port number is invalid + assert(LocatorHelper.locatorStr2HostPortPair("locslhost.1xx1").isFailure) + } + + test("parseLocatorsString with valid locator(s)") { + val (host1, port1) = ("localhost", 10334) + assert(LocatorHelper.parseLocatorsString(s"$host1:$port1") == Seq((host1, port1))) + val (host2, port2) = ("localhost2", 10335) + assert(LocatorHelper.parseLocatorsString(s"$host1:$port1,$host2:$port2") == Seq((host1, port1),(host2, port2))) + val (host3, port3) = ("localhost2", 10336) + assert(LocatorHelper.parseLocatorsString(s"$host1:$port1,$host2:$port2,$host3:$port3") == + Seq((host1, port1),(host2, port2),(host3, port3))) + } + + test("parseLocatorsString with invalid locator(s)") { + // empty and null locatorsStr + intercept[Exception] { LocatorHelper.parseLocatorsString("") } + intercept[Exception] { LocatorHelper.parseLocatorsString(null) } + // 1 bad locatorStr + intercept[Exception] { LocatorHelper.parseLocatorsString("local%host.1234") } + // 1 good locatorStr and 1 bad locatorStr + intercept[Exception] { LocatorHelper.parseLocatorsString("localhost:2345,local%host.1234") } + intercept[Exception] { LocatorHelper.parseLocatorsString("local^host:2345,localhost.1234") } + } + + test("pickPreferredGeodeServers: shared servers and one gf-server per host") { + val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004)) + val servers = Seq(srv1, srv2, srv3, srv4) + verifyPickPreferredGeodeServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3)) + verifyPickPreferredGeodeServers(servers, "host2", "0", Seq(srv2, srv3, srv4)) + verifyPickPreferredGeodeServers(servers, "host3", "1", Seq(srv3, srv4, srv1)) + verifyPickPreferredGeodeServers(servers, "host4", "2", Seq(srv4, srv1, srv2)) + } + + test("pickPreferredGeodeServers: shared servers, one gf-server per host, un-sorted list") { + val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004)) + val servers = Seq(srv4, srv2, srv3, srv1) + verifyPickPreferredGeodeServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3)) + verifyPickPreferredGeodeServers(servers, "host2", "0", Seq(srv2, srv3, srv4)) + verifyPickPreferredGeodeServers(servers, "host3", "1", Seq(srv3, srv4, srv1)) + verifyPickPreferredGeodeServers(servers, "host4", "2", Seq(srv4, srv1, srv2)) + } + + test("pickPreferredGeodeServers: shared servers and two gf-server per host") { + val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), ("host2", 4003), ("host2", 4004)) + val servers = Seq(srv1, srv2, srv3, srv4) + verifyPickPreferredGeodeServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3)) + verifyPickPreferredGeodeServers(servers, "host1", "0", Seq(srv2, srv1, srv3)) + verifyPickPreferredGeodeServers(servers, "host2", "1", Seq(srv3, srv4, srv1)) + verifyPickPreferredGeodeServers(servers, "host2", "2", Seq(srv4, srv3, srv1)) + } + + test("pickPreferredGeodeServers: shared servers, two gf-server per host, un-sorted server list") { + val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), ("host2", 4003), ("host2", 4004)) + val servers = Seq(srv1, srv4, srv3, srv2) + verifyPickPreferredGeodeServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3)) + verifyPickPreferredGeodeServers(servers, "host1", "0", Seq(srv2, srv1, srv3)) + verifyPickPreferredGeodeServers(servers, "host2", "1", Seq(srv3, srv4, srv1)) + verifyPickPreferredGeodeServers(servers, "host2", "2", Seq(srv4, srv3, srv1)) + } + + test("pickPreferredGeodeServers: no shared servers and one gf-server per host") { + val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004)) + val servers = Seq(srv1, srv2, srv3, srv4) + verifyPickPreferredGeodeServers(servers, "host5", "<driver>", Seq(srv1, srv2, srv3)) + verifyPickPreferredGeodeServers(servers, "host6", "0", Seq(srv2, srv3, srv4)) + verifyPickPreferredGeodeServers(servers, "host7", "1", Seq(srv3, srv4, srv1)) + verifyPickPreferredGeodeServers(servers, "host8", "2", Seq(srv4, srv1, srv2)) + } + + test("pickPreferredGeodeServers: no shared servers, one gf-server per host, and less gf-server") { + val (srv1, srv2) = (("host1", 4001), ("host2", 4002)) + val servers = Seq(srv1, srv2) + verifyPickPreferredGeodeServers(servers, "host5", "<driver>", Seq(srv1, srv2)) + verifyPickPreferredGeodeServers(servers, "host6", "0", Seq(srv2, srv1)) + verifyPickPreferredGeodeServers(servers, "host7", "1", Seq(srv1, srv2)) + verifyPickPreferredGeodeServers(servers, "host8", "2", Seq(srv2, srv1)) + + + println("host name: " + InetAddress.getLocalHost.getHostName) + println("canonical host name: " + InetAddress.getLocalHost.getCanonicalHostName) + println("canonical host name 2: " + InetAddress.getByName(InetAddress.getLocalHost.getHostName).getCanonicalHostName) + } + + test("pickPreferredGeodeServers: ad-hoc") { + val (srv4, srv5, srv6) = ( + ("w2-gst-pnq-04.gemstone.com", 40411), ("w2-gst-pnq-05.gemstone.com", 40411), ("w2-gst-pnq-06.gemstone.com", 40411)) + val servers = Seq(srv6, srv5, srv4) + verifyPickPreferredGeodeServers(servers, "w2-gst-pnq-03.gemstone.com", "<driver>", Seq(srv4, srv5, srv6)) + verifyPickPreferredGeodeServers(servers, "w2-gst-pnq-04.gemstone.com", "1", Seq(srv4, srv5, srv6)) + verifyPickPreferredGeodeServers(servers, "w2-gst-pnq-05.gemstone.com", "0", Seq(srv5, srv6, srv4)) + verifyPickPreferredGeodeServers(servers, "w2-gst-pnq-06.gemstone.com", "2", Seq(srv6, srv4, srv5)) + } + + def verifyPickPreferredGeodeServers( + servers: Seq[(String, Int)], hostName: String, executorId: String, expectation: Seq[(String, Int)]): Unit = { + val result = LocatorHelper.pickPreferredGeodeServers(servers, hostName, executorId) + assert(result == expectation, s"pick servers for $hostName:$executorId") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRDDPartitionerTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRDDPartitionerTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRDDPartitionerTest.scala new file mode 100644 index 0000000..f53c178 --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRDDPartitionerTest.scala @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unittest.io.pivotal.geode.spark.connector.rdd + +import com.gemstone.gemfire.distributed.internal.ServerLocation +import io.pivotal.geode.spark.connector.internal.RegionMetadata +import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartitioner._ +import io.pivotal.geode.spark.connector.GeodeConnection +import io.pivotal.geode.spark.connector.internal.rdd._ +import org.apache.spark.Partition +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, FunSuite} + +import java.util.{HashSet => JHashSet, HashMap => JHashMap} + +import scala.collection.mutable + +class GeodeRDDPartitionerTest extends FunSuite with Matchers with MockitoSugar { + + val emptyServerBucketMap: JHashMap[ServerLocation, JHashSet[Integer]] = new JHashMap() + + def toJavaServerBucketMap(map: Map[(String, Int), Set[Int]]): JHashMap[ServerLocation, JHashSet[Integer]] = { + import scala.collection.JavaConversions._ + val tmp = map.map {case ((host, port), set) => (new ServerLocation(host, port), set.map(Integer.valueOf))} + (new JHashMap[ServerLocation, JHashSet[Integer]]() /: tmp) { case (acc, (s, jset)) => acc.put(s, new JHashSet(jset)); acc } + } + + val map: mutable.Map[(String, Int), mutable.Set[Int]] = mutable.Map( + ("s0",1) -> mutable.Set.empty, ("s1",2) -> mutable.Set(0), ("s2",3) -> mutable.Set(1, 2), ("s3",4) -> mutable.Set(3, 4, 5)) + + + // update this test whenever change default setting + test("default partitioned region partitioner") { + assert(GeodeRDDPartitioner.defaultPartitionedRegionPartitioner === ServerSplitsPartitioner) + } + + // update this test whenever change default setting + test("default replicated region partitioner") { + assert(GeodeRDDPartitioner.defaultReplicatedRegionPartitioner === OnePartitionPartitioner) + } + + test("GeodeRDDPartitioner.apply method") { + import io.pivotal.geode.spark.connector.internal.rdd.GeodeRDDPartitioner._ + for ((name, partitioner) <- partitioners) assert(GeodeRDDPartitioner(name) == partitioner) + assert(GeodeRDDPartitioner("dummy") == GeodeRDDPartitioner.defaultPartitionedRegionPartitioner) + assert(GeodeRDDPartitioner() == GeodeRDDPartitioner.defaultPartitionedRegionPartitioner) + } + + test("OnePartitionPartitioner") { + val mockConnection = mock[GeodeConnection] + val partitions = OnePartitionPartitioner.partitions[String, String](mockConnection, null, Map.empty) + verifySinglePartition(partitions) + } + + def verifySinglePartition(partitions: Array[Partition]): Unit = { + assert(1 == partitions.size) + assert(partitions(0).index === 0) + assert(partitions(0).isInstanceOf[GeodeRDDPartition]) + assert(partitions(0).asInstanceOf[GeodeRDDPartition].bucketSet.isEmpty) + } + + test("ServerSplitsPartitioner.doPartitions(): n=1 & no empty bucket") { + val map: List[(String, mutable.Set[Int])] = List( + "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4)) + val partitions = ServerSplitsPartitioner.doPartitions(map, 6, 1) + verifyPartitions(partitions, List( + (Set(0, 1, 2, 3), Seq("server1")), (Set(4, 5), Seq("server2")))) + } + + test("ServerSplitsPartitioner.doPartitions(): n=1 & 1 empty bucket") { + val map: List[(String, mutable.Set[Int])] = List( + "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4)) + val partitions = ServerSplitsPartitioner.doPartitions(map, 7, 1) + verifyPartitions(partitions, List( + (Set(0, 1, 2, 3, 6), Seq("server1")), (Set(4, 5), Seq("server2")))) + } + + test("ServerSplitsPartitioner.doPartitions(): n=1 & 2 empty bucket") { + val map: List[(String, mutable.Set[Int])] = List( + "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4)) + val partitions = ServerSplitsPartitioner.doPartitions(map, 8, 1) + verifyPartitions(partitions, List( + (Set(0, 1, 2, 3, 6), Seq("server1")), (Set(4, 5, 7), Seq("server2")))) + } + + test("ServerSplitsPartitioner.doPartitions(): n=1 & 5 empty bucket") { + val map: List[(String, mutable.Set[Int])] = List( + "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4)) + val partitions = ServerSplitsPartitioner.doPartitions(map, 11, 1) + verifyPartitions(partitions, List( + (Set(0, 1, 2, 3, 6, 7, 8), Seq("server1")), (Set(4, 5, 9, 10), Seq("server2")))) + } + + test("ServerSplitsPartitioner.doPartitions(): n=1, 4 empty-bucket, non-continuous IDs") { + val map: List[(String, mutable.Set[Int])] = List( + "server1" -> mutable.Set(1, 3), "server2" -> mutable.Set(5,7)) + val partitions = ServerSplitsPartitioner.doPartitions(map, 8, 1) + verifyPartitions(partitions, List( + (Set(0, 1, 2, 3), Seq("server1")), (Set(4, 5, 6, 7), Seq("server2")))) + } + + test("ServerSplitsPartitioner.doPartitions(): n=2, no empty buckets, 3 servers have 1, 2, and 3 buckets") { + val map: List[(String, mutable.Set[Int])] = List( + "s1" -> mutable.Set(0), "s2" -> mutable.Set(1, 2), "s3" -> mutable.Set(3, 4, 5)) + val partitions = ServerSplitsPartitioner.doPartitions(map, 6, 2) + // partitions.foreach(println) + verifyPartitions(partitions, List( + (Set(0), Seq("s1")), (Set(1), Seq("s2")), (Set(2), Seq("s2")), (Set(3, 4), Seq("s3")), (Set(5), Seq("s3")))) + } + + test("ServerSplitsPartitioner.doPartitions(): n=3, no empty buckets, 4 servers have 0, 2, 3, and 4 buckets") { + val map: List[(String, mutable.Set[Int])] = List( + "s0" -> mutable.Set.empty, "s1" -> mutable.Set(0, 1), "s2" -> mutable.Set(2, 3, 4), "s3" -> mutable.Set(5, 6, 7, 8)) + val partitions = ServerSplitsPartitioner.doPartitions(map, 9, 3) + // partitions.foreach(println) + verifyPartitions(partitions, List( + (Set(0), Seq("s1")), (Set(1), Seq("s1")), (Set(2), Seq("s2")), (Set(3), Seq("s2")), (Set(4), Seq("s2")), + (Set(5, 6), Seq("s3")), (Set(7, 8), Seq("s3")) )) + } + + test("ServerSplitsPartitioner.partitions(): metadata = None ") { + val regionPath = "test" + val mockConnection = mock[GeodeConnection] + intercept[RuntimeException] { ServerSplitsPartitioner.partitions[String, String](mockConnection, null, Map.empty) } + } + + test("ServerSplitsPartitioner.partitions(): replicated region ") { + val regionPath = "test" + val mockConnection = mock[GeodeConnection] + val md = new RegionMetadata(regionPath, false, 11, null) + when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md)) + val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map.empty) + verifySinglePartition(partitions) + } + + test("ServerSplitsPartitioner.partitions(): partitioned region w/o data ") { + val regionPath = "test" + val mockConnection = mock[GeodeConnection] + val md = new RegionMetadata(regionPath, true, 6, emptyServerBucketMap) + when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md)) + val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map.empty) + verifySinglePartition(partitions) + } + + test("ServerSplitsPartitioner.partitions(): partitioned region w/ some data ") { + import io.pivotal.geode.spark.connector.NumberPartitionsPerServerPropKey + val regionPath = "test" + val mockConnection = mock[GeodeConnection] + val map: Map[(String, Int), Set[Int]] = Map( + ("s0",1) -> Set.empty, ("s1",2) -> Set(0), ("s2",3) -> Set(1, 2), ("s3",4) -> Set(3, 4, 5)) + val md = new RegionMetadata(regionPath, true, 6, toJavaServerBucketMap(map)) + when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md)) + val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map(NumberPartitionsPerServerPropKey->"2")) + // partitions.foreach(println) + verifyPartitions(partitions, List( + (Set(0), Seq("s1")), (Set(1), Seq("s2")), (Set(2), Seq("s2")), (Set(3, 4), Seq("s3")), (Set(5), Seq("s3")))) + } + + // Note: since the order of partitions is not pre-determined, we have to verify partition id + // and contents separately + def verifyPartitions(partitions: Array[Partition], expPartitions: List[(Set[Int], Seq[String])]): Unit = { + // 1. check size + assert(partitions.size == expPartitions.size) + // 2. check IDs are 0 to n-1 + (0 until partitions.size).toList.zip(partitions).foreach { case (id, p) => assert(id == p.index) } + + // 3. get all pairs of bucket set and its locations, and compare to the expected pairs + val list = partitions.map { e => + val p = e.asInstanceOf[GeodeRDDPartition] + (p.bucketSet, p.locations) + } + expPartitions.foreach(e => assert(list.contains(e))) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRegionRDDTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRegionRDDTest.scala b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRegionRDDTest.scala new file mode 100644 index 0000000..046ceac --- /dev/null +++ b/geode-spark-connector/geode-spark-connector/src/test/scala/unittest/io/pivotal/geode/spark/connector/rdd/GeodeRegionRDDTest.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unittest.io.pivotal.geode.spark.connector.rdd + +import com.gemstone.gemfire.cache.Region +import io.pivotal.geode.spark.connector.internal.RegionMetadata +import io.pivotal.geode.spark.connector.internal.rdd.{GeodeRDDPartition, GeodeRegionRDD} +import io.pivotal.geode.spark.connector.{GeodeConnectionConf, GeodeConnection} +import org.apache.spark.{TaskContext, Partition, SparkContext} +import org.mockito.Mockito._ +import org.mockito.Matchers.{eq => mockEq, any => mockAny} +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, FunSuite} + +import scala.reflect.ClassTag + +class GeodeRegionRDDTest extends FunSuite with Matchers with MockitoSugar { + + /** create common mocks, not all mocks are used by all tests */ + def createMocks[K, V](regionPath: String)(implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]]) + : (String, Region[K,V], GeodeConnectionConf, GeodeConnection) = { + val mockConnection = mock[GeodeConnection] + val mockRegion = mock[Region[K, V]] + val mockConnConf = mock[GeodeConnectionConf] + when(mockConnConf.getConnection).thenReturn(mockConnection) + when(mockConnection.getRegionProxy[K, V](regionPath)).thenReturn(mockRegion) + when(mockConnConf.locators).thenReturn(Seq.empty) + (regionPath, mockRegion, mockConnConf, mockConnection) + } + + test("create GeodeRDD with non-existing region") { + val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") + when(mockConnConf.getConnection).thenReturn(mockConnection) + when(mockConnection.validateRegion[String,String](regionPath)).thenThrow(new RuntimeException) + val mockSparkContext = mock[SparkContext] + intercept[RuntimeException] { GeodeRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf) } + verify(mockConnConf).getConnection + verify(mockConnection).validateRegion[String, String](regionPath) + } + + test("getPartitions with non-existing region") { + // region exists when RDD is created, but get removed before getPartitions() is invoked + val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") + when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(None) + val mockSparkContext = mock[SparkContext] + intercept[RuntimeException] { GeodeRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf).getPartitions } + } + + test("getPartitions with replicated region and not preferred env") { + val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") + implicit val mockConnConf2 = mockConnConf + val mockSparkContext = mock[SparkContext] + when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, false, 0, null))) + val partitions = GeodeRegionRDD(mockSparkContext, regionPath, mockConnConf).partitions + verifySinglePartition(partitions) + } + + def verifySinglePartition(partitions: Array[Partition]): Unit = { + assert(1 == partitions.size) + assert(partitions(0).index === 0) + assert(partitions(0).isInstanceOf[GeodeRDDPartition]) + assert(partitions(0).asInstanceOf[GeodeRDDPartition].bucketSet.isEmpty) + } + + test("getPartitions with replicated region and preferred OnePartitionPartitioner") { + // since it's replicated region, so OnePartitionPartitioner will be used, i.e., override preferred partitioner + import io.pivotal.geode.spark.connector.{PreferredPartitionerPropKey, OnePartitionPartitionerName} + val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") + when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, false, 0, null))) + implicit val mockConnConf2 = mockConnConf + val mockSparkContext = mock[SparkContext] + val env = Map(PreferredPartitionerPropKey -> OnePartitionPartitionerName) + val partitions = GeodeRegionRDD(mockSparkContext, regionPath, mockConnConf, env).partitions + verifySinglePartition(partitions) + } + + test("getPartitions with partitioned region and not preferred env") { + val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") + implicit val mockConnConf2 = mockConnConf + val mockSparkContext = mock[SparkContext] + when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, true, 2, null))) + val partitions = GeodeRegionRDD(mockSparkContext, regionPath, mockConnConf).partitions + verifySinglePartition(partitions) + } + + test("GeodeRDD.compute() method") { + val (regionPath, mockRegion, mockConnConf, mockConnection) = createMocks[String, String]("test") + implicit val mockConnConf2 = mockConnConf + val mockIter = mock[Iterator[(String, String)]] + val partition = GeodeRDDPartition(0, Set.empty) + when(mockConnection.getRegionMetadata[String, String](regionPath)).thenReturn(Some(new RegionMetadata(regionPath, true, 2, null))) + when(mockConnection.getRegionData[String, String](regionPath, None, partition)).thenReturn(mockIter) + val mockSparkContext = mock[SparkContext] + val rdd = GeodeRegionRDD[String, String](mockSparkContext, regionPath, mockConnConf) + val partitions = rdd.partitions + assert(1 == partitions.size) + val mockTaskContext = mock[TaskContext] + rdd.compute(partitions(0), mockTaskContext) + verify(mockConnection).getRegionData[String, String](mockEq(regionPath), mockEq(None), mockEq(partition)) + // verify(mockConnection).getRegionData[String, String](regionPath, Set.empty.asInstanceOf[Set[Int]], "geodeRDD 0.0") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/Emp.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/Emp.java b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/Emp.java new file mode 100644 index 0000000..03e15a0 --- /dev/null +++ b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/Emp.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package demo; + +import java.io.Serializable; + +/** + * This is a demo class used in doc/?.md + */ +public class Emp implements Serializable { + + private int id; + + private String lname; + + private String fname; + + private int age; + + private String loc; + + public Emp(int id, String lname, String fname, int age, String loc) { + this.id = id; + this.lname = lname; + this.fname = fname; + this.age = age; + this.loc = loc; + } + + public int getId() { + return id; + } + + public String getLname() { + return lname; + } + + public String getFname() { + return fname; + } + + public int getAge() { + return age; + } + + public String getLoc() { + return loc; + } + + @Override + public String toString() { + return "Emp(" + id + ", " + lname + ", " + fname + ", " + age + ", " + loc + ")"; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Emp emp = (Emp) o; + + if (age != emp.age) return false; + if (id != emp.id) return false; + if (fname != null ? !fname.equals(emp.fname) : emp.fname != null) return false; + if (lname != null ? !lname.equals(emp.lname) : emp.lname != null) return false; + if (loc != null ? !loc.equals(emp.loc) : emp.loc != null) return false; + + return true; + } + + @Override + public int hashCode() { + int result = id; + result = 31 * result + (lname != null ? lname.hashCode() : 0); + result = 31 * result + (fname != null ? fname.hashCode() : 0); + result = 31 * result + age; + result = 31 * result + (loc != null ? loc.hashCode() : 0); + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java new file mode 100644 index 0000000..adcf072 --- /dev/null +++ b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/OQLJavaDemo.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package demo; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.SQLContext; +import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.*; + + +/** + * This Spark application demonstrates how to get region data from Geode using Geode + * OQL Java API. The result is a Spark DataFrame. + * <p> + * In order to run it, you will need to start a Geode cluster, and run demo PairRDDSaveJavaDemo + * first to create some data in the region. + * <p> + * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar + * should be generated under geode-spark-demos/basic-demos/target/scala-2.10/. + * Then run the following command to start a Spark job: + * <pre> + * <path to spark>/bin/spark-submit --master=local[2] --class demo.OQLJavaDemo \ + * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port> + * </pre> + */ +public class OQLJavaDemo { + + public static void main(String[] argv) { + + if (argv.length != 1) { + System.err.printf("Usage: OQLJavaDemo <locators>\n"); + return; + } + + SparkConf conf = new SparkConf().setAppName("OQLJavaDemo"); + conf.set(GeodeLocatorPropKey, argv[0]); // "192.168.1.47[10335]" + JavaSparkContext sc = new JavaSparkContext(conf); + SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); + DataFrame df = javaFunctions(sqlContext).geodeOQL("select * from /str_str_region"); + System.out.println("======= DataFrame =======\n"); + df.show(); + sc.stop(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java new file mode 100644 index 0000000..52d2a99 --- /dev/null +++ b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/PairRDDSaveJavaDemo.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package demo; + +import io.pivotal.geode.spark.connector.GeodeConnectionConf; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import scala.Tuple2; +import java.util.*; + +import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.*; + +/** + * This Spark application demonstrates how to save a RDD to Geode using Geode Spark + * Connector with Java. + * <p/> + * In order to run it, you will need to start Geode cluster, and create the following region + * with GFSH: + * <pre> + * gfsh> create region --name=str_str_region --type=REPLICATE \ + * --key-constraint=java.lang.String --value-constraint=java.lang.String + * </pre> + * + * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar + * should be generated under geode-spark-demos/basic-demos/target/scala-2.10/. + * Then run the following command to start a Spark job: + * <pre> + * <path to spark>/bin/spark-submit --master=local[2] --class demo.PairRDDSaveJavaDemo \ + * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port> + * </pre> + * + * Verify the data was saved to Geode with GFSH: + * <pre>gfsh> query --query="select * from /str_str_region.entrySet" </pre> + */ +public class PairRDDSaveJavaDemo { + + public static void main(String[] argv) { + + if (argv.length != 1) { + System.err.printf("Usage: PairRDDSaveJavaDemo <locators>\n"); + return; + } + + SparkConf conf = new SparkConf().setAppName("PairRDDSaveJavaDemo"); + conf.set(GeodeLocatorPropKey, argv[0]); + JavaSparkContext sc = new JavaSparkContext(conf); + GeodeConnectionConf connConf = GeodeConnectionConf.apply(conf); + + List<Tuple2<String, String>> data = new ArrayList<>(); + data.add(new Tuple2<>("7", "seven")); + data.add(new Tuple2<>("8", "eight")); + data.add(new Tuple2<>("9", "nine")); + + List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>(); + data2.add(new Tuple2<>("11", "eleven")); + data2.add(new Tuple2<>("12", "twelve")); + data2.add(new Tuple2<>("13", "thirteen")); + + // method 1: generate JavaPairRDD directly + JavaPairRDD<String, String> rdd1 = sc.parallelizePairs(data); + javaFunctions(rdd1).saveToGeode("str_str_region", connConf); + + // method 2: convert JavaRDD<Tuple2<K,V>> to JavaPairRDD<K, V> + JavaRDD<Tuple2<String, String>> rdd2 = sc.parallelize(data2); + javaFunctions(toJavaPairRDD(rdd2)).saveToGeode("str_str_region", connConf); + + sc.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java new file mode 100644 index 0000000..1125de5 --- /dev/null +++ b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/RDDSaveJavaDemo.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package demo; + +import io.pivotal.geode.spark.connector.GeodeConnectionConf; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.PairFunction; +import scala.Tuple2; + +import java.util.ArrayList; +import java.util.List; + +import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.*; + +/** + * This Spark application demonstrates how to save a RDD to Geode using Geode Spark + * Connector with Java. + * <p/> + * In order to run it, you will need to start Geode cluster, and create the following region + * with GFSH: + * <pre> + * gfsh> create region --name=str_int_region --type=REPLICATE \ + * --key-constraint=java.lang.String --value-constraint=java.lang.Integer + * </pre> + * + * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar + * should be generated under geode-spark-demos/basic-demos/target/scala-2.10/. + * Then run the following command to start a Spark job: + * <pre> + * <path to spark>/bin/spark-submit --master=local[2] --class demo.RDDSaveJavaDemo \ + * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port> + * </pre> + * + * Verify the data was saved to Geode with GFSH: + * <pre>gfsh> query --query="select * from /str_int_region.entrySet" </pre> + */ +public class RDDSaveJavaDemo { + + public static void main(String[] argv) { + + if (argv.length != 1) { + System.err.printf("Usage: RDDSaveJavaDemo <locators>\n"); + return; + } + + SparkConf conf = new SparkConf().setAppName("RDDSaveJavaDemo"); + conf.set(GeodeLocatorPropKey, argv[0]); + JavaSparkContext sc = new JavaSparkContext(conf); + + List<String> data = new ArrayList<String>(); + data.add("abcdefg"); + data.add("abcdefgh"); + data.add("abcdefghi"); + JavaRDD<String> rdd = sc.parallelize(data); + + GeodeConnectionConf connConf = GeodeConnectionConf.apply(conf); + + PairFunction<String, String, Integer> func = new PairFunction<String, String, Integer>() { + @Override public Tuple2<String, Integer> call(String s) throws Exception { + return new Tuple2<String, Integer>(s, s.length()); + } + }; + + javaFunctions(rdd).saveToGeode("str_int_region", func, connConf); + + sc.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java new file mode 100644 index 0000000..1ce8ceb --- /dev/null +++ b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/java/demo/RegionToRDDJavaDemo.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package demo; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import static io.pivotal.geode.spark.connector.javaapi.GeodeJavaUtil.*; + +/** + * This Spark application demonstrates how to expose a region in Geode as a RDD using Geode + * Spark Connector with Java. + * <p> + * In order to run it, you will need to start Geode cluster, and run demo PairRDDSaveJavaDemo + * first to create some data in the region. + * <p> + * Once you compile and package the demo, the jar file basic-demos_2.10-0.5.0.jar + * should be generated under geode-spark-demos/basic-demos/target/scala-2.10/. + * Then run the following command to start a Spark job: + * <pre> + * <path to spark>/bin/spark-submit --master=local[2] --class demo.RegionToRDDJavaDemo \ + * <path to>/basic-demos_2.10-0.5.0.jar <locator host>:<port> + * </pre> + */ +public class RegionToRDDJavaDemo { + + public static void main(String[] argv) { + + if (argv.length != 1) { + System.err.printf("Usage: RegionToRDDJavaDemo <locators>\n"); + return; + } + + SparkConf conf = new SparkConf().setAppName("RegionToRDDJavaDemo"); + conf.set(GeodeLocatorPropKey, argv[0]); + JavaSparkContext sc = new JavaSparkContext(conf); + + JavaPairRDD<String, String> rdd = javaFunctions(sc).geodeRegion("str_str_region"); + System.out.println("=== geodeRegion =======\n" + rdd.collect() + "\n========================="); + + sc.stop(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ff914bd9/geode-spark-connector/geode-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala new file mode 100644 index 0000000..810b380 --- /dev/null +++ b/geode-spark-connector/geode-spark-demos/basic-demos/src/main/scala/demo/NetworkWordCount.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package demo + +import org.apache.spark.SparkConf +import org.apache.spark.streaming.{Seconds, StreamingContext} +import io.pivotal.geode.spark.connector.GeodeLocatorPropKey +import io.pivotal.geode.spark.connector.streaming._ + +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * <p><p> + * In order to run it, you will need to start Geode cluster, and create the following region + * with GFSH: + * <pre> + * gfsh> create region --name=str_int_region --type=REPLICATE \ + * --key-constraint=java.lang.String --value-constraint=java.lang.Integer + * </pre> + * + * <p>To run this on your local machine, you need to first run a net cat server + * `$ nc -lk 9999` + * and then run the example + * `$ bin/spark-submit --master=local[2] --class demo.NetworkWordCount <path to>/basic-demos_2.10-0.5.0.jar localhost 9999 locatorHost:port` + * + * <p><p> check result that was saved to Geode with GFSH: + * <pre>gfsh> query --query="select * from /str_int_region.entrySet" </pre> + */ +object NetworkWordCount { + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: NetworkWordCount <hostname> <port> <geode locator>") + System.exit(1) + } + + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + val currentCount = values.foldLeft(0)(_ + _) + val previousCount = state.getOrElse(0) + Some(currentCount + previousCount) + } + + // Create the context with a 1 second batch size + val sparkConf = new SparkConf().setAppName("NetworkWordCount").set(GeodeLocatorPropKey, args(2)) + val ssc = new StreamingContext(sparkConf, Seconds(1)) + ssc.checkpoint(".") + + // Create a socket stream on target ip:port and count the + // words in input stream of \n delimited text (eg. generated by 'nc') + // Note that no duplication in storage level only for running locally. + // Replication necessary in distributed scenario for fault tolerance. + val lines = ssc.socketTextStream(args(0), args(1).toInt) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + val runningCounts = wordCounts.updateStateByKey[Int](updateFunc) + // runningCounts.print() + runningCounts.saveToGeode("str_int_region") + ssc.start() + ssc.awaitTermination() + } + +}