Sure, here is the url to my repo
https://github.com/afuyo/beamStuff/blob/master/src/main/java/com/tryg/beam/kafka/poc/impl/CustomerStreamPipelineGlobal.java


It is fairly simple and I do no grouping before. Just read from Kafka and
then join. I followed your advice and the 3 way join works for pipeline
with fixed windows. Both classes use the same join class. The logic is the
same and it always work for A+B joinc(CoGroupByKey) That's what makes me
think it could be due to triggers. But I am ofcourse not sure:)

/Artur

Pipeline pipeline = Pipeline.create(options);

  Trigger trigger1 =
          AfterProcessingTime
                  .pastFirstElementInPane()
                  .plusDelayOf(Duration.standardSeconds(10))
          ;
  /**PARSE CUSTOMER*/
  PCollection<Customer3> customerInput = pipeline

          .apply((KafkaIO.<String, String>
read().withTopics(ImmutableList.of(customerTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
                  .withKeyDeserializer(StringDeserializer.class)
                  .withValueDeserializer(StringDeserializer.class))
                  .withoutMetadata())
          .apply(Values.<String> create())
          .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
  .apply(Window.<Customer3> into(new
GlobalWindows()).triggering(Repeatedly.forever(
      trigger1 ))
          .accumulatingFiredPanes())
          ;
  /**PARSE POLICY*/
  PCollection<Policy2> policyInput = pipeline
          .apply((KafkaIO.<String, String>
read().withTopics(ImmutableList.of(policyTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
                  .withKeyDeserializer(StringDeserializer.class)
                  .withValueDeserializer(StringDeserializer.class))
                  // .withWatermarkFn(new AddWatermarkFn())
                  //.withTimestampFn2(new AddCustomerTimestampFn())
                  .withoutMetadata())
          .apply(Values.<String> create())
          //.apply(ParseJsons.of(Customer.class));
          .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
         .apply(Window.<Policy2> into(new
GlobalWindows()).triggering(Repeatedly.forever(
                  trigger1))
                  .accumulatingFiredPanes())
          ;
 /**PARSE CLAIM**/
  PCollection<Claim2> claimInput = pipeline
          .apply((KafkaIO.<String, String>
read().withTopics(ImmutableList.of(claimTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
                  .withKeyDeserializer(StringDeserializer.class)
                  .withValueDeserializer(StringDeserializer.class))
                  .withoutMetadata())
          .apply(Values.<String> create())
          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
          .apply(Window.<Claim2> into(new GlobalWindows()).triggering(trigger1)
                  .accumulatingFiredPanes())
          ;

  /**CUSTOMER  ********/
  PCollection<KV<Integer,String>> all_customers = customerInput
          .apply(new ExtractAndMapCustomerKey())
          ;
  /***POLICY********/
  PCollection<KV<Integer,String>> all_policies = policyInput
          .apply(new ExtractAndMapPolicyKey())
          ;
  /***CLAIM*******/
  PCollection<KV<Integer,String>> all_claims = claimInput
          .apply(new ExtractAndMapClaimKey())
          ;
  /**JOIN**************/
  /**This join works if I comment out the subsequent join**/
// PCollection<KV<Integer,KV<String,String>>>
joinedCustomersAndPolicies=
Join.innerJoin(all_customers,all_policies);

/**This causes an exception**/
 PCollection<KV<Integer,KV<String,String>>>
joinedCustomersAndPolicies=
Join.innerJoin3Way(all_customers,all_policies,all_claims);

  /**this join will cause IllegalStateException **/
 // PCollection<KV<Integer,KV<KV<String,String>,String>>>
joinedCustomersPoliciesAndClaims =
   // Join.innerJoin(joinedCustomersAndPolicies,all_claims);

   /**This will also cause an exception when used with 3 collections**/
  //PCollectionList<KV<Integer,String>> collections =
PCollectionList.of(all_customers).and(all_policies).and(all_claims);
  //PCollection<KV<Integer,String>> merged=
collections.apply(Flatten.<KV<Integer,String>>pCollections());

And the 3 way join

public static <K, V1, V2, V3> PCollection<KV<K, KV<V1, V2>>> innerJoin3Way(
           final PCollection<KV<K, V1>> leftCollection,
           final PCollection<KV<K, V2>> rightCollection
           ,final PCollection<KV<K, V3>> thirdCollection)
{

       final TupleTag<V1> v1Tuple = new TupleTag<>();
       final TupleTag<V2> v2Tuple = new TupleTag<>();
       final TupleTag<V3> v3Tuple = new TupleTag<>();

       PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
               KeyedPCollectionTuple.of(v1Tuple, leftCollection)
                       .and(v2Tuple, rightCollection)
                       .and(v3Tuple,thirdCollection)
                       .apply(CoGroupByKey.<K>create());

       System.out.println(coGbkResultCollection);

       return coGbkResultCollection.apply(ParDo.of(
               new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {

                   @ProcessElement
                   public void processElement(ProcessContext c) {
                       KV<K, CoGbkResult> e = c.element();

                       Iterable<V1> leftValuesIterable =
e.getValue().getAll(v1Tuple);
                       Iterable<V2> rightValuesIterable =
e.getValue().getAll(v2Tuple);
                       Iterable<V3> thirdValuesIterable =
e.getValue().getAll(v3Tuple);

                       for(V3 thirdValue : thirdValuesIterable)
                       {

                       }

                       for (V1 leftValue : leftValuesIterable) {
                           for (V2 rightValue : rightValuesIterable) {
                               c.output(KV.of(e.getKey(),
KV.of(leftValue, rightValue)));
                           }

                       }
                   }
               }))
               .setCoder(KvCoder.of(((KvCoder)
leftCollection.getCoder()).getKeyCoder(),
                       KvCoder.of(((KvCoder)
leftCollection.getCoder()).getValueCoder(),
                               ((KvCoder)
rightCollection.getCoder()).getValueCoder())));


On Sat, Nov 4, 2017 at 3:43 PM, Ben Chambers <[email protected]> wrote:

> Can you share the program using global windows and make sure the exception
> is the same? Basically, this kind of problem has to do with whether you
> have applied a grouping operation already. The triggering (and sometimes
> windowing) before a grouping operation is different than after. So, if you
> are having problems applying a groupbykey somewhere and think the windowing
> and triggering should be the same, you need to look before that to see if
> one of the collections has been grouped but the others haven't.
>
> On Sat, Nov 4, 2017, 12:25 AM Artur Mrozowski <[email protected]> wrote:
>
>> Hej Ben,
>> thank you for your answer. I think I forgot to mention that the join
>> class already implements CoGroupByKey and comes from sdk extensions. I
>> haven now modified it slightly to do 3 way join(see below). It works for 3
>> PCollections with fixed windows and provides the output this time, even
>> when using early and late triggers.That is great!
>>
>> But when I try to use it  with global windows for all three collections I
>> get the same exception.
>>
>> Is it not possible to make a 3 way join in global window? The reason why
>> I want to use global window is that I want to have all the historic records
>> available in these collections. Not sure how that could be achieved using
>> fixed windows.
>>
>> /Artur
>>
>> public static <K, V1, V2, V3> PCollection<KV<K,KV<KV<V1,V2>,V3>>> 
>> innerJoin3Way(
>>         final PCollection<KV<K, V1>> leftCollection,
>>         final PCollection<KV<K, V2>> rightCollection
>>         ,final PCollection<KV<K, V3>> thirdCollection)
>> {
>>     final TupleTag<V1> v1Tuple = new TupleTag<>();
>>     final TupleTag<V2> v2Tuple = new TupleTag<>();
>>     final TupleTag<V3> v3Tuple = new TupleTag<>();
>>
>>     PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
>>             KeyedPCollectionTuple.of(v1Tuple, leftCollection)
>>                     .and(v2Tuple, rightCollection)
>>                     .and(v3Tuple,thirdCollection)
>>                     .apply(CoGroupByKey.<K>create());
>>
>>
>>     return coGbkResultCollection.apply(ParDo.of(
>>             new DoFn<KV<K, CoGbkResult>,KV<K,KV<KV<V1,V2>,V3>> >() {
>>
>>                 @ProcessElement
>>                 public void processElement(ProcessContext c)
>>
>>
>> On Fri, Nov 3, 2017 at 9:47 PM, Ben Chambers <[email protected]>
>> wrote:
>>
>>> It looks like this is a problematic interaction between the 2-layer join
>>> and the (unfortunately complicated) continuation triggering. Specifically,
>>> the triggering of Join(A, B) has the continuation trigger, so it isn't
>>> possible to join that with C.
>>>
>>> Instead of trying to do Join(Join(A, B), C), consider using a
>>> CoGroupByKey. This will allow you to join all three input collections at
>>> the same time, which should have two benefits. First, it will work since
>>> you won't be trying to merge a continuation trigger with the original
>>> trigger. Second, it should be more efficient, because you are performing a
>>> single, three-way join instead of two, two-way joins.
>>>
>>> https://beam.apache.org/documentation/sdks/javadoc/2.
>>> 1.0/org/apache/beam/sdk/transforms/join/CoGroupByKey.html for more
>>> information.
>>>
>>> -- Ben
>>>
>>> On Fri, Nov 3, 2017 at 2:32 PM Artur Mrozowski <[email protected]> wrote:
>>>
>>>> Hi Kenneth and Aleksandr and thank you for your prompt answer.
>>>>
>>>> So there are two scenarios that I've been trying out but operation is
>>>> always the same, join setA+setB =>setAB, and then join setAB+setC=>setABC.
>>>> In scenario 1 I define three PCollections with global widnows and
>>>> exactly the same triggers. Now I am able to join A+B but as soon I try to
>>>> join AB+C i get the exception.
>>>> . Here is the code snippet where the window and triggers are the same
>>>> for all three PCollections using global windows:
>>>>
>>>> Pipeline pipeline = Pipeline.create(options);
>>>>
>>>>  Trigger trigger1 =
>>>>          AfterProcessingTime
>>>>                  .pastFirstElementInPane()
>>>>                  .plusDelayOf(Duration.standardSeconds(10))
>>>>          ;
>>>>  /**PARSE CUSTOMER*/
>>>>  PCollection<Customer3> customerInput = pipeline
>>>>
>>>>          .apply((KafkaIO.<String, String> 
>>>> read().withTopics(ImmutableList.of(customerTopic))
>>>>
>>>>
>>>>                  
>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>                  .withoutMetadata())
>>>>          .apply(Values.<String> create())
>>>>
>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseCustomer()))
>>>>  .apply(Window.<Customer3> into(new 
>>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>>      trigger1 ))
>>>>          .accumulatingFiredPanes())
>>>>          ;
>>>>  /**PARSE POLICY*/
>>>>  PCollection<Policy2> policyInput = pipeline
>>>>          .apply((KafkaIO.<String, String> 
>>>> read().withTopics(ImmutableList.of(policyTopic))
>>>>                  
>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>                  // .withWatermarkFn(new AddWatermarkFn())
>>>>                  //.withTimestampFn2(new AddCustomerTimestampFn())
>>>>                  .withoutMetadata())
>>>>          .apply(Values.<String> create())
>>>>          //.apply(ParseJsons.of(Customer.class));
>>>>          .apply("ParsePolicy", ParDo.of(new ParsePolicy()))
>>>>         .apply(Window.<Policy2> into(new 
>>>> GlobalWindows()).triggering(Repeatedly.forever(
>>>>                  trigger1))
>>>>                  .accumulatingFiredPanes())
>>>>          ;
>>>> /**PARSE CLAIM**/
>>>>
>>>>  PCollection<Claim2> claimInput = pipeline
>>>>          .apply((KafkaIO.<String, String> 
>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>                  
>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>                  .withoutMetadata())
>>>>          .apply(Values.<String> create())
>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>
>>>>          .apply(Window.<Claim2> into(new 
>>>> GlobalWindows()).triggering(trigger1)
>>>>                  .accumulatingFiredPanes())
>>>>          ;
>>>>
>>>>  /**CUSTOMER  ********/
>>>>  PCollection<KV<Integer,String>> all_customers = customerInput
>>>>          .apply(new ExtractAndMapCustomerKey())
>>>>          ;
>>>>  /***POLICY********/
>>>>  PCollection<KV<Integer,String>> all_policies = policyInput
>>>>          .apply(new ExtractAndMapPolicyKey())
>>>>          ;
>>>>  /***CLAIM*******/
>>>>  PCollection<KV<Integer,String>> all_claims = claimInput
>>>>          .apply(new ExtractAndMapClaimKey())
>>>>          ;
>>>>  /**JOIN**************/
>>>>  /**This join works if I comment out the subsequent join**/
>>>>
>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>>>> Join.innerJoin(all_customers,all_policies);
>>>>
>>>>  /**this join will cause IllegalStateException **/
>>>>
>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>>>> joinedCustomersPoliciesAndClaims =
>>>>    Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>
>>>>
>>>>
>>>> The second scenario is using fixed windows. The logic is the same as
>>>> above. Even in this case triggering is equal for all three collections like
>>>> this
>>>>
>>>> .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>         
>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>         .withAllowedLateness(Duration.standardSeconds(1)));
>>>>
>>>> /**JOIN**************/
>>>> /**NO ERROR this time **/
>>>>
>>>> PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>>>> Join.innerJoin(all_customers,all_policies);
>>>> PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>>>> joinedCustomersPoliciesAndClaims =
>>>>   Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>
>>>>
>>>> this time I get no errors but I am not able to print the results in the
>>>> console.
>>>>
>>>> So, I make another experiment and again define equal trigger for all
>>>> three collections. I can print the output to the console for the first two
>>>> PCollections but the second join again fails with illegalStateException.
>>>> Triggering definition for all three collections:
>>>>
>>>> /**PARSE CLAIM**/
>>>>
>>>>  PCollection<Claim2> claimInput = pipeline
>>>>
>>>>          .apply((KafkaIO.<String, String> 
>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>                  
>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>                  .withKeyDeserializer(StringDeserializer.class)
>>>>                  .withValueDeserializer(StringDeserializer.class))
>>>>                  .withoutMetadata())
>>>>          .apply(Values.<String> create())
>>>>          .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>
>>>>          .apply(Window.<Claim2> 
>>>> into(FixedWindows.of(Duration.standardSeconds(60)))
>>>>                  .triggering(AfterWatermark.pastEndOfWindow()
>>>>                          
>>>> .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
>>>>                                  //early update frequency
>>>>                                  .alignedTo(Duration.standardSeconds(10)))
>>>>                          
>>>> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(20))))
>>>>                  .withAllowedLateness(Duration.standardMinutes(5))
>>>>                  .accumulatingFiredPanes());
>>>>
>>>>
>>>>
>>>> In this cases I've added early and late firings which emits results
>>>> from the pane but again throws exception on the second join.
>>>>
>>>> I know it's a lot of information to take in but basically if you have
>>>> an example where you join three PCollections in global and in fixed windows
>>>> with appropriate triggering, I'd be eternally grateful:)
>>>> Or if you could explain how to do it, of course. thanks in advance.
>>>>
>>>> Best Regards
>>>> Artur
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Nov 3, 2017 at 5:57 PM, Kenneth Knowles <[email protected]> wrote:
>>>>
>>>>> Hi Artur,
>>>>>
>>>>> When you join the PCollections, they will be flattened and go through
>>>>> a GroupByKey together. Since the trigger governs when the GroupByKey can
>>>>> emit output, the triggers have to be equal or the GroupByKey doesn't have 
>>>>> a
>>>>> clear guide as to when it should output. If you can make the triggering on
>>>>> all the input collections equal, that will resolve this issue. If you 
>>>>> still
>>>>> need different triggering elsewhere, that is fine. You just need to make
>>>>> them the same going in to the join.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, Nov 3, 2017 at 9:48 AM, Aleksandr <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>> You can try put PCollection after flatten into same global window
>>>>>> with triggers as it was before flattening.
>>>>>>
>>>>>> Best regards
>>>>>> Aleksandr Gortujev
>>>>>>
>>>>>>
>>>>>> 3. nov 2017 11:04 AM kirjutas kuupƤeval "Artur Mrozowski" <
>>>>>> [email protected]>:
>>>>>>
>>>>>> Hi,
>>>>>> I am on second week of our PoC with Beam and I am really amazed by
>>>>>> the capabilities of the framework and how well engineered it is.
>>>>>>
>>>>>> Amazed does not mean experienced so please bear with me.
>>>>>>
>>>>>> What  we try to achieve is to join several streams using windowing
>>>>>> and triggers. And that is where I fear we hit the limitations  for what 
>>>>>> can
>>>>>> be done.
>>>>>>
>>>>>> In case A we run in global windows and we are able to combine two
>>>>>> unbounded PCollections but when I try to combine the results with third
>>>>>> collection I get the exception below. I tried many diffrent trigger
>>>>>> combinations, but can't make it work.
>>>>>>
>>>>>> Exception in thread "main" java.lang.IllegalStateException: Inputs
>>>>>> to Flatten had incompatible triggers: Repeatedly.forever(
>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()),
>>>>>> AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>> seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
>>>>>> seconds))
>>>>>>
>>>>>> In case B I use fixed windows. Again, I can successfully  join two
>>>>>> collections and print output in the console. When I add the third it runs
>>>>>> without errors, but I am not able to materialize results in the console.
>>>>>> Although I am able to print results of merge using Flatten so the error
>>>>>> above is not longer an issue.
>>>>>>
>>>>>> Has anyone experience with joining three or more unbounded
>>>>>> PCollections? What would be successful windowing, triggering strategy for
>>>>>> global or fixed window respectively?
>>>>>>
>>>>>> Below code snippets from fixed windows case. Windows are defined in
>>>>>> the same manner for all three collections, customer, claim and policy. 
>>>>>> The
>>>>>> Join class I use comes from https://github.com/
>>>>>> apache/beam/blob/master/sdks/java/extensions/join-library/
>>>>>> src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
>>>>>>
>>>>>>
>>>>>> Would be really greateful if any of you would like to share your
>>>>>> knowledge.
>>>>>>
>>>>>> Best Regard
>>>>>> Artur
>>>>>>
>>>>>> PCollection<Claim2> claimInput = pipeline
>>>>>>
>>>>>>         .apply((KafkaIO.<String, String> 
>>>>>> read().withTopics(ImmutableList.of(claimTopic))
>>>>>>                 
>>>>>> .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>>>                 .withoutMetadata())
>>>>>>         .apply(Values.<String> create())
>>>>>>         .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
>>>>>>         .apply(Window.<Claim2> 
>>>>>> into(FixedWindows.of(Duration.standardSeconds(100)))
>>>>>>                 
>>>>>> .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
>>>>>>                 .withAllowedLateness(Duration.standardSeconds(1)));
>>>>>>
>>>>>>  /**JOIN**************/
>>>>>>  PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= 
>>>>>> Join.innerJoin(all_customers,all_policies);
>>>>>> PCollectionList<KV<Integer,String>> collections = 
>>>>>> PCollectionList.of(all_customers).and(all_policies).and(all_claims);
>>>>>>  PCollection<KV<Integer,KV<KV<String,String>,String>>> 
>>>>>> joinedCustomersPoliciesAndClaims =
>>>>>>     Join.innerJoin(joinedCustomersAndPolicies,all_claims);
>>>>>>  //PCollectionList<KV<Integer,String>> collections = 
>>>>>> PCollectionList.of(all_customers).and(all_policies);
>>>>>>
>>>>>> PCollection<KV<Integer,String>> merged=
>>>>>> collections.apply(Flatten.<KV<Integer,String>>pCollections());
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>

Reply via email to