This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch WAYANG-28 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 4cdd676f519952aba107e80532e1bba220ea1ddf Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Fri May 14 08:55:46 2021 -0400 [WAYANG-28] creation a base of DataQuantaHackit and extension of test --- .../wayang-hackit/wayang-hackit-api/pom.xml | 30 + .../plugin/hackit/api/DataQuantaHackit.scala | 395 ++++++++++++ .../java/org/apache/wayang/api/JavaApiTest.java | 711 --------------------- .../test/scala/org/apache/wayang/api/ApiTest.scala | 575 ----------------- .../plugin/hackit/api/ApiExtensionTest.scala | 41 ++ .../wayang-hackit-api/src/test/wayang.properties | 18 + 6 files changed, 484 insertions(+), 1286 deletions(-) diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml b/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml index 548f9e6..ae95199 100644 --- a/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml +++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml @@ -30,6 +30,11 @@ <artifactId>wayang-hackit-api</artifactId> + <properties> + <java-module-name>org.apache.wayang.plugin.hackit.api</java-module-name> + <spark.version>2.4.0</spark.version> + </properties> + <dependencyManagement> <dependencies> <dependency> @@ -50,6 +55,31 @@ </dependency> <dependency> <groupId>org.apache.wayang</groupId> + <artifactId>wayang-api-scala-java_2.11</artifactId> + <version>0.6.0-SNAPSHOT</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.wayang</groupId> + <artifactId>wayang-spark_${scala.mayor.version}</artifactId> + <version>0.6.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.mayor.version}</artifactId> + <version>${spark.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.wayang</groupId> + <artifactId>wayang-sqlite3</artifactId> + <version>0.6.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.wayang</groupId> <artifactId>wayang-java</artifactId> <version>0.6.0-SNAPSHOT</version> <scope>test</scope> diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala new file mode 100644 index 0000000..3f76c7b --- /dev/null +++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.plugin.hackit.api + +import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, groupedDataSetType, groupedDataUnitType} +import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaCreator, KeyedDataQuanta} +import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2} +import org.apache.wayang.basic.function.ProjectionDescriptor +import org.apache.wayang.basic.operators.{CartesianOperator, CoGroupOperator, CountOperator, DistinctOperator, FilterOperator, FlatMapOperator, GlobalMaterializedGroupOperator, GlobalReduceOperator, IntersectOperator, JoinOperator, MapOperator, MapPartitionsOperator, MaterializedGroupByOperator, ReduceByOperator, SampleOperator, SortOperator, UnionAllOperator, ZipWithIdOperator} +import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate} +import org.apache.wayang.core.function.{FlatMapDescriptor, MapPartitionsDescriptor, PredicateDescriptor, ReduceDescriptor, TransformationDescriptor} +import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval +import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator +import org.apache.wayang.core.plan.wayangplan.{ElementaryOperator, Operator, OutputSlot, WayangPlan} + +import java.lang +import java.lang.{Iterable => JavaIterable} +import java.util.function.IntUnaryOperator +import scala.reflect.ClassTag + +/** + * Represents an intermediate result/data flow edge in a [[WayangPlan]]. + * + * @param operator a unary [[Operator]] that produces this instance + * @param ev$1 the data type of the elements in this instance + * @param planBuilder keeps track of the [[WayangPlan]] being build + */ +class DataQuantaHackit[Out: ClassTag] + (override val operator: ElementaryOperator, outputIndex: Int = 0) + (implicit override val planBuilder: PlanBuilder) + extends DataQuanta[Out](operator, outputIndex) { + + /** + * Feed this instance into a [[MapOperator]]. + * + * @param udf a Java 8 lambda expression as UDF for the [[MapOperator]] + * @param udfLoad optional [[LoadProfileEstimator]] for the `udf` + * @return a new instance representing the [[MapOperator]]'s output + */ + override def mapJava[NewOut: ClassTag](udf: SerializableFunction[Out, NewOut], + udfLoad: LoadProfileEstimator = null): DataQuantaHackit[NewOut] = { + val lala = new SerializableFunction[Out, NewOut] { + override def apply(t: Out): NewOut = { + println(t) + udf.apply(t) + } + } + val mapOperator = new MapOperator(new TransformationDescriptor( + lala, basicDataUnitType[Out], basicDataUnitType[NewOut], udfLoad + )) + this.connectTo(mapOperator, 0) + DataQuantaHackit.wrap[NewOut](mapOperator) + } + + /** + * Feed this instance into a [[MapPartitionsOperator]]. + * + * @param udf a Java 8 lambda expression as UDF for the [[MapPartitionsOperator]] + * @param selectivity selectivity of the UDF + * @param udfLoad optional [[LoadProfileEstimator]] for the `udf` + * @return a new instance representing the [[MapOperator]]'s output + */ + override def mapPartitionsJava[NewOut: ClassTag](udf: SerializableFunction[JavaIterable[Out], JavaIterable[NewOut]], + selectivity: ProbabilisticDoubleInterval = null, + udfLoad: LoadProfileEstimator = null): DataQuantaHackit[NewOut] = { + val mapOperator = new MapPartitionsOperator( + new MapPartitionsDescriptor(udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad) + ) + this.connectTo(mapOperator, 0) + DataQuantaHackit.wrap[NewOut](mapOperator) + } + + /** + * Feed this instance into a [[MapOperator]] with a [[ProjectionDescriptor]]. + * + * @param fieldNames names of the fields to be projected + * @return a new instance representing the [[MapOperator]]'s output + */ + override def project[NewOut: ClassTag](fieldNames: Seq[String]): DataQuantaHackit[NewOut] = { + val projectionOperator = new MapOperator( + new ProjectionDescriptor(basicDataUnitType[Out], basicDataUnitType[NewOut], fieldNames: _*) + ) + this.connectTo(projectionOperator, 0) + DataQuantaHackit.wrap[NewOut](projectionOperator) + } + + /** + * Feed this instance into a [[FilterOperator]]. + * + * @param udf UDF for the [[FilterOperator]] + * @param sqlUdf UDF as SQL `WHERE` clause + * @param selectivity selectivity of the UDF + * @param udfLoad optional [[LoadProfileEstimator]] for the `udf` + * @return a new instance representing the [[FilterOperator]]'s output + */ + override def filterJava(udf: SerializablePredicate[Out], + sqlUdf: String = null, + selectivity: ProbabilisticDoubleInterval = null, + udfLoad: LoadProfileEstimator = null): DataQuantaHackit[Out] = { + val filterOperator = new FilterOperator(new PredicateDescriptor( + udf, this.output.getType.getDataUnitType.toBasicDataUnitType, selectivity, udfLoad + ).withSqlImplementation(sqlUdf)) + this.connectTo(filterOperator, 0) + DataQuantaHackit.wrap[Out](filterOperator) + } + + /** + * Feed this instance into a [[FlatMapOperator]]. + * + * @param udf a Java 8 lambda expression as UDF for the [[FlatMapOperator]] + * @param selectivity selectivity of the UDF + * @param udfLoad optional [[LoadProfileEstimator]] for the `udf` + * @return a new instance representing the [[FlatMapOperator]]'s output + */ + override def flatMapJava[NewOut: ClassTag](udf: SerializableFunction[Out, JavaIterable[NewOut]], + selectivity: ProbabilisticDoubleInterval = null, + udfLoad: LoadProfileEstimator = null): DataQuantaHackit[NewOut] = { + val flatMapOperator = new FlatMapOperator(new FlatMapDescriptor( + udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad + )) + this.connectTo(flatMapOperator, 0) + DataQuantaHackit.wrap[NewOut](flatMapOperator) + } + + + /** + * Feed this instance into a [[SampleOperator]]. + * + * @param sampleSizeFunction absolute size of the sample as a function of the current iteration number + * @param datasetSize optional size of the dataset to be sampled + * @param sampleMethod the [[SampleOperator.Methods]] to use for sampling + * @return a new instance representing the [[FlatMapOperator]]'s output + */ + override def sampleDynamicJava(sampleSizeFunction: IntUnaryOperator, + datasetSize: Long = SampleOperator.UNKNOWN_DATASET_SIZE, + seed: Option[Long] = None, + sampleMethod: SampleOperator.Methods = SampleOperator.Methods.ANY): DataQuantaHackit[Out] = { + if (seed.isEmpty) { + val sampleOperator = new SampleOperator( + sampleSizeFunction, + dataSetType[Out], + sampleMethod + ) + sampleOperator.setDatasetSize(datasetSize) + this.connectTo(sampleOperator, 0) + DataQuantaHackit.wrap[Out](sampleOperator) + } + else { + val sampleOperator = new SampleOperator( + sampleSizeFunction, + dataSetType[Out], + sampleMethod, + seed.get + ) + sampleOperator.setDatasetSize(datasetSize) + this.connectTo(sampleOperator, 0) + DataQuantaHackit.wrap[Out](sampleOperator) + } + } + + /** + * Assigns this instance a key extractor, which enables some key-based operations. + * + * @see KeyedDataQuanta + * @param keyExtractor extracts the key from the [[DataQuantaDefault]] + * @return the [[KeyedDataQuanta]] + */ + //TODO validate this implementation + override def keyByJava[Key: ClassTag](keyExtractor: SerializableFunction[Out, Key]) : KeyedDataQuanta[Out, Key] = { + new KeyedDataQuanta[Out, Key](this, keyExtractor) + } + + /** + * Feed this instance into a [[ReduceByOperator]]. + * + * @param keyUdf UDF to extract the grouping key from the data quanta + * @param udf aggregation UDF for the [[ReduceByOperator]] + * @param udfLoad optional [[LoadProfileEstimator]] for the `udf` + * @return a new instance representing the [[ReduceByOperator]]'s output + */ + override def reduceByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key], + udf: SerializableBinaryOperator[Out], + udfLoad: LoadProfileEstimator = null) + : DataQuantaHackit[Out] = { + val reduceByOperator = new ReduceByOperator( + new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]), + new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad) + ) + this.connectTo(reduceByOperator, 0) + DataQuantaHackit.wrap[Out](reduceByOperator) + } + + /** + * Feed this instance into a [[MaterializedGroupByOperator]]. + * + * @param keyUdf UDF to extract the grouping key from the data quanta + * @param keyUdfLoad optional [[LoadProfileEstimator]] for the `keyUdf` + * @return a new instance representing the [[MaterializedGroupByOperator]]'s output + */ + override def groupByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key], + keyUdfLoad: LoadProfileEstimator = null): DataQuantaHackit[java.lang.Iterable[Out]] = { + val groupByOperator = new MaterializedGroupByOperator( + new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key], keyUdfLoad), + dataSetType[Out], + groupedDataSetType[Out] + ) + this.connectTo(groupByOperator, 0) + DataQuantaHackit.wrap[java.lang.Iterable[Out]](groupByOperator) + } + + /** + * Feed this instance into a [[GlobalReduceOperator]]. + * + * @param udf aggregation UDF for the [[GlobalReduceOperator]] + * @param udfLoad optional [[LoadProfileEstimator]] for the `udf` + * @return a new instance representing the [[GlobalReduceOperator]]'s output + */ + override def reduceJava(udf: SerializableBinaryOperator[Out], + udfLoad: LoadProfileEstimator = null): DataQuantaHackit[Out] = { + val globalReduceOperator = new GlobalReduceOperator( + new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad) + ) + this.connectTo(globalReduceOperator, 0) + DataQuantaHackit.wrap[Out](globalReduceOperator) + } + + /** + * Feeds this and a further instance into a [[JoinOperator]]. + * + * @param thisKeyUdf UDF to extract keys from data quanta in this instance + * @param that the other instance + * @param thatKeyUdf UDF to extract keys from data quanta from `that` instance + * @return a new instance representing the [[JoinOperator]]'s output + */ + override def joinJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuantaHackit[WayangTuple2[Out, ThatOut]] = { + require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") + val joinOperator = new JoinOperator( + new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]), + new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key]) + ) + this.connectTo(joinOperator, 0) + that.connectTo(joinOperator, 1) + DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](joinOperator) + } + + /** + * Feeds this and a further instance into a [[CoGroupOperator]]. + * + * @param thisKeyUdf UDF to extract keys from data quanta in this instance + * @param that the other instance + * @param thatKeyUdf UDF to extract keys from data quanta from `that` instance + * @return a new instance representing the [[CoGroupOperator]]'s output + */ + override def coGroupJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuantaHackit[WayangTuple2[JavaIterable[Out], JavaIterable[ThatOut]]] = { + require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") + val coGroupOperator = new CoGroupOperator( + new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]), + new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key]) + ) + this.connectTo(coGroupOperator, 0) + that.connectTo(coGroupOperator, 1) + DataQuantaHackit.wrap[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]](coGroupOperator) + } + + + /** + * Feeds this and a further instance into a [[SortOperator]]. + * + * @param keyUdf UDF to extract key from data quanta in this instance + * @return a new instance representing the [[SortOperator]]'s output + */ + override def sortJava[Key: ClassTag] + (keyUdf: SerializableFunction[Out, Key]) + : DataQuantaHackit[Out] = { + val sortOperator = new SortOperator(new TransformationDescriptor( + keyUdf, basicDataUnitType[Out], basicDataUnitType[Key])) + this.connectTo(sortOperator, 0) + DataQuantaHackit.wrap[Out](sortOperator) + } + + + /** + * Feed this instance into a [[GlobalMaterializedGroupOperator]]. + * + * @return a new instance representing the [[GlobalMaterializedGroupOperator]]'s output + */ + override def group(): DataQuantaHackit[JavaIterable[Out]] = { + val groupOperator = new GlobalMaterializedGroupOperator(dataSetType[Out], groupedDataSetType[Out]) + this.connectTo(groupOperator, 0) + DataQuantaHackit.wrap[JavaIterable[Out]](groupOperator) + } + + /** + * Feed this instance and a further instance into a [[UnionAllOperator]]. + * + * @param that the other instance to union with + * @return a new instance representing the [[UnionAllOperator]]'s output + */ + override def union(that: DataQuanta[Out]): DataQuantaHackit[Out] = { + require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") + val unionAllOperator = new UnionAllOperator(dataSetType[Out]) + this.connectTo(unionAllOperator, 0) + that.connectTo(unionAllOperator, 1) + DataQuantaHackit.wrap[Out](unionAllOperator) + } + + /** + * Feed this instance and a further instance into a [[IntersectOperator]]. + * + * @param that the other instance to intersect with + * @return a new instance representing the [[IntersectOperator]]'s output + */ + override def intersect(that: DataQuanta[Out]): DataQuantaHackit[Out] = { + require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") + val intersectOperator = new IntersectOperator(dataSetType[Out]) + this.connectTo(intersectOperator, 0) + that.connectTo(intersectOperator, 1) + DataQuantaHackit.wrap[Out](intersectOperator) + } + + /** + * Feeds this and a further instance into a [[CartesianOperator]]. + * + * @param that the other instance + * @return a new instance representing the [[CartesianOperator]]'s output + */ + override def cartesian[ThatOut: ClassTag](that: DataQuanta[ThatOut]): DataQuantaHackit[WayangTuple2[Out, ThatOut]] = { + require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") + val cartesianOperator = new CartesianOperator(dataSetType[Out], dataSetType[ThatOut]) + this.connectTo(cartesianOperator, 0) + that.connectTo(cartesianOperator, 1) + DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](cartesianOperator) + } + + /** + * Feeds this instance into a [[ZipWithIdOperator]]. + * + * @return a new instance representing the [[ZipWithIdOperator]]'s output + */ + override def zipWithId: DataQuantaHackit[WayangTuple2[lang.Long, Out]] = { + val zipWithIdOperator = new ZipWithIdOperator(dataSetType[Out]) + this.connectTo(zipWithIdOperator, 0) + DataQuantaHackit.wrap[WayangTuple2[lang.Long, Out]](zipWithIdOperator) + } + + /** + * Feeds this instance into a [[DistinctOperator]]. + * + * @return a new instance representing the [[DistinctOperator]]'s output + */ + override def distinct: DataQuantaHackit[Out] = { + val distinctOperator = new DistinctOperator(dataSetType[Out]) + this.connectTo(distinctOperator, 0) + DataQuantaHackit.wrap[Out](distinctOperator) + } + + /** + * Feeds this instance into a [[CountOperator]]. + * + * @return a new instance representing the [[CountOperator]]'s output + */ + override def count: DataQuantaHackit[lang.Long] = { + val countOperator = new CountOperator(dataSetType[Out]) + this.connectTo(countOperator, 0) + DataQuantaHackit.wrap[lang.Long](countOperator) + } +} + +object DataQuantaHackit extends DataQuantaCreator{ + + def wrap[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 0)(implicit planBuilder: PlanBuilder): DataQuantaHackit[T] = { + new DataQuantaHackit[T](operator, outputIndex) + } + + def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): DataQuantaHackit[_] = + new DataQuantaHackit(output.getOwner.asInstanceOf[ElementaryOperator], output.getIndex)(ClassTag(output.getType.getDataUnitType.getTypeClass), planBuilder) + +} diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/java/org/apache/wayang/api/JavaApiTest.java b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/java/org/apache/wayang/api/JavaApiTest.java deleted file mode 100644 index aaf1f16..0000000 --- a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/java/org/apache/wayang/api/JavaApiTest.java +++ /dev/null @@ -1,711 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.api; - -import org.apache.wayang.core.api.WayangContext; -import org.apache.wayang.core.util.WayangCollections; -import org.apache.wayang.java.Java; -//import org.apache.wayang.spark.Spark; -//import org.apache.wayang.sqlite3.Sqlite3; -//import org.apache.wayang.sqlite3.operators.Sqlite3TableSource; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -/** - * Test suite for the Java API. - */ -public class JavaApiTest { - -// private Configuration sqlite3Configuration; -// -// @Before -// public void setUp() throws SQLException, IOException { -// // Generate test data. -// this.sqlite3Configuration = new Configuration(); -// File sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db"); -// sqlite3dbFile.deleteOnExit(); -// this.sqlite3Configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath()); -// try (Connection connection = Sqlite3.platform().createDatabaseDescriptor(this.sqlite3Configuration).createJdbcConnection()) { -// Statement statement = connection.createStatement(); -// statement.addBatch("DROP TABLE IF EXISTS customer;"); -// statement.addBatch("CREATE TABLE customer (name TEXT, age INT);"); -// statement.addBatch("INSERT INTO customer VALUES ('John', 20)"); -// statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)"); -// statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)"); -// statement.executeBatch(); -// } -// } - - @Test - public void testMapReduce() { - WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); - JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext); - - List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4); - Collection<Integer> outputCollection = javaPlanBuilder - .loadCollection(inputCollection).withName("load numbers") - .map(i -> i * i).withName("square") - .reduce((a, b) -> a + b).withName("sum") - .collect(); - - Assert.assertEquals(WayangCollections.asSet(1 + 4 + 9 + 16), WayangCollections.asSet(outputCollection)); - } - -// @Test -// public void testMapReduceBy() { -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); -// JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext); -// -// List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4); -// Collection<Integer> outputCollection = javaPlanBuilder -// .loadCollection(inputCollection).withName("load numbers") -// .map(i -> i * i).withName("square") -// .reduceByKey(i -> i & 1, (a, b) -> a + b).withName("sum") -// .collect(); -// -// Assert.assertEquals(WayangCollections.asSet(4 + 16, 1 + 9), WayangCollections.asSet(outputCollection)); -// } -// -// @Test -// public void testBroadcast2() { -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); -// JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext); -// -// List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4); -// List<Integer> offsetCollection = Collections.singletonList(-2); -// -// LoadCollectionDataQuantaBuilder<Integer> offsetDataQuanta = javaPlanBuilder -// .loadCollection(offsetCollection) -// .withName("load offset"); -// -// Collection<Integer> outputCollection = javaPlanBuilder -// .loadCollection(inputCollection).withName("load numbers") -// .map(new AddOffset("offset")).withName("add offset").withBroadcast(offsetDataQuanta, "offset") -// .collect(); -// -// Assert.assertEquals(WayangCollections.asSet(-2, -1, 0, 1, 2), WayangCollections.asSet(outputCollection)); -// } -// -// @Test -// public void testCustomOperatorShortCut() { -// // Set up WayangContext. -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); -// -// final List<Integer> inputValues = WayangArrays.asList(0, 1, 2, 3); -// -// // Build and execute a Wayang plan. -// final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext) -// .loadCollection(inputValues).withName("Load input values") -// .<Integer>customOperator(new JavaMapOperator<>( -// DataSetType.createDefault(Integer.class), -// DataSetType.createDefault(Integer.class), -// new TransformationDescriptor<>( -// i -> i + 2, -// Integer.class, Integer.class -// ) -// )).withName("Add 2") -// .collect(); -// -// // Check the outcome. -// final List<Integer> expectedOutputValues = WayangArrays.asList(2, 3, 4, 5); -// Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues)); -// } -// -// @Test -// public void testWordCount() { -// // Set up WayangContext. -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); -// -// final List<String> inputValues = Arrays.asList("Big data is big.", "Is data big data?"); -// -// // Build and execute a Wayang plan. -// final Collection<Tuple2<String, Integer>> outputValues = new JavaPlanBuilder(wayangContext) -// .loadCollection(inputValues).withName("Load input values") -// .flatMap(line -> Arrays.asList(line.split("\\s+"))).withName("Split words") -// .map(token -> token.replaceAll("\\W+", "").toLowerCase()).withName("To lower case") -// .map(word -> new Tuple2<>(word, 1)).withName("Attach counter") -// .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.field0, t1.field1 + t2.field1)).withName("Sum counters") -// .collect(); -// -// // Check the outcome. -// final Set<Tuple2<String, Integer>> expectedOutputValues = WayangCollections.asSet( -// new Tuple2<>("big", 3), -// new Tuple2<>("is", 2), -// new Tuple2<>("data", 3) -// ); -// Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues)); -// } -// -// @Test -// public void testWordCountOnSparkAndJava() { -// // Set up WayangContext. -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin()); -// -// final List<String> inputValues = Arrays.asList("Big data is big.", "Is data big data?"); -// -// // Build and execute a Wayang plan. -// final Collection<Tuple2<String, Integer>> outputValues = new JavaPlanBuilder(wayangContext) -// .loadCollection(inputValues).withName("Load input values") -// .flatMap(line -> Arrays.asList(line.split("\\s+"))).withName("Split words") -// .map(token -> token.replaceAll("\\W+", "").toLowerCase()).withName("To lower case") -// .map(word -> new Tuple2<>(word, 1)).withName("Attach counter") -// .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.field0, t1.field1 + t2.field1)).withName("Sum counters") -// .collect(); -// -// // Check the outcome. -// final Set<Tuple2<String, Integer>> expectedOutputValues = WayangCollections.asSet( -// new Tuple2<>("big", 3), -// new Tuple2<>("is", 2), -// new Tuple2<>("data", 3) -// ); -// Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues)); -// } -// -// @Test -// public void testSample() { -// // Set up WayangContext. -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin()); -// -// // Create some input values. -// final List<Integer> inputValues = WayangArrays.asList(WayangArrays.range(100)); -// -// // Build and execute a Wayang plan. -// final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext) -// .loadCollection(inputValues).withName("Load input values") -// .sample(10).withName("Sample") -// .collect(); -// -// // Check the outcome. -// Assert.assertEquals(10, outputValues.size()); -// Assert.assertEquals(10, WayangCollections.asSet(outputValues).size()); -// -// } -// -// @Test -// public void testDoWhile() { -// // Set up WayangContext. -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); -// -// // Generate test data. -// final List<Integer> inputValues = WayangArrays.asList(1, 2); -// -// // Build and execute a word count WayangPlan. -// -// final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext) -// .loadCollection(inputValues).withName("Load input values") -// .doWhile( -// values -> values.stream().mapToInt(i -> i).sum() > 100, -// start -> { -// final GlobalReduceDataQuantaBuilder<Integer> sum = -// start.reduce((a, b) -> a + b).withName("sum"); -// return new Tuple<>( -// start.union(sum).withName("Old+new"), -// sum -// ); -// } -// ).withConditionClass(Integer.class).withName("While <= 100") -// .collect(); -// -// Set<Integer> expectedValues = WayangCollections.asSet(1, 2, 3, 6, 12, 24, 48, 96, 192); -// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); -// } -// -// private static class AddOffset implements FunctionDescriptor.ExtendedSerializableFunction<Integer, Integer> { -// -// private final String broadcastName; -// -// private int offset; -// -// public AddOffset(String broadcastName) { -// this.broadcastName = broadcastName; -// } -// -// @Override -// public void open(ExecutionContext ctx) { -// this.offset = WayangCollections.getSingle(ctx.<Integer>getBroadcast(this.broadcastName)); -// } -// -// @Override -// public Integer apply(Integer input) { -// return input + this.offset; -// } -// } -// -// @Test -// public void testRepeat() { -// // Set up WayangContext. -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); -// -// // Generate test data. -// final List<Integer> inputValues = WayangArrays.asList(1, 2); -// -// // Build and execute a word count WayangPlan. -// -// final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext) -// .loadCollection(inputValues).withName("Load input values") -// .repeat(3, start -> start -// .reduce((a, b) -> a * b).withName("Multiply") -// .flatMap(v -> Arrays.asList(v, v + 1)).withName("Duplicate").withOutputClass(Integer.class) -// ).withName("Repeat 3x") -// .collect(); -// -// Set<Integer> expectedValues = WayangCollections.asSet(42, 43); -// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); -// } -// -// private static class SelectWords implements PredicateDescriptor.ExtendedSerializablePredicate<String> { -// -// private final String broadcastName; -// -// private Collection<Character> selectors; -// -// public SelectWords(String broadcastName) { -// this.broadcastName = broadcastName; -// } -// -// @Override -// public void open(ExecutionContext ctx) { -// this.selectors = ctx.getBroadcast(this.broadcastName); -// } -// -// @Override -// public boolean test(String word) { -// return this.selectors.stream().anyMatch(c -> word.indexOf(c) >= 0); -// } -// } -// -// @Test -// public void testBroadcast() { -// // Set up WayangContext. -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); -// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); -// -// // Generate test data. -// final List<String> inputValues = Arrays.asList("Hello", "World", "Hi", "Mars"); -// final List<Character> selectors = Arrays.asList('o', 'l'); -// -// // Execute the job. -// final DataQuantaBuilder<?, Character> selectorsDataSet = builder.loadCollection(selectors).withName("Load selectors"); -// final Collection<String> outputValues = builder -// .loadCollection(inputValues).withName("Load input values") -// .filter(new SelectWords("selectors")).withName("Filter words") -// .withBroadcast(selectorsDataSet, "selectors") -// .collect(); -// -// // Verify the outcome. -// Set<String> expectedValues = WayangCollections.asSet("Hello", "World"); -// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); -// } -// -// @Test -// public void testGroupBy() { -// // Set up WayangContext. -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); -// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); -// -// // Generate test data. -// final List<Integer> inputValues = Arrays.asList(1, 2, 3, 4, 5, 7, 8, 9, 10); -// -// // Execute the job. -// final Collection<Double> outputValues = builder -// .loadCollection(inputValues).withName("Load input values") -// .groupByKey(i -> i % 2).withName("group odd and even") -// .map(group -> { -// List<Integer> sortedGroup = StreamSupport.stream(group.spliterator(), false) -// .sorted() -// .collect(Collectors.toList()); -// int sizeDivTwo = sortedGroup.size() / 2; -// return sortedGroup.size() % 2 == 0 ? -// (sortedGroup.get(sizeDivTwo - 1) + sortedGroup.get(sizeDivTwo)) / 2d : -// (double) sortedGroup.get(sizeDivTwo); -// }) -// .collect(); -// -// // Verify the outcome. -// Set<Double> expectedValues = WayangCollections.asSet(5d, 6d); -// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); -// } -// -// @Test -// public void testJoin() { -// // Set up WayangContext. -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); -// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); -// -// // Generate test data. -// final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList( -// new Tuple2<>("Water", 0), -// new Tuple2<>("Tonic", 5), -// new Tuple2<>("Juice", 10) -// ); -// final List<Tuple2<String, String>> inputValues2 = Arrays.asList( -// new Tuple2<>("Apple juice", "Juice"), -// new Tuple2<>("Tap water", "Water"), -// new Tuple2<>("Orange juice", "Juice") -// ); -// -// // Execute the job. -// final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1); -// final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2); -// final Collection<Tuple2<String, Integer>> outputValues = dataQuanta1 -// .join(Tuple2::getField0, dataQuanta2, Tuple2::getField1) -// .map(joinTuple -> new Tuple2<>(joinTuple.getField1().getField0(), joinTuple.getField0().getField1())) -// .collect(); -// -// // Verify the outcome. -// Set<Tuple2<String, Integer>> expectedValues = WayangCollections.asSet( -// new Tuple2<>("Apple juice", 10), -// new Tuple2<>("Orange juice", 10), -// new Tuple2<>("Tap water", 0) -// ); -// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); -// } -// -// @Test -// public void testJoinAndAssemble() { -// // Set up WayangContext. -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); -// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); -// -// // Generate test data. -// final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList( -// new Tuple2<>("Water", 0), -// new Tuple2<>("Tonic", 5), -// new Tuple2<>("Juice", 10) -// ); -// final List<Tuple2<String, String>> inputValues2 = Arrays.asList( -// new Tuple2<>("Apple juice", "Juice"), -// new Tuple2<>("Tap water", "Water"), -// new Tuple2<>("Orange juice", "Juice") -// ); -// -// // Execute the job. -// final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1); -// final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2); -// final Collection<Tuple2<String, Integer>> outputValues = dataQuanta1.keyBy(Tuple2::getField0) -// .join(dataQuanta2.keyBy(Tuple2::getField1)) -// .assemble((val1, val2) -> new Tuple2<>(val2.getField0(), val1.getField1())) -// .collect(); -// -// // Verify the outcome. -// Set<Tuple2<String, Integer>> expectedValues = WayangCollections.asSet( -// new Tuple2<>("Apple juice", 10), -// new Tuple2<>("Orange juice", 10), -// new Tuple2<>("Tap water", 0) -// ); -// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); -// } -// -// @Test -// public void testCoGroup() { -// // Set up WayangContext. -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin()); -// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); -// -// // Generate test data. -// final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList( -// new Tuple2<>("Water", 0), -// new Tuple2<>("Cola", 5), -// new Tuple2<>("Juice", 10) -// ); -// final List<Tuple2<String, String>> inputValues2 = Arrays.asList( -// new Tuple2<>("Apple juice", "Juice"), -// new Tuple2<>("Tap water", "Water"), -// new Tuple2<>("Orange juice", "Juice") -// ); -// -// // Execute the job. -// final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1); -// final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2); -// final Collection<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> outputValues = dataQuanta1 -// .coGroup(Tuple2::getField0, dataQuanta2, Tuple2::getField1) -// .map(joinTuple -> new Tuple2<>( -// WayangCollections.asSet(joinTuple.getField0()), -// WayangCollections.asSet(joinTuple.getField1()) -// )) -// .collect(); -// -// // Verify the outcome. -// Set<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> expectedValues = WayangCollections.asSet( -// new Tuple2<>( -// WayangCollections.asSet(new Tuple2<>("Water", 0)), -// WayangCollections.asSet(new Tuple2<>("Tap water", "Water")) -// ), -// new Tuple2<>( -// WayangCollections.asSet(new Tuple2<>("Cola", 5)), -// WayangCollections.asSet() -// ), new Tuple2<>( -// WayangCollections.asSet(new Tuple2<>("Juice", 10)), -// WayangCollections.asSet(new Tuple2<>("Apple juice", "Juice"), new Tuple2<>("Orange juice", "Juice")) -// ) -// ); -// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); -// } -// -// @Test -// public void testCoGroupViaKeyBy() { -// // Set up WayangContext. -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin()); -// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); -// -// // Generate test data. -// final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList( -// new Tuple2<>("Water", 0), -// new Tuple2<>("Cola", 5), -// new Tuple2<>("Juice", 10) -// ); -// final List<Tuple2<String, String>> inputValues2 = Arrays.asList( -// new Tuple2<>("Apple juice", "Juice"), -// new Tuple2<>("Tap water", "Water"), -// new Tuple2<>("Orange juice", "Juice") -// ); -// -// // Execute the job. -// final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1); -// final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2); -// final Collection<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> outputValues = -// dataQuanta1.keyBy(Tuple2::getField0) -// .coGroup(dataQuanta2.keyBy(Tuple2::getField1)) -// .map(joinTuple -> new Tuple2<>( -// WayangCollections.asSet(joinTuple.getField0()), -// WayangCollections.asSet(joinTuple.getField1()) -// )) -// .collect(); -// -// // Verify the outcome. -// Set<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> expectedValues = WayangCollections.asSet( -// new Tuple2<>( -// WayangCollections.asSet(new Tuple2<>("Water", 0)), -// WayangCollections.asSet(new Tuple2<>("Tap water", "Water")) -// ), -// new Tuple2<>( -// WayangCollections.asSet(new Tuple2<>("Cola", 5)), -// WayangCollections.asSet() -// ), new Tuple2<>( -// WayangCollections.asSet(new Tuple2<>("Juice", 10)), -// WayangCollections.asSet(new Tuple2<>("Apple juice", "Juice"), new Tuple2<>("Orange juice", "Juice")) -// ) -// ); -// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); -// } -// -// @Test -// public void testIntersect() { -// // Set up WayangContext. -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); -// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); -// -// // Generate test data. -// final List<Integer> inputValues1 = Arrays.asList(1, 2, 3, 4, 5, 7, 8, 9, 10); -// final List<Integer> inputValues2 = Arrays.asList(0, 2, 3, 3, 4, 5, 7, 8, 9, 11); -// -// // Execute the job. -// final LoadCollectionDataQuantaBuilder<Integer> dataQuanta1 = builder.loadCollection(inputValues1); -// final LoadCollectionDataQuantaBuilder<Integer> dataQuanta2 = builder.loadCollection(inputValues2); -// final Collection<Integer> outputValues = dataQuanta1.intersect(dataQuanta2).collect(); -// -// // Verify the outcome. -// Set<Integer> expectedValues = WayangCollections.asSet(2, 3, 4, 5, 7, 8, 9); -// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues)); -// } -// -// @Test -// public void testSort() { -// // Set up WayangContext. -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); -// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); -// -// // Generate test data. -// final List<Integer> inputValues1 = Arrays.asList(3, 4, 5, 2, 1); -// -// // Execute the job. -// final LoadCollectionDataQuantaBuilder<Integer> dataQuanta1 = builder.loadCollection(inputValues1); -// final Collection<Integer> outputValues = dataQuanta1.sort(r -> r).collect(); -// -// // Verify the outcome. -// List<Integer> expectedValues = Arrays.asList(1, 2, 3, 4, 5); -// Assert.assertEquals(expectedValues, WayangCollections.asList(outputValues)); -// } -// -// -// @Test -// public void testPageRank() { -// // Set up WayangContext. -// WayangContext wayangContext = new WayangContext() -// .with(Java.basicPlugin()) -// .with(Java.graphPlugin()); -// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); -// -// // Create a test graph. -// Collection<Tuple2<Long, Long>> edges = Arrays.asList( -// new Tuple2<>(0L, 1L), -// new Tuple2<>(0L, 2L), -// new Tuple2<>(0L, 3L), -// new Tuple2<>(1L, 0L), -// new Tuple2<>(2L, 1L), -// new Tuple2<>(3L, 2L), -// new Tuple2<>(3L, 1L) -// ); -// -// // Execute the job. -// Collection<Tuple2<Long, Float>> pageRanks = builder.loadCollection(edges).asEdges() -// .pageRank(20) -// .collect(); -// List<Tuple2<Long, Float>> sortedPageRanks = new ArrayList<>(pageRanks); -// sortedPageRanks.sort((pr1, pr2) -> Float.compare(pr2.field1, pr1.field1)); -// -// System.out.println(sortedPageRanks); -// Assert.assertEquals(1L, sortedPageRanks.get(0).field0.longValue()); -// Assert.assertEquals(0L, sortedPageRanks.get(1).field0.longValue()); -// Assert.assertEquals(2L, sortedPageRanks.get(2).field0.longValue()); -// Assert.assertEquals(3L, sortedPageRanks.get(3).field0.longValue()); -// } -// -// @Test -// public void testMapPartitions() { -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); -// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); -// -// // Generate test data. -// List<Integer> inputValues = WayangArrays.asList(0, 1, 2, 3, 4, 6, 8); -// -// // Execute the job. -// Collection<Tuple2<String, Integer>> outputValues = builder.loadCollection(inputValues) -// .mapPartitions(partition -> { -// int numEvens = 0, numOdds = 0; -// for (Integer value : partition) { -// if ((value & 1) == 0) numEvens++; -// else numOdds++; -// } -// return Arrays.asList( -// new Tuple2<>("odd", numOdds), -// new Tuple2<>("even", numEvens) -// ); -// }) -// .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())) -// .collect(); -// -// // Check the output. -// Set<Tuple2<String, Integer>> expectedOutput = WayangCollections.asSet( -// new Tuple2<>("even", 5), new Tuple2<>("odd", 2) -// ); -// Assert.assertEquals(expectedOutput, WayangCollections.asSet(outputValues)); -// } -// -// @Test -// public void testZipWithId() { -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); -// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); -// -// // Generate test data. -// List<Integer> inputValues = new ArrayList<>(42 * 100); -// for (int i = 0; i < 100; i++) { -// for (int j = 0; j < 42; j++) { -// inputValues.add(i); -// } -// } -// -// // Execute the job. -// Collection<Tuple2<Integer, Integer>> outputValues = builder.loadCollection(inputValues) -// .zipWithId() -// .groupByKey(Tuple2::getField1) -// .map(group -> { -// int distinctIds = (int) StreamSupport.stream(group.spliterator(), false) -// .map(Tuple2::getField0) -// .distinct() -// .count(); -// return new Tuple2<>(distinctIds, 1); -// }) -// .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())) -// .collect(); -// -// // Check the output. -// Set<Tuple2<Integer, Integer>> expectedOutput = Collections.singleton(new Tuple2<>(42, 100)); -// Assert.assertEquals(expectedOutput, WayangCollections.asSet(outputValues)); -// } -// -// @Test -// public void testWriteTextFile() throws IOException, URISyntaxException { -// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()); -// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext); -// -// // Generate test data. -// List<Double> inputValues = Arrays.asList(0d, 1 / 3d, 2 / 3d, 1d, 4 / 3d, 5 / 3d); -// -// // Execute the job. -// File tempDir = LocalFileSystem.findTempDir(); -// String targetUrl = LocalFileSystem.toURL(new File(tempDir, "testWriteTextFile.txt")); -// -// builder -// .loadCollection(inputValues) -// .writeTextFile(targetUrl, d -> String.format("%.2f", d), "testWriteTextFile()"); -// -// // Check the output. -// Set<String> actualLines = Files.lines(Paths.get(new URI(targetUrl))).collect(Collectors.toSet()); -// Set<String> expectedLines = inputValues.stream().map(d -> String.format("%.2f", d)).collect(Collectors.toSet()); -// Assert.assertEquals(expectedLines, actualLines); -// } -// -// @Test -// public void testSqlOnJava() throws IOException, SQLException { -// // Execute job. -// final WayangContext wayangCtx = new WayangContext(this.sqlite3Configuration) -// .with(Java.basicPlugin()) -// .with(Sqlite3.plugin()); -// JavaPlanBuilder builder = new JavaPlanBuilder(wayangCtx, "testSqlOnJava()"); -// final Collection<String> outputValues = builder -// .readTable(new Sqlite3TableSource("customer", "name", "age")) -// .filter(r -> (Integer) r.getField(1) >= 18).withSqlUdf("age >= 18").withTargetPlatform(Java.platform()) -// .asRecords().projectRecords(new String[]{"name"}) -// .map(record -> (String) record.getField(0)) -// .collect(); -// -// // Test the outcome. -// Assert.assertEquals( -// WayangCollections.asSet("John", "Evelyn"), -// WayangCollections.asSet(outputValues) -// ); -// } -// -// @Test -// public void testSqlOnSqlite3() throws IOException, SQLException { -// // Execute job. -// final WayangContext wayangCtx = new WayangContext(this.sqlite3Configuration) -// .with(Java.basicPlugin()) -// .with(Sqlite3.plugin()); -// JavaPlanBuilder builder = new JavaPlanBuilder(wayangCtx, "testSqlOnSqlite3()"); -// final Collection<String> outputValues = builder -// .readTable(new Sqlite3TableSource("customer", "name", "age")) -// .filter(r -> (Integer) r.getField(1) >= 18).withSqlUdf("age >= 18") -// .asRecords().projectRecords(new String[]{"name"}).withTargetPlatform(Sqlite3.platform()) -// .map(record -> (String) record.getField(0)) -// .collect(); -// -// // Test the outcome. -// Assert.assertEquals( -// WayangCollections.asSet("John", "Evelyn"), -// WayangCollections.asSet(outputValues) -// ); -// } - -} diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/api/ApiTest.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/api/ApiTest.scala deleted file mode 100644 index ed1a7ac..0000000 --- a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/api/ApiTest.scala +++ /dev/null @@ -1,575 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.api - -import org.apache.wayang.basic.WayangBasics -import org.apache.wayang.core.api.{Configuration, WayangContext} -import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializablePredicate -import org.apache.wayang.core.function.{ExecutionContext, TransformationDescriptor} -import org.apache.wayang.core.util.fs.LocalFileSystem -import org.apache.wayang.java.Java -import org.junit.{Assert, Test} - -import java.io.File -import java.net.URI -import java.nio.file.{Files, Paths} -import java.sql.{Connection, Statement} -import java.util.function.Consumer - -/** - * Tests the Wayang API. - */ -class ApiTest { - - @Test - def testReadMapCollect(): Unit = { - // Set up WayangContext. - val wayangContext = new WayangContext() - .withPlugin(Java.basicPlugin) -// .withPlugin(Spark.basicPlugin) - // Generate some test data. - val inputValues = (for (i <- 1 to 10) yield i).toArray - - // Build and execute a Wayang plan. - val outputValues = wayangContext - .loadCollection(inputValues).withName("Load input values") - .map(_ + 2).withName("Add 2") - .collect() - - // Check the outcome. - val expectedOutputValues = inputValues.map(_ + 2) - Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray) - } - -// @Test -// def testCustomOperator(): Unit = { -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) -// -// // Generate some test data. -// val inputValues = (for (i <- 1 to 10) yield i).toArray -// -// // Build and execute a Wayang plan. -// val inputDataSet = wayang.loadCollection(inputValues).withName("Load input values") -// -// // Add the custom operator. -// val IndexedSeq(addedValues) = wayang.customOperator(new JavaMapOperator( -// dataSetType[Int], -// dataSetType[Int], -// new TransformationDescriptor( -// toSerializableFunction[Int, Int](_ + 2), -// basicDataUnitType[Int], basicDataUnitType[Int] -// ) -// ), inputDataSet) -// addedValues.withName("Add 2") -// -// // Collect the result. -// val outputValues = addedValues.asInstanceOf[DataQuanta[Int]].collect() -// -// // Check the outcome. -// val expectedOutputValues = inputValues.map(_ + 2) -// Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray) -// } -// -// @Test -// def testCustomOperatorShortCut(): Unit = { -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) -// -// // Generate some test data. -// val inputValues = (for (i <- 1 to 10) yield i).toArray -// -// // Build and execute a Wayang plan. -// val outputValues = wayang -// .loadCollection(inputValues).withName("Load input values") -// .customOperator[Int](new JavaMapOperator( -// dataSetType[Int], -// dataSetType[Int], -// new TransformationDescriptor( -// toSerializableFunction[Int, Int](_ + 2), -// basicDataUnitType[Int], basicDataUnitType[Int] -// ) -// )).withName("Add 2") -// .collect() -// -// // Check the outcome. -// val expectedOutputValues = inputValues.map(_ + 2) -// Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray) -// } -// -// @Test -// def testWordCount(): Unit = { -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) -// -// // Generate some test data. -// val inputValues = Array("Big data is big.", "Is data big data?") -// -// // Build and execute a word count WayangPlan. -// val wordCounts = wayang -// .loadCollection(inputValues).withName("Load input values") -// .flatMap(_.split("\\s+")).withName("Split words") -// .map(_.replaceAll("\\W+", "").toLowerCase).withName("To lowercase") -// .map((_, 1)).withName("Attach counter") -// .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)).withName("Sum counters") -// .collect().toSet -// -// val expectedWordCounts = Set(("big", 3), ("is", 2), ("data", 3)) -// -// Assert.assertEquals(expectedWordCounts, wordCounts) -// } -// -// @Test -// def testWordCountOnSparkAndJava(): Unit = { -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) -// -// // Generate some test data. -// val inputValues = Array("Big data is big.", "Is data big data?") -// -// // Build and execute a word count WayangPlan. -// val wordCounts = wayang -// .loadCollection(inputValues).withName("Load input values").withTargetPlatforms(Java.platform) -// .flatMap(_.split("\\s+")).withName("Split words").withTargetPlatforms(Java.platform) -// .map(_.replaceAll("\\W+", "").toLowerCase).withName("To lowercase").withTargetPlatforms(Spark.platform) -// .map((_, 1)).withName("Attach counter").withTargetPlatforms(Spark.platform) -// .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)).withName("Sum counters").withTargetPlatforms(Spark.platform) -// .collect().toSet -// -// val expectedWordCounts = Set(("big", 3), ("is", 2), ("data", 3)) -// -// Assert.assertEquals(expectedWordCounts, wordCounts) -// } -// -// @Test -// def testSample(): Unit = { -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) -// -// // Generate some test data. -// val inputValues = for (i <- 0 until 100) yield i -// -// // Build and execute the WayangPlan. -// val sample = wayang -// .loadCollection(inputValues) -// .sample(10) -// .collect() -// -// // Check the result. -// Assert.assertEquals(10, sample.size) -// Assert.assertEquals(10, sample.toSet.size) -// } -// -// @Test -// def testDoWhile(): Unit = { -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) -// -// // Generate some test data. -// val inputValues = Array(1, 2) -// -// // Build and execute a word count WayangPlan. -// -// val values = wayang -// .loadCollection(inputValues).withName("Load input values") -// .doWhile[Int](vals => vals.max > 100, { -// start => -// val sum = start.reduce(_ + _).withName("Sum") -// (start.union(sum).withName("Old+new"), sum) -// }).withName("While <= 100") -// .collect().toSet -// -// val expectedValues = Set(1, 2, 3, 6, 12, 24, 48, 96, 192) -// Assert.assertEquals(expectedValues, values) -// } -// -// @Test -// def testRepeat(): Unit = { -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) -// -// // Generate some test data. -// val inputValues = Array(1, 2) -// -// // Build and execute a word count WayangPlan. -// -// val values = wayang -// .loadCollection(inputValues).withName("Load input values").withName(inputValues.mkString(",")) -// .repeat(3, -// _.reduce(_ * _).withName("Multiply") -// .flatMap(v => Seq(v, v + 1)).withName("Duplicate") -// ).withName("Repeat 3x") -// .collect().toSet -// -// // initial: 1,2 -> 1st: 2,3 -> 2nd: 6,7 => 3rd: 42,43 -// val expectedValues = Set(42, 43) -// Assert.assertEquals(expectedValues, values) -// } -// -// @Test -// def testBroadcast() = { -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) -// val builder = new PlanBuilder(wayang) -// -// // Generate some test data. -// val inputStrings = Array("Hello", "World", "Hi", "Mars") -// val selectors = Array('o', 'l') -// -// val selectorsDataSet = builder.loadCollection(selectors).withName("Load selectors") -// -// // Build and execute a word count WayangPlan. -// val values = builder -// .loadCollection(inputStrings).withName("Load input values") -// .filterJava(new ExtendedSerializablePredicate[String] { -// -// var selectors: Iterable[Char] = _ -// -// override def open(ctx: ExecutionContext): Unit = { -// import scala.collection.JavaConversions._ -// selectors = collectionAsScalaIterable(ctx.getBroadcast[Char]("selectors")) -// } -// -// override def test(t: String): Boolean = selectors.forall(selector => t.contains(selector)) -// -// }).withName("Filter words") -// .withBroadcast(selectorsDataSet, "selectors") -// .collect().toSet -// -// val expectedValues = Set("Hello", "World") -// Assert.assertEquals(expectedValues, values) -// } -// -// @Test -// def testGroupBy() = { -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) -// -// val inputValues = Array(1, 2, 3, 4, 5, 7, 8, 9, 10) -// -// val result = wayang -// .loadCollection(inputValues) -// .groupByKey(_ % 2).withName("group odd and even") -// .map { -// group => -// import scala.collection.JavaConversions._ -// val buffer = group.toBuffer -// buffer.sortBy(identity) -// if (buffer.size % 2 == 0) (buffer(buffer.size / 2 - 1) + buffer(buffer.size / 2)) / 2 -// else buffer(buffer.size / 2) -// }.withName("median") -// .collect() -// -// val expectedValues = Set(5, 6) -// Assert.assertEquals(expectedValues, result.toSet) -// } -// -// @Test -// def testGroup() = { -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) -// -// val inputValues = Array(1, 2, 3, 4, 5, 7, 8, 9, 10) -// -// val result = wayang -// .loadCollection(inputValues) -// .group() -// .map { -// group => -// import scala.collection.JavaConversions._ -// val buffer = group.toBuffer -// buffer.sortBy(int => int) -// if (buffer.size % 2 == 0) (buffer(buffer.size / 2) + buffer(buffer.size / 2 + 1)) / 2 -// else buffer(buffer.size / 2) -// } -// .collect() -// -// val expectedValues = Set(5) -// Assert.assertEquals(expectedValues, result.toSet) -// } -// -// @Test -// def testJoin() = { -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) -// -// val inputValues1 = Array(("Water", 0), ("Tonic", 5), ("Juice", 10)) -// val inputValues2 = Array(("Apple juice", "Juice"), ("Tap water", "Water"), ("Orange juice", "Juice")) -// -// val builder = new PlanBuilder(wayang) -// val dataQuanta1 = builder.loadCollection(inputValues1) -// val dataQuanta2 = builder.loadCollection(inputValues2) -// val result = dataQuanta1 -// .join[(String, String), String](_._1, dataQuanta2, _._2) -// .map(joinTuple => (joinTuple.field1._1, joinTuple.field0._2)) -// .collect() -// -// val expectedValues = Set(("Apple juice", 10), ("Tap water", 0), ("Orange juice", 10)) -// Assert.assertEquals(expectedValues, result.toSet) -// } -// -// @Test -// def testJoinAndAssemble() = { -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) -// -// val inputValues1 = Array(("Water", 0), ("Tonic", 5), ("Juice", 10)) -// val inputValues2 = Array(("Apple juice", "Juice"), ("Tap water", "Water"), ("Orange juice", "Juice")) -// -// val builder = new PlanBuilder(wayang) -// val dataQuanta1 = builder.loadCollection(inputValues1) -// val dataQuanta2 = builder.loadCollection(inputValues2) -// val result = dataQuanta1.keyBy(_._1).join(dataQuanta2.keyBy(_._2)) -// .assemble((dq1, dq2) => (dq2._1, dq1._2)) -// .collect() -// -// val expectedValues = Set(("Apple juice", 10), ("Tap water", 0), ("Orange juice", 10)) -// Assert.assertEquals(expectedValues, result.toSet) -// } -// -// -// @Test -// def testCoGroup() = { -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) -// -// val inputValues1 = Array(("Water", 0), ("Cola", 5), ("Juice", 10)) -// val inputValues2 = Array(("Apple juice", "Juice"), ("Tap water", "Water"), ("Orange juice", "Juice")) -// -// val builder = new PlanBuilder(wayang) -// val dataQuanta1 = builder.loadCollection(inputValues1) -// val dataQuanta2 = builder.loadCollection(inputValues2) -// val result = dataQuanta1 -// .coGroup[(String, String), String](_._1, dataQuanta2, _._2) -// .collect() -// -// import scala.collection.JavaConversions._ -// val actualValues = result.map(coGroup => (coGroup.field0.toSet, coGroup.field1.toSet)).toSet -// val expectedValues = Set( -// (Set(("Water", 0)), Set(("Tap water", "Water"))), -// (Set(("Cola", 5)), Set()), -// (Set(("Juice", 10)), Set(("Apple juice", "Juice"), ("Orange juice", "Juice"))) -// ) -// Assert.assertEquals(expectedValues, actualValues) -// } -// -// @Test -// def testIntersect() = { -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) -// -// val inputValues1 = Array(1, 2, 3, 4, 5, 7, 8, 9, 10) -// val inputValues2 = Array(0, 2, 3, 3, 4, 5, 7, 8, 9, 11) -// -// val builder = new PlanBuilder(wayang) -// val dataQuanta1 = builder.loadCollection(inputValues1) -// val dataQuanta2 = builder.loadCollection(inputValues2) -// val result = dataQuanta1 -// .intersect(dataQuanta2) -// .collect() -// -// val expectedValues = Set(2, 3, 4, 5, 7, 8, 9) -// Assert.assertEquals(expectedValues, result.toSet) -// } -// -// -// @Test -// def testSort() = { -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) -// -// val inputValues1 = Array(3, 4, 5, 2, 1) -// -// val builder = new PlanBuilder(wayang) -// val dataQuanta1 = builder.loadCollection(inputValues1) -// val result = dataQuanta1 -// .sort(r=>r) -// .collect() -// -// val expectedValues = Array(1, 2, 3, 4, 5) -// Assert.assertArrayEquals(expectedValues, result.toArray) -// } -// -// -// @Test -// def testPageRank() = { -// // Set up WayangContext. -// val wayang = new WayangContext() -// .withPlugin(Java.graphPlugin) -// .withPlugin(WayangBasics.graphPlugin) -// .withPlugin(Java.basicPlugin) -// import org.apache.wayang.api.graph._ -// -// val edges = Seq((0, 1), (0, 2), (0, 3), (1, 0), (2, 1), (3, 2), (3, 1)).map(t => Edge(t._1, t._2)) -// -// val pageRanks = wayang -// .loadCollection(edges).withName("Load edges") -// .pageRank(20).withName("PageRank") -// .collect() -// .map(t => t.field0.longValue -> t.field1) -// .toMap -// -// print(pageRanks) -// // Let's not check absolute numbers but only the relative ordering. -// Assert.assertTrue(pageRanks(1) > pageRanks(0)) -// Assert.assertTrue(pageRanks(0) > pageRanks(2)) -// Assert.assertTrue(pageRanks(2) > pageRanks(3)) -// } -// -// @Test -// def testMapPartitions() = { -// // Set up WayangContext. -// val wayang = new WayangContext() -// .withPlugin(Java.basicPlugin()) -// .withPlugin(Spark.basicPlugin) -// -// val typeCounts = wayang -// .loadCollection(Seq(0, 1, 2, 3, 4, 6, 8)) -// .mapPartitions { ints => -// var (numOdds, numEvens) = (0, 0) -// ints.foreach(i => if ((i & 1) == 0) numEvens += 1 else numOdds += 1) -// Seq(("odd", numOdds), ("even", numEvens)) -// } -// .reduceByKey(_._1, { case ((kind1, count1), (kind2, count2)) => (kind1, count1 + count2) }) -// .collect() -// -// Assert.assertEquals(Set(("odd", 2), ("even", 5)), typeCounts.toSet) -// } -// -// @Test -// def testZipWithId() = { -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) -// -// val inputValues = for (i <- 0 until 100; j <- 0 until 42) yield i -// -// val result = wayang -// .loadCollection(inputValues) -// .zipWithId -// .groupByKey(_.field1) -// .map { group => -// import scala.collection.JavaConversions._ -// (group.map(_.field0).toSet.size, 1) -// } -// .reduceByKey(_._1, (t1, t2) => (t1._1, t1._2 + t2._2)) -// .collect() -// -// val expectedValues = Set((42, 100)) -// Assert.assertEquals(expectedValues, result.toSet) -// } -// -// @Test -// def testWriteTextFile() = { -// val tempDir = LocalFileSystem.findTempDir -// val targetUrl = LocalFileSystem.toURL(new File(tempDir, "testWriteTextFile.txt")) -// -// // Set up WayangContext. -// val wayang = new WayangContext().withPlugin(Java.basicPlugin) -// -// val inputValues = for (i <- 0 to 5) yield i * 0.333333333333 -// -// val result = wayang -// .loadCollection(inputValues) -// .writeTextFile(targetUrl, formatterUdf = d => f"${d % .2f}") -// -// val lines = scala.collection.mutable.Set[String]() -// Files.lines(Paths.get(new URI(targetUrl))).forEach(new Consumer[String] { -// override def accept(line: String): Unit = lines += line -// }) -// -// val expectedLines = inputValues.map(v => f"${v % .2f}").toSet -// Assert.assertEquals(expectedLines, lines) -// } -// -// @Test -// def testSqlOnJava() = { -// // Initialize some test data. -// val configuration = new Configuration -// val sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db") -// sqlite3dbFile.deleteOnExit() -// configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath) -// -// try { -// val connection: Connection = Sqlite3.platform.createDatabaseDescriptor(configuration).createJdbcConnection -// try { -// val statement: Statement = connection.createStatement -// statement.addBatch("DROP TABLE IF EXISTS customer;") -// statement.addBatch("CREATE TABLE customer (name TEXT, age INT);") -// statement.addBatch("INSERT INTO customer VALUES ('John', 20)") -// statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)") -// statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)") -// statement.executeBatch() -// } finally { -// if (connection != null) connection.close() -// } -// } -// -// // Set up WayangContext. -// val wayang = new WayangContext(configuration).withPlugin(Java.basicPlugin).withPlugin(Sqlite3.plugin) -// -// val result = wayang -// .readTable(new Sqlite3TableSource("customer", "name", "age")) -// .filter(r => r.getField(1).asInstanceOf[Integer] >= 18, sqlUdf = "age >= 18").withTargetPlatforms(Java.platform) -// .projectRecords(Seq("name")) -// .map(_.getField(0).asInstanceOf[String]) -// .collect() -// .toSet -// -// val expectedValues = Set("John", "Evelyn") -// Assert.assertEquals(expectedValues, result) -// } -// -// @Test -// def testSqlOnSqlite3() = { -// // Initialize some test data. -// val configuration = new Configuration -// val sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db") -// sqlite3dbFile.deleteOnExit() -// configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath) -// -// try { -// val connection: Connection = Sqlite3.platform.createDatabaseDescriptor(configuration).createJdbcConnection -// try { -// val statement: Statement = connection.createStatement -// statement.addBatch("DROP TABLE IF EXISTS customer;") -// statement.addBatch("CREATE TABLE customer (name TEXT, age INT);") -// statement.addBatch("INSERT INTO customer VALUES ('John', 20)") -// statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)") -// statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)") -// statement.executeBatch -// } finally { -// if (connection != null) connection.close() -// } -// } -// -// // Set up WayangContext. -// val wayang = new WayangContext(configuration).withPlugin(Java.basicPlugin).withPlugin(Sqlite3.plugin) -// -// val result = wayang -// .readTable(new Sqlite3TableSource("customer", "name", "age")) -// .filter(r => r.getField(1).asInstanceOf[Integer] >= 18, sqlUdf = "age >= 18") -// .projectRecords(Seq("name")).withTargetPlatforms(Sqlite3.platform) -// .map(_.getField(0).asInstanceOf[String]) -// .collect() -// .toSet -// -// val expectedValues = Set("John", "Evelyn") -// Assert.assertEquals(expectedValues, result) -// } -} diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala new file mode 100644 index 0000000..59a0fd9 --- /dev/null +++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.plugin.hackit.api + +import org.apache.wayang.api.ApiTest +import org.apache.wayang.api.dataquanta.DataQuantaFactory +import org.junit.{BeforeClass, Test} +import org.junit.jupiter.api.{BeforeAll, BeforeEach} + +class ApiExtensionTest extends ApiTest { + + @BeforeEach + def setUp() ={ + DataQuantaFactory.setTemplate(DataQuantaHackit); + } + + @Test + override def testReadMapCollect(): Unit = { + DataQuantaFactory.setTemplate(DataQuantaHackit); + + super.testReadMapCollect() + } + +} diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/wayang.properties b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/wayang.properties new file mode 100644 index 0000000..b296279 --- /dev/null +++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/wayang.properties @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +spark.driver.host = localhost
