We have a scenario that events from three kafka topics sharing the same
keys need to be merged. One topic has the master events; most events in
other two topics arrive within 10 minutes of master event arrival. Wrote
pseudo code below. I'd love to hear your thoughts whether I am on the right
track.
// Scenario
// (1) Merging events from Kafka topic1, topic2 and topic 3 sharing
the same keys
// (2) Events in topic1 are master events
// (3) One master event may have associated event in Topic2 and/or
Topic3 sharing the same key
// (4) Most events in topic2 and topic3 will arrive within 10
minutes of the master event arrival
//
// Pseudo code
// Use 1-minute window of events in topic1, to left-outer-join with
next 10-minute of events from
// topic2 and topic3
// parse the event to form key-value pair
def parse(v:String) = {
(v.split(",")(0), v)
}
// Create context with 1 minute batch interval
val sparkConf = new SparkConf().setAppName("MergeLogs")
val ssc = new StreamingContext(sparkConf, Minutes(1))
ssc.checkpoint(checkpointDirectory)
// Create direct kafka stream with brokers and topics
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val stream1 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
ssc, kafkaParams, Set(“topic1”)
stream1.checkpoint(Minutes(5)
val pairStream1 = stream1.map(_._2).map(s => parse(s))
val stream2 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
ssc, kafkaParams, Set(“topic2”)
stream2.checkpoint(Minutes(5)
val pairStream2 = stream2.map(_._2).map(s => parse(s))
val stream3 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
ssc, kafkaParams, Set(“topic3”)
stream3.checkpoint(Minutes(5)
val pairStream3 = stream3.map(_._2).map(s => parse(s))
// load 1 minute of master events from topic 1
val windowedStream1 = pairStream1.window(Minutes(1))
// load 10 minutes of topic1 and topic2
val windowedStream2 = pairStream2.window(Minutes(10), Minutes(1))
val windowedStream3 = pairStream3.window(Minutes(10), Minutes(1))
// lefter join topic1 with topic2 and topic3
*val joinedStream =
windowedStream1.leftOuterJoin(windowedStream2).leftOuterJoin(windowedStream3)*
// dump merged events
joinedStream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
// Start the computation
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createContext(ip, port, outputPath, checkpointDirectory)
})
ssc.start()
ssc.awaitTermination()
thx
Daniel