This is an automated email from the ASF dual-hosted git repository.

paulk pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/groovy-website.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 81ba492  update in preparation for Wayang 1.0.0
81ba492 is described below

commit 81ba492a08ddea2333007179e46e353c9bf9a5d6
Author: Paul King <[email protected]>
AuthorDate: Mon Feb 17 14:36:03 2025 +1000

    update in preparation for Wayang 1.0.0
---
 .../site/blog/using-groovy-with-apache-wayang.adoc | 241 +++++++++++++--------
 1 file changed, 153 insertions(+), 88 deletions(-)

diff --git a/site/src/site/blog/using-groovy-with-apache-wayang.adoc 
b/site/src/site/blog/using-groovy-with-apache-wayang.adoc
index 6b6884f..98e1f2d 100644
--- a/site/src/site/blog/using-groovy-with-apache-wayang.adoc
+++ b/site/src/site/blog/using-groovy-with-apache-wayang.adoc
@@ -1,9 +1,10 @@
 = Using Groovy with Apache Wayang and Apache Spark
-Paul King
+Paul King <paulk-asert|PMC_Member>
 :source-highlighter: pygments
 :pygments-style: emacs
 :icons: font
 :revdate: 2022-06-19T13:01:07+00:00
+:updated: 2025-02-15T14:30:00+00:00
 :keywords: centroids, data science, groovy, kmeans, records, apache spark, 
apache wayang
 :description: This post looks at using Apache Wayang and Apache Spark with 
Apache Groovy to cluster various Whiskies.
 
@@ -56,32 +57,33 @@ We'll start with defining a Point record:
 
 [source,groovy]
 ----
-record Point(double[] pts) implements Serializable {
-    static Point fromLine(String line) { new 
Point(line.split(',')[2..-1]*.toDouble() as double[]) }
-}
+record Point(double[] pts) implements Serializable { }
 ----
 
-We've made it `Serializable` (more on that later) and included
-a `fromLine` factory method to help us make points from a CSV
-file. We'll do that ourselves rather than rely on other libraries
-which could assist. It's not a 2D or 3D point for us but 12D
+We've made it `Serializable` (more on that later).
+It's not a 2D or 3D point for us but 12D
 corresponding to the 12 criteria. We just use a `double` array,
 so any dimension would be supported but the 12 comes from the
 number of columns in our data file.
 
-We'll define a related `TaggedPointCounter` record. It's like
+We'll define a related `PointGrouping` record. It's like
 `Point` but tracks an `int` cluster id and `long` count used
 when clustering the points:
 
 [source,groovy]
 ----
