Hi,
I am on second week of our PoC with Beam and I am really amazed by the
capabilities of the framework and how well engineered it is.

Amazed does not mean experienced so please bear with me.

What  we try to achieve is to join several streams using windowing and
triggers. And that is where I fear we hit the limitations  for what can be
done.

In case A we run in global windows and we are able to combine two unbounded
PCollections but when I try to combine the results with third collection I
get the exception below. I tried many diffrent trigger combinations, but
can't make it work.

Exception in thread "main" java.lang.IllegalStateException: Inputs to
Flatten had incompatible triggers:
Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane()),
AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10
seconds))

In case B I use fixed windows. Again, I can successfully  join two
collections and print output in the console. When I add the third it runs
without errors, but I am not able to materialize results in the console.
Although I am able to print results of merge using Flatten so the error
above is not longer an issue.

Has anyone experience with joining three or more unbounded PCollections?
What would be successful windowing, triggering strategy for global or fixed
window respectively?

Below code snippets from fixed windows case. Windows are defined in the
same manner for all three collections, customer, claim and policy. The Join
class I use comes from https://github.com/apache/beam/blob/master/sdks/
java/extensions/join-library/src/main/java/org/apache/beam/
sdk/extensions/joinlibrary/Join.java


Would be really greateful if any of you would like to share your knowledge.

Best Regard
Artur

PCollection<Claim2> claimInput = pipeline

        .apply((KafkaIO.<String, String>
read().withTopics(ImmutableList.of(claimTopic))

.updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers)
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class))
                .withoutMetadata())
        .apply(Values.<String> create())
        .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim()))
        .apply(Window.<Claim2>
into(FixedWindows.of(Duration.standardSeconds(100)))

.triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes()
                .withAllowedLateness(Duration.standardSeconds(1)));

 /**JOIN**************/
 PCollection<KV<Integer,KV<String,String>>>
joinedCustomersAndPolicies=
Join.innerJoin(all_customers,all_policies);
PCollectionList<KV<Integer,String>> collections =
PCollectionList.of(all_customers).and(all_policies).and(all_claims);
 PCollection<KV<Integer,KV<KV<String,String>,String>>>
joinedCustomersPoliciesAndClaims =
    Join.innerJoin(joinedCustomersAndPolicies,all_claims);
 //PCollectionList<KV<Integer,String>> collections =
PCollectionList.of(all_customers).and(all_policies);

PCollection<KV<Integer,String>> merged=
collections.apply(Flatten.<KV<Integer,String>>pCollections());

Reply via email to