Yes, the second suggestion is more appropriate if either type A or B or is
common. The first suggestion is only good if type A and type B are rare
allowing for the session windows to close regularly.


On Thu, Dec 22, 2016 at 9:17 AM, Ray Ruvinskiy <[email protected]
> wrote:

> Interesting. Thanks, Lukasz!
>
> Suppose type A is common but type B is rate. Would your second suggestion
> be more appropriate in this case?
>
> Ray
>
> From: Lukasz Cwik <[email protected]>
> Reply-To: "[email protected]" <[email protected]>
> Date: Wednesday, December 21, 2016 at 8:14 PM
> To: "[email protected]" <[email protected]>
> Subject: Re: One-to-many mapping between unbounded input source and
> pipelines with session windows
>
> Globally will not perform as well because you reduce the inherent level of
> parallelism.
>
> Lets tackle this one problem and you can see if you can apply the same
> principles to the other problems:
> - Any time we see a record (call this type A) with record[“id”] == 1 &&
> record[“field_6”] == “some_value” *not* followed by a record (call this
> type B) with record[“id”] == 2 && record[“field_7”] == “other_value” in the
> subsequent 10 minutes.
>
> One idea:
> If type A and type B records are rare. You can use session windows with a
> gap duration of 10 minutes. Whenever you see a record of type A or type B,
> you convert them to a KV<common key, record data>. You pass this through a
> GBK which will produce a KV<common key, iterable<record data>> that is
> guaranteed to have all the records which of type A and type B that are
> within 10 minutes of each other. Then you scan through the iterable and
> output all records of type A that are not followed by a record of type B
> within 10 minutes. The reasoning why they need to be rare is that you don't
> want the session to continue forever.
>
> Another idea:
> Convert all type A and type B records to be a KV<common key, record> using
> a sliding window of size 20 mins being output every 10 mins. pass these
> through GBK, and similarly as above scan through the iterale output all
> records of type A that are not followed by a record of type B within 10
> minutes.
>
>
>
>
> On Wed, Dec 21, 2016 at 2:56 PM, Ray Ruvinskiy <
> [email protected]> wrote:
> The records have a property value in common, yes. For example,
> record[“record_kind”] == “foo” or record[“record_kind”] == “bar.” However,
> I’d be curious if the answer changes if I wanted to do this globally for
> the whole stream.
>
> Thanks!
>
> Ray
>
> From: Lukasz Cwik <[email protected]>
> Reply-To: "[email protected]" <[email protected]
> >
> Date: Wednesday, December 21, 2016 at 5:52 PM
> To: "[email protected]" <[email protected]>
> Subject: Re: One-to-many mapping between unbounded input source and
> pipelines with session windows
>
> My first question was about how do you know two or more records are
> related or is this global for the entire stream?
>
> The reason I was asking about whether you can map the qualifiers onto a
> fixed set of states is because I was wondering if there was a way to either
> use the State API (WIP https://issues.apache.org/jira/browse/BEAM-25) and
> timers API (WIP https://issues.apache.org/jira/browse/BEAM-27) and just
> transition between a fixed number of states or create composite session
> keys based upon the "id" plus some small set of qualifiers and do a GBK to
> do a join.
>
> In this example, how do you know the two records are related to each other
> (do the share a common attribute or can a common attribute be computed)?
> - Any time we see a record with record[“id”] == 1 && record[“field_6”] ==
> “some_value” *not* followed by a record with record[“id”] == 2 &&
> record[“field_7”] == “other_value” in the subsequent 10 minutes.
>
>
>
> On Wed, Dec 21, 2016 at 2:14 PM, Ray Ruvinskiy <
> [email protected]> wrote:
> I’m unsure about your first question. Are you asking whether there’s an
> attribute that all the records have in common?
>
> I think I’m looking for more flexibility than a fixed set of values but
> perhaps I’m overlooking something. To flesh out the example, let’s say the
> records are JSON documents, with fields. So, to express my examples again,
> I want to know:
> - Any time we see record_1[“type”] == “type1” && record_1[“field1”] ==
> “value1”, followed within no more than a minute by record_2[“type”] ==
> “type1” && record_2[“field2”].contains(“some_substring”), followed within
> no more than 5 minutes by record_3[“type”] == “type2” && record_3[“field3”]
> == “value3”
> - Any time we see N records where record[“id”] == 123 within 5 hours of
> each other, followed by another record with record[“id”] == 456 no more
> than an hour later than the group of N records
> - Any time we see a record with record[“id”] == 1 && record[“field_6”] ==
> “some_value” *not* followed by a record with record[“id”] == 2 &&
> record[“field_7”] == “other_value” in the subsequent 10 minutes.
>
> If data is late, *ideally* it’s taken into account, but we don’t need to
> account for data being late for an arbitrary amount of time. We can say
> that if a data is, for instance, less than an hour later it should be taken
> into account, but if it’s more than an hour late we can ignore it.
>
> Thanks!
>
> Ray
>
> From: Lukasz Cwik <[email protected]>
> Reply-To: "[email protected]" <[email protected]
> >
> Date: Wednesday, December 21, 2016 at 4:47 PM
> To: "[email protected]" <[email protected]>
> Subject: Re: One-to-many mapping between unbounded input source and
> pipelines with session windows
>
> Do the records have another attribute Z which joins them all together?
> Are the set of attributes A, B, C, X, Y, K, L are from a fixed set of
> values like enums or can be mapped onto a certain number of states (like an
> attribute A > 50 can be mapped onto a state "exceeds threshold")?
> For your examples, what should occur when there is late data in your three
> scenarios?
>
>
> On Wed, Dec 21, 2016 at 9:05 AM, Ray Ruvinskiy <
> [email protected]> wrote:
> Hello,
>
> I am trying to figure out if Apache Beam is the right framework for my use
> case. I have an unbounded stream, and there are a number of questions I
> would like to ask regarding the records in the stream:
>
> - For example, one question may be trying to find a record with attribute
> A followed within no more than a minute by a record with attribute B
> followed within no more than 5 minutes by a record with attribute C.
> - Another question may be trying to find a sequence of at least N records
> with attribute X within 5 hours of each other, followed by an record with
> attribute Y no more than an hour later.
> - A third question would be seeing if there exist a record with attribute
> K *not* followed by a record with attribute L in the next 10 minutes.
>
> Every time I encounter the pattern of records I’m looking for, I would
> like to perform an action. If I understand the Beam model correctly, each
> question would correspond to a separate pipeline I would create, and it
> also sounds like I’m looking for session windows. However, I’m assuming I
> would need to tee the input source to all the separate pipelines? I have
> tried to look for documentation and/or examples on whether Apache Beam can
> be used to express such a setup and how to do it if so, but I haven’t been
> able to find anything concrete. Any help would be appreciated.
>
> Thanks!
>
> Ray
>
>
>
>
>
>
>
>
>

Reply via email to