Hey Jamie, Ok with #1. I guess #2 is just not possible.
I got it about #3. I just checked the code for the tumbling window assigner and I noticed it's just its default trigger that gets overwritten when using a custom trigger, not the way it assigns windows, it makes sense now. Regarding #4, after doing some more tests I think it's more complex than I first thought. I'll probably create another thread explaining more that specific question. Thanks, Matt On Wed, Dec 14, 2016 at 2:52 PM, Jamie Grier <[email protected]> wrote: > For #1 there are a couple of ways to do this. The easiest is probably > stream1.connect(stream2).map(...) where the MapFunction maps the two > input types to a common type that you can then process uniformly. > > For #3 There must always be a WindowAssigner specified. There are some > convenient ways to do this in the API such at timeWindow(), or window( > TumblingProcessingTimeWindows.of(...)), etc, however you always must do > this whether your provide your own trigger implementation or not. The way > to use window(...) with and customer trigger is just: > stream.keyBy(...).window(...).trigger(...).apply(...) or something > similar. Not sure if I answered your question though.. > > For #4: If I understand you correctly that is exactly what CountWindow(10, > 1) does already. For example if your input data was a sequence of integers > starting with 0 the output would be: > > (0) > (0, 1) > (0, 1, 2) > (0, 1, 2, 3) > (0, 1, 2, 3, 4) > (0, 1, 2, 3, 4, 5) > (0, 1, 2, 3, 4, 5, 6) > (0, 1, 2, 3, 4, 5, 6, 7) > (0, 1, 2, 3, 4, 5, 6, 7, 8) > (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) > (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) > (2, 3, 4, 5, 6, 7, 8, 9, 10, 11) > (3, 4, 5, 6, 7, 8, 9, 10, 11, 12) > (4, 5, 6, 7, 8, 9, 10, 11, 12, 13) > ... > etc > > -Jamie > > > On Wed, Dec 14, 2016 at 9:17 AM, Matt <[email protected]> wrote: > >> Hello people, >> >> I've written down some quick questions for which I couldn't find much or >> anything in the documentation. I hope you can answer some of them! >> >> *# Multiple consumers* >> >> *1.* Is it possible to .union() streams of different classes? It is >> useful to create a consumer that counts elements on different topics for >> example, using a key such as the class name of the element, and a tumbling >> window of 5 mins let's say. >> >> *2.* In case #1 is not possible, I need to launch multiple consumers to >> achieve the same effect. However, I'm getting a "Factory already >> initialized" error if I run environment.execute() for two consumers on >> different threads. How do you .execute() more than one consumer on the same >> application? >> >> *# Custom triggers* >> >> *3.* If a custom .trigger() overwrites the trigger of the WindowAssigner >> used previously, why do we have to specify a WindowAssigner (such as >> TumblingProcessingTimeWindows) in order to be able to specify a custom >> trigger? Shouldn't it be possible to send a trigger to .window()? >> >> *4.* I need a stream with a CountWindow (size 10, slide 1 let's say) >> that may take more than 10 hours fill for the first time, but in the >> meanwhile I want to process whatever elements already generated. I guess >> the way to do this is to create a custom trigger that fires on every new >> element, with up to 10 elements at a time. The result would be windows of >> sizes: 1 element, then 2, 3, ..., 9, 10, 10, 10, .... Is there a way to >> achieve this with predefined triggers or a custom trigger is the only way >> to go here? >> >> Best regards, >> Matt >> > > > > -- > > Jamie Grier > data Artisans, Director of Applications Engineering > @jamiegrier <https://twitter.com/jamiegrier> > [email protected] > >
