Hello, You are using for policyInput Repeatedly.forever, for customerInput Repeatedly.forever and for claimInput your trigger is without Repeatedly.forever.
Best regards Aleksandr Gortujev сб, 4 нояб. 2017 г. в 20:39, Artur Mrozowski <[email protected]>: > And the error message: > > Exception in thread "main" java.lang.IllegalStateException: Inputs to > Flatten had incompatible triggers: > Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 > seconds)), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 > seconds) > at > org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:124) > at > org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:102) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422) > at > org.apache.beam.sdk.values.PCollectionList.apply(PCollectionList.java:182) > at > org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:124) > at > org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422) > at > org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:106) > at com.tryg.beam.kafka.poc.utils.Join.innerJoin3Way(Join.java:170) > at > com.tryg.beam.kafka.poc.impl.CustomerStreamPipelineGlobal.main(CustomerStreamPipelineGlobal.java:220) > > > On Sat, Nov 4, 2017 at 6:36 PM, Artur Mrozowski <[email protected]> wrote: > >> Sure, here is the url to my repo >> https://github.com/afuyo/beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/CustomerStreamPipelineGlobal.java >> >> >> It is fairly simple and I do no grouping before. Just read from Kafka and >> then join. I followed your advice and the 3 way join works for pipeline >> with fixed windows. Both classes use the same join class. The logic is the >> same and it always work for A+B joinc(CoGroupByKey) That's what makes me >> think it could be due to triggers. But I am ofcourse not sure:) >> >> /Artur >> >> Pipeline pipeline = Pipeline.create(options); >> >> Trigger trigger1 = >> AfterProcessingTime >> .pastFirstElementInPane() >> .plusDelayOf(Duration.standardSeconds(10)) >> ; >> /**PARSE CUSTOMER*/ >> PCollection<Customer3> customerInput = pipeline >> >> .apply((KafkaIO.<String, String> >> read().withTopics(ImmutableList.of(customerTopic)) >> >> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(StringDeserializer.class)) >> .withoutMetadata()) >> .apply(Values.<String> create()) >> .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer())) >> .apply(Window.<Customer3> into(new >> GlobalWindows()).triggering(Repeatedly.forever( >> trigger1 )) >> .accumulatingFiredPanes()) >> ; >> /**PARSE POLICY*/ >> PCollection<Policy2> policyInput = pipeline >> .apply((KafkaIO.<String, String> >> read().withTopics(ImmutableList.of(policyTopic)) >> >> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(StringDeserializer.class)) >> // .withWatermarkFn(new AddWatermarkFn()) >> //.withTimestampFn2(new AddCustomerTimestampFn()) >> .withoutMetadata()) >> .apply(Values.<String> create()) >> //.apply(ParseJsons.of(Customer.class)); >> .apply("ParsePolicy", ParDo.of(new ParsePolicy())) >> .apply(Window.<Policy2> into(new >> GlobalWindows()).triggering(Repeatedly.forever( >> trigger1)) >> .accumulatingFiredPanes()) >> ; >> /**PARSE CLAIM**/ >> PCollection<Claim2> claimInput = pipeline >> .apply((KafkaIO.<String, String> >> read().withTopics(ImmutableList.of(claimTopic)) >> >> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(StringDeserializer.class)) >> .withoutMetadata()) >> .apply(Values.<String> create()) >> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) >> .apply(Window.<Claim2> into(new >> GlobalWindows()).triggering(trigger1) >> .accumulatingFiredPanes()) >> ; >> >> /**CUSTOMER ********/ >> PCollection<KV<Integer,String>> all_customers = customerInput >> .apply(new ExtractAndMapCustomerKey()) >> ; >> /***POLICY********/ >> PCollection<KV<Integer,String>> all_policies = policyInput >> .apply(new ExtractAndMapPolicyKey()) >> ; >> /***CLAIM*******/ >> PCollection<KV<Integer,String>> all_claims = claimInput >> .apply(new ExtractAndMapClaimKey()) >> ; >> /**JOIN**************/ >> /**This join works if I comment out the subsequent join**/ >> // PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= >> Join.innerJoin(all_customers,all_policies); >> >> /**This causes an exception**/ >> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= >> Join.innerJoin3Way(all_customers,all_policies,all_claims); >> >> /**this join will cause IllegalStateException **/ >> // PCollection<KV<Integer,KV<KV<String,String>,String>>> >> joinedCustomersPoliciesAndClaims = >> // Join.innerJoin(joinedCustomersAndPolicies,all_claims); >> >> /**This will also cause an exception when used with 3 collections**/ >> //PCollectionList<KV<Integer,String>> collections = >> PCollectionList.of(all_customers).and(all_policies).and(all_claims); >> //PCollection<KV<Integer,String>> merged= >> collections.apply(Flatten.<KV<Integer,String>>pCollections()); >> >> And the 3 way join >> >> public static <K, V1, V2, V3> PCollection<KV<K, KV<V1, V2>>> innerJoin3Way( >> final PCollection<KV<K, V1>> leftCollection, >> final PCollection<KV<K, V2>> rightCollection >> ,final PCollection<KV<K, V3>> thirdCollection) >> { >> >> final TupleTag<V1> v1Tuple = new TupleTag<>(); >> final TupleTag<V2> v2Tuple = new TupleTag<>(); >> final TupleTag<V3> v3Tuple = new TupleTag<>(); >> >> PCollection<KV<K, CoGbkResult>> coGbkResultCollection = >> KeyedPCollectionTuple.of(v1Tuple, leftCollection) >> .and(v2Tuple, rightCollection) >> .and(v3Tuple,thirdCollection) >> .apply(CoGroupByKey.<K>create()); >> >> System.out.println(coGbkResultCollection); >> >> return coGbkResultCollection.apply(ParDo.of( >> new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { >> >> @ProcessElement >> public void processElement(ProcessContext c) { >> KV<K, CoGbkResult> e = c.element(); >> >> Iterable<V1> leftValuesIterable = >> e.getValue().getAll(v1Tuple); >> Iterable<V2> rightValuesIterable = >> e.getValue().getAll(v2Tuple); >> Iterable<V3> thirdValuesIterable = >> e.getValue().getAll(v3Tuple); >> >> for(V3 thirdValue : thirdValuesIterable) >> { >> >> } >> >> for (V1 leftValue : leftValuesIterable) { >> for (V2 rightValue : rightValuesIterable) { >> c.output(KV.of(e.getKey(), KV.of(leftValue, >> rightValue))); >> } >> >> } >> } >> })) >> .setCoder(KvCoder.of(((KvCoder) >> leftCollection.getCoder()).getKeyCoder(), >> KvCoder.of(((KvCoder) >> leftCollection.getCoder()).getValueCoder(), >> ((KvCoder) >> rightCollection.getCoder()).getValueCoder()))); >> >> >> On Sat, Nov 4, 2017 at 3:43 PM, Ben Chambers <[email protected]> >> wrote: >> >>> Can you share the program using global windows and make sure the >>> exception is the same? Basically, this kind of problem has to do with >>> whether you have applied a grouping operation already. The triggering (and >>> sometimes windowing) before a grouping operation is different than after. >>> So, if you are having problems applying a groupbykey somewhere and think >>> the windowing and triggering should be the same, you need to look before >>> that to see if one of the collections has been grouped but the others >>> haven't. >>> >>> On Sat, Nov 4, 2017, 12:25 AM Artur Mrozowski <[email protected]> wrote: >>> >>>> Hej Ben, >>>> thank you for your answer. I think I forgot to mention that the join >>>> class already implements CoGroupByKey and comes from sdk extensions. I >>>> haven now modified it slightly to do 3 way join(see below). It works for 3 >>>> PCollections with fixed windows and provides the output this time, even >>>> when using early and late triggers.That is great! >>>> >>>> But when I try to use it with global windows for all three collections >>>> I get the same exception. >>>> >>>> Is it not possible to make a 3 way join in global window? The reason >>>> why I want to use global window is that I want to have all the historic >>>> records available in these collections. Not sure how that could be achieved >>>> using fixed windows. >>>> >>>> /Artur >>>> >>>> public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>> >>>> innerJoin3Way( >>>> final PCollection<KV<K, V1>> leftCollection, >>>> final PCollection<KV<K, V2>> rightCollection >>>> ,final PCollection<KV<K, V3>> thirdCollection) >>>> { >>>> final TupleTag<V1> v1Tuple = new TupleTag<>(); >>>> final TupleTag<V2> v2Tuple = new TupleTag<>(); >>>> final TupleTag<V3> v3Tuple = new TupleTag<>(); >>>> >>>> PCollection<KV<K, CoGbkResult>> coGbkResultCollection = >>>> KeyedPCollectionTuple.of(v1Tuple, leftCollection) >>>> .and(v2Tuple, rightCollection) >>>> .and(v3Tuple,thirdCollection) >>>> .apply(CoGroupByKey.<K>create()); >>>> >>>> >>>> return coGbkResultCollection.apply(ParDo.of( >>>> new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() { >>>> >>>> @ProcessElement >>>> public void processElement(ProcessContext c) >>>> >>>> >>>> On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <[email protected]> >>>> wrote: >>>> >>>>> It looks like this is a problematic interaction between the 2-layer >>>>> join and the (unfortunately complicated) continuation triggering. >>>>> Specifically, the triggering of Join(A, B) has the continuation trigger, >>>>> so >>>>> it isn't possible to join that with C. >>>>> >>>>> Instead of trying to do Join(Join(A, B), C), consider using a >>>>> CoGroupByKey. This will allow you to join all three input collections at >>>>> the same time, which should have two benefits. First, it will work since >>>>> you won't be trying to merge a continuation trigger with the original >>>>> trigger. Second, it should be more efficient, because you are performing a >>>>> single, three-way join instead of two, two-way joins. >>>>> >>>>> >>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html >>>>> for >>>>> more information. >>>>> >>>>> -- Ben >>>>> >>>>> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Kenneth and Aleksandr and thank you for your prompt answer. >>>>>> >>>>>> So there are two scenarios that I've been trying out but operation is >>>>>> always the same, join setA+setB =>setAB, and then join >>>>>> setAB+setC=>setABC. >>>>>> In scenario 1 I define three PCollections with global widnows and >>>>>> exactly the same triggers. Now I am able to join A+B but as soon I try to >>>>>> join AB+C i get the exception. >>>>>> . Here is the code snippet where the window and triggers are the same >>>>>> for all three PCollections using global windows: >>>>>> >>>>>> Pipeline pipeline = Pipeline.create(options); >>>>>> >>>>>> Trigger trigger1 = >>>>>> AfterProcessingTime >>>>>> .pastFirstElementInPane() >>>>>> .plusDelayOf(Duration.standardSeconds(10)) >>>>>> ; >>>>>> /**PARSE CUSTOMER*/ >>>>>> PCollection<Customer3> customerInput = pipeline >>>>>> >>>>>> .apply((KafkaIO.<String, String> >>>>>> read().withTopics(ImmutableList.of(customerTopic)) >>>>>> >>>>>> >>>>>> >>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>> .withoutMetadata()) >>>>>> .apply(Values.<String> create()) >>>>>> >>>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer())) >>>>>> .apply(Window.<Customer3> into(new >>>>>> GlobalWindows()).triggering(Repeatedly.forever( >>>>>> trigger1 )) >>>>>> .accumulatingFiredPanes()) >>>>>> ; >>>>>> /**PARSE POLICY*/ >>>>>> PCollection<Policy2> policyInput = pipeline >>>>>> .apply((KafkaIO.<String, String> >>>>>> read().withTopics(ImmutableList.of(policyTopic)) >>>>>> >>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>> // .withWatermarkFn(new AddWatermarkFn()) >>>>>> //.withTimestampFn2(new AddCustomerTimestampFn()) >>>>>> .withoutMetadata()) >>>>>> .apply(Values.<String> create()) >>>>>> //.apply(ParseJsons.of(Customer.class)); >>>>>> .apply("ParsePolicy", ParDo.of(new ParsePolicy())) >>>>>> .apply(Window.<Policy2> into(new >>>>>> GlobalWindows()).triggering(Repeatedly.forever( >>>>>> trigger1)) >>>>>> .accumulatingFiredPanes()) >>>>>> ; >>>>>> /**PARSE CLAIM**/ >>>>>> >>>>>> PCollection<Claim2> claimInput = pipeline >>>>>> .apply((KafkaIO.<String, String> >>>>>> read().withTopics(ImmutableList.of(claimTopic)) >>>>>> >>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>> .withoutMetadata()) >>>>>> .apply(Values.<String> create()) >>>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) >>>>>> >>>>>> .apply(Window.<Claim2> into(new >>>>>> GlobalWindows()).triggering(trigger1) >>>>>> .accumulatingFiredPanes()) >>>>>> ; >>>>>> >>>>>> /**CUSTOMER ********/ >>>>>> PCollection<KV<Integer,String>> all_customers = customerInput >>>>>> .apply(new ExtractAndMapCustomerKey()) >>>>>> ; >>>>>> /***POLICY********/ >>>>>> PCollection<KV<Integer,String>> all_policies = policyInput >>>>>> .apply(new ExtractAndMapPolicyKey()) >>>>>> ; >>>>>> /***CLAIM*******/ >>>>>> PCollection<KV<Integer,String>> all_claims = claimInput >>>>>> .apply(new ExtractAndMapClaimKey()) >>>>>> ; >>>>>> /**JOIN**************/ >>>>>> /**This join works if I comment out the subsequent join**/ >>>>>> >>>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= >>>>>> Join.innerJoin(all_customers,all_policies); >>>>>> >>>>>> /**this join will cause IllegalStateException **/ >>>>>> >>>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> >>>>>> joinedCustomersPoliciesAndClaims = >>>>>> Join.innerJoin(joinedCustomersAndPolicies,all_claims); >>>>>> >>>>>> >>>>>> >>>>>> The second scenario is using fixed windows. The logic is the same as >>>>>> above. Even in this case triggering is equal for all three collections >>>>>> like >>>>>> this >>>>>> >>>>>> .apply(Window.<Claim2> >>>>>> into(FixedWindows.of(Duration.standardSeconds(100))) >>>>>> >>>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes() >>>>>> .withAllowedLateness(Duration.standardSeconds(1))); >>>>>> >>>>>> /**JOIN**************/ >>>>>> /**NO ERROR this time **/ >>>>>> >>>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= >>>>>> Join.innerJoin(all_customers,all_policies); >>>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> >>>>>> joinedCustomersPoliciesAndClaims = >>>>>> Join.innerJoin(joinedCustomersAndPolicies,all_claims); >>>>>> >>>>>> >>>>>> this time I get no errors but I am not able to print the results in >>>>>> the console. >>>>>> >>>>>> So, I make another experiment and again define equal trigger for all >>>>>> three collections. I can print the output to the console for the first >>>>>> two >>>>>> PCollections but the second join again fails with illegalStateException. >>>>>> Triggering definition for all three collections: >>>>>> >>>>>> /**PARSE CLAIM**/ >>>>>> >>>>>> PCollection<Claim2> claimInput = pipeline >>>>>> >>>>>> .apply((KafkaIO.<String, String> >>>>>> read().withTopics(ImmutableList.of(claimTopic)) >>>>>> >>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>> .withoutMetadata()) >>>>>> .apply(Values.<String> create()) >>>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) >>>>>> >>>>>> .apply(Window.<Claim2> >>>>>> into(FixedWindows.of(Duration.standardSeconds(60))) >>>>>> .triggering(AfterWatermark.pastEndOfWindow() >>>>>> >>>>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() >>>>>> //early update frequency >>>>>> >>>>>> .alignedTo(Duration.standardSeconds(10))) >>>>>> >>>>>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20)))) >>>>>> .withAllowedLateness(Duration.standardMinutes(5)) >>>>>> .accumulatingFiredPanes()); >>>>>> >>>>>> >>>>>> >>>>>> In this cases I've added early and late firings which emits results >>>>>> from the pane but again throws exception on the second join. >>>>>> >>>>>> I know it's a lot of information to take in but basically if you have >>>>>> an example where you join three PCollections in global and in fixed >>>>>> windows >>>>>> with appropriate triggering, I'd be eternally grateful:) >>>>>> Or if you could explain how to do it, of course. thanks in advance. >>>>>> >>>>>> Best Regards >>>>>> Artur >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Artur, >>>>>>> >>>>>>> When you join the PCollections, they will be flattened and go >>>>>>> through a GroupByKey together. Since the trigger governs when the >>>>>>> GroupByKey can emit output, the triggers have to be equal or the >>>>>>> GroupByKey >>>>>>> doesn't have a clear guide as to when it should output. If you can make >>>>>>> the >>>>>>> triggering on all the input collections equal, that will resolve this >>>>>>> issue. If you still need different triggering elsewhere, that is fine. >>>>>>> You >>>>>>> just need to make them the same going in to the join. >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello, >>>>>>>> You can try put PCollection after flatten into same global window >>>>>>>> with triggers as it was before flattening. >>>>>>>> >>>>>>>> Best regards >>>>>>>> Aleksandr Gortujev >>>>>>>> >>>>>>>> >>>>>>>> 3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" < >>>>>>>> [email protected]>: >>>>>>>> >>>>>>>> Hi, >>>>>>>> I am on second week of our PoC with Beam and I am really amazed by >>>>>>>> the capabilities of the framework and how well engineered it is. >>>>>>>> >>>>>>>> Amazed does not mean experienced so please bear with me. >>>>>>>> >>>>>>>> What we try to achieve is to join several streams using windowing >>>>>>>> and triggers. And that is where I fear we hit the limitations for >>>>>>>> what can >>>>>>>> be done. >>>>>>>> >>>>>>>> In case A we run in global windows and we are able to combine two >>>>>>>> unbounded PCollections but when I try to combine the results with third >>>>>>>> collection I get the exception below. I tried many diffrent trigger >>>>>>>> combinations, but can't make it work. >>>>>>>> >>>>>>>> Exception in thread "main" java.lang.IllegalStateException: Inputs >>>>>>>> to Flatten had incompatible triggers: >>>>>>>> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane()), >>>>>>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 >>>>>>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 >>>>>>>> seconds)) >>>>>>>> >>>>>>>> In case B I use fixed windows. Again, I can successfully join two >>>>>>>> collections and print output in the console. When I add the third it >>>>>>>> runs >>>>>>>> without errors, but I am not able to materialize results in the >>>>>>>> console. >>>>>>>> Although I am able to print results of merge using Flatten so the error >>>>>>>> above is not longer an issue. >>>>>>>> >>>>>>>> Has anyone experience with joining three or more unbounded >>>>>>>> PCollections? What would be successful windowing, triggering strategy >>>>>>>> for >>>>>>>> global or fixed window respectively? >>>>>>>> >>>>>>>> Below code snippets from fixed windows case. Windows are defined in >>>>>>>> the same manner for all three collections, customer, claim and policy. >>>>>>>> The >>>>>>>> Join class I use comes from >>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java >>>>>>>> >>>>>>>> >>>>>>>> Would be really greateful if any of you would like to share your >>>>>>>> knowledge. >>>>>>>> >>>>>>>> Best Regard >>>>>>>> Artur >>>>>>>> >>>>>>>> PCollection<Claim2> claimInput = pipeline >>>>>>>> >>>>>>>> .apply((KafkaIO.<String, String> >>>>>>>> read().withTopics(ImmutableList.of(claimTopic)) >>>>>>>> >>>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) >>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>>>> .withoutMetadata()) >>>>>>>> .apply(Values.<String> create()) >>>>>>>> .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) >>>>>>>> .apply(Window.<Claim2> >>>>>>>> into(FixedWindows.of(Duration.standardSeconds(100))) >>>>>>>> >>>>>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes() >>>>>>>> .withAllowedLateness(Duration.standardSeconds(1))); >>>>>>>> >>>>>>>> /**JOIN**************/ >>>>>>>> PCollection<KV<Integer,KV<String,String>>> >>>>>>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies); >>>>>>>> PCollectionList<KV<Integer,String>> collections = >>>>>>>> PCollectionList.of(all_customers).and(all_policies).and(all_claims); >>>>>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> >>>>>>>> joinedCustomersPoliciesAndClaims = >>>>>>>> Join.innerJoin(joinedCustomersAndPolicies,all_claims); >>>>>>>> //PCollectionList<KV<Integer,String>> collections = >>>>>>>> PCollectionList.of(all_customers).and(all_policies); >>>>>>>> >>>>>>>> PCollection<KV<Integer,String>> merged= >>>>>>>> collections.apply(Flatten.<KV<Integer,String>>pCollections()); >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >> >
