This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch WAYANG-28 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit a272d73203333bd4b141e923e525bc6df6c635e1 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Tue May 18 00:50:30 2021 -0400 [WAYANG-28] seed version of the Hackit API --- .../apache/wayang/api/dataquanta/DataQuanta.scala | 2 +- .../wayang-hackit/wayang-hackit-api/pom.xml | 5 + .../plugin/hackit/api/DataQuantaHackit.scala | 226 +++++++++++++-------- .../apache/wayang/plugin/hackit/api/Hackit.scala | 78 +++++++ .../plugin/hackit/api/ApiExtensionTest.scala | 42 +++- 5 files changed, 260 insertions(+), 93 deletions(-) diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala index 664079b..4f9eb43 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala @@ -49,7 +49,7 @@ import scala.reflect._ * @param ev$1 the data type of the elements in this instance * @param planBuilder keeps track of the [[WayangPlan]] being build */ -abstract class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: Int = 0)(implicit val planBuilder: PlanBuilder) { +abstract class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, var outputIndex: Int = 0)(implicit val planBuilder: PlanBuilder) { Validate.isTrue(operator.getNumOutputs > outputIndex) diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml b/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml index ae95199..85a37b7 100644 --- a/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml +++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml @@ -50,6 +50,11 @@ <dependencies> <dependency> <groupId>org.apache.wayang</groupId> + <artifactId>wayang-hackit-core</artifactId> + <version>0.6.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.wayang</groupId> <artifactId>wayang-api-scala-java_2.11</artifactId> <version>0.6.0-SNAPSHOT</version> </dependency> diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala index 3f76c7b..f1505e0 100644 --- a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala +++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala @@ -29,10 +29,17 @@ import org.apache.wayang.core.function.{FlatMapDescriptor, MapPartitionsDescript import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator import org.apache.wayang.core.plan.wayangplan.{ElementaryOperator, Operator, OutputSlot, WayangPlan} +import org.apache.wayang.plugin.hackit.core.tagger.HackitTagger +import org.apache.wayang.plugin.hackit.core.tagger.wrapper.FunctionWrapperHackit +import org.apache.wayang.plugin.hackit.core.tags.HackitTag +import org.apache.wayang.plugin.hackit.core.tuple.HackitTuple -import java.lang + + +import java.{lang, util} import java.lang.{Iterable => JavaIterable} import java.util.function.IntUnaryOperator +import scala.language.implicitConversions import scala.reflect.ClassTag /** @@ -42,10 +49,35 @@ import scala.reflect.ClassTag * @param ev$1 the data type of the elements in this instance * @param planBuilder keeps track of the [[WayangPlan]] being build */ -class DataQuantaHackit[Out: ClassTag] +class DataQuantaHackit[Out : ClassTag] (override val operator: ElementaryOperator, outputIndex: Int = 0) (implicit override val planBuilder: PlanBuilder) - extends DataQuanta[Out](operator, outputIndex) { + extends DataQuanta[Out](operator, outputIndex) +{ + + var tagger : HackitTagger = null; + //var sniffer : HackitSniffer = null; + + /** + * add a [[HackitTag]] on the [[HackitTagger]] to enable the process + * + * @param tag [[HackitTag]] to be added + * @return the self instance + */ + //TODO add the version of collection + def addTag(tag: HackitTag): DataQuantaHackit[Out] = { +// tagger.addPreTag(tag) + println("here") + this + } + + def underHackit[Key: ClassTag](): DataQuantaHackit[HackitTuple[Key, Out]] = { +// HackItRDD<K, T> ktHackItRDD = new HackItRDD<K, T>( +// rdd.map((Function<T, HackItTuple<K, T>>) HackItTuple::new) +// ); + +null + } /** * Feed this instance into a [[MapOperator]]. @@ -54,16 +86,21 @@ class DataQuantaHackit[Out: ClassTag] * @param udfLoad optional [[LoadProfileEstimator]] for the `udf` * @return a new instance representing the [[MapOperator]]'s output */ - override def mapJava[NewOut: ClassTag](udf: SerializableFunction[Out, NewOut], + override def mapJava[NewOut: ClassTag](udf: SerializableFunction[Out, NewOut], udfLoad: LoadProfileEstimator = null): DataQuantaHackit[NewOut] = { - val lala = new SerializableFunction[Out, NewOut] { - override def apply(t: Out): NewOut = { + + val wrapper_internal = new FunctionWrapperHackit[Object, Out, NewOut](udf) + this.tagger = wrapper_internal + + val wrapper = new SerializableFunction[HackitTuple[Object, Out], HackitTuple[Object, NewOut]](){ + override def apply(t: HackitTuple[Object, Out]): HackitTuple[Object, NewOut] = { println(t) - udf.apply(t) + wrapper_internal.apply(t) } } + val mapOperator = new MapOperator(new TransformationDescriptor( - lala, basicDataUnitType[Out], basicDataUnitType[NewOut], udfLoad + wrapper, basicDataUnitType[HackitTuple[Object, Out]], basicDataUnitType[HackitTuple[Object, NewOut]], udfLoad )) this.connectTo(mapOperator, 0) DataQuantaHackit.wrap[NewOut](mapOperator) @@ -114,11 +151,12 @@ class DataQuantaHackit[Out: ClassTag] sqlUdf: String = null, selectivity: ProbabilisticDoubleInterval = null, udfLoad: LoadProfileEstimator = null): DataQuantaHackit[Out] = { - val filterOperator = new FilterOperator(new PredicateDescriptor( - udf, this.output.getType.getDataUnitType.toBasicDataUnitType, selectivity, udfLoad - ).withSqlImplementation(sqlUdf)) - this.connectTo(filterOperator, 0) - DataQuantaHackit.wrap[Out](filterOperator) +// val filterOperator = new FilterOperator(new PredicateDescriptor( +// udf, this.output.getType.getDataUnitType.toBasicDataUnitType, selectivity, udfLoad +// ).withSqlImplementation(sqlUdf)) +// this.connectTo(filterOperator, 0) +// DataQuantaHackit.wrap[Out](filterOperator) + null } /** @@ -132,11 +170,12 @@ class DataQuantaHackit[Out: ClassTag] override def flatMapJava[NewOut: ClassTag](udf: SerializableFunction[Out, JavaIterable[NewOut]], selectivity: ProbabilisticDoubleInterval = null, udfLoad: LoadProfileEstimator = null): DataQuantaHackit[NewOut] = { - val flatMapOperator = new FlatMapOperator(new FlatMapDescriptor( - udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad - )) - this.connectTo(flatMapOperator, 0) - DataQuantaHackit.wrap[NewOut](flatMapOperator) +// val flatMapOperator = new FlatMapOperator(new FlatMapDescriptor( +// udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad +// )) +// this.connectTo(flatMapOperator, 0) +// DataQuantaHackit.wrap[NewOut](flatMapOperator) + null } @@ -199,12 +238,13 @@ class DataQuantaHackit[Out: ClassTag] udf: SerializableBinaryOperator[Out], udfLoad: LoadProfileEstimator = null) : DataQuantaHackit[Out] = { - val reduceByOperator = new ReduceByOperator( - new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]), - new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad) - ) - this.connectTo(reduceByOperator, 0) - DataQuantaHackit.wrap[Out](reduceByOperator) +// val reduceByOperator = new ReduceByOperator( +// new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]), +// new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad) +// ) +// this.connectTo(reduceByOperator, 0) +// DataQuantaHackit.wrap[Out](reduceByOperator) + null } /** @@ -216,13 +256,14 @@ class DataQuantaHackit[Out: ClassTag] */ override def groupByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key], keyUdfLoad: LoadProfileEstimator = null): DataQuantaHackit[java.lang.Iterable[Out]] = { - val groupByOperator = new MaterializedGroupByOperator( - new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key], keyUdfLoad), - dataSetType[Out], - groupedDataSetType[Out] - ) - this.connectTo(groupByOperator, 0) - DataQuantaHackit.wrap[java.lang.Iterable[Out]](groupByOperator) +// val groupByOperator = new MaterializedGroupByOperator( +// new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key], keyUdfLoad), +// dataSetType[Out], +// groupedDataSetType[Out] +// ) +// this.connectTo(groupByOperator, 0) +// DataQuantaHackit.wrap[java.lang.Iterable[Out]](groupByOperator) + null } /** @@ -234,11 +275,12 @@ class DataQuantaHackit[Out: ClassTag] */ override def reduceJava(udf: SerializableBinaryOperator[Out], udfLoad: LoadProfileEstimator = null): DataQuantaHackit[Out] = { - val globalReduceOperator = new GlobalReduceOperator( - new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad) - ) - this.connectTo(globalReduceOperator, 0) - DataQuantaHackit.wrap[Out](globalReduceOperator) +// val globalReduceOperator = new GlobalReduceOperator( +// new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad) +// ) +// this.connectTo(globalReduceOperator, 0) +// DataQuantaHackit.wrap[Out](globalReduceOperator) + null } /** @@ -250,14 +292,15 @@ class DataQuantaHackit[Out: ClassTag] * @return a new instance representing the [[JoinOperator]]'s output */ override def joinJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuantaHackit[WayangTuple2[Out, ThatOut]] = { - require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") - val joinOperator = new JoinOperator( - new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]), - new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key]) - ) - this.connectTo(joinOperator, 0) - that.connectTo(joinOperator, 1) - DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](joinOperator) +// require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") +// val joinOperator = new JoinOperator( +// new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]), +// new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key]) +// ) +// this.connectTo(joinOperator, 0) +// that.connectTo(joinOperator, 1) +// DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](joinOperator) + null } /** @@ -269,14 +312,15 @@ class DataQuantaHackit[Out: ClassTag] * @return a new instance representing the [[CoGroupOperator]]'s output */ override def coGroupJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuantaHackit[WayangTuple2[JavaIterable[Out], JavaIterable[ThatOut]]] = { - require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") - val coGroupOperator = new CoGroupOperator( - new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]), - new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key]) - ) - this.connectTo(coGroupOperator, 0) - that.connectTo(coGroupOperator, 1) - DataQuantaHackit.wrap[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]](coGroupOperator) +// require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") +// val coGroupOperator = new CoGroupOperator( +// new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]), +// new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key]) +// ) +// this.connectTo(coGroupOperator, 0) +// that.connectTo(coGroupOperator, 1) +// DataQuantaHackit.wrap[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]](coGroupOperator) + null } @@ -286,13 +330,12 @@ class DataQuantaHackit[Out: ClassTag] * @param keyUdf UDF to extract key from data quanta in this instance * @return a new instance representing the [[SortOperator]]'s output */ - override def sortJava[Key: ClassTag] - (keyUdf: SerializableFunction[Out, Key]) - : DataQuantaHackit[Out] = { - val sortOperator = new SortOperator(new TransformationDescriptor( - keyUdf, basicDataUnitType[Out], basicDataUnitType[Key])) - this.connectTo(sortOperator, 0) - DataQuantaHackit.wrap[Out](sortOperator) + override def sortJava[Key: ClassTag] (keyUdf: SerializableFunction[Out, Key]) : DataQuantaHackit[Out] = { +// val sortOperator = new SortOperator(new TransformationDescriptor( +// keyUdf, basicDataUnitType[Out], basicDataUnitType[Key])) +// this.connectTo(sortOperator, 0) +// DataQuantaHackit.wrap[Out](sortOperator) + null } @@ -302,9 +345,10 @@ class DataQuantaHackit[Out: ClassTag] * @return a new instance representing the [[GlobalMaterializedGroupOperator]]'s output */ override def group(): DataQuantaHackit[JavaIterable[Out]] = { - val groupOperator = new GlobalMaterializedGroupOperator(dataSetType[Out], groupedDataSetType[Out]) - this.connectTo(groupOperator, 0) - DataQuantaHackit.wrap[JavaIterable[Out]](groupOperator) +// val groupOperator = new GlobalMaterializedGroupOperator(dataSetType[Out], groupedDataSetType[Out]) +// this.connectTo(groupOperator, 0) +// DataQuantaHackit.wrap[JavaIterable[Out]](groupOperator) + null } /** @@ -314,11 +358,12 @@ class DataQuantaHackit[Out: ClassTag] * @return a new instance representing the [[UnionAllOperator]]'s output */ override def union(that: DataQuanta[Out]): DataQuantaHackit[Out] = { - require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") - val unionAllOperator = new UnionAllOperator(dataSetType[Out]) - this.connectTo(unionAllOperator, 0) - that.connectTo(unionAllOperator, 1) - DataQuantaHackit.wrap[Out](unionAllOperator) +// require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") +// val unionAllOperator = new UnionAllOperator(dataSetType[Out]) +// this.connectTo(unionAllOperator, 0) +// that.connectTo(unionAllOperator, 1) +// DataQuantaHackit.wrap[Out](unionAllOperator) + null } /** @@ -328,11 +373,12 @@ class DataQuantaHackit[Out: ClassTag] * @return a new instance representing the [[IntersectOperator]]'s output */ override def intersect(that: DataQuanta[Out]): DataQuantaHackit[Out] = { - require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") - val intersectOperator = new IntersectOperator(dataSetType[Out]) - this.connectTo(intersectOperator, 0) - that.connectTo(intersectOperator, 1) - DataQuantaHackit.wrap[Out](intersectOperator) +// require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") +// val intersectOperator = new IntersectOperator(dataSetType[Out]) +// this.connectTo(intersectOperator, 0) +// that.connectTo(intersectOperator, 1) +// DataQuantaHackit.wrap[Out](intersectOperator) + null } /** @@ -342,11 +388,12 @@ class DataQuantaHackit[Out: ClassTag] * @return a new instance representing the [[CartesianOperator]]'s output */ override def cartesian[ThatOut: ClassTag](that: DataQuanta[ThatOut]): DataQuantaHackit[WayangTuple2[Out, ThatOut]] = { - require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") - val cartesianOperator = new CartesianOperator(dataSetType[Out], dataSetType[ThatOut]) - this.connectTo(cartesianOperator, 0) - that.connectTo(cartesianOperator, 1) - DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](cartesianOperator) +// require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.") +// val cartesianOperator = new CartesianOperator(dataSetType[Out], dataSetType[ThatOut]) +// this.connectTo(cartesianOperator, 0) +// that.connectTo(cartesianOperator, 1) +// DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](cartesianOperator) + null } /** @@ -355,9 +402,10 @@ class DataQuantaHackit[Out: ClassTag] * @return a new instance representing the [[ZipWithIdOperator]]'s output */ override def zipWithId: DataQuantaHackit[WayangTuple2[lang.Long, Out]] = { - val zipWithIdOperator = new ZipWithIdOperator(dataSetType[Out]) - this.connectTo(zipWithIdOperator, 0) - DataQuantaHackit.wrap[WayangTuple2[lang.Long, Out]](zipWithIdOperator) +// val zipWithIdOperator = new ZipWithIdOperator(dataSetType[Out]) +// this.connectTo(zipWithIdOperator, 0) +// DataQuantaHackit.wrap[WayangTuple2[lang.Long, Out]](zipWithIdOperator) + null } /** @@ -366,9 +414,10 @@ class DataQuantaHackit[Out: ClassTag] * @return a new instance representing the [[DistinctOperator]]'s output */ override def distinct: DataQuantaHackit[Out] = { - val distinctOperator = new DistinctOperator(dataSetType[Out]) - this.connectTo(distinctOperator, 0) - DataQuantaHackit.wrap[Out](distinctOperator) +// val distinctOperator = new DistinctOperator(dataSetType[Out]) +// this.connectTo(distinctOperator, 0) +// DataQuantaHackit.wrap[Out](distinctOperator) + null } /** @@ -377,9 +426,10 @@ class DataQuantaHackit[Out: ClassTag] * @return a new instance representing the [[CountOperator]]'s output */ override def count: DataQuantaHackit[lang.Long] = { - val countOperator = new CountOperator(dataSetType[Out]) - this.connectTo(countOperator, 0) - DataQuantaHackit.wrap[lang.Long](countOperator) +// val countOperator = new CountOperator(dataSetType[Out]) +// this.connectTo(countOperator, 0) +// DataQuantaHackit.wrap[lang.Long](countOperator) + null } } @@ -392,4 +442,8 @@ object DataQuantaHackit extends DataQuantaCreator{ def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): DataQuantaHackit[_] = new DataQuantaHackit(output.getOwner.asInstanceOf[ElementaryOperator], output.getIndex)(ClassTag(output.getType.getDataUnitType.getTypeClass), planBuilder) + implicit def wrapDataQuanta[T:ClassTag](dataQuanta: DataQuanta[T]): DataQuantaHackit[T] = { + new DataQuantaHackit[T](dataQuanta.operator, dataQuanta.outputIndex)(ClassTag(dataQuanta.output.getType.getDataUnitType.getTypeClass), dataQuanta.planBuilder); + } + } diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/Hackit.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/Hackit.scala new file mode 100644 index 0000000..b9499fb --- /dev/null +++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/Hackit.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.plugin.hackit.api + +import org.apache.wayang.api.PlanBuilder +import org.apache.wayang.api.dataquanta.DataQuanta +import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction +import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator +import org.apache.wayang.plugin.hackit.core.tagger.HackitTagger +import org.apache.wayang.plugin.hackit.core.tagger.wrapper.FunctionWrapperHackit +import org.apache.wayang.plugin.hackit.core.tags.HackitTag +import org.apache.wayang.plugin.hackit.core.tuple.HackitTuple +import org.apache.wayang.api.toSerializableFunction + +import scala.language.implicitConversions +import scala.reflect.{ClassTag, classTag} + +class Hackit[Key: ClassTag, Type:ClassTag](implicit var planBuilder: PlanBuilder) { + + var dataQuanta: DataQuanta[HackitTuple[Key, Type]] = null + + var tagger : HackitTagger = null; + + def addTag(hackitTag: HackitTag) = { + println("you are adding a tag") + this + } + + def underHackit(dataQuanta: DataQuanta[Type]): Hackit[Key, Type] = { + val hackit = new Hackit[Key, Type]() + hackit.dataQuanta = dataQuanta.map[HackitTuple[Key, Type]]( + element => { + new HackitTuple[Key, Type](element) + } + ) + hackit + } + + def toDataQuanta(): DataQuanta[Type] = { + this.dataQuanta.map(tuple => tuple.getValue) + } + + def map[TypeOut: ClassTag](udf: Type => TypeOut, udfLoad: LoadProfileEstimator = null): Hackit[Key, TypeOut] = { + val hackit = new Hackit[Key, TypeOut]() + val wrapper = new FunctionWrapperHackit[Key, Type, TypeOut](toSerializableFunction(udf)) + hackit.dataQuanta = dataQuanta.map[HackitTuple[Key, TypeOut]](element => wrapper.apply(element), udfLoad) + hackit + } + +} + + +object Hackit { + + //TODO: replace the object with a parameter + implicit def underHackit[T: ClassTag](dataQuanta: DataQuanta[T]): Hackit[java.lang.Object, T] = { + return new Hackit[java.lang.Object, T]()(classTag[java.lang.Object], ClassTag(dataQuanta.output.getType.getDataUnitType.getTypeClass), dataQuanta.planBuilder) + .underHackit(dataQuanta) + } + +} diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala index 59a0fd9..42867f3 100644 --- a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala +++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala @@ -19,12 +19,17 @@ package org.apache.wayang.plugin.hackit.api -import org.apache.wayang.api.ApiTest import org.apache.wayang.api.dataquanta.DataQuantaFactory -import org.junit.{BeforeClass, Test} +import org.apache.wayang.api.createPlanBuilder +import org.apache.wayang.plugin.hackit.api.Hackit.underHackit +import org.apache.wayang.core.api.WayangContext +import org.apache.wayang.java.Java +import org.apache.wayang.spark.Spark +import org.junit.{Assert, BeforeClass, Test} import org.junit.jupiter.api.{BeforeAll, BeforeEach} -class ApiExtensionTest extends ApiTest { + +class ApiExtensionTest { @BeforeEach def setUp() ={ @@ -32,10 +37,35 @@ class ApiExtensionTest extends ApiTest { } @Test - override def testReadMapCollect(): Unit = { - DataQuantaFactory.setTemplate(DataQuantaHackit); + def testReadMapCollectHackit(): Unit = { + + // Set up WayangContext. + val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin) + + // Generate some test data. + val inputValues = (for (i <- 1 to 10) yield i).toArray + + // Build and execute a Wayang plan. + var outputValues = wayang + .loadCollection(inputValues).withName("Load input values") + .addTag(null) + .map(a => a + 2)//.withName("Add 2") + .dataQuanta + .collect() + + print(outputValues) + + var lolo = wayang + .loadCollection(inputValues).withName("Load input values") + .addTag(null) + .map(a => a + 2)//.withName("Add 2") + .toDataQuanta() + .collect() - super.testReadMapCollect() + print(lolo) +// // Check the outcome. +// val expectedOutputValues = inputValues.map(_ + 2) +// Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray) } }
