Hi, I am on second week of our PoC with Beam and I am really amazed by the capabilities of the framework and how well engineered it is.
Amazed does not mean experienced so please bear with me. What we try to achieve is to join several streams using windowing and triggers. And that is where I fear we hit the limitations for what can be done. In case A we run in global windows and we are able to combine two unbounded PCollections but when I try to combine the results with third collection I get the exception below. I tried many diffrent trigger combinations, but can't make it work. Exception in thread "main" java.lang.IllegalStateException: Inputs to Flatten had incompatible triggers: Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane()), AfterEach.inOrder(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(10 seconds)) In case B I use fixed windows. Again, I can successfully join two collections and print output in the console. When I add the third it runs without errors, but I am not able to materialize results in the console. Although I am able to print results of merge using Flatten so the error above is not longer an issue. Has anyone experience with joining three or more unbounded PCollections? What would be successful windowing, triggering strategy for global or fixed window respectively? Below code snippets from fixed windows case. Windows are defined in the same manner for all three collections, customer, claim and policy. The Join class I use comes from https://github.com/apache/beam/blob/master/sdks/ java/extensions/join-library/src/main/java/org/apache/beam/ sdk/extensions/joinlibrary/Join.java Would be really greateful if any of you would like to share your knowledge. Best Regard Artur PCollection<Claim2> claimInput = pipeline .apply((KafkaIO.<String, String> read().withTopics(ImmutableList.of(claimTopic)) .updateConsumerProperties(consumerProps).withBootstrapServers(bootstrapServers) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(StringDeserializer.class)) .withoutMetadata()) .apply(Values.<String> create()) .apply("ParseJsonEventFn2", ParDo.of(new ParseClaim())) .apply(Window.<Claim2> into(FixedWindows.of(Duration.standardSeconds(100))) .triggering(AfterWatermark.pastEndOfWindow()).accumulatingFiredPanes() .withAllowedLateness(Duration.standardSeconds(1))); /**JOIN**************/ PCollection<KV<Integer,KV<String,String>>> joinedCustomersAndPolicies= Join.innerJoin(all_customers,all_policies); PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies).and(all_claims); PCollection<KV<Integer,KV<KV<String,String>,String>>> joinedCustomersPoliciesAndClaims = Join.innerJoin(joinedCustomersAndPolicies,all_claims); //PCollectionList<KV<Integer,String>> collections = PCollectionList.of(all_customers).and(all_policies); PCollection<KV<Integer,String>> merged= collections.apply(Flatten.<KV<Integer,String>>pCollections());
