Hi Artur, When you join the PCollections, they will be flattened and go through a GroupByKey together. Since the trigger governs when the GroupByKey can emit output, the triggers have to be equal or the GroupByKey doesn't have a clear guide as to when it should output. If you can make the triggering on all the input collections equal, that will resolve this issue. If you still need different triggering elsewhere, that is fine. You just need to make them the same going in to the join.
Kenn On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <[email protected]> wrote: > Hello, > You can try put PCollection after flatten into same global window with > triggers as it was before flattening. > > Best regards > Aleksandr Gortujev > > > 3. nov 2017 11:04 AM kirjutas kuupƤeval "Artur Mrozowski" < > [email protected]>: > > Hi, > I am on second week of our PoC with Beam and I am really amazed by the > capabilities of the framework and how well engineered it is. > > Amazed does not mean experienced so please bear with me. > > What we try to achieve is to join several streams using windowing and > triggers. And that is where I fear we hit the limitations for what can be > done. > > In case A we run in global windows and we are able to combine two > unbounded PCollections but when I try to combine the results with third > collection I get the exception below. I tried many diffrent trigger > combinations, but can't make it work. > > Exception in thread "main" java.lang.IllegalStateException: Inputs to > Flatten had incompatible triggers: Repeatedly.forever(AfterSynchr > onizedProcessingTime.pastFirstElementInPane()), > AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 > seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 > seconds)) > > In case B I use fixed windows. Again, I can successfully join two > collections and print output in the console. When I add the third it runs > without errors, but I am not able to materialize results in the console. > Although I am able to print results of merge using Flatten so the error > above is not longer an issue. > > Has anyone experience with joining three or more unbounded PCollections? > What would be successful windowing, triggering strategy for global or fixed > window respectively? > > Below code snippets from fixed windows case. Windows are defined in the > same manner for all three collections, customer, claim and policy. The > Join class I use comes from https://github.com/apache > /beam/blob/master/sdks/java/extensions/join-library/src/main > /java/org/apache/beam/sdk/extensions/joinlibrary/Join.java > > > Would be really greateful if any of you would like to share your > knowledge. > > Best Regard > Artur > > PCollection<Claim2> claimInput = pipeline > > .apply((KafkaIO.<String, String> > read().withTopics(ImmutableList.of(claimTopic)) > > .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class)) > .withoutMetadata()) > .apply(Values.<String> create()) > .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) > .apply(Window.<Claim2> > into(FixedWindows.of(Duration.standardSeconds(100))) > > .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes() > .withAllowedLateness(Duration.standardSeconds(1))); > > /**JOIN**************/ > PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= > Join.innerJoin(all_customers,all_policies); > PCollectionList<KV<Integer,String>> collections = > PCollectionList.of(all_customers).and(all_policies).and(all_claims); > PCollection<KV<Integer,KV<KV<String,String>,String>>> > joinedCustomersPoliciesAndClaims = > Join.innerJoin(joinedCustomersAndPolicies,all_claims); > //PCollectionList<KV<Integer,String>> collections = > PCollectionList.of(all_customers).and(all_policies); > > PCollection<KV<Integer,String>> merged= collections.apply(Flatten.<KV< > Integer,String>>pCollections()); > > >
