And the error message:

Exception in thread "main" java.lang.IllegalStateException: Inputs to
Flatten had incompatible triggers:
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
seconds)), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
seconds)
at
org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:124)
at
org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:102)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
at
org.apache.beam.sdk.values.PCollectionList.apply(PCollectionList.java:182)
at
org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:124)
at
org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
at
org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:106)
at com.tryg.beam.kafka.poc.utils.Join.innerJoin3Way(Join.java:170)
at
com.tryg.beam.kafka.poc.impl.CustomerStreamPipelineGlobal.main(CustomerStreamPipelineGlobal.java:220)


On Sat, Nov 4, 2017 at 6:36 PM, Artur Mrozowski <[email protected]> wrote:

> Sure, here is the url to my repo https://github.com/afuyo/
> beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/
> CustomerStreamPipelineGlobal.java
>
> It is fairly simple and I do no grouping before. Just read from Kafka and
> then join. I followed your advice and the 3 way join works for pipeline
> with fixed windows. Both classes use the same join class. The logic is the
> same and it always work for A+B joinc(CoGroupByKey) That's what makes me
> think it could be due to triggers. But I am ofcourse not sure:)
>
> /Artur
>
> Pipeline pipeline = Pipeline.create(options);
>
>   Trigger trigger1 =
>           AfterProcessingTime
>                   .pastFirstElementInPane()
>                   .plusDelayOf(Duration.standardSeconds(10))
>           ;
>   /**PARSE CUSTOMER*/
>   PCollection<Customer3> customerInput = pipeline
>
>           .apply((KafkaIO.<String, String> 
> read().withTopics(ImmutableList.of(customerTopic))
>                   
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>                   .withKeyDeserializer(StringDeserializer.class)
>                   .withValueDeserializer(StringDeserializer.class))
>                   .withoutMetadata())
>           .apply(Values.<String> create())
>           .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>   .apply(Window.<Customer3> into(new 
> GlobalWindows()).triggering(Repeatedly.forever(
>       trigger1 ))
>           .accumulatingFiredPanes())
>           ;
>   /**PARSE POLICY*/
>   PCollection<Policy2> policyInput = pipeline
>           .apply((KafkaIO.<String, String> 
> read().withTopics(ImmutableList.of(policyTopic))
>                   
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>                   .withKeyDeserializer(StringDeserializer.class)
>                   .withValueDeserializer(StringDeserializer.class))
>                   // .withWatermarkFn(new AddWatermarkFn())
>                   //.withTimestampFn2(new AddCustomerTimestampFn())
>                   .withoutMetadata())
>           .apply(Values.<String> create())
>           //.apply(ParseJsons.of(Customer.class));
>           .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>          .apply(Window.<Policy2> into(new 
> GlobalWindows()).triggering(Repeatedly.forever(
>                   trigger1))
>                   .accumulatingFiredPanes())
>           ;
>  /**PARSE CLAIM**/
>   PCollection<Claim2> claimInput = pipeline
>           .apply((KafkaIO.<String, String> 
> read().withTopics(ImmutableList.of(claimTopic))
>                   
> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>                   .withKeyDeserializer(StringDeserializer.class)
>                   .withValueDeserializer(StringDeserializer.class))
>                   .withoutMetadata())
>           .apply(Values.<String> create())
>           .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>           .apply(Window.<Claim2> into(new 
> GlobalWindows()).triggering(trigger1)
>                   .accumulatingFiredPanes())
>           ;
>
>   /**CUSTOMER  ********/
>   PCollection<KV<Integer,String>> all_customers = customerInput
>           .apply(new ExtractAndMapCustomerKey())
>           ;
>   /***POLICY********/
>   PCollection<KV<Integer,String>> all_policies = policyInput
>           .apply(new ExtractAndMapPolicyKey())
>           ;
>   /***CLAIM*******/
>   PCollection<KV<Integer,String>> all_claims = claimInput
>           .apply(new ExtractAndMapClaimKey())
>           ;
>   /**JOIN**************/
>   /**This join works if I comment out the subsequent join**/
> // PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
> Join.innerJoin(all_customers,all_policies);
>
> /**This causes an exception**/
>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
> Join.innerJoin3Way(all_customers,all_policies,all_claims);
>
>   /**this join will cause IllegalStateException **/
>  // PCollection<KV<Integer,KV<KV<String,String>,String>>> 
> joinedCustomersPoliciesAndClaims =
>    // Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>
>    /**This will also cause an exception when used with 3 collections**/
>   //PCollectionList<KV<Integer,String>> collections = 
> PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>   //PCollection<KV<Integer,String>> merged= 
> collections.apply(Flatten.<KV<Integer,String>>pCollections());
>
> And the 3 way join
>
> public static <K, V1, V2, V3> PCollection<KV<K, KV<V1, V2>>> innerJoin3Way(
>            final PCollection<KV<K, V1>> leftCollection,
>            final PCollection<KV<K, V2>> rightCollection
>            ,final PCollection<KV<K, V3>> thirdCollection)
> {
>
>        final TupleTag<V1> v1Tuple = new TupleTag<>();
>        final TupleTag<V2> v2Tuple = new TupleTag<>();
>        final TupleTag<V3> v3Tuple = new TupleTag<>();
>
>        PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
>                KeyedPCollectionTuple.of(v1Tuple, leftCollection)
>                        .and(v2Tuple, rightCollection)
>                        .and(v3Tuple,thirdCollection)
>                        .apply(CoGroupByKey.<K>create());
>
>        System.out.println(coGbkResultCollection);
>
>        return coGbkResultCollection.apply(ParDo.of(
>                new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
>
>                    @ProcessElement
>                    public void processElement(ProcessContext c) {
>                        KV<K, CoGbkResult> e = c.element();
>
>                        Iterable<V1> leftValuesIterable = 
> e.getValue().getAll(v1Tuple);
>                        Iterable<V2> rightValuesIterable = 
> e.getValue().getAll(v2Tuple);
>                        Iterable<V3> thirdValuesIterable = 
> e.getValue().getAll(v3Tuple);
>
>                        for(V3 thirdValue : thirdValuesIterable)
>                        {
>
>                        }
>
>                        for (V1 leftValue : leftValuesIterable) {
>                            for (V2 rightValue : rightValuesIterable) {
>                                c.output(KV.of(e.getKey(), KV.of(leftValue, 
> rightValue)));
>                            }
>
>                        }
>                    }
>                }))
>                .setCoder(KvCoder.of(((KvCoder) 
> leftCollection.getCoder()).getKeyCoder(),
>                        KvCoder.of(((KvCoder) 
> leftCollection.getCoder()).getValueCoder(),
>                                ((KvCoder) 
> rightCollection.getCoder()).getValueCoder())));
>
>
> On Sat, Nov 4, 2017 at 3:43 PM, Ben Chambers <[email protected]> wrote:
>
>> Can you share the program using global windows and make sure the
>> exception is the same? Basically, this kind of problem has to do with
>> whether you have applied a grouping operation already. The triggering (and
>> sometimes windowing) before a grouping operation is different than after.
>> So, if you are having problems applying a groupbykey somewhere and think
>> the windowing and triggering should be the same, you need to look before
>> that to see if one of the collections has been grouped but the others
>> haven't.
>>
>> On Sat, Nov 4, 2017, 12:25 AM Artur Mrozowski <[email protected]> wrote:
>>
>>> Hej Ben,
>>> thank you for your answer. I think I forgot to mention that the join
>>> class already implements CoGroupByKey and comes from sdk extensions. I
>>> haven now modified it slightly to do 3 way join(see below). It works for 3
>>> PCollections with fixed windows and provides the output this time, even
>>> when using early and late triggers.That is great!
>>>
>>> But when I try to use it  with global windows for all three collections
>>> I get the same exception.
>>>
>>> Is it not possible to make a 3 way join in global window? The reason why
>>> I want to use global window is that I want to have all the historic records
>>> available in these collections. Not sure how that could be achieved using
>>> fixed windows.
>>>
>>> /Artur
>>>
>>> public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>> 
>>> innerJoin3Way(
>>>         final PCollection<KV<K, V1>> leftCollection,
>>>         final PCollection<KV<K, V2>> rightCollection
>>>         ,final PCollection<KV<K, V3>> thirdCollection)
>>> {
>>>     final TupleTag<V1> v1Tuple = new TupleTag<>();
>>>     final TupleTag<V2> v2Tuple = new TupleTag<>();
>>>     final TupleTag<V3> v3Tuple = new TupleTag<>();
>>>
>>>     PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
>>>             KeyedPCollectionTuple.of(v1Tuple, leftCollection)
>>>                     .and(v2Tuple, rightCollection)
>>>                     .and(v3Tuple,thirdCollection)
>>>                     .apply(CoGroupByKey.<K>create());
>>>
>>>
>>>     return coGbkResultCollection.apply(ParDo.of(
>>>             new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() {
>>>
>>>                 @ProcessElement
>>>                 public void processElement(ProcessContext c)
>>>
>>>
>>> On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <[email protected]>
>>> wrote:
>>>
>>>> It looks like this is a problematic interaction between the 2-layer
>>>> join and the (unfortunately complicated) continuation triggering.
>>>> Specifically, the triggering of Join(A, B) has the continuation trigger, so
>>>> it isn't possible to join that with C.
>>>>
>>>> Instead of trying to do Join(Join(A, B), C), consider using a
>>>> CoGroupByKey. This will allow you to join all three input collections at
>>>> the same time, which should have two benefits. First, it will work since
>>>> you won't be trying to merge a continuation trigger with the original
>>>> trigger. Second, it should be more efficient, because you are performing a
>>>> single, three-way join instead of two, two-way joins.
>>>>
>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/
>>>> org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more
>>>> information.
>>>>
>>>> -- Ben
>>>>
>>>> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>>>>
>>>>> So there are two scenarios that I've been trying out but operation is
>>>>> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
>>>>> In scenario 1 I define three PCollections with global widnows and
>>>>> exactly the same triggers. Now I am able to join A+B but as soon I try to
>>>>> join AB+C i get the exception.
>>>>> . Here is the code snippet where the window and triggers are the same
>>>>> for all three PCollections using global windows:
>>>>>
>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>>
>>>>>  Trigger trigger1 =
>>>>>          AfterProcessingTime
>>>>>                  .pastFirstElementInPane()
>>>>>                  .plusDelayOf(Duration.standardSeconds(10))
>>>>>          ;
>>>>>  /**PARSE CUSTOMER*/
>>>>>  PCollection<Customer3> customerInput = pipeline
>>>>>
>>>>>          .apply((KafkaIO.<String, String> 
>>>>> read().withTopics(ImmutableList.of(customerTopic))
>>>>>
>>>>>
>>>>>                  
>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>                  .withoutMetadata())
>>>>>          .apply(Values.<String> create())
>>>>>
>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>>>>  .apply(Window.<Customer3> into(new 
>>>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>>>      trigger1 ))
>>>>>          .accumulatingFiredPanes())
>>>>>          ;
>>>>>  /**PARSE POLICY*/
>>>>>  PCollection<Policy2> policyInput = pipeline
>>>>>          .apply((KafkaIO.<String, String> 
>>>>> read().withTopics(ImmutableList.of(policyTopic))
>>>>>                  
>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>                  // .withWatermarkFn(new AddWatermarkFn())
>>>>>                  //.withTimestampFn2(new AddCustomerTimestampFn())
>>>>>                  .withoutMetadata())
>>>>>          .apply(Values.<String> create())
>>>>>          //.apply(ParseJsons.of(Customer.class));
>>>>>          .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>>>>         .apply(Window.<Policy2> into(new 
>>>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>>>                  trigger1))
>>>>>                  .accumulatingFiredPanes())
>>>>>          ;
>>>>> /**PARSE CLAIM**/
>>>>>
>>>>>  PCollection<Claim2> claimInput = pipeline
>>>>>          .apply((KafkaIO.<String, String> 
>>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>>                  
>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>                  .withoutMetadata())
>>>>>          .apply(Values.<String> create())
>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>
>>>>>          .apply(Window.<Claim2> into(new 
>>>>> GlobalWindows()).triggering(trigger1)
>>>>>                  .accumulatingFiredPanes())
>>>>>          ;
>>>>>
>>>>>  /**CUSTOMER  ********/
>>>>>  PCollection<KV<Integer,String>> all_customers = customerInput
>>>>>          .apply(new ExtractAndMapCustomerKey())
>>>>>          ;
>>>>>  /***POLICY********/
>>>>>  PCollection<KV<Integer,String>> all_policies = policyInput
>>>>>          .apply(new ExtractAndMapPolicyKey())
>>>>>          ;
>>>>>  /***CLAIM*******/
>>>>>  PCollection<KV<Integer,String>> all_claims = claimInput
>>>>>          .apply(new ExtractAndMapClaimKey())
>>>>>          ;
>>>>>  /**JOIN**************/
>>>>>  /**This join works if I comment out the subsequent join**/
>>>>>
>>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>>>>> Join.innerJoin(all_customers,all_policies);
>>>>>
>>>>>  /**this join will cause IllegalStateException **/
>>>>>
>>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>>>>> joinedCustomersPoliciesAndClaims =
>>>>>    Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>
>>>>>
>>>>>
>>>>> The second scenario is using fixed windows. The logic is the same as
>>>>> above. Even in this case triggering is equal for all three collections 
>>>>> like
>>>>> this
>>>>>
>>>>> .apply(Window.<Claim2> 
>>>>> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>>         
>>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>>         .withAllowedLateness(Duration.standardSeconds(1)));
>>>>>
>>>>> /**JOIN**************/
>>>>> /**NO ERROR this time **/
>>>>>
>>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>>>>> Join.innerJoin(all_customers,all_policies);
>>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>>>>> joinedCustomersPoliciesAndClaims =
>>>>>   Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>
>>>>>
>>>>> this time I get no errors but I am not able to print the results in
>>>>> the console.
>>>>>
>>>>> So, I make another experiment and again define equal trigger for all
>>>>> three collections. I can print the output to the console for the first two
>>>>> PCollections but the second join again fails with illegalStateException.
>>>>> Triggering definition for all three collections:
>>>>>
>>>>> /**PARSE CLAIM**/
>>>>>
>>>>>  PCollection<Claim2> claimInput = pipeline
>>>>>
>>>>>          .apply((KafkaIO.<String, String> 
>>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>>                  
>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>                  .withoutMetadata())
>>>>>          .apply(Values.<String> create())
>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>
>>>>>          .apply(Window.<Claim2> 
>>>>> into(FixedWindows.of(Duration.standardSeconds(60)))
>>>>>                  .triggering(AfterWatermark.pastEndOfWindow()
>>>>>                          
>>>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>>>                                  //early update frequency
>>>>>                                  .alignedTo(Duration.standardSeconds(10)))
>>>>>                          
>>>>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20))))
>>>>>                  .withAllowedLateness(Duration.standardMinutes(5))
>>>>>                  .accumulatingFiredPanes());
>>>>>
>>>>>
>>>>>
>>>>> In this cases I've added early and late firings which emits results
>>>>> from the pane but again throws exception on the second join.
>>>>>
>>>>> I know it's a lot of information to take in but basically if you have
>>>>> an example where you join three PCollections in global and in fixed 
>>>>> windows
>>>>> with appropriate triggering, I'd be eternally grateful:)
>>>>> Or if you could explain how to do it, of course. thanks in advance.
>>>>>
>>>>> Best Regards
>>>>> Artur
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Artur,
>>>>>>
>>>>>> When you join the PCollections, they will be flattened and go through
>>>>>> a GroupByKey together. Since the trigger governs when the GroupByKey can
>>>>>> emit output, the triggers have to be equal or the GroupByKey doesn't 
>>>>>> have a
>>>>>> clear guide as to when it should output. If you can make the triggering 
>>>>>> on
>>>>>> all the input collections equal, that will resolve this issue. If you 
>>>>>> still
>>>>>> need different triggering elsewhere, that is fine. You just need to make
>>>>>> them the same going in to the join.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>> You can try put PCollection after flatten into same global window
>>>>>>> with triggers as it was before flattening.
>>>>>>>
>>>>>>> Best regards
>>>>>>> Aleksandr Gortujev
>>>>>>>
>>>>>>>
>>>>>>> 3. nov 2017 11:04 AM kirjutas kuupƤeval "Artur Mrozowski" <
>>>>>>> [email protected]>:
>>>>>>>
>>>>>>> Hi,
>>>>>>> I am on second week of our PoC with Beam and I am really amazed by
>>>>>>> the capabilities of the framework and how well engineered it is.
>>>>>>>
>>>>>>> Amazed does not mean experienced so please bear with me.
>>>>>>>
>>>>>>> What  we try to achieve is to join several streams using windowing
>>>>>>> and triggers. And that is where I fear we hit the limitations  for what 
>>>>>>> can
>>>>>>> be done.
>>>>>>>
>>>>>>> In case A we run in global windows and we are able to combine two
>>>>>>> unbounded PCollections but when I try to combine the results with third
>>>>>>> collection I get the exception below. I tried many diffrent trigger
>>>>>>> combinations, but can't make it work.
>>>>>>>
>>>>>>> Exception in thread "main" java.lang.IllegalStateException: Inputs
>>>>>>> to Flatten had incompatible triggers: Repeatedly.forever(AfterSynchr
>>>>>>> onizedProcessingTime.pastFirstElementInPane()),
>>>>>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>>> seconds))
>>>>>>>
>>>>>>> In case B I use fixed windows. Again, I can successfully  join two
>>>>>>> collections and print output in the console. When I add the third it 
>>>>>>> runs
>>>>>>> without errors, but I am not able to materialize results in the console.
>>>>>>> Although I am able to print results of merge using Flatten so the error
>>>>>>> above is not longer an issue.
>>>>>>>
>>>>>>> Has anyone experience with joining three or more unbounded
>>>>>>> PCollections? What would be successful windowing, triggering strategy 
>>>>>>> for
>>>>>>> global or fixed window respectively?
>>>>>>>
>>>>>>> Below code snippets from fixed windows case. Windows are defined in
>>>>>>> the same manner for all three collections, customer, claim and policy. 
>>>>>>> The
>>>>>>> Join class I use comes from https://github.com/apache
>>>>>>> /beam/blob/master/sdks/java/extensions/join-library/src/
>>>>>>> main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>>>>>>>
>>>>>>>
>>>>>>> Would be really greateful if any of you would like to share your
>>>>>>> knowledge.
>>>>>>>
>>>>>>> Best Regard
>>>>>>> Artur
>>>>>>>
>>>>>>> PCollection<Claim2> claimInput = pipeline
>>>>>>>
>>>>>>>         .apply((KafkaIO.<String, String> 
>>>>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>>                 
>>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>>>>                 .withoutMetadata())
>>>>>>>         .apply(Values.<String> create())
>>>>>>>         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>>>         .apply(Window.<Claim2> 
>>>>>>> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>>>>                 
>>>>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>>>>                 .withAllowedLateness(Duration.standardSeconds(1)));
>>>>>>>
>>>>>>>  /**JOIN**************/
>>>>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>>>>>>> Join.innerJoin(all_customers,all_policies);
>>>>>>> PCollectionList<KV<Integer,String>> collections = 
>>>>>>> PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>>>>>>> joinedCustomersPoliciesAndClaims =
>>>>>>>     Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>>>  //PCollectionList<KV<Integer,String>> collections = 
>>>>>>> PCollectionList.of(all_customers).and(all_policies);
>>>>>>>
>>>>>>> PCollection<KV<Integer,String>> merged=
>>>>>>> collections.apply(Flatten.<KV<Integer,String>>pCollections());
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>

Reply via email to