> So as long as the parallelism of my kafka source and sink operators is 1, all 
> the subsequent operators (multiple filters to create multiple streams, and 
> then individual CEP and Process operators per stream) will be executed in the 
> same task slot? 

Yes, unless you specify different resource sharing group for subsequent 
operators. 

> Regarding approach D, I'm not sure how this is different from the current 
> approach I had provided the code for above, and will it solve this problem of 
> different data streams not getting distributed across slots?

Difference is huge. Without keyBy you can not have multiple instances 
(parallelism > 1) of source and filtering operators (unless you create 
different Kafka partitions per each device, which in your case would solve a 
lot of problems btw). Your solution that you shown earlier, will simply not 
scale beyond one machine. You could distribute your business logic among as 
many machines as you want, but there always would be a potential bottleneck of 
single source/filtering operations. With keyBy you could have multiple source 
operators and keyBy would ensure that events from the same device are processed 
always by one task/machine.

Piotrek

> On 21 Nov 2017, at 07:39, Shailesh Jain <shailesh.j...@stellapps.com> wrote:
> 
> Thanks for your time in helping me here.
> 
> So as long as the parallelism of my kafka source and sink operators is 1, all 
> the subsequent operators (multiple filters to create multiple streams, and 
> then individual CEP and Process operators per stream) will be executed in the 
> same task slot? 
> 
> I cannot take approach F as the entire business logic revolves around event 
> timing.
> 
> Regarding approach D, I'm not sure how this is different from the current 
> approach I had provided the code for above, and will it solve this problem of 
> different data streams not getting distributed across slots?
> 
> Thanks again,
> Shailesh
> 
> On Fri, Nov 17, 2017 at 3:01 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Sorry for not responding but I was away.
> 
> Regarding 1.
> 
> One source operator, followed by multiple tasks with parallelism 1 (as 
> visible on your screen shot) that share resource group will collapse to one 
> task slot - only one TaskManager will execute all of your job.
> 
> 
> Because all of your events are written into one Kafka topic, previously 
> proposed solutions A) (multiple jobs), and B) (one job with multiple sources) 
> can not work. In that case what you have to do is either:
> 
> D) set parallelism as you wish in the environment, read from Kafka, keyBy 
> device type, split the stream by filtering by device type (or using side 
> outputs), perform your logic
> 
> This will create TOTAL_DEVICES number of data streams after keyBy on each 
> machine, and filtering will cost you (it will be linear according 
> TOTAL_DEVICES), but should be the easiest solution.
> 
> E) set parallelism as you wish, read from Kafka, keyBy device type, write 
> custom operators with custom logic handling watermarks using KeyedState
> 
> However I would strongly suggest to re-consider
> 
> F) ignore all the issue of assigning different watermarks per device stream, 
> just assign minimal from all of the devices. It would be the easiest to 
> implement.
> 
> Piotrek
> 
> > On 17 Nov 2017, at 09:22, Nico Kruber <n...@data-artisans.com 
> > <mailto:n...@data-artisans.com>> wrote:
> >
> > regarding 3.
> > a) The taskmanager logs are missing, are there any?
> > b) Also, the JobManager logs say you have 4 slots available in total - is 
> > this
> > enough for your 5 devices scenario?
> > c) The JobManager log, however, does not really reveal what it is currently
> > doing, can you set the log level to DEBUG to see more?
> > d) Also, do you still observe CPU load during the 15min as an indication 
> > that
> > it is actually doing something?
> > e) During this 15min period where apparently nothing happens, can you 
> > provide
> > the output of "jstack <jobmanager_pid>" (with the PID of your JobManager)?
> > f) You may further be able to debug into what is happening by running this 
> > in
> > your IDE in debug mode and pause the execution when you suspect it to hang.
> >
> >
> > Nico
> >
> > On Tuesday, 14 November 2017 14:27:36 CET Piotr Nowojski wrote:
> >> 3. Nico, can you take a look at this one? Isn’t this a blob server issue?
> >>
> >> Piotrek
> >>
> >>> On 14 Nov 2017, at 11:35, Shailesh Jain <shailesh.j...@stellapps.com 
> >>> <mailto:shailesh.j...@stellapps.com>>
> >>> wrote:
> >>>
> >>> 3. Have attached the logs and exception raised (15min - configured akka
> >>> timeout) after submitting the job.
> >>>
> >>> Thanks,
> >>> Shailesh
> >>>
> >>>
> >>> On Tue, Nov 14, 2017 at 2:46 PM, Piotr Nowojski <pi...@data-artisans.com 
> >>> <mailto:pi...@data-artisans.com>
> >>> <mailto:pi...@data-artisans.com <mailto:pi...@data-artisans.com>>> wrote: 
> >>> Hi,
> >>>
> >>> 3. Can you show the logs from job manager and task manager?
> >>>
> >>>> On 14 Nov 2017, at 07:26, Shailesh Jain <shailesh.j...@stellapps.com 
> >>>> <mailto:shailesh.j...@stellapps.com>
> >>>> <mailto:shailesh.j...@stellapps.com 
> >>>> <mailto:shailesh.j...@stellapps.com>>> wrote:
> >>>>
> >>>> Hi Piotrek,
> >>>>
> >>>> I tried out option 'a' mentioned above, but instead of separate jobs, I'm
> >>>> creating separate streams per device. Following is the test deployment
> >>>> configuration as a local cluster (8GB ram, 2.5 GHz i5, ubuntu machine):
> >>>>
> >>>> akka.client.timeout 15 min
> >>>> jobmanager.heap.mb 1024
> >>>> jobmanager.rpc.address localhost
> >>>> jobmanager.rpc.port 6123
> >>>> jobmanager.web.port 8081
> >>>> metrics.reporter.jmx.class org.apache.flink.metrics.jmx.JMXReporter
> >>>> metrics.reporter.jmx.port 8789
> >>>> metrics.reporters jmx
> >>>> parallelism.default 1
> >>>> taskmanager.heap.mb 1024
> >>>> taskmanager.memory.preallocate false
> >>>> taskmanager.numberOfTaskSlots 4
> >>>>
> >>>> The number of Operators per device stream is 4 (one sink function, 3 CEP
> >>>> operators).
> >>>>
> >>>> Observations (and questions):
> >>>>
> >>>> 3. Job deployment hangs (never switches to RUNNING) when the number of
> >>>> devices is greater than 5. Even on increasing the akka client timeout,
> >>>> it does not help. Will separate jobs being deployed per device instead
> >>>> of separate streams help here?
> >>>>
> >>>> Thanks,
> >>>> Shailesh
> 
> 

Reply via email to