Hey friends,

We're trying to make some batched computations run against an OLAP DB
closer to "realtime". One of our more complex computations is a trigger
when event A occurs but not event B within a given time period. Our
experience with Spark is limited, but since Spark 2.3.0 just introduced
Stream-Stream Joins
<https://databricks.com/blog/2018/03/13/introducing-stream-stream-joins-in-apache-spark-2-3.html>
and
our data is already in Kafka, we thought we'd try it out.

That said, in our exploration, we've been running into an issue where Spark
optimizes the Kafka *watermark* to be applied *after* the *filter* in our
SQL query. This means the watermark won't move forward unless there's data
within the filtered results and thus, the trigger for "event B" not won't
occur until another "event B" is triggered, which can be problematic
depending on how granular the filter is.

See the quick isolated example I setup in *spark-shell* below.

```
scala> :paste
// Entering paste mode (ctrl-D to finish)

val kafka =
spark.readStream.format("kafka").option("kafka.bootstrap.servers",
*"<host>:<port>"*).option("subscribe", "*<topic>*").option("startingOffsets",
"latest").load()

import org.apache.spark.sql.types._
val schema = StructType(Seq(
  StructField("id", StringType),
  StructField("message", StructType(Seq(
    StructField("event", StringType),
    StructField("timestamp", TimestampType)
  )))
))

val parsed = kafka.select(from_json($"value".cast(StringType), schema) as
'data).select($"data.*", $"data.message.timestamp" as
'ts).withWatermark("ts", "10 seconds")

// Exiting paste mode, now interpreting.
scala> parsed.filter("message.event = 'Item
Added'").as('a).join(parsed.filter("message.event = 'Item Purchased'") as
'b, expr("a.id = b.id AND a.ts < b.ts AND b.ts < a.ts + interval 5
seconds"), joinType="left").explain()
== Physical Plan ==
StreamingSymmetricHashJoin [id#24], [id#37], LeftOuter, condition = [
leftOnly = null, rightOnly = null, both = ((ts#23-T10000ms <
ts#39-T10000ms) && (ts#39-T10000ms < ts#23-T10000ms + interval 5 seconds)),
full = ((ts#23-T10000ms < ts#39-T10000ms) && (ts#39-T10000ms <
ts#23-T10000ms + interval 5 seconds)) ], state info [ checkpoint =
<unknown>, runId = 52d0e4a5-150c-4136-8542-c2c5e4bb59c2, opId = 0, ver = 0,
numPartitions = 4], 0, state cleanup [ left value predicate:
(ts#23-T10000ms <= -5000000), right value predicate: (ts#39-T10000ms <= 0) ]
:- Exchange hashpartitioning(id#24, 4)
:  +- EventTimeWatermark ts#23: timestamp, interval 10 seconds
:     +- Project [jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).id AS id#24,
jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).message AS message#25,
jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).message.timestamp AS ts#23]
:        +- Filter (jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).message.event = Item Added)
:           +- StreamingRelation kafka, [key#7, value#8, topic#9,
partition#10, offset#11L, timestamp#12, timestampType#13]
+- Exchange hashpartitioning(id#37, 4)
   +- *(1) Filter isnotnull(ts#39-T10000ms)
      +- EventTimeWatermark ts#39: timestamp, interval 10 seconds
         +- Project [jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).id AS id#37,
jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).message AS message#38,
jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).message.timestamp AS ts#39]
            +- Filter ((jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).message.event = Item Purchased) &&
isnotnull(jsontostructs(StructField(id,StringType,true),
StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).id))
               +- StreamingRelation kafka, [key#7, value#8, topic#9,
partition#10, offset#11L, timestamp#12, timestampType#13]
```

(The parts that need to be replaced with environment-specific values are
*bolded *in case you'd like to reproduce locally.)

As you can see in the plan above, Spark's optimizer has decided to make two
separate *filtered *streams with two separate windows and then join them
together. However, we'd like the plan to look more like

= -> Kafka stream
= => Watermark
= = => JOIN between
= = = = >  Kafka-substream using filter A
= = = = > Kafka substream using filter B

or at least the following

=> JOIN between A and B
= = >  A) Kafka-substream
= = = => Filter
= = > B) Kafka substream using filter B
= = = => Filter

The behavior of filtering the stream *after* applying the watermark seems
to happen with or without the JOIN as seen below.

```
val parsed = kafka.select(from_json($"value".cast(StringType), schema) as
'data).select($"data.*", $"data.message.timestamp" as
'ts).withWatermark("ts", "10 seconds")

parsed.filter("message.event = 'Item Added'").explain()

// Exiting paste mode, now interpreting.

== Physical Plan ==
EventTimeWatermark ts#23: timestamp, interval 10 seconds
+- Project
[jsontostructs(StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).message AS message#24,
jsontostructs(StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).message.timestamp AS ts#23]
   +- Filter
(jsontostructs(StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true), cast(value#8 as string),
Some(Etc/UTC), true).message.event = Item Added)
      +- StreamingRelation kafka, [key#7, value#8, topic#9, partition#10,
offset#11L, timestamp#12, timestampType#13]
kafka: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5
more fields]
import org.apache.spark.sql.types._
schema: org.apache.spark.sql.types.StructType =
StructType(StructField(message,StructType(StructField(event,StringType,true),
StructField(timestamp,TimestampType,true)),true))
parsed: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [message:
struct<event: string, timestamp: timestamp>, ts: timestamp]
```

That said, I think the example with the JOIN helps put *why* it's important
for the windowing to happen afterwards in picture. Anyways, is there
another way to accurately express this query? Thanks in advance.
-- 

Best regards,

Tejas Manohar

Reply via email to