This is an automated email from the ASF dual-hosted git repository. glauesppen pushed a commit to branch feature/update-docs in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit c85dad20950f9587ecbf4ea7bd8559be77e2e73d Author: Glaucia Esppenchutz <[email protected]> AuthorDate: Mon Jun 19 13:48:18 2023 +0100 Organizing README.md and create a doc for contribution --- CONTRIBUTING.md | 25 +++++ README.md | 236 ++++------------------------------------------ guides/wayang-examples.md | 232 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 275 insertions(+), 218 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 00000000..df8b204a --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,25 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> + +## Contributing +As a contributor, you can help shape the future of the project by providing feedback, joining our mailing lists, reporting bugs, requesting features, and participating in discussions. As you become more involved, you can also help with development by providing patches for bug fixes or features and helping to improve our documentation. + +If you show sustained commitment to the project, you may be invited to become a committer. This brings with it the privilege of write access to the project repository and resources. + +To learn more about how to get involved with the Apache Wayang project, please visit our “Get Involved” [page](https://wayang.apache.org/community/) and read the [Apache code of conduct](https://www.apache.org/foundation/policies/conduct.html). We look forward to your contributions! diff --git a/README.md b/README.md index afb2c1a4..23f34dea 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,22 @@  [](https://twitter.com/intent/tweet?text=Apache%20Wayang%20enables%20cross%20platform%20data%20processing,%20star%20it%20via:%20&url=https://github.com/apache/incubator-wayang&via=apachewayang&hashtags=dataprocessing,bigdata,analytics,hybridcloud,developers) [](https://www.reddit.com/r/ApacheWayang/) - +## Table of contents + * [Description](#description) + * [Quick Guide for Running Wayang](#quick-guide-for-running-wayang) + * [Quick Guide for Developing with Wayang](#quick-guide-for-developing-with-wayang) + * [Installing Wayang](#installing-wayang) + + [Requirements at Runtime](#requirements-at-runtime) + + [Validating the installation](#validating-the-installation) + * [Getting Started](#getting-started) + + [Prerequisites](#prerequisites) + + [Building](#building) + * [Running the tests](#running-the-tests) + * [Example Applications](#example-applications) + * [Built With](#built-with) + * [Contributing](#contributing) + * [Authors](#authors) + * [License](#license) ## Description @@ -158,218 +173,7 @@ In the incubator-wayang root folder run: ``` ## Example Applications -### WordCount - -The "Hello World!" of data processing systems is the wordcount. - -#### Java scala-like API -```java -import org.apache.wayang.api.JavaPlanBuilder; -import org.apache.wayang.basic.data.Tuple2; -import org.apache.wayang.core.api.Configuration; -import org.apache.wayang.core.api.WayangContext; -import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator; -import org.apache.wayang.java.Java; -import org.apache.wayang.spark.Spark; -import java.util.Collection; -import java.util.Arrays; - -public class WordcountJava { - - public static void main(String[] args){ - - // Settings - String inputUrl = "file:/tmp.txt"; - - // Get a plan builder. - WayangContext wayangContext = new WayangContext(new Configuration()) - .withPlugin(Java.basicPlugin()) - .withPlugin(Spark.basicPlugin()); - JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext) - .withJobName(String.format("WordCount (%s)", inputUrl)) - .withUdfJarOf(WordcountJava.class); - - // Start building the WayangPlan. - Collection<Tuple2<String, Integer>> wordcounts = planBuilder - // Read the text file. - .readTextFile(inputUrl).withName("Load file") - - // Split each line by non-word characters. - .flatMap(line -> Arrays.asList(line.split("\\W+"))) - .withSelectivity(10, 100, 0.9) - .withName("Split words") - - // Filter empty tokens. - .filter(token -> !token.isEmpty()) - .withSelectivity(0.99, 0.99, 0.99) - .withName("Filter empty words") - - // Attach counter to each word. - .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter") - - // Sum up counters for every word. - .reduceByKey( - Tuple2::getField0, - (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()) - ) - .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0]))) - .withName("Add counters") - - // Execute the plan and collect the results. - .collect(); - - System.out.println(wordcounts); - } -} -``` - -#### Scala API - -```scala -import org.apache.wayang.api._ -import org.apache.wayang.core.api.{Configuration, WayangContext} -import org.apache.wayang.java.Java -import org.apache.wayang.spark.Spark - -object WordcountScala { - def main(args: Array[String]) { - - // Settings - val inputUrl = "file:/tmp.txt" - - // Get a plan builder. - val wayangContext = new WayangContext(new Configuration) - .withPlugin(Java.basicPlugin) - .withPlugin(Spark.basicPlugin) - val planBuilder = new PlanBuilder(wayangContext) - .withJobName(s"WordCount ($inputUrl)") - .withUdfJarsOf(this.getClass) - - val wordcounts = planBuilder - // Read the text file. - .readTextFile(inputUrl).withName("Load file") - - // Split each line by non-word characters. - .flatMap(_.split("\\W+"), selectivity = 10).withName("Split words") - - // Filter empty tokens. - .filter(_.nonEmpty, selectivity = 0.99).withName("Filter empty words") - - // Attach counter to each word. - .map(word => (word.toLowerCase, 1)).withName("To lower case, add counter") - - // Sum up counters for every word. - .reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2)).withName("Add counters") - .withCardinalityEstimator((in: Long) => math.round(in * 0.01)) - - // Execute the plan and collect the results. - .collect() - - println(wordcounts) - } -} -``` - -### k-means - -Wayang is also capable of iterative processing, which is, e.g., very important for machine learning algorithms, such as k-means. - -#### Scala API - -```scala -import org.apache.wayang.api._ -import org.apache.wayang.core.api.{Configuration, WayangContext} -import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializableFunction -import org.apache.wayang.core.function.ExecutionContext -import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators -import org.apache.wayang.java.Java -import org.apache.wayang.spark.Spark - -import scala.util.Random -import scala.collection.JavaConversions._ - -object kmeans { - def main(args: Array[String]) { - - // Settings - val inputUrl = "file:/kmeans.txt" - val k = 5 - val iterations = 100 - val configuration = new Configuration - - // Get a plan builder. - val wayangContext = new WayangContext(new Configuration) - .withPlugin(Java.basicPlugin) - .withPlugin(Spark.basicPlugin) - val planBuilder = new PlanBuilder(wayangContext) - .withJobName(s"k-means ($inputUrl, k=$k, $iterations iterations)") - .withUdfJarsOf(this.getClass) - - case class Point(x: Double, y: Double) - case class TaggedPoint(x: Double, y: Double, cluster: Int) - case class TaggedPointCounter(x: Double, y: Double, cluster: Int, count: Long) { - def add_points(that: TaggedPointCounter) = TaggedPointCounter(this.x + that.x, this.y + that.y, this.cluster, this.count + that.count) - def average = TaggedPointCounter(x / count, y / count, cluster, 0) - } - - // Read and parse the input file(s). - val points = planBuilder - .readTextFile(inputUrl).withName("Read file") - .map { line => - val fields = line.split(",") - Point(fields(0).toDouble, fields(1).toDouble) - }.withName("Create points") - - - // Create initial centroids. - val random = new Random - val initialCentroids = planBuilder - .loadCollection(for (i <- 1 to k) yield TaggedPointCounter(random.nextGaussian(), random.nextGaussian(), i, 0)).withName("Load random centroids") - - // Declare UDF to select centroid for each data point. - class SelectNearestCentroid extends ExtendedSerializableFunction[Point, TaggedPointCounter] { - - /** Keeps the broadcasted centroids. */ - var centroids: Iterable[TaggedPointCounter] = _ - - override def open(executionCtx: ExecutionContext) = { - centroids = executionCtx.getBroadcast[TaggedPointCounter]("centroids") - } - - override def apply(point: Point): TaggedPointCounter = { - var minDistance = Double.PositiveInfinity - var nearestCentroidId = -1 - for (centroid <- centroids) { - val distance = Math.pow(Math.pow(point.x - centroid.x, 2) + Math.pow(point.y - centroid.y, 2), 0.5) - if (distance < minDistance) { - minDistance = distance - nearestCentroidId = centroid.cluster - } - } - new TaggedPointCounter(point.x, point.y, nearestCentroidId, 1) - } - } - - // Do the k-means loop. - val finalCentroids = initialCentroids.repeat(iterations, { currentCentroids => - points - .mapJava(new SelectNearestCentroid, - udfLoad = LoadProfileEstimators.createFromSpecification( - "my.udf.costfunction.key", configuration - )) - .withBroadcast(currentCentroids, "centroids").withName("Find nearest centroid") - .reduceByKey(_.cluster, _.add_points(_)).withName("Add up points") - .withCardinalityEstimator(k) - .map(_.average).withName("Average points") - }).withName("Loop") - - // Collect the results. - .collect() - - println(finalCentroids) - } -} -``` +You can see examples on how to start using Wayang [here](guides/wayang-examples.md) ## Built With @@ -378,11 +182,7 @@ object kmeans { * [Maven](https://maven.apache.org/) ## Contributing -As a contributor, you can help shape the future of the project by providing feedback, joining our mailing lists, reporting bugs, requesting features, and participating in discussions. As you become more involved, you can also help with development by providing patches for bug fixes or features and helping to improve our documentation. - -If you show sustained commitment to the project, you may be invited to become a committer. This brings with it the privilege of write access to the project repository and resources. - -To learn more about how to get involved with the Apache Wayang project, please visit our “Get Involved” [page](https://wayang.apache.org/community/) and read the [Apache code of conduct](https://www.apache.org/foundation/policies/conduct.html). We look forward to your contributions! +For more information about how to contribute with Apache Wayang, see the [CONTRIBUTING.md](CONTRIBUTING.md) guide. ## Authors The list of [contributors](https://github.com/apache/incubator-wayang/graphs/contributors). diff --git a/guides/wayang-examples.md b/guides/wayang-examples.md new file mode 100644 index 00000000..e67602f6 --- /dev/null +++ b/guides/wayang-examples.md @@ -0,0 +1,232 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +This page contains examples to be executed using Wayang. + +## WordCount + +The "Hello World!" of data processing systems is the wordcount. + +### Java scala-like API +```java +import org.apache.wayang.api.JavaPlanBuilder; +import org.apache.wayang.basic.data.Tuple2; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.api.WayangContext; +import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator; +import org.apache.wayang.java.Java; +import org.apache.wayang.spark.Spark; +import java.util.Collection; +import java.util.Arrays; + +public class WordcountJava { + + public static void main(String[] args){ + + // Settings + String inputUrl = "file:/tmp.txt"; + + // Get a plan builder. + WayangContext wayangContext = new WayangContext(new Configuration()) + .withPlugin(Java.basicPlugin()) + .withPlugin(Spark.basicPlugin()); + JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext) + .withJobName(String.format("WordCount (%s)", inputUrl)) + .withUdfJarOf(WordcountJava.class); + + // Start building the WayangPlan. + Collection<Tuple2<String, Integer>> wordcounts = planBuilder + // Read the text file. + .readTextFile(inputUrl).withName("Load file") + + // Split each line by non-word characters. + .flatMap(line -> Arrays.asList(line.split("\\W+"))) + .withSelectivity(10, 100, 0.9) + .withName("Split words") + + // Filter empty tokens. + .filter(token -> !token.isEmpty()) + .withSelectivity(0.99, 0.99, 0.99) + .withName("Filter empty words") + + // Attach counter to each word. + .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter") + + // Sum up counters for every word. + .reduceByKey( + Tuple2::getField0, + (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()) + ) + .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0]))) + .withName("Add counters") + + // Execute the plan and collect the results. + .collect(); + + System.out.println(wordcounts); + } +} +``` + +### Scala API + +```scala +import org.apache.wayang.api._ +import org.apache.wayang.core.api.{Configuration, WayangContext} +import org.apache.wayang.java.Java +import org.apache.wayang.spark.Spark + +object WordcountScala { + def main(args: Array[String]) { + + // Settings + val inputUrl = "file:/tmp.txt" + + // Get a plan builder. + val wayangContext = new WayangContext(new Configuration) + .withPlugin(Java.basicPlugin) + .withPlugin(Spark.basicPlugin) + val planBuilder = new PlanBuilder(wayangContext) + .withJobName(s"WordCount ($inputUrl)") + .withUdfJarsOf(this.getClass) + + val wordcounts = planBuilder + // Read the text file. + .readTextFile(inputUrl).withName("Load file") + + // Split each line by non-word characters. + .flatMap(_.split("\\W+"), selectivity = 10).withName("Split words") + + // Filter empty tokens. + .filter(_.nonEmpty, selectivity = 0.99).withName("Filter empty words") + + // Attach counter to each word. + .map(word => (word.toLowerCase, 1)).withName("To lower case, add counter") + + // Sum up counters for every word. + .reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2)).withName("Add counters") + .withCardinalityEstimator((in: Long) => math.round(in * 0.01)) + + // Execute the plan and collect the results. + .collect() + + println(wordcounts) + } +} +``` + +## k-means + +Wayang is also capable of iterative processing, which is, e.g., very important for machine learning algorithms, such as k-means. + +### Scala API + +```scala +import org.apache.wayang.api._ +import org.apache.wayang.core.api.{Configuration, WayangContext} +import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializableFunction +import org.apache.wayang.core.function.ExecutionContext +import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators +import org.apache.wayang.java.Java +import org.apache.wayang.spark.Spark + +import scala.util.Random +import scala.collection.JavaConversions._ + +object kmeans { + def main(args: Array[String]) { + + // Settings + val inputUrl = "file:/kmeans.txt" + val k = 5 + val iterations = 100 + val configuration = new Configuration + + // Get a plan builder. + val wayangContext = new WayangContext(new Configuration) + .withPlugin(Java.basicPlugin) + .withPlugin(Spark.basicPlugin) + val planBuilder = new PlanBuilder(wayangContext) + .withJobName(s"k-means ($inputUrl, k=$k, $iterations iterations)") + .withUdfJarsOf(this.getClass) + + case class Point(x: Double, y: Double) + case class TaggedPoint(x: Double, y: Double, cluster: Int) + case class TaggedPointCounter(x: Double, y: Double, cluster: Int, count: Long) { + def add_points(that: TaggedPointCounter) = TaggedPointCounter(this.x + that.x, this.y + that.y, this.cluster, this.count + that.count) + def average = TaggedPointCounter(x / count, y / count, cluster, 0) + } + + // Read and parse the input file(s). + val points = planBuilder + .readTextFile(inputUrl).withName("Read file") + .map { line => + val fields = line.split(",") + Point(fields(0).toDouble, fields(1).toDouble) + }.withName("Create points") + + + // Create initial centroids. + val random = new Random + val initialCentroids = planBuilder + .loadCollection(for (i <- 1 to k) yield TaggedPointCounter(random.nextGaussian(), random.nextGaussian(), i, 0)).withName("Load random centroids") + + // Declare UDF to select centroid for each data point. + class SelectNearestCentroid extends ExtendedSerializableFunction[Point, TaggedPointCounter] { + + /** Keeps the broadcasted centroids. */ + var centroids: Iterable[TaggedPointCounter] = _ + + override def open(executionCtx: ExecutionContext) = { + centroids = executionCtx.getBroadcast[TaggedPointCounter]("centroids") + } + + override def apply(point: Point): TaggedPointCounter = { + var minDistance = Double.PositiveInfinity + var nearestCentroidId = -1 + for (centroid <- centroids) { + val distance = Math.pow(Math.pow(point.x - centroid.x, 2) + Math.pow(point.y - centroid.y, 2), 0.5) + if (distance < minDistance) { + minDistance = distance + nearestCentroidId = centroid.cluster + } + } + new TaggedPointCounter(point.x, point.y, nearestCentroidId, 1) + } + } + + // Do the k-means loop. + val finalCentroids = initialCentroids.repeat(iterations, { currentCentroids => + points + .mapJava(new SelectNearestCentroid, + udfLoad = LoadProfileEstimators.createFromSpecification( + "my.udf.costfunction.key", configuration + )) + .withBroadcast(currentCentroids, "centroids").withName("Find nearest centroid") + .reduceByKey(_.cluster, _.add_points(_)).withName("Add up points") + .withCardinalityEstimator(k) + .map(_.average).withName("Average points") + }).withName("Loop") + + // Collect the results. + .collect() + + println(finalCentroids) + } +} +``` \ No newline at end of file
