Hello, everybody,
I found a solution myself, here's it in case somebody is interested:
Idea
The trick is to combine the two streams using the beam.Flatten operation
and to use a *Stateful DoFn* to compute the number of pageviews before one
request. Each stream contains json dictionaries. I embedded them by
using {'request'
: request} and {'pageview' : pageview} as a surrounding block, so that I
can keep the different events apart in the *Stateful DoFn*. I also computed
things like first pageview timestamp and seconds since first pageview
along. The streams have to use the session_id as a key, such that the *Stateful
DoFn* is receiving all the events of one session only.
Code
First of all, this is the pipeline code:
# Beam pipeline, that extends requests by number of pageviews before
request in that sessionwith beam.Pipeline(options=options) as p:
# The stream of requests
requests = (
'Read from PubSub subscription' >>
beam.io.ReadFromPubSub(subscription=request_sub)
| 'Extract JSON' >> beam.ParDo(ExtractJSON())
| 'Add Timestamp' >> beam.ParDo(AssignTimestampFn())
| 'Use Session ID as stream key' >> beam.Map(lambda
request: (request['session_id'], request))
| 'Add type of event' >> beam.Map(lambda r:
(r[0], ('request', r[1])))
)
# The stream of pageviews
pageviews = (
'Read from PubSub subscription' >>
beam.io.ReadFromPubSub(subscription=pageview_sub)
| 'Extract JSON' >> beam.ParDo(ExtractJSON())
| 'Add Timestamp' >> beam.ParDo(AssignTimestampFn())
| 'Use Session ID as stream key' >> beam.Map(lambda
pageview: (pageview['session_id'], pageview))
| 'Add type of event' >> beam.Map(lambda p:
(p[0], ('pageview', p[1])))
)
# Combine the streams and apply Stateful DoFn
combined = (
(
p | ('Prepare requests stream' >> requests),
p | ('Prepare pageviews stream' >> pageviews)
)
| 'Combine event streams' >> beam.Flatten()
| 'Global Window' >>
beam.WindowInto(windowfn=window.GlobalWindows(),
trigger=trigger.AfterCount(1),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| 'Stateful DoFn' >> beam.ParDo(CountPageviews())
| 'Compute processing delay' >> beam.ParDo(LogTimeDelay())
| 'Format for BigQuery output' >> beam.ParDo(FormatForOutputDoFn())
)
# Write to BigQuery.
combined | 'Write' >> beam.io.WriteToBigQuery(
requests_extended_table,
schema=REQUESTS_EXTENDED_TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
The interesting part is the combination of the two streams using
beam.Flatten and applying the stateful DoFn CountPageviews()
Here's the code of the used custom DoFns:
# This DoFn just loads a json messageclass ExtractJSON(beam.DoFn):
def process(self, element):
import json
yield json.loads(element)
# This DoFn adds the event timestamp of messages into it json elements
for further processingclass AssignTimestampFn(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
import datetime
timestamped_element = element
timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
timestamp = timestamp_utc.strftime("%Y-%m-%d %H:%M:%S")
timestamped_element['timestamp_utc'] = timestamp_utc
timestamped_element['timestamp'] = timestamp
yield timestamped_element
# This class is a stateful dofn# Input elements should be of form
(session_id, {'event_type' : event}# Where events can be requests or
pageviews# It computes on a per session basis the number of pageviews
and the first pageview timestamp# in its internal state# Whenever a
request comes in, it appends the internal state to the request and
emits# a extended request# Whenever a pageview comes in, the internal
state is updated but nothing is emittedclass
CountPageviewsStateful(beam.DoFn):
# The internal states
NUM_PAGEVIEWS = userstate.CombiningValueStateSpec('num_pageviews',
combine_fn=sum)
FIRST_PAGEVIEW =
userstate.ReadModifyWriteStateSpec('first_pageview',
coder=beam.coders.VarIntCoder())
def process(self,
element,
num_pageviews_state=beam.DoFn.StateParam(NUM_PAGEVIEWS),
first_pageview_state=beam.DoFn.StateParam(FIRST_PAGEVIEW)
):
import datetime
# Extract element
session_id = element[0]
event_type, event = element[1]
# Process different event types
# Depending on event type, different actions are done
if event_type == 'request':
# This is a request
request = event
# First, the first pageview timestamp is extracted and the
seconds since first timestamp are calculated
first_pageview = first_pageview_state.read()
if first_pageview is not None:
seconds_since_first_pageview =
(int(request['timestamp_utc'].timestamp()) - first_pageview)
first_pageview_timestamp_utc =
datetime.datetime.utcfromtimestamp(float(first_pageview))
first_pageview_timestamp =
first_pageview_timestamp_utc.strftime("%Y-%m-%d %H:%M:%S")
else:
seconds_since_first_pageview = -1
first_pageview_timestamp = None
# The calculated data is appended to the request
request['num_pageviews'] = num_pageviews_state.read()
request['first_pageview_timestamp'] = first_pageview_timestamp
request['seconds_since_first_pageview'] = seconds_since_first_pageview
# The pageview counter is reset
num_pageviews_state.clear()
# The request is returned
yield (session_id, request)
elif event_type == 'pageview':
# This is a pageview
pageview = event
# Update first pageview state
first_pageview = first_pageview_state.read()
if first_pageview is None:
first_pageview_state.write(int(pageview['timestamp_utc'].timestamp()))
elif first_pageview > int(pageview['timestamp_utc'].timestamp()):
first_pageview_state.write(int(pageview['timestamp_utc'].timestamp()))
# Increase number of pageviews
num_pageviews_state.add(1)
# Do not return anything, pageviews are not further processed
# This DoFn logs the delay between the event time and the processing
timeclass LogTimeDelay(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
import datetime
import logging
timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
seconds_delay = (datetime.datetime.utcnow() - timestamp_utc).total_seconds()
logging.warning('Delayed by %s seconds', seconds_delay)
yield element
This seems to work and gives me an average delay of about 1-2 seconds on
the direct runner. On Cloud Dataflow the average delay is about 0.5-1
seconds. So all in all, this seems to solve the problem definition.
Further considerations
There are some open questions, though:
- I am using global windows, which means internal state will be kept
forever as far as i am concerned. Maybe session windows are the correct way
to go: When there are no pageviews/requests for x seconds, the window is
closed and internal state is given free.
- Processing delay is a little bit high, but maybe I need to tweak the
pubsub part a little bit.
- I do not know how much overhead or memory consumption this solution
adds over standard beam methods. I also didn't test high workload and
parallelisation.
Any input is welcome!
Kind regards,
Hendrik Gruß
Am Fr., 8. Jan. 2021 um 14:56 Uhr schrieb Gruß, Hendrik <[email protected]>:
> Hi everybody,
>
> I would like to hear your thoughts on which technique would be used in
> Apache Beam for the following problem:
>
> *Problem definition*:
>
> I have two streams of data, one with pageviews of users, and another with
> requests of the users. They share the key session_id which describes the
> users session, but each have other additional data.
>
> The goal is to append the number of pageviews in a session to the requests
> of that session. That means, I want to have a stream of data that has every
> request together with the number of pageviews before the request. It
> suffices to have the pageviews of lets say the last 5 minutes, and it is
> not important to have all the pageviews, if there is late data. There
> should only be low latency on receiving the requests.
>
> What would be the appropriate technique? Side inputs? CoGroupByKey? Here
> are my first attempts:
> https://stackoverflow.com/questions/65625961/windowed-joins-in-apache-beam
> Kind regards,
> Hendrik Gruß
>
> ----
> Hendrik Gruß
> Data Engineer
> Diginet Gmbh & Co. KG
>
--
Hendrik Gruß
Data Engineer
--
Pixum und artboxONE sind geschützte Marken der Diginet GmbH & Co. KG -
Industriestr.161 - 50999 Köln
Fon: +49 (2236) 886-0 - Fax: +49 (2236) 88
66 99 Sitz Köln, HRA 25531, Umsatzsteuer-ID: DE-209867661,
Komplementärin:
Diginet Management GmbH, Sitz Köln, HRB 69766, Geschäftsführer: Daniel
Attallah, Oliver Thomsen
----------------------------------------------------
Pixum hat die beste
Bildqualität. Ausgezeichnet von der Stiftung Warentest - Jetzt mehr
erfahren: www.pixum.de/testsiege <http://www.pixum.de/testsiege>
----------------------------------------------------
Außergewöhnliche
Kunstwerke - modern und bezahlbar: Jetzt artboxONE entdecken:
<http://www.artboxone.de/>www.artboxone.de <http://www.artboxone.de>