This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch WAYANG-28 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 99a666566d7a2882e0ce1333d7c86e5ac9ccabe7 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Fri May 14 08:54:56 2021 -0400 [WAYANG-31] small correction to be extensible --- wayang-api/wayang-api-scala-java/pom.xml | 25 ++++++++++++++-- .../apache/wayang/api/dataquanta/DataQuanta.scala | 2 +- .../wayang/api/dataquanta/DataQuantaCreator.scala | 33 ++++++++++++++++++++++ .../wayang/api/dataquanta/DataQuantaDefault.scala | 6 ++-- .../wayang/api/dataquanta/DataQuantaFactory.scala | 21 +++++++++++++- 5 files changed, 80 insertions(+), 7 deletions(-) diff --git a/wayang-api/wayang-api-scala-java/pom.xml b/wayang-api/wayang-api-scala-java/pom.xml index 1470c30..d385dae 100644 --- a/wayang-api/wayang-api-scala-java/pom.xml +++ b/wayang-api/wayang-api-scala-java/pom.xml @@ -91,7 +91,12 @@ <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> - <scope>provided</scope> + <version>${scala.version}</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> @@ -103,6 +108,11 @@ <artifactId>hadoop-hdfs</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.reflections</groupId> + <artifactId>reflections</artifactId> + <version>0.9.12</version> + </dependency> </dependencies> <build> @@ -114,8 +124,19 @@ <useSystemClassLoader>true</useSystemClassLoader> </configuration> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.2</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> - </project> diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala index 6e464e0..664079b 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala @@ -122,7 +122,7 @@ abstract class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outpu * @param operator the [[Operator]] to connect to * @param inputIndex the input index of the [[Operator]]s [[InputSlot]] */ - private[api] def connectTo(operator: Operator, inputIndex: Int): Unit = + def connectTo(operator: Operator, inputIndex: Int): Unit = this.operator.connectTo(outputIndex, operator, inputIndex) diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaCreator.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaCreator.scala new file mode 100644 index 0000000..6388cc9 --- /dev/null +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaCreator.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.wayang.api.dataquanta + +import org.apache.wayang.api.PlanBuilder +import org.apache.wayang.core.plan.wayangplan.{ElementaryOperator, OutputSlot} + +import scala.reflect.ClassTag + +abstract class DataQuantaCreator { + + def wrap[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 0)(implicit planBuilder: PlanBuilder): DataQuanta[T] + + def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): DataQuanta[_] + +} diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala index 1f79ae7..aa9c7d0 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaDefault.scala @@ -19,7 +19,7 @@ package org.apache.wayang.api.dataquanta -import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, groupedDataSetType, groupedDataUnitType, toConsumer, toSerializableBinaryOperator, toSerializableFlatteningFunction, toSerializableFunction, toSerializablePartitionFunction, toSerializablePredicate} +import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, groupedDataSetType, groupedDataUnitType} import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2} import org.apache.wayang.basic.function.ProjectionDescriptor import org.apache.wayang.basic.operators._ @@ -31,7 +31,7 @@ import org.apache.wayang.core.plan.wayangplan._ import java.lang import java.lang.{Iterable => JavaIterable} -import java.util.function.{Consumer, IntUnaryOperator, Function => JavaFunction} +import java.util.function.{IntUnaryOperator} import scala.reflect._ /** @@ -396,7 +396,7 @@ class DataQuantaDefault[Out: ClassTag] } } -object DataQuantaDefault { +object DataQuantaDefault extends DataQuantaCreator { def wrap[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 0)(implicit planBuilder: PlanBuilder): DataQuantaDefault[T] = { new DataQuantaDefault[T](operator, outputIndex) diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala index aad0e42..ef8fe0d 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaFactory.scala @@ -21,7 +21,11 @@ package org.apache.wayang.api.dataquanta import org.apache.wayang.api.PlanBuilder import org.apache.wayang.core.plan.wayangplan.ElementaryOperator +import org.reflections.Reflections +import org.reflections.scanners.{FieldAnnotationsScanner, MethodParameterScanner, ResourcesScanner, SubTypesScanner} +import org.reflections.util.{ClasspathHelper, ConfigurationBuilder} +import scala.collection.immutable.List import scala.reflect.ClassTag /** @@ -32,6 +36,21 @@ import scala.reflect.ClassTag object DataQuantaFactory { /** + * template is the instance of [[DataQuantaCreator]] that will be use in the creation of [[DataQuanta]] instance + */ + var template: DataQuantaCreator = DataQuantaDefault + + /** + * set the [[DataQuantaCreator]] + * + * @param dataQuantaCreator it will be use as creator when the [[DataQuantaFactory.build()]] is called + */ + def setTemplate(dataQuantaCreator: DataQuantaCreator) = { + this.template = dataQuantaCreator + + } + + /** * Given the configuration loaded the [[DataQuantaFactory.build()]] the right extender, if not configuration is * provided the [[DataQuantaFactory]] will create a [[DataQuantaDefault]] instance * @@ -43,7 +62,7 @@ object DataQuantaFactory { */ def build[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 0)(implicit planBuilder: PlanBuilder): DataQuanta[T] = { //TODO validate if the correct way - DataQuantaDefault.wrap[T](operator, outputIndex) + this.template.wrap[T](operator, outputIndex) } }
