http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala deleted file mode 100644 index aaf0fcc..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/package.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark - -import io.pivotal.gemfire.spark.connector.internal.rdd.{ServerSplitsPartitioner, OnePartitionPartitioner} -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext - -import scala.reflect.ClassTag - -/** - * The root package of Gemfire connector for Apache Spark. - * Provides handy implicit conversions that add gemfire-specific - * methods to `SparkContext` and `RDD`. - */ -package object connector { - - /** constants */ - final val GemFireLocatorPropKey = "spark.gemfire.locators" - // partitioner related keys and values - final val PreferredPartitionerPropKey = "preferred.partitioner" - final val NumberPartitionsPerServerPropKey = "number.partitions.per.server" - final val OnePartitionPartitionerName = OnePartitionPartitioner.name - final val ServerSplitsPartitionerName = ServerSplitsPartitioner.name - - final val RDDSaveBatchSizePropKey = "rdd.save.batch.size" - final val RDDSaveBatchSizeDefault = 10000 - - /** implicits */ - - implicit def toSparkContextFunctions(sc: SparkContext): GemFireSparkContextFunctions = - new GemFireSparkContextFunctions(sc) - - implicit def toSQLContextFunctions(sqlContext: SQLContext): GemFireSQLContextFunctions = - new GemFireSQLContextFunctions(sqlContext) - - implicit def toGemfirePairRDDFunctions[K: ClassTag, V: ClassTag] - (self: RDD[(K, V)]): GemFirePairRDDFunctions[K, V] = new GemFirePairRDDFunctions(self) - - implicit def toGemfireRDDFunctions[T: ClassTag] - (self: RDD[T]): GemFireRDDFunctions[T] = new GemFireRDDFunctions(self) - - /** utility implicits */ - - /** convert Map[String, String] to java.util.Properties */ - implicit def map2Properties(map: Map[String,String]): java.util.Properties = - (new java.util.Properties /: map) {case (props, (k,v)) => props.put(k,v); props} - - /** internal util methods */ - - private[connector] def getRddPartitionsInfo(rdd: RDD[_], sep: String = "\n "): String = - rdd.partitions.zipWithIndex.map{case (p,i) => s"$i: $p loc=${rdd.preferredLocations(p)}"}.mkString(sep) - -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala deleted file mode 100644 index 6890ec0..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/GemFireDStreamFunctions.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector.streaming - -import io.pivotal.gemfire.spark.connector.GemFireConnectionConf -import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFirePairRDDWriter, GemFireRDDWriter} -import org.apache.spark.Logging -import org.apache.spark.api.java.function.PairFunction -import org.apache.spark.streaming.dstream.DStream - -/** - * Extra gemFire functions on DStream of non-pair elements through an implicit conversion. - * Import `io.pivotal.gemfire.spark.connector.streaming._` at the top of your program to - * use these functions. - */ -class GemFireDStreamFunctions[T](val dstream: DStream[T]) extends Serializable with Logging { - - /** - * Save the DStream of non-pair elements to GemFire key-value store. - * @param regionPath the full path of region that the DStream is stored - * @param func the function that converts elements of the DStream to key/value pairs - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @param opConf the optional parameters for this operation - */ - def saveToGemfire[K, V]( - regionPath: String, - func: T => (K, V), - connConf: GemFireConnectionConf = defaultConnectionConf, - opConf: Map[String, String] = Map.empty): Unit = { - connConf.getConnection.validateRegion[K, V](regionPath) - val writer = new GemFireRDDWriter[T, K, V](regionPath, connConf, opConf) - logInfo(s"""Save DStream region=$regionPath conn=${connConf.locators.mkString(",")}""") - dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write(func) _)) - } - - /** this version of saveToGemfire is just for Java API */ - def saveToGemfire[K, V]( - regionPath: String, - func: PairFunction[T, K, V], - connConf: GemFireConnectionConf, - opConf: Map[String, String] ): Unit = { - saveToGemfire[K, V](regionPath, func.call _, connConf, opConf) - } - - private[connector] def defaultConnectionConf: GemFireConnectionConf = - GemFireConnectionConf(dstream.context.sparkContext.getConf) -} - - -/** - * Extra gemFire functions on DStream of (key, value) pairs through an implicit conversion. - * Import `io.pivotal.gemfire.spark.connector.streaming._` at the top of your program to - * use these functions. - */ -class GemFirePairDStreamFunctions[K, V](val dstream: DStream[(K,V)]) extends Serializable with Logging { - - /** - * Save the DStream of pairs to GemFire key-value store without any conversion - * @param regionPath the full path of region that the DStream is stored - * @param connConf the GemFireConnectionConf object that provides connection to GemFire cluster - * @param opConf the optional parameters for this operation - */ - def saveToGemfire( - regionPath: String, - connConf: GemFireConnectionConf = defaultConnectionConf, - opConf: Map[String, String] = Map.empty): Unit = { - connConf.getConnection.validateRegion[K, V](regionPath) - val writer = new GemFirePairRDDWriter[K, V](regionPath, connConf, opConf) - logInfo(s"""Save DStream region=$regionPath conn=${connConf.locators.mkString(",")}""") - dstream.foreachRDD(rdd => rdd.sparkContext.runJob(rdd, writer.write _)) - } - - private[connector] def defaultConnectionConf: GemFireConnectionConf = - GemFireConnectionConf(dstream.context.sparkContext.getConf) -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/package.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/package.scala b/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/package.scala deleted file mode 100644 index b475cbb..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/main/scala/io/pivotal/gemfire/spark/connector/streaming/package.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector - -import org.apache.spark.streaming.dstream.DStream - -/** - * Provides handy implicit conversions that add gemfire-specific methods to `DStream`. - */ -package object streaming { - - implicit def toGemFireDStreamFunctions[T](ds: DStream[T]): GemFireDStreamFunctions[T] = - new GemFireDStreamFunctions[T](ds) - - implicit def toGemFirePairDStreamFunctions[K, V](ds: DStream[(K, V)]): GemFirePairDStreamFunctions[K, V] = - new GemFirePairDStreamFunctions[K, V](ds) - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/java/io/pivotal/gemfire/spark/connector/JavaAPITest.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/java/io/pivotal/gemfire/spark/connector/JavaAPITest.java b/geode-spark-connector/gemfire-spark-connector/src/test/java/io/pivotal/gemfire/spark/connector/JavaAPITest.java deleted file mode 100644 index 2236b4a..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/test/java/io/pivotal/gemfire/spark/connector/JavaAPITest.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector; - -import io.pivotal.gemfire.spark.connector.javaapi.*; -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.SQLContext; -//import org.apache.spark.sql.api.java.JavaSQLContext; -import org.apache.spark.streaming.api.java.JavaDStream; -import org.apache.spark.streaming.api.java.JavaPairDStream; -import org.apache.spark.streaming.dstream.DStream; -import org.junit.Test; -import org.scalatest.junit.JUnitSuite; -import scala.Function1; -import scala.Function2; -import scala.Tuple2; -import scala.Tuple3; -import scala.collection.mutable.LinkedList; -import scala.reflect.ClassTag; - -import static org.junit.Assert.*; -import static io.pivotal.gemfire.spark.connector.javaapi.GemFireJavaUtil.*; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.*; - -public class JavaAPITest extends JUnitSuite { - - @SuppressWarnings( "unchecked" ) - public Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> createCommonMocks() { - SparkContext mockSparkContext = mock(SparkContext.class); - GemFireConnectionConf mockConnConf = mock(GemFireConnectionConf.class); - GemFireConnection mockConnection = mock(GemFireConnection.class); - when(mockConnConf.getConnection()).thenReturn(mockConnection); - when(mockConnConf.locators()).thenReturn(new LinkedList()); - return new Tuple3<>(mockSparkContext, mockConnConf, mockConnection); - } - - @Test - public void testSparkContextFunction() throws Exception { - Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> tuple3 = createCommonMocks(); - GemFireJavaSparkContextFunctions wrapper = javaFunctions(tuple3._1()); - assertTrue(tuple3._1() == wrapper.sc); - String regionPath = "testregion"; - JavaPairRDD<String, String> rdd = wrapper.gemfireRegion(regionPath, tuple3._2()); - verify(tuple3._3()).validateRegion(regionPath); - } - - @Test - public void testJavaSparkContextFunctions() throws Exception { - SparkContext mockSparkContext = mock(SparkContext.class); - JavaSparkContext mockJavaSparkContext = mock(JavaSparkContext.class); - when(mockJavaSparkContext.sc()).thenReturn(mockSparkContext); - GemFireJavaSparkContextFunctions wrapper = javaFunctions(mockJavaSparkContext); - assertTrue(mockSparkContext == wrapper.sc); - } - - @Test - @SuppressWarnings( "unchecked" ) - public void testJavaPairRDDFunctions() throws Exception { - JavaPairRDD<String, Integer> mockPairRDD = mock(JavaPairRDD.class); - RDD<Tuple2<String, Integer>> mockTuple2RDD = mock(RDD.class); - when(mockPairRDD.rdd()).thenReturn(mockTuple2RDD); - GemFireJavaPairRDDFunctions wrapper = javaFunctions(mockPairRDD); - assertTrue(mockTuple2RDD == wrapper.rddf.rdd()); - - Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> tuple3 = createCommonMocks(); - when(mockTuple2RDD.sparkContext()).thenReturn(tuple3._1()); - String regionPath = "testregion"; - wrapper.saveToGemfire(regionPath, tuple3._2()); - verify(mockTuple2RDD, times(1)).sparkContext(); - verify(tuple3._1(), times(1)).runJob(eq(mockTuple2RDD), any(Function2.class), any(ClassTag.class)); - } - - @Test - @SuppressWarnings( "unchecked" ) - public void testJavaRDDFunctions() throws Exception { - JavaRDD<String> mockJavaRDD = mock(JavaRDD.class); - RDD<String> mockRDD = mock(RDD.class); - when(mockJavaRDD.rdd()).thenReturn(mockRDD); - GemFireJavaRDDFunctions wrapper = javaFunctions(mockJavaRDD); - assertTrue(mockRDD == wrapper.rddf.rdd()); - - Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> tuple3 = createCommonMocks(); - when(mockRDD.sparkContext()).thenReturn(tuple3._1()); - PairFunction<String, String, Integer> mockPairFunc = mock(PairFunction.class); - String regionPath = "testregion"; - wrapper.saveToGemfire(regionPath, mockPairFunc, tuple3._2()); - verify(mockRDD, times(1)).sparkContext(); - verify(tuple3._1(), times(1)).runJob(eq(mockRDD), any(Function2.class), any(ClassTag.class)); - } - - @Test - @SuppressWarnings( "unchecked" ) - public void testJavaPairDStreamFunctions() throws Exception { - JavaPairDStream<String, String> mockJavaDStream = mock(JavaPairDStream.class); - DStream<Tuple2<String, String>> mockDStream = mock(DStream.class); - when(mockJavaDStream.dstream()).thenReturn(mockDStream); - GemFireJavaPairDStreamFunctions wrapper = javaFunctions(mockJavaDStream); - assertTrue(mockDStream == wrapper.dsf.dstream()); - - Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> tuple3 = createCommonMocks(); - String regionPath = "testregion"; - wrapper.saveToGemfire(regionPath, tuple3._2()); - verify(tuple3._2()).getConnection(); - verify(tuple3._3()).validateRegion(regionPath); - verify(mockDStream).foreachRDD(any(Function1.class)); - } - - @Test - @SuppressWarnings( "unchecked" ) - public void testJavaPairDStreamFunctionsWithTuple2DStream() throws Exception { - JavaDStream<Tuple2<String, String>> mockJavaDStream = mock(JavaDStream.class); - DStream<Tuple2<String, String>> mockDStream = mock(DStream.class); - when(mockJavaDStream.dstream()).thenReturn(mockDStream); - GemFireJavaPairDStreamFunctions wrapper = javaFunctions(toJavaPairDStream(mockJavaDStream)); - assertTrue(mockDStream == wrapper.dsf.dstream()); - } - - @Test - @SuppressWarnings( "unchecked" ) - public void testJavaDStreamFunctions() throws Exception { - JavaDStream<String> mockJavaDStream = mock(JavaDStream.class); - DStream<String> mockDStream = mock(DStream.class); - when(mockJavaDStream.dstream()).thenReturn(mockDStream); - GemFireJavaDStreamFunctions wrapper = javaFunctions(mockJavaDStream); - assertTrue(mockDStream == wrapper.dsf.dstream()); - - Tuple3<SparkContext, GemFireConnectionConf, GemFireConnection> tuple3 = createCommonMocks(); - PairFunction<String, String, Integer> mockPairFunc = mock(PairFunction.class); - String regionPath = "testregion"; - wrapper.saveToGemfire(regionPath, mockPairFunc, tuple3._2()); - verify(tuple3._2()).getConnection(); - verify(tuple3._3()).validateRegion(regionPath); - verify(mockDStream).foreachRDD(any(Function1.class)); - } - - @Test - public void testSQLContextFunction() throws Exception { - SQLContext mockSQLContext = mock(SQLContext.class); - GemFireJavaSQLContextFunctions wrapper = javaFunctions(mockSQLContext); - assertTrue(wrapper.scf.getClass() == GemFireSQLContextFunctions.class); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployerTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployerTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployerTest.scala deleted file mode 100644 index 854fd8f..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/GemFireFunctionDeployerTest.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector - -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{FunSuite, Matchers} -import org.apache.commons.httpclient.HttpClient -import java.io.File - - -class GemFireFunctionDeployerTest extends FunSuite with Matchers with MockitoSugar { - val mockHttpClient: HttpClient = mock[HttpClient] - - test("jmx url creation") { - val jmxHostAndPort = "localhost:7070" - val expectedUrlString = "http://" + jmxHostAndPort + "/gemfire/v1/deployed" - val gfd = new GemFireFunctionDeployer(mockHttpClient); - val urlString = gfd.constructURLString(jmxHostAndPort) - assert(urlString === expectedUrlString) - } - - test("missing jar file") { - val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist" - val gfd = new GemFireFunctionDeployer(mockHttpClient); - intercept[RuntimeException] { gfd.jarFileHandle(missingJarFileLocation)} - } - - test("deploy with missing jar") { - val missingJarFileLocation = "file:///somemissingjarfilethatdoesnot.exist" - val gfd = new GemFireFunctionDeployer(mockHttpClient); - intercept[RuntimeException] {(gfd.deploy("localhost:7070", missingJarFileLocation).contains("Deployed"))} - intercept[RuntimeException] {(gfd.deploy("localhost", 7070, missingJarFileLocation).contains("Deployed"))} - } - - test("successful mocked deploy") { - val gfd = new GemFireFunctionDeployer(mockHttpClient); - val jar = new File("README.md"); - assert(gfd.deploy("localhost:7070", jar).contains("Deployed")) - } - - - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala deleted file mode 100644 index 0ce9808..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/DefaultGemFireConnectionManagerTest.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector.internal - -import io.pivotal.gemfire.spark.connector.GemFireConnectionConf -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{FunSuite, Matchers} - -class DefaultGemFireConnectionManagerTest extends FunSuite with Matchers with MockitoSugar { - - test("DefaultGemFireConnectionFactory get/closeConnection") { - // note: connConf 1-4 share the same set of locators - val connConf1 = new GemFireConnectionConf(Seq(("host1", 1234))) - val connConf2 = new GemFireConnectionConf(Seq(("host2", 5678))) - val connConf3 = new GemFireConnectionConf(Seq(("host1", 1234), ("host2", 5678))) - val connConf4 = new GemFireConnectionConf(Seq(("host2", 5678), ("host1", 1234))) - val connConf5 = new GemFireConnectionConf(Seq(("host5", 3333))) - - val props: Map[String, String] = Map.empty - val mockConnFactory: DefaultGemFireConnectionFactory = mock[DefaultGemFireConnectionFactory] - val mockConn1 = mock[DefaultGemFireConnection] - val mockConn2 = mock[DefaultGemFireConnection] - when(mockConnFactory.newConnection(connConf3.locators, props)).thenReturn(mockConn1) - when(mockConnFactory.newConnection(connConf5.locators, props)).thenReturn(mockConn2) - - assert(DefaultGemFireConnectionManager.getConnection(connConf3)(mockConnFactory) == mockConn1) - // note: following 3 lines do not trigger connFactory.newConnection(...) - assert(DefaultGemFireConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1) - assert(DefaultGemFireConnectionManager.getConnection(connConf2)(mockConnFactory) == mockConn1) - assert(DefaultGemFireConnectionManager.getConnection(connConf4)(mockConnFactory) == mockConn1) - assert(DefaultGemFireConnectionManager.getConnection(connConf5)(mockConnFactory) == mockConn2) - - // connFactory.newConnection(...) were invoked only twice - verify(mockConnFactory, times(1)).newConnection(connConf3.locators, props) - verify(mockConnFactory, times(1)).newConnection(connConf5.locators, props) - assert(DefaultGemFireConnectionManager.connections.size == 3) - - DefaultGemFireConnectionManager.closeConnection(connConf1) - assert(DefaultGemFireConnectionManager.connections.size == 1) - DefaultGemFireConnectionManager.closeConnection(connConf5) - assert(DefaultGemFireConnectionManager.connections.isEmpty) - } - - test("DefaultGemFireConnectionFactory newConnection(...) throws RuntimeException") { - val connConf1 = new GemFireConnectionConf(Seq(("host1", 1234))) - val props: Map[String, String] = Map.empty - val mockConnFactory: DefaultGemFireConnectionFactory = mock[DefaultGemFireConnectionFactory] - when(mockConnFactory.newConnection(connConf1.locators, props)).thenThrow(new RuntimeException()) - intercept[RuntimeException] { DefaultGemFireConnectionManager.getConnection(connConf1)(mockConnFactory) } - verify(mockConnFactory, times(1)).newConnection(connConf1.locators, props) - } - - test("DefaultGemFireConnectionFactory close() w/ non-exist connection") { - val props: Map[String, String] = Map.empty - val mockConnFactory: DefaultGemFireConnectionFactory = mock[DefaultGemFireConnectionFactory] - val connConf1 = new GemFireConnectionConf(Seq(("host1", 1234))) - val connConf2 = new GemFireConnectionConf(Seq(("host2", 5678))) - val mockConn1 = mock[DefaultGemFireConnection] - when(mockConnFactory.newConnection(connConf1.locators, props)).thenReturn(mockConn1) - assert(DefaultGemFireConnectionManager.getConnection(connConf1)(mockConnFactory) == mockConn1) - assert(DefaultGemFireConnectionManager.connections.size == 1) - // connection does not exists in the connection manager - DefaultGemFireConnectionManager.closeConnection(connConf2) - assert(DefaultGemFireConnectionManager.connections.size == 1) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala deleted file mode 100644 index ad2b94e..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/gemfirefunctions/StructStreamingResultSenderAndCollectorTest.scala +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector.internal.gemfirefunctions - -import com.gemstone.gemfire.DataSerializer -import com.gemstone.gemfire.cache.execute.{ResultCollector, ResultSender} -import com.gemstone.gemfire.cache.query.internal.types.{ObjectTypeImpl, StructTypeImpl} -import com.gemstone.gemfire.cache.query.types.ObjectType -import com.gemstone.gemfire.internal.{Version, ByteArrayDataInput, HeapDataOutputStream} -import com.gemstone.gemfire.internal.cache.{CachedDeserializable, CachedDeserializableFactory} -import org.scalatest.{BeforeAndAfter, FunSuite} -import scala.collection.JavaConversions._ -import scala.concurrent.{Await, ExecutionContext, Future} -import ExecutionContext.Implicits.global -import scala.concurrent.duration._ - -class StructStreamingResultSenderAndCollectorTest extends FunSuite with BeforeAndAfter { - - /** - * A test ResultSender that connects struct ResultSender and ResultCollector - * Note: this test ResultSender has to copy the data (byte array) since the - * StructStreamingResultSender will reuse the byte array. - */ - class LocalResultSender(collector: ResultCollector[Array[Byte], _], num: Int = 1) extends ResultSender[Object] { - - var finishedNum = 0 - - override def sendResult(result: Object): Unit = - collector.addResult(null, result.asInstanceOf[Array[Byte]].clone()) - - /** exception should be sent via lastResult() */ - override def sendException(throwable: Throwable): Unit = - throw new UnsupportedOperationException("sendException is not supported.") - - override def lastResult(result: Object): Unit = { - collector.addResult(null, result.asInstanceOf[Array[Byte]].clone()) - this.synchronized { - finishedNum += 1 - if (finishedNum == num) - collector.endResults() - } - } - } - - /** common variables */ - var collector: StructStreamingResultCollector = _ - var baseSender: LocalResultSender = _ - /** common types */ - val objType = new ObjectTypeImpl("java.lang.Object").asInstanceOf[ObjectType] - val TwoColType = new StructTypeImpl(Array("key", "value"), Array(objType, objType)) - val OneColType = new StructTypeImpl(Array("value"), Array(objType)) - - before { - collector = new StructStreamingResultCollector - baseSender = new LocalResultSender(collector, 1) - } - - test("transfer simple data") { - verifySimpleTransfer(sendDataType = true) - } - - test("transfer simple data with no type info") { - verifySimpleTransfer(sendDataType = false) - } - - def verifySimpleTransfer(sendDataType: Boolean): Unit = { - val iter = (0 to 9).map(i => Array(i.asInstanceOf[Object], (i.toString * 5).asInstanceOf[Object])).toIterator - val dataType = if (sendDataType) TwoColType else null - new StructStreamingResultSender(baseSender, dataType , iter).send() - // println("type: " + collector.getResultType.toString) - assert(TwoColType.equals(collector.getResultType)) - val iter2 = collector.getResult - (0 to 9).foreach { i => - assert(iter2.hasNext) - val o = iter2.next() - assert(o.size == 2) - assert(o(0).asInstanceOf[Int] == i) - assert(o(1).asInstanceOf[String] == i.toString * 5) - } - assert(! iter2.hasNext) - } - - - /** - * A test iterator that generate integer data - * @param start the 1st value - * @param n number of integers generated - * @param genExcp generate Exception if true. This is used to test exception handling. - */ - def intIterator(start: Int, n: Int, genExcp: Boolean): Iterator[Array[Object]] = { - new Iterator[Array[Object]] { - val max = if (genExcp) start + n else start + n - 1 - var index: Int = start - 1 - - override def hasNext: Boolean = if (index < max) true else false - - override def next(): Array[Object] = - if (index < (start + n - 1)) { - index += 1 - Array(index.asInstanceOf[Object]) - } else throw new RuntimeException("simulated error") - } - } - - test("transfer data with 0 row") { - new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 0, genExcp = false)).send() - // println("type: " + collector.getResultType.toString) - assert(collector.getResultType == null) - val iter = collector.getResult - assert(! iter.hasNext) - } - - test("transfer data with 10K rows") { - new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 10000, genExcp = false)).send() - // println("type: " + collector.getResultType.toString) - assert(OneColType.equals(collector.getResultType)) - val iter = collector.getResult - // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) - (1 to 10000).foreach { i => - assert(iter.hasNext) - val o = iter.next() - assert(o.size == 1) - assert(o(0).asInstanceOf[Int] == i) - } - assert(! iter.hasNext) - } - - test("transfer data with 10K rows with 2 sender") { - baseSender = new LocalResultSender(collector, 2) - val total = 300 - val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()} - val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = false), "sender2").send()} - Await.result(sender1, 1.seconds) - Await.result(sender2, 1.seconds) - - // println("type: " + collector.getResultType.toString) - assert(OneColType.equals(collector.getResultType)) - val iter = collector.getResult - // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) - val set = scala.collection.mutable.Set[Int]() - (1 to total).foreach { i => - assert(iter.hasNext) - val o = iter.next() - assert(o.size == 1) - assert(! set.contains(o(0).asInstanceOf[Int])) - set.add(o(0).asInstanceOf[Int]) - } - assert(! iter.hasNext) - } - - test("transfer data with 10K rows with 2 sender with error") { - baseSender = new LocalResultSender(collector, 2) - val total = 1000 - val sender1 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(1, total/2, genExcp = false), "sender1").send()} - val sender2 = Future { new StructStreamingResultSender(baseSender, OneColType, intIterator(total/2+1, total/2, genExcp = true), "sender2").send()} - Await.result(sender1, 1 seconds) - Await.result(sender2, 1 seconds) - - // println("type: " + collector.getResultType.toString) - assert(OneColType.equals(collector.getResultType)) - val iter = collector.getResult - // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) - val set = scala.collection.mutable.Set[Int]() - intercept[RuntimeException] { - (1 to total).foreach { i => - assert(iter.hasNext) - val o = iter.next() - assert(o.size == 1) - assert(! set.contains(o(0).asInstanceOf[Int])) - set.add(o(0).asInstanceOf[Int]) - } - } - // println(s"rows received: ${set.size}") - } - - test("transfer data with Exception") { - new StructStreamingResultSender(baseSender, OneColType, intIterator(1, 200, genExcp = true)).send() - // println("type: " + collector.getResultType.toString) - val iter = collector.getResult - intercept[RuntimeException] ( iter.foreach(_.mkString(",")) ) - } - - def stringPairIterator(n: Int, genExcp: Boolean): Iterator[Array[Object]] = - intIterator(1, n, genExcp).map(x => Array(s"key-${x(0)}", s"value-${x(0)}")) - - test("transfer string pair data with 200 rows") { - new StructStreamingResultSender(baseSender, TwoColType, stringPairIterator(1000, genExcp = false)).send() - // println("type: " + collector.getResultType.toString) - assert(TwoColType.equals(collector.getResultType)) - val iter = collector.getResult - // println(iter.toList.map(list => list.mkString(",")).mkString("; ")) - (1 to 1000).foreach { i => - assert(iter.hasNext) - val o = iter.next() - assert(o.size == 2) - assert(o(0) == s"key-$i") - assert(o(1) == s"value-$i") - } - assert(! iter.hasNext) - } - - /** - * Usage notes: There are 3 kinds of data to transfer: - * (1) object, (2) byte array of serialized object, and (3) byte array - * this test shows how to handle all of them. - */ - test("DataSerializer usage") { - val outBuf = new HeapDataOutputStream(1024, null) - val inBuf = new ByteArrayDataInput() - - // 1. a regular object - val hello = "Hello World!" * 30 - // serialize the data - DataSerializer.writeObject(hello, outBuf) - val bytesHello = outBuf.toByteArray.clone() - // de-serialize the data - inBuf.initialize(bytesHello, Version.CURRENT) - val hello2 = DataSerializer.readObject(inBuf).asInstanceOf[Object] - assert(hello == hello2) - - // 2. byte array of serialized object - // serialize: byte array from `CachedDeserializable` - val cd: CachedDeserializable = CachedDeserializableFactory.create(bytesHello) - outBuf.reset() - DataSerializer.writeByteArray(cd.getSerializedValue, outBuf) - // de-serialize the data in 2 steps - inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT) - val bytesHello2: Array[Byte] = DataSerializer.readByteArray(inBuf) - inBuf.initialize(bytesHello2, Version.CURRENT) - val hello3 = DataSerializer.readObject(inBuf).asInstanceOf[Object] - assert(hello == hello3) - - // 3. byte array - outBuf.reset() - DataSerializer.writeByteArray(bytesHello, outBuf) - inBuf.initialize(outBuf.toByteArray.clone(), Version.CURRENT) - val bytesHello3: Array[Byte] = DataSerializer.readByteArray(inBuf) - assert(bytesHello sameElements bytesHello3) - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParserTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParserTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParserTest.scala deleted file mode 100644 index e33e9e8..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/io/pivotal/gemfire/spark/connector/internal/oql/QueryParserTest.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.gemfire.spark.connector.internal.oql - -import org.scalatest.FunSuite - -class QueryParserTest extends FunSuite { - - test("select * from /r1") { - val r = QueryParser.parseOQL("select * from /r1").get - assert(r == "List(/r1)") - } - - test("select c2 from /r1") { - val r = QueryParser.parseOQL("select c2 from /r1").get - assert(r == "List(/r1)") - } - - test("select key, value from /r1.entries") { - val r = QueryParser.parseOQL("select key, value from /r1.entries").get - assert(r == "List(/r1.entries)") - } - - test("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2") { - val r = QueryParser.parseOQL("select c1, c2 from /r1 where col1 > 100 and col2 <= 120 or c3 = 2").get - assert(r == "List(/r1)") - } - - test("select * from /r1/r2 where c1 >= 200") { - val r = QueryParser.parseOQL("select * from /r1/r2 where c1 >= 200").get - assert(r == "List(/r1/r2)") - } - - test("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100") { - val r = QueryParser.parseOQL("import io.pivotal select c1, c2, c3 from /r1/r2, /r3/r4 where c1 <= 15 and c2 = 100").get - assert(r == "List(/r1/r2, /r3/r4)") - } - - test("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100") { - val r = QueryParser.parseOQL("SELECT distinct f1, f2 FROM /r1/r2 WHere f = 100").get - assert(r == "List(/r1/r2)") - } - - test("IMPORT io.pivotal.gemfire IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc") { - val r = QueryParser.parseOQL("IMPORT io.pivotal.gemfire IMPORT com.mypackage SELECT key,value FROM /root/sub.entries WHERE status = 'active' ORDER BY id desc").get - assert(r == "List(/root/sub.entries)") - } - - test("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status") { - val r = QueryParser.parseOQL("select distinct p.ID, p.status from /region p where p.ID > 5 order by p.status").get - assert(r == "List(/region)") - } - - test("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID") { - val r = QueryParser.parseOQL("SELECT DISTINCT * FROM /QueryRegion1 r1, /QueryRegion2 r2 WHERE r1.ID = r2.ID").get - assert(r == "List(/QueryRegion1, /QueryRegion2)") - } - - test("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'") { - val r = QueryParser.parseOQL("SELECT id, \"type\", positions, status FROM /obj_obj_region WHERE status = 'active'").get - println("r.type=" + r.getClass.getName + " r=" + r) - assert(r == "List(/obj_obj_region)") - } - - test("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'") { - val r = QueryParser.parseOQL("SELECT r.id, r.\"type\", r.positions, r.status FROM /obj_obj_region r, r.positions.values f WHERE r.status = 'active' and f.secId = 'MSFT'").get - assert(r == "List(/obj_obj_region, r.positions.values)") - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/ConnectorImplicitsTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/ConnectorImplicitsTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/ConnectorImplicitsTest.scala deleted file mode 100644 index 4032ee8..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/ConnectorImplicitsTest.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package unittest.io.pivotal.gemfire.spark.connector - -import io.pivotal.gemfire.spark.connector._ -import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext -import org.scalatest.FunSuite -import org.scalatest.mock.MockitoSugar -import org.scalatest.Matchers - -class ConnectorImplicitsTest extends FunSuite with Matchers with MockitoSugar { - - test("implicit map2Properties") { - verifyProperties(Map.empty) - verifyProperties(Map("One" -> "1", "Two" -> "2", "Three" ->"3")) - } - - def verifyProperties(map: Map[String, String]): Unit = { - val props: java.util.Properties = map - assert(props.size() == map.size) - map.foreach(p => assert(props.getProperty(p._1) == p._2)) - } - - test("Test Implicit SparkContext Conversion") { - val mockSparkContext = mock[SparkContext] - val gfscf: GemFireSparkContextFunctions = mockSparkContext - assert(gfscf.isInstanceOf[GemFireSparkContextFunctions]) - } - - test("Test Implicit SQLContext Conversion") { - val mockSQLContext = mock[SQLContext] - val gfscf: GemFireSQLContextFunctions = mockSQLContext - assert(gfscf.isInstanceOf[GemFireSQLContextFunctions]) - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireConnectionConfTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireConnectionConfTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireConnectionConfTest.scala deleted file mode 100644 index 0e06db4..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireConnectionConfTest.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package unittest.io.pivotal.gemfire.spark.connector - -import org.apache.spark.SparkConf -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, FunSuite} -import io.pivotal.gemfire.spark.connector._ - -class GemFireConnectionConfTest extends FunSuite with Matchers with MockitoSugar { - - test("apply(SparkConf) w/ GemFireLocator property and empty gemfireProps") { - val (host1, port1) = ("host1", 1234) - val (host2, port2) = ("host2", 5678) - val conf = new SparkConf().set(GemFireLocatorPropKey, s"$host1[$port1],$host2[$port2]") - val connConf = GemFireConnectionConf(conf) - assert(connConf.locators == Seq((host1, port1),(host2, port2))) - assert(connConf.gemfireProps.isEmpty) - } - - test("apply(SparkConf) w/ GemFireLocator property and gemfire properties") { - val (host1, port1) = ("host1", 1234) - val (host2, port2) = ("host2", 5678) - val (propK1, propV1) = ("ack-severe-alert-threshold", "1") - val (propK2, propV2) = ("ack-wait-threshold", "10") - val conf = new SparkConf().set(GemFireLocatorPropKey, s"$host1[$port1],$host2[$port2]") - .set(s"spark.gemfire.$propK1", propV1).set(s"spark.gemfire.$propK2", propV2) - val connConf = GemFireConnectionConf(conf) - assert(connConf.locators == Seq((host1, port1),(host2, port2))) - assert(connConf.gemfireProps == Map(propK1 -> propV1, propK2 -> propV2)) - } - - test("apply(SparkConf) w/o GemFireLocator property") { - intercept[RuntimeException] { GemFireConnectionConf(new SparkConf()) } - } - - test("apply(SparkConf) w/ invalid GemFireLocator property") { - val conf = new SparkConf().set(GemFireLocatorPropKey, "local^host:1234") - intercept[Exception] { GemFireConnectionConf(conf) } - } - - test("apply(locatorStr, gemfireProps) w/ valid locatorStr and non gemfireProps") { - val (host1, port1) = ("host1", 1234) - val connConf = GemFireConnectionConf(s"$host1:$port1") - assert(connConf.locators == Seq((host1, port1))) - assert(connConf.gemfireProps.isEmpty) - } - - test("apply(locatorStr, gemfireProps) w/ valid locatorStr and non-empty gemfireProps") { - val (host1, port1) = ("host1", 1234) - val (host2, port2) = ("host2", 5678) - val (propK1, propV1) = ("ack-severe-alert-threshold", "1") - val (propK2, propV2) = ("ack-wait-threshold", "10") - val props = Map(propK1 -> propV1, propK2 -> propV2) - val connConf = GemFireConnectionConf(s"$host1:$port1,$host2:$port2", props) - assert(connConf.locators == Seq((host1, port1),(host2, port2))) - assert(connConf.gemfireProps == props) - } - - test("apply(locatorStr, gemfireProps) w/ invalid locatorStr") { - intercept[Exception] { GemFireConnectionConf("local~host:4321") } - } - - test("constructor w/ empty (host,port) pairs") { - intercept[IllegalArgumentException] { new GemFireConnectionConf(Seq.empty) } - } - - test("getConnection() normal") { - implicit val mockFactory = mock[GemFireConnectionManager] - val mockConnection = mock[GemFireConnection] - when(mockFactory.getConnection(org.mockito.Matchers.any[GemFireConnectionConf])).thenReturn(mockConnection) - val connConf = GemFireConnectionConf("localhost:1234") - assert(connConf.getConnection == mockConnection) - verify(mockFactory).getConnection(connConf) - } - - test("getConnection() failure") { - implicit val mockFactory = mock[GemFireConnectionManager] - when(mockFactory.getConnection(org.mockito.Matchers.any[GemFireConnectionConf])).thenThrow(new RuntimeException) - val connConf = GemFireConnectionConf("localhost:1234") - intercept[RuntimeException] { connConf.getConnection } - verify(mockFactory).getConnection(connConf) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireDStreamFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireDStreamFunctionsTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireDStreamFunctionsTest.scala deleted file mode 100644 index 4117596..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireDStreamFunctionsTest.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package unittest.io.pivotal.gemfire.spark.connector - -import com.gemstone.gemfire.cache.Region -import io.pivotal.gemfire.spark.connector.{GemFireConnection, GemFireConnectionConf} -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.dstream.DStream -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, FunSuite} -import org.mockito.Matchers.{eq => mockEq, any => mockAny} - -import scala.reflect.ClassTag - -class GemFireDStreamFunctionsTest extends FunSuite with Matchers with MockitoSugar { - - test("test GemFirePairDStreamFunctions Implicit") { - import io.pivotal.gemfire.spark.connector.streaming._ - val mockDStream = mock[DStream[(Int, String)]] - // the implicit make the following line valid - val pairDStream: GemFirePairDStreamFunctions[Int, String] = mockDStream - pairDStream shouldBe a[GemFirePairDStreamFunctions[_, _]] - } - - test("test GemFireDStreamFunctions Implicit") { - import io.pivotal.gemfire.spark.connector.streaming._ - val mockDStream = mock[DStream[String]] - // the implicit make the following line valid - val dstream: GemFireDStreamFunctions[String] = mockDStream - dstream shouldBe a[GemFireDStreamFunctions[_]] - } - - def createMocks[K, V](regionPath: String) - (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]]) - : (String, GemFireConnectionConf, GemFireConnection, Region[K, V]) = { - val mockConnection = mock[GemFireConnection] - val mockConnConf = mock[GemFireConnectionConf] - val mockRegion = mock[Region[K, V]] - when(mockConnConf.getConnection).thenReturn(mockConnection) - when(mockConnConf.locators).thenReturn(Seq.empty) - (regionPath, mockConnConf, mockConnection, mockRegion) - } - - test("test GemFirePairDStreamFunctions.saveToGemfire()") { - import io.pivotal.gemfire.spark.connector.streaming._ - val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test") - val mockDStream = mock[DStream[(String, String)]] - mockDStream.saveToGemfire(regionPath, mockConnConf) - verify(mockConnConf).getConnection - verify(mockConnection).validateRegion[String, String](regionPath) - verify(mockDStream).foreachRDD(mockAny[(RDD[(String, String)]) => Unit]) - } - - test("test GemFireDStreamFunctions.saveToGemfire()") { - import io.pivotal.gemfire.spark.connector.streaming._ - val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, Int]("test") - val mockDStream = mock[DStream[String]] - mockDStream.saveToGemfire[String, Int](regionPath, (s: String) => (s, s.length), mockConnConf) - verify(mockConnConf).getConnection - verify(mockConnection).validateRegion[String, String](regionPath) - verify(mockDStream).foreachRDD(mockAny[(RDD[String]) => Unit]) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala deleted file mode 100644 index f2d49cb..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/GemFireRDDFunctionsTest.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package unittest.io.pivotal.gemfire.spark.connector - -import com.gemstone.gemfire.cache.Region -import io.pivotal.gemfire.spark.connector._ -import io.pivotal.gemfire.spark.connector.internal.rdd.{GemFireRDDWriter, GemFirePairRDDWriter} -import org.apache.spark.{TaskContext, SparkContext} -import org.apache.spark.rdd.RDD -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{FunSuite, Matchers} -import collection.JavaConversions._ -import scala.reflect.ClassTag -import org.mockito.Matchers.{eq => mockEq, any => mockAny} - -class GemFireRDDFunctionsTest extends FunSuite with Matchers with MockitoSugar { - - test("test PairRDDFunction Implicit") { - import io.pivotal.gemfire.spark.connector._ - val mockRDD = mock[RDD[(Int, String)]] - // the implicit make the following line valid - val pairRDD: GemFirePairRDDFunctions[Int, String] = mockRDD - pairRDD shouldBe a [GemFirePairRDDFunctions[_, _]] - } - - test("test RDDFunction Implicit") { - import io.pivotal.gemfire.spark.connector._ - val mockRDD = mock[RDD[String]] - // the implicit make the following line valid - val nonPairRDD: GemFireRDDFunctions[String] = mockRDD - nonPairRDD shouldBe a [GemFireRDDFunctions[_]] - } - - def createMocks[K, V](regionPath: String) - (implicit kt: ClassTag[K], vt: ClassTag[V], m: Manifest[Region[K, V]]): (String, GemFireConnectionConf, GemFireConnection, Region[K, V]) = { - val mockConnection = mock[GemFireConnection] - val mockConnConf = mock[GemFireConnectionConf] - val mockRegion = mock[Region[K, V]] - when(mockConnConf.getConnection).thenReturn(mockConnection) - when(mockConnection.getRegionProxy[K, V](regionPath)).thenReturn(mockRegion) - // mockRegion shouldEqual mockConn.getRegionProxy[K, V](regionPath) - (regionPath, mockConnConf, mockConnection, mockRegion) - } - - test("test GemFirePairRDDWriter") { - val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test") - val writer = new GemFirePairRDDWriter[String, String](regionPath, mockConnConf) - val data = List(("1", "one"), ("2", "two"), ("3", "three")) - writer.write(null, data.toIterator) - val expectedMap: Map[String, String] = data.toMap - verify(mockRegion).putAll(expectedMap) - } - - test("test GemFireNonPairRDDWriter") { - val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[Int, String]("test") - val writer = new GemFireRDDWriter[String, Int, String](regionPath, mockConnConf) - val data = List("a", "ab", "abc") - val f: String => (Int, String) = s => (s.length, s) - writer.write(f)(null, data.toIterator) - val expectedMap: Map[Int, String] = data.map(f).toMap - verify(mockRegion).putAll(expectedMap) - } - - test("test PairRDDFunctions.saveToGemfire") { - verifyPairRDDFunction(useOpConf = false) - } - - test("test PairRDDFunctions.saveToGemfire w/ opConf") { - verifyPairRDDFunction(useOpConf = true) - } - - def verifyPairRDDFunction(useOpConf: Boolean): Unit = { - import io.pivotal.gemfire.spark.connector._ - val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[String, String]("test") - val mockRDD = mock[RDD[(String, String)]] - val mockSparkContext = mock[SparkContext] - when(mockRDD.sparkContext).thenReturn(mockSparkContext) - val result = - if (useOpConf) - mockRDD.saveToGemfire(regionPath, mockConnConf, Map(RDDSaveBatchSizePropKey -> "5000")) - else - mockRDD.saveToGemfire(regionPath, mockConnConf) - verify(mockConnection, times(1)).validateRegion[String, String](regionPath) - result === Unit - verify(mockSparkContext, times(1)).runJob[(String, String), Unit]( - mockEq(mockRDD), mockAny[(TaskContext, Iterator[(String, String)]) => Unit])(mockAny(classOf[ClassTag[Unit]])) - - // Note: current implementation make following code not compilable - // so not negative test for this case - // val rdd: RDD[(K, V)] = ... - // rdd.saveToGemfire(regionPath, s => (s.length, s)) - } - - test("test RDDFunctions.saveToGemfire") { - verifyRDDFunction(useOpConf = false) - } - - test("test RDDFunctions.saveToGemfire w/ opConf") { - verifyRDDFunction(useOpConf = true) - } - - def verifyRDDFunction(useOpConf: Boolean): Unit = { - import io.pivotal.gemfire.spark.connector._ - val (regionPath, mockConnConf, mockConnection, mockRegion) = createMocks[Int, String]("test") - val mockRDD = mock[RDD[(String)]] - val mockSparkContext = mock[SparkContext] - when(mockRDD.sparkContext).thenReturn(mockSparkContext) - val result = - if (useOpConf) - mockRDD.saveToGemfire(regionPath, s => (s.length, s), mockConnConf, Map(RDDSaveBatchSizePropKey -> "5000")) - else - mockRDD.saveToGemfire(regionPath, s => (s.length, s), mockConnConf) - verify(mockConnection, times(1)).validateRegion[Int, String](regionPath) - result === Unit - verify(mockSparkContext, times(1)).runJob[String, Unit]( - mockEq(mockRDD), mockAny[(TaskContext, Iterator[String]) => Unit])(mockAny(classOf[ClassTag[Unit]])) - - // Note: current implementation make following code not compilable - // so not negative test for this case - // val rdd: RDD[T] = ... // T is not a (K, V) tuple - // rdd.saveToGemfire(regionPath) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala deleted file mode 100644 index bfb115a..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/LocatorHelperTest.scala +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package unittest.io.pivotal.gemfire.spark.connector - -import java.net.InetAddress - -import io.pivotal.gemfire.spark.connector.internal.LocatorHelper -import org.scalatest.FunSuite - -class LocatorHelperTest extends FunSuite { - - test("locatorStr2HostPortPair hostname w/o domain") { - val (host, port) = ("localhost", 10334) - assert(LocatorHelper.locatorStr2HostPortPair(s"$host:$port").get ==(host, port)) - assert(LocatorHelper.locatorStr2HostPortPair(s"$host[$port]").get ==(host, port)) - } - - test("locatorStr2HostPortPair hostname w/ domain") { - val (host, port) = ("localhost", 10334) - assert(LocatorHelper.locatorStr2HostPortPair(s"$host:$port").get ==(host, port)) - assert(LocatorHelper.locatorStr2HostPortPair(s"$host[$port]").get ==(host, port)) - } - - test("locatorStr2HostPortPair w/ invalid host name") { - // empty or null locatorStr - assert(LocatorHelper.locatorStr2HostPortPair("").isFailure) - assert(LocatorHelper.locatorStr2HostPortPair(null).isFailure) - // host name has leading `.` - assert(LocatorHelper.locatorStr2HostPortPair(".localhost.1234").isFailure) - // host name has leading and/or tail white space - assert(LocatorHelper.locatorStr2HostPortPair(" localhost.1234").isFailure) - assert(LocatorHelper.locatorStr2HostPortPair("localhost .1234").isFailure) - assert(LocatorHelper.locatorStr2HostPortPair(" localhost .1234").isFailure) - // host name contain invalid characters - assert(LocatorHelper.locatorStr2HostPortPair("local%host.1234").isFailure) - assert(LocatorHelper.locatorStr2HostPortPair("localhost*.1234").isFailure) - assert(LocatorHelper.locatorStr2HostPortPair("^localhost.1234").isFailure) - } - - test("locatorStr2HostPortPair w/ valid port") { - val host = "192.168.0.1" - // port has 2, 3, 4, 5 digits - assert(LocatorHelper.locatorStr2HostPortPair(s"$host:20").get ==(host, 20)) - assert(LocatorHelper.locatorStr2HostPortPair(s"$host:300").get ==(host, 300)) - assert(LocatorHelper.locatorStr2HostPortPair(s"$host:4000").get ==(host, 4000)) - assert(LocatorHelper.locatorStr2HostPortPair(s"$host:50000").get ==(host, 50000)) - } - - test("locatorStr2HostPortPair w/ invalid port") { - // port number is less than 2 digits - assert(LocatorHelper.locatorStr2HostPortPair("locslhost.9").isFailure) - // port number is more than 5 digits - assert(LocatorHelper.locatorStr2HostPortPair("locslhost.100000").isFailure) - // port number is invalid - assert(LocatorHelper.locatorStr2HostPortPair("locslhost.1xx1").isFailure) - } - - test("parseLocatorsString with valid locator(s)") { - val (host1, port1) = ("localhost", 10334) - assert(LocatorHelper.parseLocatorsString(s"$host1:$port1") == Seq((host1, port1))) - val (host2, port2) = ("localhost2", 10335) - assert(LocatorHelper.parseLocatorsString(s"$host1:$port1,$host2:$port2") == Seq((host1, port1),(host2, port2))) - val (host3, port3) = ("localhost2", 10336) - assert(LocatorHelper.parseLocatorsString(s"$host1:$port1,$host2:$port2,$host3:$port3") == - Seq((host1, port1),(host2, port2),(host3, port3))) - } - - test("parseLocatorsString with invalid locator(s)") { - // empty and null locatorsStr - intercept[Exception] { LocatorHelper.parseLocatorsString("") } - intercept[Exception] { LocatorHelper.parseLocatorsString(null) } - // 1 bad locatorStr - intercept[Exception] { LocatorHelper.parseLocatorsString("local%host.1234") } - // 1 good locatorStr and 1 bad locatorStr - intercept[Exception] { LocatorHelper.parseLocatorsString("localhost:2345,local%host.1234") } - intercept[Exception] { LocatorHelper.parseLocatorsString("local^host:2345,localhost.1234") } - } - - test("pickPreferredGemFireServers: shared servers and one gf-server per host") { - val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004)) - val servers = Seq(srv1, srv2, srv3, srv4) - verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3)) - verifyPickPreferredGemFireServers(servers, "host2", "0", Seq(srv2, srv3, srv4)) - verifyPickPreferredGemFireServers(servers, "host3", "1", Seq(srv3, srv4, srv1)) - verifyPickPreferredGemFireServers(servers, "host4", "2", Seq(srv4, srv1, srv2)) - } - - test("pickPreferredGemFireServers: shared servers, one gf-server per host, un-sorted list") { - val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004)) - val servers = Seq(srv4, srv2, srv3, srv1) - verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3)) - verifyPickPreferredGemFireServers(servers, "host2", "0", Seq(srv2, srv3, srv4)) - verifyPickPreferredGemFireServers(servers, "host3", "1", Seq(srv3, srv4, srv1)) - verifyPickPreferredGemFireServers(servers, "host4", "2", Seq(srv4, srv1, srv2)) - } - - test("pickPreferredGemFireServers: shared servers and two gf-server per host") { - val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), ("host2", 4003), ("host2", 4004)) - val servers = Seq(srv1, srv2, srv3, srv4) - verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3)) - verifyPickPreferredGemFireServers(servers, "host1", "0", Seq(srv2, srv1, srv3)) - verifyPickPreferredGemFireServers(servers, "host2", "1", Seq(srv3, srv4, srv1)) - verifyPickPreferredGemFireServers(servers, "host2", "2", Seq(srv4, srv3, srv1)) - } - - test("pickPreferredGemFireServers: shared servers, two gf-server per host, un-sorted server list") { - val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host1", 4002), ("host2", 4003), ("host2", 4004)) - val servers = Seq(srv1, srv4, srv3, srv2) - verifyPickPreferredGemFireServers(servers, "host1", "<driver>", Seq(srv1, srv2, srv3)) - verifyPickPreferredGemFireServers(servers, "host1", "0", Seq(srv2, srv1, srv3)) - verifyPickPreferredGemFireServers(servers, "host2", "1", Seq(srv3, srv4, srv1)) - verifyPickPreferredGemFireServers(servers, "host2", "2", Seq(srv4, srv3, srv1)) - } - - test("pickPreferredGemFireServers: no shared servers and one gf-server per host") { - val (srv1, srv2, srv3, srv4) = (("host1", 4001), ("host2", 4002), ("host3", 4003),("host4", 4004)) - val servers = Seq(srv1, srv2, srv3, srv4) - verifyPickPreferredGemFireServers(servers, "host5", "<driver>", Seq(srv1, srv2, srv3)) - verifyPickPreferredGemFireServers(servers, "host6", "0", Seq(srv2, srv3, srv4)) - verifyPickPreferredGemFireServers(servers, "host7", "1", Seq(srv3, srv4, srv1)) - verifyPickPreferredGemFireServers(servers, "host8", "2", Seq(srv4, srv1, srv2)) - } - - test("pickPreferredGemFireServers: no shared servers, one gf-server per host, and less gf-server") { - val (srv1, srv2) = (("host1", 4001), ("host2", 4002)) - val servers = Seq(srv1, srv2) - verifyPickPreferredGemFireServers(servers, "host5", "<driver>", Seq(srv1, srv2)) - verifyPickPreferredGemFireServers(servers, "host6", "0", Seq(srv2, srv1)) - verifyPickPreferredGemFireServers(servers, "host7", "1", Seq(srv1, srv2)) - verifyPickPreferredGemFireServers(servers, "host8", "2", Seq(srv2, srv1)) - - - println("host name: " + InetAddress.getLocalHost.getHostName) - println("canonical host name: " + InetAddress.getLocalHost.getCanonicalHostName) - println("canonical host name 2: " + InetAddress.getByName(InetAddress.getLocalHost.getHostName).getCanonicalHostName) - } - - test("pickPreferredGemFireServers: ad-hoc") { - val (srv4, srv5, srv6) = ( - ("w2-gst-pnq-04.gemstone.com", 40411), ("w2-gst-pnq-05.gemstone.com", 40411), ("w2-gst-pnq-06.gemstone.com", 40411)) - val servers = Seq(srv6, srv5, srv4) - verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-03.gemstone.com", "<driver>", Seq(srv4, srv5, srv6)) - verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-04.gemstone.com", "1", Seq(srv4, srv5, srv6)) - verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-05.gemstone.com", "0", Seq(srv5, srv6, srv4)) - verifyPickPreferredGemFireServers(servers, "w2-gst-pnq-06.gemstone.com", "2", Seq(srv6, srv4, srv5)) - } - - def verifyPickPreferredGemFireServers( - servers: Seq[(String, Int)], hostName: String, executorId: String, expectation: Seq[(String, Int)]): Unit = { - val result = LocatorHelper.pickPreferredGemFireServers(servers, hostName, executorId) - assert(result == expectation, s"pick servers for $hostName:$executorId") - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/760c6e22/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRDDPartitionerTest.scala ---------------------------------------------------------------------- diff --git a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRDDPartitionerTest.scala b/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRDDPartitionerTest.scala deleted file mode 100644 index f6a30c7..0000000 --- a/geode-spark-connector/gemfire-spark-connector/src/test/scala/unittest/io/pivotal/gemfire/spark/connector/rdd/GemFireRDDPartitionerTest.scala +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package unittest.io.pivotal.gemfire.spark.connector.rdd - -import com.gemstone.gemfire.distributed.internal.ServerLocation -import io.pivotal.gemfire.spark.connector.internal.RegionMetadata -import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartitioner._ -import io.pivotal.gemfire.spark.connector.GemFireConnection -import io.pivotal.gemfire.spark.connector.internal.rdd._ -import org.apache.spark.Partition -import org.mockito.Mockito._ -import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, FunSuite} - -import java.util.{HashSet => JHashSet, HashMap => JHashMap} - -import scala.collection.mutable - -class GemFireRDDPartitionerTest extends FunSuite with Matchers with MockitoSugar { - - val emptyServerBucketMap: JHashMap[ServerLocation, JHashSet[Integer]] = new JHashMap() - - def toJavaServerBucketMap(map: Map[(String, Int), Set[Int]]): JHashMap[ServerLocation, JHashSet[Integer]] = { - import scala.collection.JavaConversions._ - val tmp = map.map {case ((host, port), set) => (new ServerLocation(host, port), set.map(Integer.valueOf))} - (new JHashMap[ServerLocation, JHashSet[Integer]]() /: tmp) { case (acc, (s, jset)) => acc.put(s, new JHashSet(jset)); acc } - } - - val map: mutable.Map[(String, Int), mutable.Set[Int]] = mutable.Map( - ("s0",1) -> mutable.Set.empty, ("s1",2) -> mutable.Set(0), ("s2",3) -> mutable.Set(1, 2), ("s3",4) -> mutable.Set(3, 4, 5)) - - - // update this test whenever change default setting - test("default partitioned region partitioner") { - assert(GemFireRDDPartitioner.defaultPartitionedRegionPartitioner === ServerSplitsPartitioner) - } - - // update this test whenever change default setting - test("default replicated region partitioner") { - assert(GemFireRDDPartitioner.defaultReplicatedRegionPartitioner === OnePartitionPartitioner) - } - - test("GemFireRDDPartitioner.apply method") { - import io.pivotal.gemfire.spark.connector.internal.rdd.GemFireRDDPartitioner._ - for ((name, partitioner) <- partitioners) assert(GemFireRDDPartitioner(name) == partitioner) - assert(GemFireRDDPartitioner("dummy") == GemFireRDDPartitioner.defaultPartitionedRegionPartitioner) - assert(GemFireRDDPartitioner() == GemFireRDDPartitioner.defaultPartitionedRegionPartitioner) - } - - test("OnePartitionPartitioner") { - val mockConnection = mock[GemFireConnection] - val partitions = OnePartitionPartitioner.partitions[String, String](mockConnection, null, Map.empty) - verifySinglePartition(partitions) - } - - def verifySinglePartition(partitions: Array[Partition]): Unit = { - assert(1 == partitions.size) - assert(partitions(0).index === 0) - assert(partitions(0).isInstanceOf[GemFireRDDPartition]) - assert(partitions(0).asInstanceOf[GemFireRDDPartition].bucketSet.isEmpty) - } - - test("ServerSplitsPartitioner.doPartitions(): n=1 & no empty bucket") { - val map: List[(String, mutable.Set[Int])] = List( - "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4)) - val partitions = ServerSplitsPartitioner.doPartitions(map, 6, 1) - verifyPartitions(partitions, List( - (Set(0, 1, 2, 3), Seq("server1")), (Set(4, 5), Seq("server2")))) - } - - test("ServerSplitsPartitioner.doPartitions(): n=1 & 1 empty bucket") { - val map: List[(String, mutable.Set[Int])] = List( - "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4)) - val partitions = ServerSplitsPartitioner.doPartitions(map, 7, 1) - verifyPartitions(partitions, List( - (Set(0, 1, 2, 3, 6), Seq("server1")), (Set(4, 5), Seq("server2")))) - } - - test("ServerSplitsPartitioner.doPartitions(): n=1 & 2 empty bucket") { - val map: List[(String, mutable.Set[Int])] = List( - "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4)) - val partitions = ServerSplitsPartitioner.doPartitions(map, 8, 1) - verifyPartitions(partitions, List( - (Set(0, 1, 2, 3, 6), Seq("server1")), (Set(4, 5, 7), Seq("server2")))) - } - - test("ServerSplitsPartitioner.doPartitions(): n=1 & 5 empty bucket") { - val map: List[(String, mutable.Set[Int])] = List( - "server1" -> mutable.Set(3,2,1,0), "server2" -> mutable.Set(5,4)) - val partitions = ServerSplitsPartitioner.doPartitions(map, 11, 1) - verifyPartitions(partitions, List( - (Set(0, 1, 2, 3, 6, 7, 8), Seq("server1")), (Set(4, 5, 9, 10), Seq("server2")))) - } - - test("ServerSplitsPartitioner.doPartitions(): n=1, 4 empty-bucket, non-continuous IDs") { - val map: List[(String, mutable.Set[Int])] = List( - "server1" -> mutable.Set(1, 3), "server2" -> mutable.Set(5,7)) - val partitions = ServerSplitsPartitioner.doPartitions(map, 8, 1) - verifyPartitions(partitions, List( - (Set(0, 1, 2, 3), Seq("server1")), (Set(4, 5, 6, 7), Seq("server2")))) - } - - test("ServerSplitsPartitioner.doPartitions(): n=2, no empty buckets, 3 servers have 1, 2, and 3 buckets") { - val map: List[(String, mutable.Set[Int])] = List( - "s1" -> mutable.Set(0), "s2" -> mutable.Set(1, 2), "s3" -> mutable.Set(3, 4, 5)) - val partitions = ServerSplitsPartitioner.doPartitions(map, 6, 2) - // partitions.foreach(println) - verifyPartitions(partitions, List( - (Set(0), Seq("s1")), (Set(1), Seq("s2")), (Set(2), Seq("s2")), (Set(3, 4), Seq("s3")), (Set(5), Seq("s3")))) - } - - test("ServerSplitsPartitioner.doPartitions(): n=3, no empty buckets, 4 servers have 0, 2, 3, and 4 buckets") { - val map: List[(String, mutable.Set[Int])] = List( - "s0" -> mutable.Set.empty, "s1" -> mutable.Set(0, 1), "s2" -> mutable.Set(2, 3, 4), "s3" -> mutable.Set(5, 6, 7, 8)) - val partitions = ServerSplitsPartitioner.doPartitions(map, 9, 3) - // partitions.foreach(println) - verifyPartitions(partitions, List( - (Set(0), Seq("s1")), (Set(1), Seq("s1")), (Set(2), Seq("s2")), (Set(3), Seq("s2")), (Set(4), Seq("s2")), - (Set(5, 6), Seq("s3")), (Set(7, 8), Seq("s3")) )) - } - - test("ServerSplitsPartitioner.partitions(): metadata = None ") { - val regionPath = "test" - val mockConnection = mock[GemFireConnection] - intercept[RuntimeException] { ServerSplitsPartitioner.partitions[String, String](mockConnection, null, Map.empty) } - } - - test("ServerSplitsPartitioner.partitions(): replicated region ") { - val regionPath = "test" - val mockConnection = mock[GemFireConnection] - val md = new RegionMetadata(regionPath, false, 11, null) - when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md)) - val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map.empty) - verifySinglePartition(partitions) - } - - test("ServerSplitsPartitioner.partitions(): partitioned region w/o data ") { - val regionPath = "test" - val mockConnection = mock[GemFireConnection] - val md = new RegionMetadata(regionPath, true, 6, emptyServerBucketMap) - when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md)) - val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map.empty) - verifySinglePartition(partitions) - } - - test("ServerSplitsPartitioner.partitions(): partitioned region w/ some data ") { - import io.pivotal.gemfire.spark.connector.NumberPartitionsPerServerPropKey - val regionPath = "test" - val mockConnection = mock[GemFireConnection] - val map: Map[(String, Int), Set[Int]] = Map( - ("s0",1) -> Set.empty, ("s1",2) -> Set(0), ("s2",3) -> Set(1, 2), ("s3",4) -> Set(3, 4, 5)) - val md = new RegionMetadata(regionPath, true, 6, toJavaServerBucketMap(map)) - when(mockConnection.getRegionMetadata(regionPath)).thenReturn(Some(md)) - val partitions = ServerSplitsPartitioner.partitions[String, String](mockConnection, md, Map(NumberPartitionsPerServerPropKey->"2")) - // partitions.foreach(println) - verifyPartitions(partitions, List( - (Set(0), Seq("s1")), (Set(1), Seq("s2")), (Set(2), Seq("s2")), (Set(3, 4), Seq("s3")), (Set(5), Seq("s3")))) - } - - // Note: since the order of partitions is not pre-determined, we have to verify partition id - // and contents separately - def verifyPartitions(partitions: Array[Partition], expPartitions: List[(Set[Int], Seq[String])]): Unit = { - // 1. check size - assert(partitions.size == expPartitions.size) - // 2. check IDs are 0 to n-1 - (0 until partitions.size).toList.zip(partitions).foreach { case (id, p) => assert(id == p.index) } - - // 3. get all pairs of bucket set and its locations, and compare to the expected pairs - val list = partitions.map { e => - val p = e.asInstanceOf[GemFireRDDPartition] - (p.bucketSet, p.locations) - } - expPartitions.foreach(e => assert(list.contains(e))) - } - -}