Hey Wayang team,
I have added the KafkaTopicSink incl. Mappings and the code compiles
already, which is good for a Monday morning session ;-)
Here is my question:
In the example code which I have taken out of my WordCount Example, I need
FORMATTER_UDF and UDF_LOAD_PROFILE_ESTIMATOR in order to
call the new writeKafkaTopic(...) function.
Can somebody please provide me with a pointer to a place in the code or
docs, where I can read how to instantiate those objects?
Many thanks,
and have a creat start into the week.
Cheers,
Mirk
Object FORMATTER_UDF = null;
Object UDF_LOAD_PROFILE_ESTIMATOR = null;
// Get a plan builder.
WayangContext wayangContext = new WayangContext(new Configuration())
.withPlugin(Java.basicPlugin());
// .withPlugin(Spark.basicPlugin());
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName(String.format("WordCount (%s)", topicName))
.withUdfJarOf(KafkaTopicWordCount.class);
// Start building the WayangPlan.
Collection<Tuple2<String, Integer>> wordcounts = planBuilder
// Read the text file.
.readKafkaTopic(topicName).withName("Load data from topic")
// Split each line by non-word characters.
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.withSelectivity(10, 100, 0.9)
.withName("Split words")
// Filter empty tokens.
.filter(token -> !token.isEmpty())
.withSelectivity(0.99, 0.99, 0.99)
.withName("Filter empty words")
// Attach counter to each word.
.map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To
lower case, add counter")
// Sum up counters for every word.
.reduceByKey(
Tuple2::getField0,
(t1, t2) -> new Tuple2<>(t1.getField0(),
t1.getField1() + t2.getField1())
)
.withCardinalityEstimator(new DefaultCardinalityEstimator(0.9,
1, false, in -> Math.round(0.01 * in[0])))
.withName("Add counters")
// Execute the plan and collect the results.
//.collect();
.writeKafkaTopic("test", FORMATTER_UDF, "job_test_1",
UDF_LOAD_PROFILE_ESTIMATOR );
--
Dr. rer. nat. Mirko Kämpf
Müchelner Str. 23
06259 Frankleben