-record TaggedPointCounter(double[] pts, int cluster, long count) implements 
Serializable {
-    TaggedPointCounter plus(TaggedPointCounter that) {
-        new TaggedPointCounter((0..<pts.size()).collect{ pts[it] + 
that.pts[it] } as double[], cluster, count + that.count)
+record PointGrouping(double[] pts, int cluster, long count) implements 
Serializable {
+    PointGrouping(List<Double> pts, int cluster, long count) {
+        this(pts as double[], cluster, count)
+    }
+
+    PointGrouping plus(PointGrouping that) {
+        var newPts = pts.indices.collect{ pts[it] + that.pts[it] }
+        new PointGrouping(newPts, cluster, count + that.count)
     }
 
-    TaggedPointCounter average() {
-        new TaggedPointCounter(pts.collect{ double d -> d/count } as double[], 
cluster, 0)
+    PointGrouping average() {
+        new PointGrouping(pts.collect{ double d -> d/count }, cluster, 1)
     }
 }
 ----
@@ -99,24 +101,26 @@ class to capture this part of the algorithm:
 
 [source,groovy]
 ----
-class SelectNearestCentroid implements ExtendedSerializableFunction<Point, 
TaggedPointCounter> {
-    Iterable<TaggedPointCounter> centroids
+class SelectNearestCentroid implements ExtendedSerializableFunction<Point, 
PointGrouping> {
+    Iterable<PointGrouping> centroids
 
     void open(ExecutionContext context) {
-        centroids = context.getBroadcast("centroids")
+        centroids = context.getBroadcast('centroids')
     }
 
-    TaggedPointCounter apply(Point p) {
-        def minDistance = Double.POSITIVE_INFINITY
-        def nearestCentroidId = -1
+    PointGrouping apply(Point p) {
+        var minDistance = Double.POSITIVE_INFINITY
+        var nearestCentroidId = -1
         for (c in centroids) {
-            def distance = sqrt((0..<p.pts.size()).collect{ p.pts[it] - 
c.pts[it] }.sum{ it ** 2 } as double)
+            var distance = sqrt(p.pts.indices
+                .collect{ p.pts[it] - c.pts[it] }
+                .sum{ it ** 2 } as double)
             if (distance < minDistance) {
                 minDistance = distance
                 nearestCentroidId = c.cluster
             }
         }
-        new TaggedPointCounter(p.pts, nearestCentroidId, 1)
+        new PointGrouping(p.pts, nearestCentroidId, 1)
     }
 }
 ----
@@ -126,25 +130,15 @@ _UDF_, a User-Defined Function. It represents some chunk 
of
 functionality where an optimization decision can be made about
 where to run the operation.
 
-Once we get to using Spark, the classes in the map/reduce part
-of our algorithm will need to be serializable. Method closures
-in dynamic Groovy aren't serializable. We have a few options to
-avoid using them. I'll show one approach here which is to use
-some helper classes in places where we might typically use method
-references. Here are the helper classes:
+To make our pipeline definitions a little shorter,
+we'll define some useful operators in a `PipelineOps` helper class:
 
 [source,groovy]
 ----
-class Cluster implements SerializableFunction<TaggedPointCounter, Integer> {
-    Integer apply(TaggedPointCounter tpc) { tpc.cluster() }
-}
-
-class Average implements SerializableFunction<TaggedPointCounter, 
TaggedPointCounter> {
-    TaggedPointCounter apply(TaggedPointCounter tpc) { tpc.average() }
-}
-
-class Plus implements SerializableBinaryOperator<TaggedPointCounter> {
-    TaggedPointCounter apply(TaggedPointCounter tpc1, TaggedPointCounter tpc2) 
{ tpc1.plus(tpc2) }
+class PipelineOps {
+    public static SerializableFunction<PointGrouping, Integer> cluster = tpc 
-> tpc.cluster
+    public static SerializableFunction<PointGrouping, PointGrouping> average = 
tpc -> tpc.average()
+    public static SerializableBinaryOperator<PointGrouping> plus = (tpc1, 
tpc2) -> tpc1 + tpc2
 }
 ----
 
@@ -153,42 +147,43 @@ Now we are ready for our KMeans script:
 [source,groovy]
 ----
 int k = 5
-int iterations = 20
+int iterations = 10
 
 // read in data from our file
-def url = WhiskeyWayang.classLoader.getResource('whiskey.csv').file
-def pointsData = new File(url).readLines()[1..-1].collect{ Point.fromLine(it) }
-def dims = pointsData[0].pts().size()
+var url = WhiskeyWayang.classLoader.getResource('whiskey.csv').file
+def rows = new File(url).readLines()[1..-1]*.split(',')
+var distilleries = rows*.getAt(1)
+var pointsData = rows.collect{ new Point(it[2..-1] as double[]) }
+var dims = pointsData[0].pts.size()
 
 // create some random points as initial centroids
-def r = new Random()
-def initPts = (1..k).collect { (0..<dims).collect { r.nextGaussian() + 2 } as 
double[] }
+var r = new Random()
+var randomPoint = { (0..<dims).collect { r.nextGaussian() + 2 } as double[] }
+var initPts = (1..k).collect(randomPoint)
 
-// create planbuilder with Java and Spark enabled
-def configuration = new Configuration()
-def context = new WayangContext(configuration)
+var context = new WayangContext()
     .withPlugin(Java.basicPlugin())
     .withPlugin(Spark.basicPlugin())
-def planBuilder = new JavaPlanBuilder(context, "KMeans ($url, k=$k, 
iterations=$iterations)")
+var planBuilder = new JavaPlanBuilder(context, "KMeans ($url, k=$k, 
iterations=$iterations)")
 
-def points = planBuilder
+var points = planBuilder
     .loadCollection(pointsData).withName('Load points')
 
-def initialCentroids = planBuilder
-    .loadCollection((0..<k).collect{ idx -> new 
TaggedPointCounter(initPts[idx], idx, 0) })
-    .withName("Load random centroids")
+var initialCentroids = planBuilder
+    .loadCollection((0..<k).collect{ idx -> new PointGrouping(initPts[idx], 
idx, 0) })
+    .withName('Load random centroids')
 
-def finalCentroids = initialCentroids
-    .repeat(iterations, currentCentroids ->
-        points.map(new SelectNearestCentroid())
-            .withBroadcast(currentCentroids, "centroids").withName("Find 
nearest centroid")
-            .reduceByKey(new Cluster(), new Plus()).withName("Add up points")
-            .map(new Average()).withName("Average points")
-            .withOutputClass(TaggedPointCounter)).withName("Loop").collect()
+var finalCentroids = initialCentroids.repeat(iterations, currentCentroids ->
+    points.map(new SelectNearestCentroid())
+        .withBroadcast(currentCentroids, 'centroids').withName('Find nearest 
centroid')
+        .reduceByKey(cluster, plus).withName('Aggregate points')
+        .map(average).withName('Average points')
+        .withOutputClass(PointGrouping)
+).withName('Loop').collect()
 
 println 'Centroids:'
 finalCentroids.each { c ->
-    println "Cluster$c.cluster: ${c.pts.collect{ sprintf('%.3f', it) }.join(', 
')}"
+    println "Cluster $c.cluster: ${c.pts.collect('%.2f'::formatted).join(', 
')}"
 }
 ----
 
@@ -205,6 +200,20 @@ at each iteration, all the points to their closest current
 centroid and then calculating the new centroids given those
 assignments. Finally, we output the results.
 
+Optionally, we might want to print out the distilleries allocated to each 
cluster.
+The code looks like this:
+
+[source,groovy]
+----
+var allocator = new SelectNearestCentroid(centroids: finalCentroids)
+var allocations = pointsData.withIndex()
+    .collect{ pt, idx -> [allocator.apply(pt).cluster, distilleries[idx]] }
+    .groupBy{ cluster, ds -> "Cluster $cluster" }
+    .collectValues{ v -> v.collect{ it[1] } }
+    .sort{ it.key }
+allocations.each{ c, ds -> println "$c (${ds.size()} members): ${ds.join(', 
')}" }
+----
+
 == Running with the Java streams-backed platform
 
 As we mentioned earlier, Wayang selects which platform(s) will
@@ -223,10 +232,10 @@ the script is run, but here is one output:
 ----
 > Task :WhiskeyWayang:run
 Centroids:
-Cluster0: 2.548, 2.419, 1.613, 0.194, 0.097, 1.871, 1.742, 1.774, 1.677, 
1.935, 1.806, 1.613
-Cluster2: 1.464, 2.679, 1.179, 0.321, 0.071, 0.786, 1.429, 0.429, 0.964, 
1.643, 1.929, 2.179
-Cluster3: 3.250, 1.500, 3.250, 3.000, 0.500, 0.250, 1.625, 0.375, 1.375, 
1.375, 1.250, 0.250
-Cluster4: 1.684, 1.842, 1.211, 0.421, 0.053, 1.316, 0.632, 0.737, 1.895, 
2.000, 1.842, 1.737
+Cluster0: 2.55, 2.42, 1.61, 0.19, 0.10, 1.87, 1.74, 1.77, 1.68, 1.93, 1.81, 
1.61
+Cluster2: 1.46, 2.68, 1.18, 0.32, 0.07, 0.79, 1.43, 0.43, 0.96, 1.64, 1.93, 
2.18
+Cluster3: 3.25, 1.50, 3.25, 3.00, 0.50, 0.25, 1.62, 0.37, 1.37, 1.37, 1.25, 
0.25
+Cluster4: 1.68, 1.84, 1.21, 0.42, 0.05, 1.32, 0.63, 0.74, 1.89, 2.00, 1.84, 
1.74
 ...
 ----
 
@@ -252,11 +261,9 @@ change in our code:
 [source,groovy,highlight=4]
 ----
 ...
-def configuration = new Configuration()
-def context = new WayangContext(configuration)
-//    .withPlugin(Java.basicPlugin())                <1>
+var context = new WayangContext()
+//    .withPlugin(Java.basicPlugin())     <1>
     .withPlugin(Spark.basicPlugin())
-def planBuilder = new JavaPlanBuilder(context, "KMeans ($url, k=$k, 
iterations=$iterations)")
 ...
 ----
 <1> Disabled
@@ -266,20 +273,71 @@ this (a solution similar to before but with 1000+ extra 
lines of
 Spark and Wayang log information - truncated for presentation purposes):
 
 ----
-[main] INFO org.apache.spark.SparkContext - Running Spark version 3.3.0
+[main] INFO org.apache.spark.SparkContext - Running Spark version 3.5.4
 [main] INFO org.apache.spark.util.Utils - Successfully started service 
'sparkDriver' on port 62081.
 ...
 Centroids:
-Cluster4: 1.414, 2.448, 0.966, 0.138, 0.034, 0.862, 1.000, 0.483, 1.345, 
1.690, 2.103, 2.138
-Cluster0: 2.773, 2.455, 1.455, 0.000, 0.000, 1.909, 1.682, 1.955, 2.091, 
2.045, 2.136, 1.818
-Cluster1: 1.762, 2.286, 1.571, 0.619, 0.143, 1.714, 1.333, 0.905, 1.190, 
1.952, 1.095, 1.524
-Cluster2: 3.250, 1.500, 3.250, 3.000, 0.500, 0.250, 1.625, 0.375, 1.375, 
1.375, 1.250, 0.250
-Cluster3: 2.167, 2.000, 2.167, 1.000, 0.333, 0.333, 2.000, 0.833, 0.833, 
1.500, 2.333, 1.667
+Cluster 4: 1.63, 2.26, 1.68, 0.63, 0.16, 1.47, 1.42, 0.89, 1.16, 1.95, 0.89, 
1.58
+Cluster 0: 2.76, 2.44, 1.44, 0.04, 0.00, 1.88, 1.68, 1.92, 1.92, 2.04, 2.16, 
1.72
+Cluster 1: 3.11, 1.44, 3.11, 2.89, 0.56, 0.22, 1.56, 0.44, 1.44, 1.44, 1.33, 
0.44
+Cluster 2: 1.52, 2.42, 1.09, 0.24, 0.06, 0.91, 1.09, 0.45, 1.30, 1.64, 2.18, 
2.09
 ...
 [shutdown-hook-0] INFO org.apache.spark.SparkContext - Successfully stopped 
SparkContext
 [shutdown-hook-0] INFO org.apache.spark.util.ShutdownHookManager - Shutdown 
hook called
 ----
 
+== Using ML4all
+
+In recent versions of Wayang, a new abstraction, called ML4all has been 
introduced.
+It frees users from the burden of machine learning algorithm selection and 
low-level implementation details. Many readers will be familiar with how 
systems supporting
+_MapReduce_ split functionality into `map`, `filter` or `shuffle`, and 
`reduce` steps.
+ML4all abstracts machine learning algorithm functionality into 7 operators:
+`Transform`, `Stage`, `Compute`, `Update`, `Sample`, `Converge`, and `Loop`.
+
+Wayang comes bundled with implementations for many of these operators, but
+you can write your own like we have here for the Transform operator:
+
+[source,groovy]
+----
+class TransformCSV extends Transform<double[], String> {
+    double[] transform(String input) {
+        input.split(',')[2..-1] as double[]
+    }
+}
+----
+
+With this operator defined, we can now write our
+
+[source,groovy]
+----
+var dims = 12
+var context = new WayangContext()
+    .withPlugin(Spark.basicPlugin())
+    .withPlugin(Java.basicPlugin())
+
+var plan = new ML4allPlan(
+    transformOp: new TransformCSV(),
+    localStage: new KMeansStageWithRandoms(k: k, dimension: dims),
+    computeOp: new KMeansCompute(),
+    updateOp: new KMeansUpdate(),
+    loopOp: new KMeansConvergeOrMaxIterationsLoop(accuracy, maxIterations)
+)
+
+var model = plan.execute('file:' + url, context)
+model.getByKey("centers").eachWithIndex { center, idx ->
+    var pts = center.collect('%.2f'::formatted).join(', ')
+    println "Cluster$idx: $pts"
+}
+----
+
+When run we get this output:
+
+----
+Cluster0: 1.57, 2.32, 1.32, 0.45, 0.09, 1.08, 1.19, 0.60, 1.26, 1.74, 1.72, 
1.85
+Cluster1: 3.43, 1.57, 3.43, 3.14, 0.57, 0.14, 1.71, 0.43, 1.29, 1.43, 1.29, 
0.14
+Cluster2: 2.73, 2.42, 1.46, 0.04, 0.04, 1.88, 1.69, 1.88, 1.92, 2.04, 2.12, 
1.81
+----
+
 == Discussion
 
 A goal of Apache Wayang is to allow developers to write
@@ -288,23 +346,18 @@ the abstractions aren't perfect. As an example, if I know 
I
 am only using the streams-backed platform, I don't need to worry
 about making any of my classes serializable (which is a Spark
 requirement). In our example, we could have omitted the
-`implements Serializable` part of the `TaggedPointCounter` record,
-and we could have used a method reference
-`TaggedPointCounter::average` instead of our `Average`
-helper class. This isn't meant to be a criticism of Wayang,
+`implements Serializable` part of the `PointGrouping` record,
+and several of our pipeline operators may have reduced to simple closures.
+This isn't meant to be a criticism of Wayang,
 after all if you want to write cross-platform UDFs, you might
 expect to have to follow some rules. Instead, it is meant to
 just indicate that abstractions often have leaks around the edges.
 Sometimes those leaks can be beneficially used, other times they
 are traps waiting for unknowing developers.
 
-To summarise, if using the Java streams-backed platform, you can
-run the application on JDK17 (which uses native records) as well
-as JDK11 and JDK8 (where Groovy provides emulated records).
-Also, we could make numerous simplifications if we desired.
-When using the Spark processing platform, the potential
-simplifications aren't applicable, and we can run on JDK8 and
-JDK11 (Spark isn't yet supported on JDK17).
+We ran this example using JDK17, but on earlier
+JDK versions, Groovy will use emulated records
+instead of native records without changing the source code.
 
 == Conclusion
 
@@ -329,6 +382,18 @@ in achieving this goal.
 == More Information
 
 * Repo containing the source code: 
https://github.com/paulk-asert/groovy-data-science/tree/master/subprojects/WhiskeyWayang[WhiskeyWayang]
-* Repo containing similar examples using a variety of libraries including 
Apache Commons CSV, Weka, Smile, Tribuo and others: 
https://github.com/paulk-asert/groovy-data-science/tree/master/subprojects/Whiskey[Whiskey]
-* A similar example using Apache Spark directly but with a built-in 
parallelized KMeans from the `spark-mllib` library rather than a hand-crafted 
algorithm: 
https://github.com/paulk-asert/groovy-data-science/tree/master/subprojects/WhiskeySpark[WhiskeySpark]
-* A similar example using Apache Ignite directly but with a built-in clustered 
KMeans from the `ignite-ml` library rather than a hand-crafted algorithm: 
https://github.com/paulk-asert/groovy-data-science/tree/master/subprojects/WhiskeyIgnite[WhiskeyIgnite]
+* Repo containing solutions to this problem using a variety of non-distributed 
libraries including Apache Commons CSV, Weka, Smile, Tribuo and others:
+https://github.com/paulk-asert/groovy-data-science/tree/master/subprojects/Whiskey[Whiskey]
+* A similar example using https://spark.apache.org/[Apache Spark] directly but 
with a built-in parallelized KMeans from the `spark-mllib` library:
+https://github.com/paulk-asert/groovy-data-science/tree/master/subprojects/WhiskeySpark[WhiskeySpark]
+* A similar example using https://ignite.apache.org/[Apache Ignite] using the 
built-in clustered KMeans from the `ignite-ml` library:
+https://github.com/paulk-asert/groovy-data-science/tree/master/subprojects/WhiskeyIgnite[WhiskeyIgnite]
+* A similar example using https://flink.apache.org/[Apache Flink] using KMeans 
from the Flink ML (`flink-ml-uber`) library:
+https://github.com/paulk-asert/groovy-data-science/tree/master/subprojects/WhiskeyFlink[WhiskeyFlink]
+
+.Update history
+****
+*19/Jun/2022*: Initial version. +
+*15/Feb/2025*: Updated for Apache Wayang 1.0.0.
+****
+

Reply via email to