This is an automated email from the ASF dual-hosted git repository.
paulk pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/groovy-website.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 81ba492 update in preparation for Wayang 1.0.0
81ba492 is described below
commit 81ba492a08ddea2333007179e46e353c9bf9a5d6
Author: Paul King <[email protected]>
AuthorDate: Mon Feb 17 14:36:03 2025 +1000
update in preparation for Wayang 1.0.0
---
.../site/blog/using-groovy-with-apache-wayang.adoc | 241 +++++++++++++--------
1 file changed, 153 insertions(+), 88 deletions(-)
diff --git a/site/src/site/blog/using-groovy-with-apache-wayang.adoc
b/site/src/site/blog/using-groovy-with-apache-wayang.adoc
index 6b6884f..98e1f2d 100644
--- a/site/src/site/blog/using-groovy-with-apache-wayang.adoc
+++ b/site/src/site/blog/using-groovy-with-apache-wayang.adoc
@@ -1,9 +1,10 @@
= Using Groovy with Apache Wayang and Apache Spark
-Paul King
+Paul King <paulk-asert|PMC_Member>
:source-highlighter: pygments
:pygments-style: emacs
:icons: font
:revdate: 2022-06-19T13:01:07+00:00
+:updated: 2025-02-15T14:30:00+00:00
:keywords: centroids, data science, groovy, kmeans, records, apache spark,
apache wayang
:description: This post looks at using Apache Wayang and Apache Spark with
Apache Groovy to cluster various Whiskies.
@@ -56,32 +57,33 @@ We'll start with defining a Point record:
[source,groovy]
----
-record Point(double[] pts) implements Serializable {
- static Point fromLine(String line) { new
Point(line.split(',')[2..-1]*.toDouble() as double[]) }
-}
+record Point(double[] pts) implements Serializable { }
----
-We've made it `Serializable` (more on that later) and included
-a `fromLine` factory method to help us make points from a CSV
-file. We'll do that ourselves rather than rely on other libraries
-which could assist. It's not a 2D or 3D point for us but 12D
+We've made it `Serializable` (more on that later).
+It's not a 2D or 3D point for us but 12D
corresponding to the 12 criteria. We just use a `double` array,
so any dimension would be supported but the 12 comes from the
number of columns in our data file.
-We'll define a related `TaggedPointCounter` record. It's like
+We'll define a related `PointGrouping` record. It's like
`Point` but tracks an `int` cluster id and `long` count used
when clustering the points:
[source,groovy]
----
-record TaggedPointCounter(double[] pts, int cluster, long count) implements
Serializable {
- TaggedPointCounter plus(TaggedPointCounter that) {
- new TaggedPointCounter((0..<pts.size()).collect{ pts[it] +
that.pts[it] } as double[], cluster, count + that.count)
+record PointGrouping(double[] pts, int cluster, long count) implements
Serializable {
+ PointGrouping(List<Double> pts, int cluster, long count) {
+ this(pts as double[], cluster, count)
+ }
+
+ PointGrouping plus(PointGrouping that) {
+ var newPts = pts.indices.collect{ pts[it] + that.pts[it] }
+ new PointGrouping(newPts, cluster, count + that.count)
}
- TaggedPointCounter average() {
- new TaggedPointCounter(pts.collect{ double d -> d/count } as double[],
cluster, 0)
+ PointGrouping average() {
+ new PointGrouping(pts.collect{ double d -> d/count }, cluster, 1)
}
}
----
@@ -99,24 +101,26 @@ class to capture this part of the algorithm:
[source,groovy]
----
-class SelectNearestCentroid implements ExtendedSerializableFunction<Point,
TaggedPointCounter> {
- Iterable<TaggedPointCounter> centroids
+class SelectNearestCentroid implements ExtendedSerializableFunction<Point,
PointGrouping> {
+ Iterable<PointGrouping> centroids
void open(ExecutionContext context) {
- centroids = context.getBroadcast("centroids")
+ centroids = context.getBroadcast('centroids')
}
- TaggedPointCounter apply(Point p) {
- def minDistance = Double.POSITIVE_INFINITY
- def nearestCentroidId = -1
+ PointGrouping apply(Point p) {
+ var minDistance = Double.POSITIVE_INFINITY
+ var nearestCentroidId = -1
for (c in centroids) {
- def distance = sqrt((0..<p.pts.size()).collect{ p.pts[it] -
c.pts[it] }.sum{ it ** 2 } as double)
+ var distance = sqrt(p.pts.indices
+ .collect{ p.pts[it] - c.pts[it] }
+ .sum{ it ** 2 } as double)
if (distance < minDistance) {
minDistance = distance
nearestCentroidId = c.cluster
}
}
- new TaggedPointCounter(p.pts, nearestCentroidId, 1)
+ new PointGrouping(p.pts, nearestCentroidId, 1)
}
}
----
@@ -126,25 +130,15 @@ _UDF_, a User-Defined Function. It represents some chunk
of
functionality where an optimization decision can be made about
where to run the operation.
-Once we get to using Spark, the classes in the map/reduce part
-of our algorithm will need to be serializable. Method closures
-in dynamic Groovy aren't serializable. We have a few options to
-avoid using them. I'll show one approach here which is to use
-some helper classes in places where we might typically use method
-references. Here are the helper classes:
+To make our pipeline definitions a little shorter,
+we'll define some useful operators in a `PipelineOps` helper class:
[source,groovy]
----
-class Cluster implements SerializableFunction<TaggedPointCounter, Integer> {
- Integer apply(TaggedPointCounter tpc) { tpc.cluster() }
-}
-
-class Average implements SerializableFunction<TaggedPointCounter,
TaggedPointCounter> {
- TaggedPointCounter apply(TaggedPointCounter tpc) { tpc.average() }
-}
-
-class Plus implements SerializableBinaryOperator<TaggedPointCounter> {
- TaggedPointCounter apply(TaggedPointCounter tpc1, TaggedPointCounter tpc2)
{ tpc1.plus(tpc2) }
+class PipelineOps {
+ public static SerializableFunction<PointGrouping, Integer> cluster = tpc
-> tpc.cluster
+ public static SerializableFunction<PointGrouping, PointGrouping> average =
tpc -> tpc.average()
+ public static SerializableBinaryOperator<PointGrouping> plus = (tpc1,
tpc2) -> tpc1 + tpc2
}
----
@@ -153,42 +147,43 @@ Now we are ready for our KMeans script:
[source,groovy]
----
int k = 5
-int iterations = 20
+int iterations = 10
// read in data from our file
-def url = WhiskeyWayang.classLoader.getResource('whiskey.csv').file
-def pointsData = new File(url).readLines()[1..-1].collect{ Point.fromLine(it) }
-def dims = pointsData[0].pts().size()
+var url = WhiskeyWayang.classLoader.getResource('whiskey.csv').file
+def rows = new File(url).readLines()[1..-1]*.split(',')
+var distilleries = rows*.getAt(1)
+var pointsData = rows.collect{ new Point(it[2..-1] as double[]) }
+var dims = pointsData[0].pts.size()
// create some random points as initial centroids
-def r = new Random()
-def initPts = (1..k).collect { (0..<dims).collect { r.nextGaussian() + 2 } as
double[] }
+var r = new Random()
+var randomPoint = { (0..<dims).collect { r.nextGaussian() + 2 } as double[] }
+var initPts = (1..k).collect(randomPoint)
-// create planbuilder with Java and Spark enabled
-def configuration = new Configuration()
-def context = new WayangContext(configuration)
+var context = new WayangContext()
.withPlugin(Java.basicPlugin())
.withPlugin(Spark.basicPlugin())
-def planBuilder = new JavaPlanBuilder(context, "KMeans ($url, k=$k,
iterations=$iterations)")
+var planBuilder = new JavaPlanBuilder(context, "KMeans ($url, k=$k,
iterations=$iterations)")
-def points = planBuilder
+var points = planBuilder
.loadCollection(pointsData).withName('Load points')
-def initialCentroids = planBuilder
- .loadCollection((0..<k).collect{ idx -> new
TaggedPointCounter(initPts[idx], idx, 0) })
- .withName("Load random centroids")
+var initialCentroids = planBuilder
+ .loadCollection((0..<k).collect{ idx -> new PointGrouping(initPts[idx],
idx, 0) })
+ .withName('Load random centroids')
-def finalCentroids = initialCentroids
- .repeat(iterations, currentCentroids ->
- points.map(new SelectNearestCentroid())
- .withBroadcast(currentCentroids, "centroids").withName("Find
nearest centroid")
- .reduceByKey(new Cluster(), new Plus()).withName("Add up points")
- .map(new Average()).withName("Average points")
- .withOutputClass(TaggedPointCounter)).withName("Loop").collect()
+var finalCentroids = initialCentroids.repeat(iterations, currentCentroids ->
+ points.map(new SelectNearestCentroid())
+ .withBroadcast(currentCentroids, 'centroids').withName('Find nearest
centroid')
+ .reduceByKey(cluster, plus).withName('Aggregate points')
+ .map(average).withName('Average points')
+ .withOutputClass(PointGrouping)
+).withName('Loop').collect()
println 'Centroids:'
finalCentroids.each { c ->
- println "Cluster$c.cluster: ${c.pts.collect{ sprintf('%.3f', it) }.join(',
')}"
+ println "Cluster $c.cluster: ${c.pts.collect('%.2f'::formatted).join(',
')}"
}
----
@@ -205,6 +200,20 @@ at each iteration, all the points to their closest current
centroid and then calculating the new centroids given those
assignments. Finally, we output the results.
+Optionally, we might want to print out the distilleries allocated to each
cluster.
+The code looks like this:
+
+[source,groovy]
+----
+var allocator = new SelectNearestCentroid(centroids: finalCentroids)
+var allocations = pointsData.withIndex()
+ .collect{ pt, idx -> [allocator.apply(pt).cluster, distilleries[idx]] }
+ .groupBy{ cluster, ds -> "Cluster $cluster" }
+ .collectValues{ v -> v.collect{ it[1] } }
+ .sort{ it.key }
+allocations.each{ c, ds -> println "$c (${ds.size()} members): ${ds.join(',
')}" }
+----
+
== Running with the Java streams-backed platform
As we mentioned earlier, Wayang selects which platform(s) will
@@ -223,10 +232,10 @@ the script is run, but here is one output:
----
> Task :WhiskeyWayang:run
Centroids:
-Cluster0: 2.548, 2.419, 1.613, 0.194, 0.097, 1.871, 1.742, 1.774, 1.677,
1.935, 1.806, 1.613
-Cluster2: 1.464, 2.679, 1.179, 0.321, 0.071, 0.786, 1.429, 0.429, 0.964,
1.643, 1.929, 2.179
-Cluster3: 3.250, 1.500, 3.250, 3.000, 0.500, 0.250, 1.625, 0.375, 1.375,
1.375, 1.250, 0.250
-Cluster4: 1.684, 1.842, 1.211, 0.421, 0.053, 1.316, 0.632, 0.737, 1.895,
2.000, 1.842, 1.737
+Cluster0: 2.55, 2.42, 1.61, 0.19, 0.10, 1.87, 1.74, 1.77, 1.68, 1.93, 1.81,
1.61
+Cluster2: 1.46, 2.68, 1.18, 0.32, 0.07, 0.79, 1.43, 0.43, 0.96, 1.64, 1.93,
2.18
+Cluster3: 3.25, 1.50, 3.25, 3.00, 0.50, 0.25, 1.62, 0.37, 1.37, 1.37, 1.25,
0.25
+Cluster4: 1.68, 1.84, 1.21, 0.42, 0.05, 1.32, 0.63, 0.74, 1.89, 2.00, 1.84,
1.74
...
----
@@ -252,11 +261,9 @@ change in our code:
[source,groovy,highlight=4]
----
...
-def configuration = new Configuration()
-def context = new WayangContext(configuration)
-// .withPlugin(Java.basicPlugin()) <1>
+var context = new WayangContext()
+// .withPlugin(Java.basicPlugin()) <1>
.withPlugin(Spark.basicPlugin())
-def planBuilder = new JavaPlanBuilder(context, "KMeans ($url, k=$k,
iterations=$iterations)")
...
----
<1> Disabled
@@ -266,20 +273,71 @@ this (a solution similar to before but with 1000+ extra
lines of
Spark and Wayang log information - truncated for presentation purposes):
----
-[main] INFO org.apache.spark.SparkContext - Running Spark version 3.3.0
+[main] INFO org.apache.spark.SparkContext - Running Spark version 3.5.4
[main] INFO org.apache.spark.util.Utils - Successfully started service
'sparkDriver' on port 62081.
...
Centroids:
-Cluster4: 1.414, 2.448, 0.966, 0.138, 0.034, 0.862, 1.000, 0.483, 1.345,
1.690, 2.103, 2.138
-Cluster0: 2.773, 2.455, 1.455, 0.000, 0.000, 1.909, 1.682, 1.955, 2.091,
2.045, 2.136, 1.818
-Cluster1: 1.762, 2.286, 1.571, 0.619, 0.143, 1.714, 1.333, 0.905, 1.190,
1.952, 1.095, 1.524
-Cluster2: 3.250, 1.500, 3.250, 3.000, 0.500, 0.250, 1.625, 0.375, 1.375,
1.375, 1.250, 0.250
-Cluster3: 2.167, 2.000, 2.167, 1.000, 0.333, 0.333, 2.000, 0.833, 0.833,
1.500, 2.333, 1.667
+Cluster 4: 1.63, 2.26, 1.68, 0.63, 0.16, 1.47, 1.42, 0.89, 1.16, 1.95, 0.89,
1.58
+Cluster 0: 2.76, 2.44, 1.44, 0.04, 0.00, 1.88, 1.68, 1.92, 1.92, 2.04, 2.16,
1.72
+Cluster 1: 3.11, 1.44, 3.11, 2.89, 0.56, 0.22, 1.56, 0.44, 1.44, 1.44, 1.33,
0.44
+Cluster 2: 1.52, 2.42, 1.09, 0.24, 0.06, 0.91, 1.09, 0.45, 1.30, 1.64, 2.18,
2.09
...
[shutdown-hook-0] INFO org.apache.spark.SparkContext - Successfully stopped
SparkContext
[shutdown-hook-0] INFO org.apache.spark.util.ShutdownHookManager - Shutdown
hook called
----
+== Using ML4all
+
+In recent versions of Wayang, a new abstraction, called ML4all has been
introduced.
+It frees users from the burden of machine learning algorithm selection and
low-level implementation details. Many readers will be familiar with how
systems supporting
+_MapReduce_ split functionality into `map`, `filter` or `shuffle`, and
`reduce` steps.
+ML4all abstracts machine learning algorithm functionality into 7 operators:
+`Transform`, `Stage`, `Compute`, `Update`, `Sample`, `Converge`, and `Loop`.
+
+Wayang comes bundled with implementations for many of these operators, but
+you can write your own like we have here for the Transform operator:
+
+[source,groovy]
+----
+class TransformCSV extends Transform<double[], String> {
+ double[] transform(String input) {
+ input.split(',')[2..-1] as double[]
+ }
+}
+----
+
+With this operator defined, we can now write our
+
+[source,groovy]
+----
+var dims = 12
+var context = new WayangContext()
+ .withPlugin(Spark.basicPlugin())
+ .withPlugin(Java.basicPlugin())
+
+var plan = new ML4allPlan(
+ transformOp: new TransformCSV(),
+ localStage: new KMeansStageWithRandoms(k: k, dimension: dims),
+ computeOp: new KMeansCompute(),
+ updateOp: new KMeansUpdate(),
+ loopOp: new KMeansConvergeOrMaxIterationsLoop(accuracy, maxIterations)
+)
+
+var model = plan.execute('file:' + url, context)
+model.getByKey("centers").eachWithIndex { center, idx ->
+ var pts = center.collect('%.2f'::formatted).join(', ')
+ println "Cluster$idx: $pts"
+}
+----
+
+When run we get this output:
+
+----
+Cluster0: 1.57, 2.32, 1.32, 0.45, 0.09, 1.08, 1.19, 0.60, 1.26, 1.74, 1.72,
1.85
+Cluster1: 3.43, 1.57, 3.43, 3.14, 0.57, 0.14, 1.71, 0.43, 1.29, 1.43, 1.29,
0.14
+Cluster2: 2.73, 2.42, 1.46, 0.04, 0.04, 1.88, 1.69, 1.88, 1.92, 2.04, 2.12,
1.81
+----
+
== Discussion
A goal of Apache Wayang is to allow developers to write
@@ -288,23 +346,18 @@ the abstractions aren't perfect. As an example, if I know
I
am only using the streams-backed platform, I don't need to worry
about making any of my classes serializable (which is a Spark
requirement). In our example, we could have omitted the
-`implements Serializable` part of the `TaggedPointCounter` record,
-and we could have used a method reference
-`TaggedPointCounter::average` instead of our `Average`
-helper class. This isn't meant to be a criticism of Wayang,
+`implements Serializable` part of the `PointGrouping` record,
+and several of our pipeline operators may have reduced to simple closures.
+This isn't meant to be a criticism of Wayang,
after all if you want to write cross-platform UDFs, you might
expect to have to follow some rules. Instead, it is meant to
just indicate that abstractions often have leaks around the edges.
Sometimes those leaks can be beneficially used, other times they
are traps waiting for unknowing developers.
-To summarise, if using the Java streams-backed platform, you can
-run the application on JDK17 (which uses native records) as well
-as JDK11 and JDK8 (where Groovy provides emulated records).
-Also, we could make numerous simplifications if we desired.
-When using the Spark processing platform, the potential
-simplifications aren't applicable, and we can run on JDK8 and
-JDK11 (Spark isn't yet supported on JDK17).
+We ran this example using JDK17, but on earlier
+JDK versions, Groovy will use emulated records
+instead of native records without changing the source code.
== Conclusion
@@ -329,6 +382,18 @@ in achieving this goal.
== More Information
* Repo containing the source code:
https://github.com/paulk-asert/groovy-data-science/tree/master/subprojects/WhiskeyWayang[WhiskeyWayang]
-* Repo containing similar examples using a variety of libraries including
Apache Commons CSV, Weka, Smile, Tribuo and others:
https://github.com/paulk-asert/groovy-data-science/tree/master/subprojects/Whiskey[Whiskey]
-* A similar example using Apache Spark directly but with a built-in
parallelized KMeans from the `spark-mllib` library rather than a hand-crafted
algorithm:
https://github.com/paulk-asert/groovy-data-science/tree/master/subprojects/WhiskeySpark[WhiskeySpark]
-* A similar example using Apache Ignite directly but with a built-in clustered
KMeans from the `ignite-ml` library rather than a hand-crafted algorithm:
https://github.com/paulk-asert/groovy-data-science/tree/master/subprojects/WhiskeyIgnite[WhiskeyIgnite]
+* Repo containing solutions to this problem using a variety of non-distributed
libraries including Apache Commons CSV, Weka, Smile, Tribuo and others:
+https://github.com/paulk-asert/groovy-data-science/tree/master/subprojects/Whiskey[Whiskey]
+* A similar example using https://spark.apache.org/[Apache Spark] directly but
with a built-in parallelized KMeans from the `spark-mllib` library:
+https://github.com/paulk-asert/groovy-data-science/tree/master/subprojects/WhiskeySpark[WhiskeySpark]
+* A similar example using https://ignite.apache.org/[Apache Ignite] using the
built-in clustered KMeans from the `ignite-ml` library:
+https://github.com/paulk-asert/groovy-data-science/tree/master/subprojects/WhiskeyIgnite[WhiskeyIgnite]
+* A similar example using https://flink.apache.org/[Apache Flink] using KMeans
from the Flink ML (`flink-ml-uber`) library:
+https://github.com/paulk-asert/groovy-data-science/tree/master/subprojects/WhiskeyFlink[WhiskeyFlink]
+
+.Update history
+****
+*19/Jun/2022*: Initial version. +
+*15/Feb/2025*: Updated for Apache Wayang 1.0.0.
+****
+