And the error message: Exception in thread "main" java.lang.IllegalStateException: Inputs to Flatten had incompatible triggers: Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds)), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds) at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:124) at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:102) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422) at org.apache.beam.sdk.values.PCollectionList.apply(PCollectionList.java:182) at org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:124) at org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422) at org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:106) at com.tryg.beam.kafka.poc.utils.Join.innerJoin3Way(Join.java:170) at com.tryg.beam.kafka.poc.impl.CustomerStreamPipelineGlobal.main(CustomerStreamPipelineGlobal.java:220)
On Sat, Nov 4, 2017 at 6:36 PM, Artur Mrozowski <[email protected]> wrote: > Sure, here is the url to my repo https://github.com/afuyo/ > beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/ > CustomerStreamPipelineGlobal.java > > It is fairly simple and I do no grouping before. Just read from Kafka and > then join. I followed your advice and the 3 way join works for pipeline > with fixed windows. Both classes use the same join class. The logic is the > same and it always work for A+B joinc(CoGroupByKey) That's what makes me > think it could be due to triggers. But I am ofcourse not sure:) > > /Artur > > Pipeline pipeline = Pipeline.create(options); > > Trigger trigger1 = > AfterProcessingTime > .pastFirstElementInPane() > .plusDelayOf(Duration.standardSeconds(10)) > ; > /**PARSE CUSTOMER*/ > PCollection<Customer3> customerInput = pipeline > > .apply((KafkaIO.<String, String> > read().withTopics(ImmutableList.of(customerTopic)) > > .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class)) > .withoutMetadata()) > .apply(Values.<String> create()) > .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer())) > .apply(Window.<Customer3> into(new > GlobalWindows()).triggering(Repeatedly.forever( > trigger1 )) > .accumulatingFiredPanes()) > ; > /**PARSE POLICY*/ > PCollection<Policy2> policyInput = pipeline > .apply((KafkaIO.<String, String> > read().withTopics(ImmutableList.of(policyTopic)) > > .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class)) > // .withWatermarkFn(new AddWatermarkFn()) > //.withTimestampFn2(new AddCustomerTimestampFn()) > .withoutMetadata()) > .apply(Values.<String> create()) > //.apply(ParseJsons.of(Customer.class)); > .apply("ParsePolicy", ParDo.of(new ParsePolicy())) > .apply(Window.<Policy2> into(new > GlobalWindows()).triggering(Repeatedly.forever( > trigger1)) > .accumulatingFiredPanes()) > ; > /**PARSE CLAIM**/ > PCollection<Claim2> claimInput = pipeline > .apply((KafkaIO.<String, String> > read().withTopics(ImmutableList.of(claimTopic)) > > .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class)) > .withoutMetadata()) > .apply(Values.<String> create()) > .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) > .apply(Window.<Claim2> into(new > GlobalWindows()).triggering(trigger1) > .accumulatingFiredPanes()) > ; > > /**CUSTOMER ********/ > PCollection<KV<Integer,String>> all_customers = customerInput > .apply(new ExtractAndMapCustomerKey()) > ; > /***POLICY********/ > PCollection<KV<Integer,String>> all_policies = policyInput > .apply(new ExtractAndMapPolicyKey()) > ; > /***CLAIM*******/ > PCollection<KV<Integer,String>> all_claims = claimInput > .apply(new ExtractAndMapClaimKey()) > ; > /**JOIN**************/ > /**This join works if I comment out the subsequent join**/ > // PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= > Join.innerJoin(all_customers,all_policies); > > /**This causes an exception**/ > PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= > Join.innerJoin3Way(all_customers,all_policies,all_claims); > > /**this join will cause IllegalStateException **/ > // PCollection<KV<Integer,KV<KV<String,String>,String>>> > joinedCustomersPoliciesAndClaims = > // Join.innerJoin(joinedCustomersAndPolicies,all_claims); > > /**This will also cause an exception when used with 3 collections**/ > //PCollectionList<KV<Integer,String>> collections = > PCollectionList.of(all_customers).and(all_policies).and(all_claims); > //PCollection<KV<Integer,String>> merged= > collections.apply(Flatten.<KV<Integer,String>>pCollections()); > > And the 3 way join > > public static <K, V1, V2, V3> PCollection<KV<K, KV<V1, V2>>> innerJoin3Way( > final PCollection<KV<K, V1>> leftCollection, > final PCollection<KV<K, V2>> rightCollection > ,final PCollection<KV<K, V3>> thirdCollection) > { > > final TupleTag<V1> v1Tuple = new TupleTag<>(); > final TupleTag<V2> v2Tuple = new TupleTag<>(); > final TupleTag<V3> v3Tuple = new TupleTag<>(); > > PCollection<KV<K, CoGbkResult>> coGbkResultCollection = > KeyedPCollectionTuple.of(v1Tuple, leftCollection) > .and(v2Tuple, rightCollection) > .and(v3Tuple,thirdCollection) > .apply(CoGroupByKey.<K>create()); > > System.out.println(coGbkResultCollection); > > return coGbkResultCollection.apply(ParDo.of( > new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { > > @ProcessElement > public void processElement(ProcessContext c) { > KV<K, CoGbkResult> e = c.element(); > > Iterable<V1> leftValuesIterable = > e.getValue().getAll(v1Tuple); > Iterable<V2> rightValuesIterable = > e.getValue().getAll(v2Tuple); > Iterable<V3> thirdValuesIterable = > e.getValue().getAll(v3Tuple); > > for(V3 thirdValue : thirdValuesIterable) > { > > } > > for (V1 leftValue : leftValuesIterable) { > for (V2 rightValue : rightValuesIterable) { > c.output(KV.of(e.getKey(), KV.of(leftValue, > rightValue))); > } > > } > } > })) > .setCoder(KvCoder.of(((KvCoder) > leftCollection.getCoder()).getKeyCoder(), > KvCoder.of(((KvCoder) > leftCollection.getCoder()).getValueCoder(), > ((KvCoder) > rightCollection.getCoder()).getValueCoder()))); > > > On Sat, Nov 4, 2017 at 3:43 PM, Ben Chambers <[email protected]> wrote: > >> Can you share the program using global windows and make sure the >> exception is the same? Basically, this kind of problem has to do with >> whether you have applied a grouping operation already. The triggering (and >> sometimes windowing) before a grouping operation is different than after. >> So, if you are having problems applying a groupbykey somewhere and think >> the windowing and triggering should be the same, you need to look before >> that to see if one of the collections has been grouped but the others >> haven't. >> >> On Sat, Nov 4, 2017, 12:25 AM Artur Mrozowski <[email protected]> wrote: >> >>> Hej Ben, >>> thank you for your answer. I think I forgot to mention that the join >>> class already implements CoGroupByKey and comes from sdk extensions. I >>> haven now modified it slightly to do 3 way join(see below). It works for 3 >>> PCollections with fixed windows and provides the output this time, even >>> when using early and late triggers.That is great! >>> >>> But when I try to use it with global windows for all three collections >>> I get the same exception. >>> >>> Is it not possible to make a 3 way join in global window? The reason why >>> I want to use global window is that I want to have all the historic records >>> available in these collections. Not sure how that could be achieved using >>> fixed windows. >>> >>> /Artur >>> >>> public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>> >>> innerJoin3Way( >>> final PCollection<KV<K, V1>> leftCollection, >>> final PCollection<KV<K, V2>> rightCollection >>> ,final PCollection<KV<K, V3>> thirdCollection) >>> { >>> final TupleTag<V1> v1Tuple = new TupleTag<>(); >>> final TupleTag<V2> v2Tuple = new TupleTag<>(); >>> final TupleTag<V3> v3Tuple = new TupleTag<>(); >>> >>> PCollection<KV<K, CoGbkResult>> coGbkResultCollection = >>> KeyedPCollectionTuple.of(v1Tuple, leftCollection) >>> .and(v2Tuple, rightCollection) >>> .and(v3Tuple,thirdCollection) >>> .apply(CoGroupByKey.<K>create()); >>> >>> >>> return coGbkResultCollection.apply(ParDo.of( >>> new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() { >>> >>> @ProcessElement >>> public void processElement(ProcessContext c) >>> >>> >>> On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <[email protected]> >>> wrote: >>> >>>> It looks like this is a problematic interaction between the 2-layer >>>> join and the (unfortunately complicated) continuation triggering. >>>> Specifically, the triggering of Join(A, B) has the continuation trigger, so >>>> it isn't possible to join that with C. >>>> >>>> Instead of trying to do Join(Join(A, B), C), consider using a >>>> CoGroupByKey. This will allow you to join all three input collections at >>>> the same time, which should have two benefits. First, it will work since >>>> you won't be trying to merge a continuation trigger with the original >>>> trigger. Second, it should be more efficient, because you are performing a >>>> single, three-way join instead of two, two-way joins. >>>> >>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/ >>>> org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more >>>> information. >>>> >>>> -- Ben >>>> >>>> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <[email protected]> >>>> wrote: >>>> >>>>> Hi Kenneth and Aleksandr and thank you for your prompt answer. >>>>> >>>>> So there are two scenarios that I've been trying out but operation is >>>>> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC. >>>>> In scenario 1 I define three PCollections with global widnows and >>>>> exactly the same triggers. Now I am able to join A+B but as soon I try to >>>>> join AB+C i get the exception. >>>>> . Here is the code snippet where the window and triggers are the same >>>>> for all three PCollections using global windows: >>>>> >>>>> Pipeline pipeline = Pipeline.create(options); >>>>> >>>>> Trigger trigger1 = >>>>> AfterProcessingTime >>>>> .pastFirstElementInPane() >>>>> .plusDelayOf(Duration.standardSeconds(10)) >>>>> ; >>>>> /**PARSE CUSTOMER*/ >>>>> PCollection<Customer3> customerInput = pipeline >>>>> >>>>> .apply((KafkaIO.<String, String> >>>>> read().withTopics(ImmutableList.of(customerTopic)) >>>>> >>>>> >>>>> >>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>>>> .withKeyDeserializer(StringDeserializer.class) >>>>> .withValueDeserializer(StringDeserializer.class)) >>>>> .withoutMetadata()) >>>>> .apply(Values.<String> create()) >>>>> >>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer())) >>>>> .apply(Window.<Customer3> into(new >>>>> GlobalWindows()).triggering(Repeatedly.forever( >>>>> trigger1 )) >>>>> .accumulatingFiredPanes()) >>>>> ; >>>>> /**PARSE POLICY*/ >>>>> PCollection<Policy2> policyInput = pipeline >>>>> .apply((KafkaIO.<String, String> >>>>> read().withTopics(ImmutableList.of(policyTopic)) >>>>> >>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>>>> .withKeyDeserializer(StringDeserializer.class) >>>>> .withValueDeserializer(StringDeserializer.class)) >>>>> // .withWatermarkFn(new AddWatermarkFn()) >>>>> //.withTimestampFn2(new AddCustomerTimestampFn()) >>>>> .withoutMetadata()) >>>>> .apply(Values.<String> create()) >>>>> //.apply(ParseJsons.of(Customer.class)); >>>>> .apply("ParsePolicy", ParDo.of(new ParsePolicy())) >>>>> .apply(Window.<Policy2> into(new >>>>> GlobalWindows()).triggering(Repeatedly.forever( >>>>> trigger1)) >>>>> .accumulatingFiredPanes()) >>>>> ; >>>>> /**PARSE CLAIM**/ >>>>> >>>>> PCollection<Claim2> claimInput = pipeline >>>>> .apply((KafkaIO.<String, String> >>>>> read().withTopics(ImmutableList.of(claimTopic)) >>>>> >>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>>>> .withKeyDeserializer(StringDeserializer.class) >>>>> .withValueDeserializer(StringDeserializer.class)) >>>>> .withoutMetadata()) >>>>> .apply(Values.<String> create()) >>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) >>>>> >>>>> .apply(Window.<Claim2> into(new >>>>> GlobalWindows()).triggering(trigger1) >>>>> .accumulatingFiredPanes()) >>>>> ; >>>>> >>>>> /**CUSTOMER ********/ >>>>> PCollection<KV<Integer,String>> all_customers = customerInput >>>>> .apply(new ExtractAndMapCustomerKey()) >>>>> ; >>>>> /***POLICY********/ >>>>> PCollection<KV<Integer,String>> all_policies = policyInput >>>>> .apply(new ExtractAndMapPolicyKey()) >>>>> ; >>>>> /***CLAIM*******/ >>>>> PCollection<KV<Integer,String>> all_claims = claimInput >>>>> .apply(new ExtractAndMapClaimKey()) >>>>> ; >>>>> /**JOIN**************/ >>>>> /**This join works if I comment out the subsequent join**/ >>>>> >>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= >>>>> Join.innerJoin(all_customers,all_policies); >>>>> >>>>> /**this join will cause IllegalStateException **/ >>>>> >>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> >>>>> joinedCustomersPoliciesAndClaims = >>>>> Join.innerJoin(joinedCustomersAndPolicies,all_claims); >>>>> >>>>> >>>>> >>>>> The second scenario is using fixed windows. The logic is the same as >>>>> above. Even in this case triggering is equal for all three collections >>>>> like >>>>> this >>>>> >>>>> .apply(Window.<Claim2> >>>>> into(FixedWindows.of(Duration.standardSeconds(100))) >>>>> >>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes() >>>>> .withAllowedLateness(Duration.standardSeconds(1))); >>>>> >>>>> /**JOIN**************/ >>>>> /**NO ERROR this time **/ >>>>> >>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= >>>>> Join.innerJoin(all_customers,all_policies); >>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> >>>>> joinedCustomersPoliciesAndClaims = >>>>> Join.innerJoin(joinedCustomersAndPolicies,all_claims); >>>>> >>>>> >>>>> this time I get no errors but I am not able to print the results in >>>>> the console. >>>>> >>>>> So, I make another experiment and again define equal trigger for all >>>>> three collections. I can print the output to the console for the first two >>>>> PCollections but the second join again fails with illegalStateException. >>>>> Triggering definition for all three collections: >>>>> >>>>> /**PARSE CLAIM**/ >>>>> >>>>> PCollection<Claim2> claimInput = pipeline >>>>> >>>>> .apply((KafkaIO.<String, String> >>>>> read().withTopics(ImmutableList.of(claimTopic)) >>>>> >>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>>>> .withKeyDeserializer(StringDeserializer.class) >>>>> .withValueDeserializer(StringDeserializer.class)) >>>>> .withoutMetadata()) >>>>> .apply(Values.<String> create()) >>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) >>>>> >>>>> .apply(Window.<Claim2> >>>>> into(FixedWindows.of(Duration.standardSeconds(60))) >>>>> .triggering(AfterWatermark.pastEndOfWindow() >>>>> >>>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() >>>>> //early update frequency >>>>> .alignedTo(Duration.standardSeconds(10))) >>>>> >>>>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20)))) >>>>> .withAllowedLateness(Duration.standardMinutes(5)) >>>>> .accumulatingFiredPanes()); >>>>> >>>>> >>>>> >>>>> In this cases I've added early and late firings which emits results >>>>> from the pane but again throws exception on the second join. >>>>> >>>>> I know it's a lot of information to take in but basically if you have >>>>> an example where you join three PCollections in global and in fixed >>>>> windows >>>>> with appropriate triggering, I'd be eternally grateful:) >>>>> Or if you could explain how to do it, of course. thanks in advance. >>>>> >>>>> Best Regards >>>>> Artur >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Artur, >>>>>> >>>>>> When you join the PCollections, they will be flattened and go through >>>>>> a GroupByKey together. Since the trigger governs when the GroupByKey can >>>>>> emit output, the triggers have to be equal or the GroupByKey doesn't >>>>>> have a >>>>>> clear guide as to when it should output. If you can make the triggering >>>>>> on >>>>>> all the input collections equal, that will resolve this issue. If you >>>>>> still >>>>>> need different triggering elsewhere, that is fine. You just need to make >>>>>> them the same going in to the join. >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hello, >>>>>>> You can try put PCollection after flatten into same global window >>>>>>> with triggers as it was before flattening. >>>>>>> >>>>>>> Best regards >>>>>>> Aleksandr Gortujev >>>>>>> >>>>>>> >>>>>>> 3. nov 2017 11:04 AM kirjutas kuupƤeval "Artur Mrozowski" < >>>>>>> [email protected]>: >>>>>>> >>>>>>> Hi, >>>>>>> I am on second week of our PoC with Beam and I am really amazed by >>>>>>> the capabilities of the framework and how well engineered it is. >>>>>>> >>>>>>> Amazed does not mean experienced so please bear with me. >>>>>>> >>>>>>> What we try to achieve is to join several streams using windowing >>>>>>> and triggers. And that is where I fear we hit the limitations for what >>>>>>> can >>>>>>> be done. >>>>>>> >>>>>>> In case A we run in global windows and we are able to combine two >>>>>>> unbounded PCollections but when I try to combine the results with third >>>>>>> collection I get the exception below. I tried many diffrent trigger >>>>>>> combinations, but can't make it work. >>>>>>> >>>>>>> Exception in thread "main" java.lang.IllegalStateException: Inputs >>>>>>> to Flatten had incompatible triggers: Repeatedly.forever(AfterSynchr >>>>>>> onizedProcessingTime.pastFirstElementInPane()), >>>>>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 >>>>>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 >>>>>>> seconds)) >>>>>>> >>>>>>> In case B I use fixed windows. Again, I can successfully join two >>>>>>> collections and print output in the console. When I add the third it >>>>>>> runs >>>>>>> without errors, but I am not able to materialize results in the console. >>>>>>> Although I am able to print results of merge using Flatten so the error >>>>>>> above is not longer an issue. >>>>>>> >>>>>>> Has anyone experience with joining three or more unbounded >>>>>>> PCollections? What would be successful windowing, triggering strategy >>>>>>> for >>>>>>> global or fixed window respectively? >>>>>>> >>>>>>> Below code snippets from fixed windows case. Windows are defined in >>>>>>> the same manner for all three collections, customer, claim and policy. >>>>>>> The >>>>>>> Join class I use comes from https://github.com/apache >>>>>>> /beam/blob/master/sdks/java/extensions/join-library/src/ >>>>>>> main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java >>>>>>> >>>>>>> >>>>>>> Would be really greateful if any of you would like to share your >>>>>>> knowledge. >>>>>>> >>>>>>> Best Regard >>>>>>> Artur >>>>>>> >>>>>>> PCollection<Claim2> claimInput = pipeline >>>>>>> >>>>>>> .apply((KafkaIO.<String, String> >>>>>>> read().withTopics(ImmutableList.of(claimTopic)) >>>>>>> >>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>>> .withoutMetadata()) >>>>>>> .apply(Values.<String> create()) >>>>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) >>>>>>> .apply(Window.<Claim2> >>>>>>> into(FixedWindows.of(Duration.standardSeconds(100))) >>>>>>> >>>>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes() >>>>>>> .withAllowedLateness(Duration.standardSeconds(1))); >>>>>>> >>>>>>> /**JOIN**************/ >>>>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= >>>>>>> Join.innerJoin(all_customers,all_policies); >>>>>>> PCollectionList<KV<Integer,String>> collections = >>>>>>> PCollectionList.of(all_customers).and(all_policies).and(all_claims); >>>>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> >>>>>>> joinedCustomersPoliciesAndClaims = >>>>>>> Join.innerJoin(joinedCustomersAndPolicies,all_claims); >>>>>>> //PCollectionList<KV<Integer,String>> collections = >>>>>>> PCollectionList.of(all_customers).and(all_policies); >>>>>>> >>>>>>> PCollection<KV<Integer,String>> merged= >>>>>>> collections.apply(Flatten.<KV<Integer,String>>pCollections()); >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>> >
