Sure, here is the url to my repo
https://github.com/afuyo/beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/CustomerStreamPipelineGlobal.java
It is fairly simple and I do no grouping before. Just read from Kafka and
then join. I followed your advice and the 3 way join works for pipeline
with fixed windows. Both classes use the same join class. The logic is the
same and it always work for A+B joinc(CoGroupByKey) That's what makes me
think it could be due to triggers. But I am ofcourse not sure:)
/Artur
Pipeline pipeline = Pipeline.create(options);
Trigger trigger1 =
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(10))
;
/**PARSE CUSTOMER*/
PCollection<Customer3> customerInput = pipeline
.apply((KafkaIO.<String, String>
read().withTopics(ImmutableList.of(customerTopic))
.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class))
.withoutMetadata())
.apply(Values.<String> create())
.apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
.apply(Window.<Customer3> into(new
GlobalWindows()).triggering(Repeatedly.forever(
trigger1 ))
.accumulatingFiredPanes())
;
/**PARSE POLICY*/
PCollection<Policy2> policyInput = pipeline
.apply((KafkaIO.<String, String>
read().withTopics(ImmutableList.of(policyTopic))
.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class))
// .withWatermarkFn(new AddWatermarkFn())
//.withTimestampFn2(new AddCustomerTimestampFn())
.withoutMetadata())
.apply(Values.<String> create())
//.apply(ParseJsons.of(Customer.class));
.apply("ParsePolicy", ParDo.of(new ParsePolicy()))
.apply(Window.<Policy2> into(new
GlobalWindows()).triggering(Repeatedly.forever(
trigger1))
.accumulatingFiredPanes())
;
/**PARSE CLAIM**/
PCollection<Claim2> claimInput = pipeline
.apply((KafkaIO.<String, String>
read().withTopics(ImmutableList.of(claimTopic))
.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class))
.withoutMetadata())
.apply(Values.<String> create())
.apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
.apply(Window.<Claim2> into(new GlobalWindows()).triggering(trigger1)
.accumulatingFiredPanes())
;
/**CUSTOMER ********/
PCollection<KV<Integer,String>> all_customers = customerInput
.apply(new ExtractAndMapCustomerKey())
;
/***POLICY********/
PCollection<KV<Integer,String>> all_policies = policyInput
.apply(new ExtractAndMapPolicyKey())
;
/***CLAIM*******/
PCollection<KV<Integer,String>> all_claims = claimInput
.apply(new ExtractAndMapClaimKey())
;
/**JOIN**************/
/**This join works if I comment out the subsequent join**/
// PCollection<KV<Integer,KV<String,String>>>
joinedCustomersAndPolicies=
Join.innerJoin(all_customers,all_policies);
/**This causes an exception**/
PCollection<KV<Integer,KV<String,String>>>
joinedCustomersAndPolicies=
Join.innerJoin3Way(all_customers,all_policies,all_claims);
/**this join will cause IllegalStateException **/
// PCollection<KV<Integer,KV<KV<String,String>,String>>>
joinedCustomersPoliciesAndClaims =
// Join.innerJoin(joinedCustomersAndPolicies,all_claims);
/**This will also cause an exception when used with 3 collections**/
//PCollectionList<KV<Integer,String>> collections =
PCollectionList.of(all_customers).and(all_policies).and(all_claims);
//PCollection<KV<Integer,String>> merged=
collections.apply(Flatten.<KV<Integer,String>>pCollections());
And the 3 way join
public static <K, V1, V2, V3> PCollection<KV<K, KV<V1, V2>>> innerJoin3Way(
final PCollection<KV<K, V1>> leftCollection,
final PCollection<KV<K, V2>> rightCollection
,final PCollection<KV<K, V3>> thirdCollection)
{
final TupleTag<V1> v1Tuple = new TupleTag<>();
final TupleTag<V2> v2Tuple = new TupleTag<>();
final TupleTag<V3> v3Tuple = new TupleTag<>();
PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
KeyedPCollectionTuple.of(v1Tuple, leftCollection)
.and(v2Tuple, rightCollection)
.and(v3Tuple,thirdCollection)
.apply(CoGroupByKey.<K>create());
System.out.println(coGbkResultCollection);
return coGbkResultCollection.apply(ParDo.of(
new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<K, CoGbkResult> e = c.element();
Iterable<V1> leftValuesIterable =
e.getValue().getAll(v1Tuple);
Iterable<V2> rightValuesIterable =
e.getValue().getAll(v2Tuple);
Iterable<V3> thirdValuesIterable =
e.getValue().getAll(v3Tuple);
for(V3 thirdValue : thirdValuesIterable)
{
}
for (V1 leftValue : leftValuesIterable) {
for (V2 rightValue : rightValuesIterable) {
c.output(KV.of(e.getKey(),
KV.of(leftValue, rightValue)));
}
}
}
}))
.setCoder(KvCoder.of(((KvCoder)
leftCollection.getCoder()).getKeyCoder(),
KvCoder.of(((KvCoder)
leftCollection.getCoder()).getValueCoder(),
((KvCoder)
rightCollection.getCoder()).getValueCoder())));
On Sat, Nov 4, 2017 at 3:43 PM, Ben Chambers <[email protected]> wrote:
> Can you share the program using global windows and make sure the exception
> is the same? Basically, this kind of problem has to do with whether you
> have applied a grouping operation already. The triggering (and sometimes
> windowing) before a grouping operation is different than after. So, if you
> are having problems applying a groupbykey somewhere and think the windowing
> and triggering should be the same, you need to look before that to see if
> one of the collections has been grouped but the others haven't.
>
> On Sat, Nov 4, 2017, 12:25 AM Artur Mrozowski <[email protected]> wrote:
>
>> Hej Ben,
>> thank you for your answer. I think I forgot to mention that the join
>> class already implements CoGroupByKey and comes from sdk extensions. I
>> haven now modified it slightly to do 3 way join(see below). It works for 3
>> PCollections with fixed windows and provides the output this time, even
>> when using early and late triggers.That is great!
>>
>> But when I try to use it with global windows for all three collections I
>> get the same exception.
>>
>> Is it not possible to make a 3 way join in global window? The reason why
>> I want to use global window is that I want to have all the historic records
>> available in these collections. Not sure how that could be achieved using
>> fixed windows.
>>
>> /Artur
>>
>> public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>>
>> innerJoin3Way(
>> final PCollection<KV<K, V1>> leftCollection,
>> final PCollection<KV<K, V2>> rightCollection
>> ,final PCollection<KV<K, V3>> thirdCollection)
>> {
>> final TupleTag<V1> v1Tuple = new TupleTag<>();
>> final TupleTag<V2> v2Tuple = new TupleTag<>();
>> final TupleTag<V3> v3Tuple = new TupleTag<>();
>>
>> PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
>> KeyedPCollectionTuple.of(v1Tuple, leftCollection)
>> .and(v2Tuple, rightCollection)
>> .and(v3Tuple,thirdCollection)
>> .apply(CoGroupByKey.<K>create());
>>
>>
>> return coGbkResultCollection.apply(ParDo.of(
>> new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() {
>>
>> @ProcessElement
>> public void processElement(ProcessContext c)
>>
>>
>> On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <[email protected]>
>> wrote:
>>
>>> It looks like this is a problematic interaction between the 2-layer join
>>> and the (unfortunately complicated) continuation triggering. Specifically,
>>> the triggering of Join(A, B) has the continuation trigger, so it isn't
>>> possible to join that with C.
>>>
>>> Instead of trying to do Join(Join(A, B), C), consider using a
>>> CoGroupByKey. This will allow you to join all three input collections at
>>> the same time, which should have two benefits. First, it will work since
>>> you won't be trying to merge a continuation trigger with the original
>>> trigger. Second, it should be more efficient, because you are performing a
>>> single, three-way join instead of two, two-way joins.
>>>
>>> https://beam.apache.org/documentation/sdks/javadoc/2.
>>> 1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more
>>> information.
>>>
>>> -- Ben
>>>
>>> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <[email protected]> wrote:
>>>
>>>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>>>
>>>> So there are two scenarios that I've been trying out but operation is
>>>> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
>>>> In scenario 1 I define three PCollections with global widnows and
>>>> exactly the same triggers. Now I am able to join A+B but as soon I try to
>>>> join AB+C i get the exception.
>>>> . Here is the code snippet where the window and triggers are the same
>>>> for all three PCollections using global windows:
>>>>
>>>> Pipeline pipeline = Pipeline.create(options);
>>>>
>>>> Trigger trigger1 =
>>>> AfterProcessingTime
>>>> .pastFirstElementInPane()
>>>> .plusDelayOf(Duration.standardSeconds(10))
>>>> ;
>>>> /**PARSE CUSTOMER*/
>>>> PCollection<Customer3> customerInput = pipeline
>>>>
>>>> .apply((KafkaIO.<String, String>
>>>> read().withTopics(ImmutableList.of(customerTopic))
>>>>
>>>>
>>>>
>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>> .withKeyDeserializer(StringDeserializer.class)
>>>> .withValueDeserializer(StringDeserializer.class))
>>>> .withoutMetadata())
>>>> .apply(Values.<String> create())
>>>>
>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>>> .apply(Window.<Customer3> into(new
>>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>> trigger1 ))
>>>> .accumulatingFiredPanes())
>>>> ;
>>>> /**PARSE POLICY*/
>>>> PCollection<Policy2> policyInput = pipeline
>>>> .apply((KafkaIO.<String, String>
>>>> read().withTopics(ImmutableList.of(policyTopic))
>>>>
>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>> .withKeyDeserializer(StringDeserializer.class)
>>>> .withValueDeserializer(StringDeserializer.class))
>>>> // .withWatermarkFn(new AddWatermarkFn())
>>>> //.withTimestampFn2(new AddCustomerTimestampFn())
>>>> .withoutMetadata())
>>>> .apply(Values.<String> create())
>>>> //.apply(ParseJsons.of(Customer.class));
>>>> .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>>> .apply(Window.<Policy2> into(new
>>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>> trigger1))
>>>> .accumulatingFiredPanes())
>>>> ;
>>>> /**PARSE CLAIM**/
>>>>
>>>> PCollection<Claim2> claimInput = pipeline
>>>> .apply((KafkaIO.<String, String>
>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>
>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>> .withKeyDeserializer(StringDeserializer.class)
>>>> .withValueDeserializer(StringDeserializer.class))
>>>> .withoutMetadata())
>>>> .apply(Values.<String> create())
>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>
>>>> .apply(Window.<Claim2> into(new
>>>> GlobalWindows()).triggering(trigger1)
>>>> .accumulatingFiredPanes())
>>>> ;
>>>>
>>>> /**CUSTOMER ********/
>>>> PCollection<KV<Integer,String>> all_customers = customerInput
>>>> .apply(new ExtractAndMapCustomerKey())
>>>> ;
>>>> /***POLICY********/
>>>> PCollection<KV<Integer,String>> all_policies = policyInput
>>>> .apply(new ExtractAndMapPolicyKey())
>>>> ;
>>>> /***CLAIM*******/
>>>> PCollection<KV<Integer,String>> all_claims = claimInput
>>>> .apply(new ExtractAndMapClaimKey())
>>>> ;
>>>> /**JOIN**************/
>>>> /**This join works if I comment out the subsequent join**/
>>>>
>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies=
>>>> Join.innerJoin(all_customers,all_policies);
>>>>
>>>> /**this join will cause IllegalStateException **/
>>>>
>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>>
>>>> joinedCustomersPoliciesAndClaims =
>>>> Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>
>>>>
>>>>
>>>> The second scenario is using fixed windows. The logic is the same as
>>>> above. Even in this case triggering is equal for all three collections like
>>>> this
>>>>
>>>> .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>
>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>> .withAllowedLateness(Duration.standardSeconds(1)));
>>>>
>>>> /**JOIN**************/
>>>> /**NO ERROR this time **/
>>>>
>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies=
>>>> Join.innerJoin(all_customers,all_policies);
>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>>
>>>> joinedCustomersPoliciesAndClaims =
>>>> Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>
>>>>
>>>> this time I get no errors but I am not able to print the results in the
>>>> console.
>>>>
>>>> So, I make another experiment and again define equal trigger for all
>>>> three collections. I can print the output to the console for the first two
>>>> PCollections but the second join again fails with illegalStateException.
>>>> Triggering definition for all three collections:
>>>>
>>>> /**PARSE CLAIM**/
>>>>
>>>> PCollection<Claim2> claimInput = pipeline
>>>>
>>>> .apply((KafkaIO.<String, String>
>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>
>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>> .withKeyDeserializer(StringDeserializer.class)
>>>> .withValueDeserializer(StringDeserializer.class))
>>>> .withoutMetadata())
>>>> .apply(Values.<String> create())
>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>
>>>> .apply(Window.<Claim2>
>>>> into(FixedWindows.of(Duration.standardSeconds(60)))
>>>> .triggering(AfterWatermark.pastEndOfWindow()
>>>>
>>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>> //early update frequency
>>>> .alignedTo(Duration.standardSeconds(10)))
>>>>
>>>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20))))
>>>> .withAllowedLateness(Duration.standardMinutes(5))
>>>> .accumulatingFiredPanes());
>>>>
>>>>
>>>>
>>>> In this cases I've added early and late firings which emits results
>>>> from the pane but again throws exception on the second join.
>>>>
>>>> I know it's a lot of information to take in but basically if you have
>>>> an example where you join three PCollections in global and in fixed windows
>>>> with appropriate triggering, I'd be eternally grateful:)
>>>> Or if you could explain how to do it, of course. thanks in advance.
>>>>
>>>> Best Regards
>>>> Artur
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <[email protected]> wrote:
>>>>
>>>>> Hi Artur,
>>>>>
>>>>> When you join the PCollections, they will be flattened and go through
>>>>> a GroupByKey together. Since the trigger governs when the GroupByKey can
>>>>> emit output, the triggers have to be equal or the GroupByKey doesn't have
>>>>> a
>>>>> clear guide as to when it should output. If you can make the triggering on
>>>>> all the input collections equal, that will resolve this issue. If you
>>>>> still
>>>>> need different triggering elsewhere, that is fine. You just need to make
>>>>> them the same going in to the join.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>> You can try put PCollection after flatten into same global window
>>>>>> with triggers as it was before flattening.
>>>>>>
>>>>>> Best regards
>>>>>> Aleksandr Gortujev
>>>>>>
>>>>>>
>>>>>> 3. nov 2017 11:04 AM kirjutas kuupƤeval "Artur Mrozowski" <
>>>>>> [email protected]>:
>>>>>>
>>>>>> Hi,
>>>>>> I am on second week of our PoC with Beam and I am really amazed by
>>>>>> the capabilities of the framework and how well engineered it is.
>>>>>>
>>>>>> Amazed does not mean experienced so please bear with me.
>>>>>>
>>>>>> What we try to achieve is to join several streams using windowing
>>>>>> and triggers. And that is where I fear we hit the limitations for what
>>>>>> can
>>>>>> be done.
>>>>>>
>>>>>> In case A we run in global windows and we are able to combine two
>>>>>> unbounded PCollections but when I try to combine the results with third
>>>>>> collection I get the exception below. I tried many diffrent trigger
>>>>>> combinations, but can't make it work.
>>>>>>
>>>>>> Exception in thread "main" java.lang.IllegalStateException: Inputs
>>>>>> to Flatten had incompatible triggers: Repeatedly.forever(
>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()),
>>>>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>> seconds))
>>>>>>
>>>>>> In case B I use fixed windows. Again, I can successfully join two
>>>>>> collections and print output in the console. When I add the third it runs
>>>>>> without errors, but I am not able to materialize results in the console.
>>>>>> Although I am able to print results of merge using Flatten so the error
>>>>>> above is not longer an issue.
>>>>>>
>>>>>> Has anyone experience with joining three or more unbounded
>>>>>> PCollections? What would be successful windowing, triggering strategy for
>>>>>> global or fixed window respectively?
>>>>>>
>>>>>> Below code snippets from fixed windows case. Windows are defined in
>>>>>> the same manner for all three collections, customer, claim and policy.
>>>>>> The
>>>>>> Join class I use comes from https://github.com/
>>>>>> apache/beam/blob/master/sdks/java/extensions/join-library/
>>>>>> src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>>>>>>
>>>>>>
>>>>>> Would be really greateful if any of you would like to share your
>>>>>> knowledge.
>>>>>>
>>>>>> Best Regard
>>>>>> Artur
>>>>>>
>>>>>> PCollection<Claim2> claimInput = pipeline
>>>>>>
>>>>>> .apply((KafkaIO.<String, String>
>>>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>
>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>> .withKeyDeserializer(StringDeserializer.class)
>>>>>> .withValueDeserializer(StringDeserializer.class))
>>>>>> .withoutMetadata())
>>>>>> .apply(Values.<String> create())
>>>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>> .apply(Window.<Claim2>
>>>>>> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>>>
>>>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>>> .withAllowedLateness(Duration.standardSeconds(1)));
>>>>>>
>>>>>> /**JOIN**************/
>>>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies=
>>>>>> Join.innerJoin(all_customers,all_policies);
>>>>>> PCollectionList<KV<Integer,String>> collections =
>>>>>> PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>>
>>>>>> joinedCustomersPoliciesAndClaims =
>>>>>> Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>> //PCollectionList<KV<Integer,String>> collections =
>>>>>> PCollectionList.of(all_customers).and(all_policies);
>>>>>>
>>>>>> PCollection<KV<Integer,String>> merged=
>>>>>> collections.apply(Flatten.<KV<Integer,String>>pCollections());
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>