Hello,
You are using for policyInput Repeatedly.forever, for customerInput
Repeatedly.forever and for claimInput your trigger is without
Repeatedly.forever.


Best regards
Aleksandr Gortujev


сб, 4 нояб. 2017 г. в 20:39, Artur Mrozowski <[email protected]>:

> And the error message:
>
> Exception in thread "main" java.lang.IllegalStateException: Inputs to
> Flatten had incompatible triggers:
> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
> seconds)), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
> seconds)
> at
> org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:124)
> at
> org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:102)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
> at
> org.apache.beam.sdk.values.PCollectionList.apply(PCollectionList.java:182)
> at
> org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:124)
> at
> org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
> at
> org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:106)
> at com.tryg.beam.kafka.poc.utils.Join.innerJoin3Way(Join.java:170)
> at
> com.tryg.beam.kafka.poc.impl.CustomerStreamPipelineGlobal.main(CustomerStreamPipelineGlobal.java:220)
>
>
> On Sat, Nov 4, 2017 at 6:36 PM, Artur Mrozowski <[email protected]> wrote:
>
>> Sure, here is the url to my repo
>> https://github.com/afuyo/beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/CustomerStreamPipelineGlobal.java
>>
>>
>> It is fairly simple and I do no grouping before. Just read from Kafka and
>> then join. I followed your advice and the 3 way join works for pipeline
>> with fixed windows. Both classes use the same join class. The logic is the
>> same and it always work for A+B joinc(CoGroupByKey) That's what makes me
>> think it could be due to triggers. But I am ofcourse not sure:)
>>
>> /Artur
>>
>> Pipeline pipeline = Pipeline.create(options);
>>
>>   Trigger trigger1 =
>>           AfterProcessingTime
>>                   .pastFirstElementInPane()
>>                   .plusDelayOf(Duration.standardSeconds(10))
>>           ;
>>   /**PARSE CUSTOMER*/
>>   PCollection<Customer3> customerInput = pipeline
>>
>>           .apply((KafkaIO.<String, String> 
>> read().withTopics(ImmutableList.of(customerTopic))
>>                   
>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>                   .withKeyDeserializer(StringDeserializer.class)
>>                   .withValueDeserializer(StringDeserializer.class))
>>                   .withoutMetadata())
>>           .apply(Values.<String> create())
>>           .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>   .apply(Window.<Customer3> into(new 
>> GlobalWindows()).triggering(Repeatedly.forever(
>>       trigger1 ))
>>           .accumulatingFiredPanes())
>>           ;
>>   /**PARSE POLICY*/
>>   PCollection<Policy2> policyInput = pipeline
>>           .apply((KafkaIO.<String, String> 
>> read().withTopics(ImmutableList.of(policyTopic))
>>                   
>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>                   .withKeyDeserializer(StringDeserializer.class)
>>                   .withValueDeserializer(StringDeserializer.class))
>>                   // .withWatermarkFn(new AddWatermarkFn())
>>                   //.withTimestampFn2(new AddCustomerTimestampFn())
>>                   .withoutMetadata())
>>           .apply(Values.<String> create())
>>           //.apply(ParseJsons.of(Customer.class));
>>           .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>          .apply(Window.<Policy2> into(new 
>> GlobalWindows()).triggering(Repeatedly.forever(
>>                   trigger1))
>>                   .accumulatingFiredPanes())
>>           ;
>>  /**PARSE CLAIM**/
>>   PCollection<Claim2> claimInput = pipeline
>>           .apply((KafkaIO.<String, String> 
>> read().withTopics(ImmutableList.of(claimTopic))
>>                   
>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>                   .withKeyDeserializer(StringDeserializer.class)
>>                   .withValueDeserializer(StringDeserializer.class))
>>                   .withoutMetadata())
>>           .apply(Values.<String> create())
>>           .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>           .apply(Window.<Claim2> into(new 
>> GlobalWindows()).triggering(trigger1)
>>                   .accumulatingFiredPanes())
>>           ;
>>
>>   /**CUSTOMER  ********/
>>   PCollection<KV<Integer,String>> all_customers = customerInput
>>           .apply(new ExtractAndMapCustomerKey())
>>           ;
>>   /***POLICY********/
>>   PCollection<KV<Integer,String>> all_policies = policyInput
>>           .apply(new ExtractAndMapPolicyKey())
>>           ;
>>   /***CLAIM*******/
>>   PCollection<KV<Integer,String>> all_claims = claimInput
>>           .apply(new ExtractAndMapClaimKey())
>>           ;
>>   /**JOIN**************/
>>   /**This join works if I comment out the subsequent join**/
>> // PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>> Join.innerJoin(all_customers,all_policies);
>>
>> /**This causes an exception**/
>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>> Join.innerJoin3Way(all_customers,all_policies,all_claims);
>>
>>   /**this join will cause IllegalStateException **/
>>  // PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>> joinedCustomersPoliciesAndClaims =
>>    // Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>
>>    /**This will also cause an exception when used with 3 collections**/
>>   //PCollectionList<KV<Integer,String>> collections = 
>> PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>   //PCollection<KV<Integer,String>> merged= 
>> collections.apply(Flatten.<KV<Integer,String>>pCollections());
>>
>> And the 3 way join
>>
>> public static <K, V1, V2, V3> PCollection<KV<K, KV<V1, V2>>> innerJoin3Way(
>>            final PCollection<KV<K, V1>> leftCollection,
>>            final PCollection<KV<K, V2>> rightCollection
>>            ,final PCollection<KV<K, V3>> thirdCollection)
>> {
>>
>>        final TupleTag<V1> v1Tuple = new TupleTag<>();
>>        final TupleTag<V2> v2Tuple = new TupleTag<>();
>>        final TupleTag<V3> v3Tuple = new TupleTag<>();
>>
>>        PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
>>                KeyedPCollectionTuple.of(v1Tuple, leftCollection)
>>                        .and(v2Tuple, rightCollection)
>>                        .and(v3Tuple,thirdCollection)
>>                        .apply(CoGroupByKey.<K>create());
>>
>>        System.out.println(coGbkResultCollection);
>>
>>        return coGbkResultCollection.apply(ParDo.of(
>>                new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
>>
>>                    @ProcessElement
>>                    public void processElement(ProcessContext c) {
>>                        KV<K, CoGbkResult> e = c.element();
>>
>>                        Iterable<V1> leftValuesIterable = 
>> e.getValue().getAll(v1Tuple);
>>                        Iterable<V2> rightValuesIterable = 
>> e.getValue().getAll(v2Tuple);
>>                        Iterable<V3> thirdValuesIterable = 
>> e.getValue().getAll(v3Tuple);
>>
>>                        for(V3 thirdValue : thirdValuesIterable)
>>                        {
>>
>>                        }
>>
>>                        for (V1 leftValue : leftValuesIterable) {
>>                            for (V2 rightValue : rightValuesIterable) {
>>                                c.output(KV.of(e.getKey(), KV.of(leftValue, 
>> rightValue)));
>>                            }
>>
>>                        }
>>                    }
>>                }))
>>                .setCoder(KvCoder.of(((KvCoder) 
>> leftCollection.getCoder()).getKeyCoder(),
>>                        KvCoder.of(((KvCoder) 
>> leftCollection.getCoder()).getValueCoder(),
>>                                ((KvCoder) 
>> rightCollection.getCoder()).getValueCoder())));
>>
>>
>> On Sat, Nov 4, 2017 at 3:43 PM, Ben Chambers <[email protected]>
>> wrote:
>>
>>> Can you share the program using global windows and make sure the
>>> exception is the same? Basically, this kind of problem has to do with
>>> whether you have applied a grouping operation already. The triggering (and
>>> sometimes windowing) before a grouping operation is different than after.
>>> So, if you are having problems applying a groupbykey somewhere and think
>>> the windowing and triggering should be the same, you need to look before
>>> that to see if one of the collections has been grouped but the others
>>> haven't.
>>>
>>> On Sat, Nov 4, 2017, 12:25 AM Artur Mrozowski <[email protected]> wrote:
>>>
>>>> Hej Ben,
>>>> thank you for your answer. I think I forgot to mention that the join
>>>> class already implements CoGroupByKey and comes from sdk extensions. I
>>>> haven now modified it slightly to do 3 way join(see below). It works for 3
>>>> PCollections with fixed windows and provides the output this time, even
>>>> when using early and late triggers.That is great!
>>>>
>>>> But when I try to use it  with global windows for all three collections
>>>> I get the same exception.
>>>>
>>>> Is it not possible to make a 3 way join in global window? The reason
>>>> why I want to use global window is that I want to have all the historic
>>>> records available in these collections. Not sure how that could be achieved
>>>> using fixed windows.
>>>>
>>>> /Artur
>>>>
>>>> public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>> 
>>>> innerJoin3Way(
>>>>         final PCollection<KV<K, V1>> leftCollection,
>>>>         final PCollection<KV<K, V2>> rightCollection
>>>>         ,final PCollection<KV<K, V3>> thirdCollection)
>>>> {
>>>>     final TupleTag<V1> v1Tuple = new TupleTag<>();
>>>>     final TupleTag<V2> v2Tuple = new TupleTag<>();
>>>>     final TupleTag<V3> v3Tuple = new TupleTag<>();
>>>>
>>>>     PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
>>>>             KeyedPCollectionTuple.of(v1Tuple, leftCollection)
>>>>                     .and(v2Tuple, rightCollection)
>>>>                     .and(v3Tuple,thirdCollection)
>>>>                     .apply(CoGroupByKey.<K>create());
>>>>
>>>>
>>>>     return coGbkResultCollection.apply(ParDo.of(
>>>>             new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() {
>>>>
>>>>                 @ProcessElement
>>>>                 public void processElement(ProcessContext c)
>>>>
>>>>
>>>> On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <[email protected]>
>>>> wrote:
>>>>
>>>>> It looks like this is a problematic interaction between the 2-layer
>>>>> join and the (unfortunately complicated) continuation triggering.
>>>>> Specifically, the triggering of Join(A, B) has the continuation trigger, 
>>>>> so
>>>>> it isn't possible to join that with C.
>>>>>
>>>>> Instead of trying to do Join(Join(A, B), C), consider using a
>>>>> CoGroupByKey. This will allow you to join all three input collections at
>>>>> the same time, which should have two benefits. First, it will work since
>>>>> you won't be trying to merge a continuation trigger with the original
>>>>> trigger. Second, it should be more efficient, because you are performing a
>>>>> single, three-way join instead of two, two-way joins.
>>>>>
>>>>>
>>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html
>>>>>  for
>>>>> more information.
>>>>>
>>>>> -- Ben
>>>>>
>>>>> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>>>>>
>>>>>> So there are two scenarios that I've been trying out but operation is
>>>>>> always the same, join setA+setB =>setAB, and then join 
>>>>>> setAB+setC=>setABC.
>>>>>> In scenario 1 I define three PCollections with global widnows and
>>>>>> exactly the same triggers. Now I am able to join A+B but as soon I try to
>>>>>> join AB+C i get the exception.
>>>>>> . Here is the code snippet where the window and triggers are the same
>>>>>> for all three PCollections using global windows:
>>>>>>
>>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>>>
>>>>>>  Trigger trigger1 =
>>>>>>          AfterProcessingTime
>>>>>>                  .pastFirstElementInPane()
>>>>>>                  .plusDelayOf(Duration.standardSeconds(10))
>>>>>>          ;
>>>>>>  /**PARSE CUSTOMER*/
>>>>>>  PCollection<Customer3> customerInput = pipeline
>>>>>>
>>>>>>          .apply((KafkaIO.<String, String> 
>>>>>> read().withTopics(ImmutableList.of(customerTopic))
>>>>>>
>>>>>>
>>>>>>                  
>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>>                  .withoutMetadata())
>>>>>>          .apply(Values.<String> create())
>>>>>>
>>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>>>>>  .apply(Window.<Customer3> into(new 
>>>>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>>>>      trigger1 ))
>>>>>>          .accumulatingFiredPanes())
>>>>>>          ;
>>>>>>  /**PARSE POLICY*/
>>>>>>  PCollection<Policy2> policyInput = pipeline
>>>>>>          .apply((KafkaIO.<String, String> 
>>>>>> read().withTopics(ImmutableList.of(policyTopic))
>>>>>>                  
>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>>                  // .withWatermarkFn(new AddWatermarkFn())
>>>>>>                  //.withTimestampFn2(new AddCustomerTimestampFn())
>>>>>>                  .withoutMetadata())
>>>>>>          .apply(Values.<String> create())
>>>>>>          //.apply(ParseJsons.of(Customer.class));
>>>>>>          .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>>>>>         .apply(Window.<Policy2> into(new 
>>>>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>>>>                  trigger1))
>>>>>>                  .accumulatingFiredPanes())
>>>>>>          ;
>>>>>> /**PARSE CLAIM**/
>>>>>>
>>>>>>  PCollection<Claim2> claimInput = pipeline
>>>>>>          .apply((KafkaIO.<String, String> 
>>>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>                  
>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>>                  .withoutMetadata())
>>>>>>          .apply(Values.<String> create())
>>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>>
>>>>>>          .apply(Window.<Claim2> into(new 
>>>>>> GlobalWindows()).triggering(trigger1)
>>>>>>                  .accumulatingFiredPanes())
>>>>>>          ;
>>>>>>
>>>>>>  /**CUSTOMER  ********/
>>>>>>  PCollection<KV<Integer,String>> all_customers = customerInput
>>>>>>          .apply(new ExtractAndMapCustomerKey())
>>>>>>          ;
>>>>>>  /***POLICY********/
>>>>>>  PCollection<KV<Integer,String>> all_policies = policyInput
>>>>>>          .apply(new ExtractAndMapPolicyKey())
>>>>>>          ;
>>>>>>  /***CLAIM*******/
>>>>>>  PCollection<KV<Integer,String>> all_claims = claimInput
>>>>>>          .apply(new ExtractAndMapClaimKey())
>>>>>>          ;
>>>>>>  /**JOIN**************/
>>>>>>  /**This join works if I comment out the subsequent join**/
>>>>>>
>>>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>>>>>> Join.innerJoin(all_customers,all_policies);
>>>>>>
>>>>>>  /**this join will cause IllegalStateException **/
>>>>>>
>>>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>>>>>> joinedCustomersPoliciesAndClaims =
>>>>>>    Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>>
>>>>>>
>>>>>>
>>>>>> The second scenario is using fixed windows. The logic is the same as
>>>>>> above. Even in this case triggering is equal for all three collections 
>>>>>> like
>>>>>> this
>>>>>>
>>>>>> .apply(Window.<Claim2> 
>>>>>> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>>>         
>>>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>>>         .withAllowedLateness(Duration.standardSeconds(1)));
>>>>>>
>>>>>> /**JOIN**************/
>>>>>> /**NO ERROR this time **/
>>>>>>
>>>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>>>>>> Join.innerJoin(all_customers,all_policies);
>>>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>>>>>> joinedCustomersPoliciesAndClaims =
>>>>>>   Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>>
>>>>>>
>>>>>> this time I get no errors but I am not able to print the results in
>>>>>> the console.
>>>>>>
>>>>>> So, I make another experiment and again define equal trigger for all
>>>>>> three collections. I can print the output to the console for the first 
>>>>>> two
>>>>>> PCollections but the second join again fails with illegalStateException.
>>>>>> Triggering definition for all three collections:
>>>>>>
>>>>>> /**PARSE CLAIM**/
>>>>>>
>>>>>>  PCollection<Claim2> claimInput = pipeline
>>>>>>
>>>>>>          .apply((KafkaIO.<String, String> 
>>>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>                  
>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>>>                  .withoutMetadata())
>>>>>>          .apply(Values.<String> create())
>>>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>>
>>>>>>          .apply(Window.<Claim2> 
>>>>>> into(FixedWindows.of(Duration.standardSeconds(60)))
>>>>>>                  .triggering(AfterWatermark.pastEndOfWindow()
>>>>>>                          
>>>>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>>>>                                  //early update frequency
>>>>>>                                  
>>>>>> .alignedTo(Duration.standardSeconds(10)))
>>>>>>                          
>>>>>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20))))
>>>>>>                  .withAllowedLateness(Duration.standardMinutes(5))
>>>>>>                  .accumulatingFiredPanes());
>>>>>>
>>>>>>
>>>>>>
>>>>>> In this cases I've added early and late firings which emits results
>>>>>> from the pane but again throws exception on the second join.
>>>>>>
>>>>>> I know it's a lot of information to take in but basically if you have
>>>>>> an example where you join three PCollections in global and in fixed 
>>>>>> windows
>>>>>> with appropriate triggering, I'd be eternally grateful:)
>>>>>> Or if you could explain how to do it, of course. thanks in advance.
>>>>>>
>>>>>> Best Regards
>>>>>> Artur
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Artur,
>>>>>>>
>>>>>>> When you join the PCollections, they will be flattened and go
>>>>>>> through a GroupByKey together. Since the trigger governs when the
>>>>>>> GroupByKey can emit output, the triggers have to be equal or the 
>>>>>>> GroupByKey
>>>>>>> doesn't have a clear guide as to when it should output. If you can make 
>>>>>>> the
>>>>>>> triggering on all the input collections equal, that will resolve this
>>>>>>> issue. If you still need different triggering elsewhere, that is fine. 
>>>>>>> You
>>>>>>> just need to make them the same going in to the join.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>> You can try put PCollection after flatten into same global window
>>>>>>>> with triggers as it was before flattening.
>>>>>>>>
>>>>>>>> Best regards
>>>>>>>> Aleksandr Gortujev
>>>>>>>>
>>>>>>>>
>>>>>>>> 3. nov 2017 11:04 AM kirjutas kuupäeval "Artur Mrozowski" <
>>>>>>>> [email protected]>:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>> I am on second week of our PoC with Beam and I am really amazed by
>>>>>>>> the capabilities of the framework and how well engineered it is.
>>>>>>>>
>>>>>>>> Amazed does not mean experienced so please bear with me.
>>>>>>>>
>>>>>>>> What  we try to achieve is to join several streams using windowing
>>>>>>>> and triggers. And that is where I fear we hit the limitations  for 
>>>>>>>> what can
>>>>>>>> be done.
>>>>>>>>
>>>>>>>> In case A we run in global windows and we are able to combine two
>>>>>>>> unbounded PCollections but when I try to combine the results with third
>>>>>>>> collection I get the exception below. I tried many diffrent trigger
>>>>>>>> combinations, but can't make it work.
>>>>>>>>
>>>>>>>> Exception in thread "main" java.lang.IllegalStateException: Inputs
>>>>>>>> to Flatten had incompatible triggers:
>>>>>>>> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane()),
>>>>>>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>>>> seconds))
>>>>>>>>
>>>>>>>> In case B I use fixed windows. Again, I can successfully  join two
>>>>>>>> collections and print output in the console. When I add the third it 
>>>>>>>> runs
>>>>>>>> without errors, but I am not able to materialize results in the 
>>>>>>>> console.
>>>>>>>> Although I am able to print results of merge using Flatten so the error
>>>>>>>> above is not longer an issue.
>>>>>>>>
>>>>>>>> Has anyone experience with joining three or more unbounded
>>>>>>>> PCollections? What would be successful windowing, triggering strategy 
>>>>>>>> for
>>>>>>>> global or fixed window respectively?
>>>>>>>>
>>>>>>>> Below code snippets from fixed windows case. Windows are defined in
>>>>>>>> the same manner for all three collections, customer, claim and policy. 
>>>>>>>> The
>>>>>>>> Join class I use comes from
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>>>>>>>>
>>>>>>>>
>>>>>>>> Would be really greateful if any of you would like to share your
>>>>>>>> knowledge.
>>>>>>>>
>>>>>>>> Best Regard
>>>>>>>> Artur
>>>>>>>>
>>>>>>>> PCollection<Claim2> claimInput = pipeline
>>>>>>>>
>>>>>>>>         .apply((KafkaIO.<String, String> 
>>>>>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>>>                 
>>>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>>>>>                 .withoutMetadata())
>>>>>>>>         .apply(Values.<String> create())
>>>>>>>>         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>>>>         .apply(Window.<Claim2> 
>>>>>>>> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>>>>>                 
>>>>>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>>>>>                 .withAllowedLateness(Duration.standardSeconds(1)));
>>>>>>>>
>>>>>>>>  /**JOIN**************/
>>>>>>>>  PCollection<KV<Integer,KV<String,String>>> 
>>>>>>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies);
>>>>>>>> PCollectionList<KV<Integer,String>> collections = 
>>>>>>>> PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>>>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>>>>>>>> joinedCustomersPoliciesAndClaims =
>>>>>>>>     Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>>>>  //PCollectionList<KV<Integer,String>> collections = 
>>>>>>>> PCollectionList.of(all_customers).and(all_policies);
>>>>>>>>
>>>>>>>> PCollection<KV<Integer,String>> merged=
>>>>>>>> collections.apply(Flatten.<KV<Integer,String>>pCollections());
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>
>

Reply via email to