Hi Vijay,

Flink does not provide fine-grained control to place keys to certain slots
or machines.
When specifying a key, it is up to Flink (i.e., its internal hash function)
where the data is processed. This works well for large key spaces, but can
be difficult if you have only a few keys.

So, even if you keyBy(cam) and handle the parallelization of seq#
internally (which I would not recommend), it might still happen that the
data of two cameras is processed on the same slot.
The only way to change that would be to fiddle with the hash of your keys,
but this might give you a completely different distribution when scaling
out the application at a later point in time.

Best, Fabian

2018-06-26 19:54 GMT+02:00 Vijay Balakrishnan <bvija...@gmail.com>:

> Hi Fabian,
> Thanks once again for your reply. I need to get the data from each
> cam/camera into 1 partition/slot and not move the gigantic video data
> around as much as I perform other operations on it. For eg, I can get seq#1
> and seq#2 for cam1 in cam1 partition/slot and then combine, split,parse,
> stitch etc. operations on it in multiple threads within the same cam1
> partition.
>
> I have the CameraKey defined as cam,seq# and then keyBy(cam) to get it in
> 1 partition(eg: cam1). The idea is to then work within the cam1 partition
> with various seq#'s 1,2 etc on various threads within the same
> slot/partition of TaskManager.
>
> The data is stored in EFS keyed based on seq#/cam# folder structure.
>
> Our actual problem is managing network bandwidth as a resource in each
> partition. We want to make sure that the processing of 1 camera(split into
> multiple seq# tasks) is not running on the same node as the processing of
> another camera as in that case, the required network bandwidth for storing
> the output of the process running in the partition would exceed the network
> bandwidth of the hardware. Camera processing is expected to run on the same
> hardware as the video decode step which is an earlier sequential process in
> the same Dataflow pipeline.
>
> I guess I might have to use a ThreadPool within each Slot(cam partition)
> to work on each seq# ??
>
> TIA
>
> On Tue, Jun 26, 2018 at 1:06 AM Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> keyBy() does not work hierarchically. Each keyBy() overrides the previous
>> partitioning.
>> You can keyBy(cam, seq#) which guarantees that all records with the same
>> (cam, seq#) are processed by the same parallel instance.
>> However, Flink does not give any guarantees about how the (cam, seq#)
>> partitions are distributed across slots (or even physical nodes).
>>
>> Btw. why is it important that all records of the same cam are processed
>> by the same physical node?
>>
>> Fabian
>>
>> 2018-06-25 21:36 GMT+02:00 Vijay Balakrishnan <bvija...@gmail.com>:
>>
>>> I see a .slotSharingGroup for SingleOutputStreamOperator
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.html#slotSharingGroup-java.lang.String->
>>>  which can put parallel instances of operations in same TM slot.
>>> I also see a CoLocationGroup but do not see a .coLocationGroup for 
>>> SingleOutputStreamOperator to
>>> put a task on the same slot.Seems CoLocationGroup
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.html>
>>> is defined at JobVertex level and has nothing to do with for
>>> SingleOutputStreamOperator.
>>> TaskManager has many slots. Slots have many threads within it.
>>> I want to be able to put the cam1 partition(keyBy(cam) in 1 slot and
>>> then use a keyBy(seq#) to run on many threads within that cam1 slot.
>>>
>>> Vijay
>>>
>>> On Mon, Jun 25, 2018 at 10:10 AM Vijay Balakrishnan <bvija...@gmail.com>
>>> wrote:
>>>
>>>> Thanks, Fabian.
>>>> Been reading your excellent book on Flink Streaming.Can't wait for more
>>>> chapters.
>>>> Attached a pic.
>>>>
>>>> [image: partition-by-cam-ts.jpg]
>>>>
>>>> I have records with seq# 1 and cam1 and cam2. I also have records with
>>>> varying seq#'s.
>>>> By partitioning on cam field first(keyBy(cam)), I can get cam1
>>>> partition on the same task manager instance/slot/vCore(???)
>>>> Can I then have seq# 1 and seq# 2 for cam1 partition run in different
>>>> slots/threads on the same Task Manager instance(aka cam1 partition) using
>>>> keyBy(seq#) & setParallelism() ? Can *forward* Strategy be used to
>>>> achieve this ?
>>>>
>>>> TIA
>>>>
>>>>
>>>> On Mon, Jun 25, 2018 at 1:03 AM Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Flink distributes task instances to slots and does not expose physical
>>>>> machines.
>>>>> Records are partitioned to task instances by hash partitioning. It is
>>>>> also not possible to guarantee that the records in two different operators
>>>>> are send to the same slot.
>>>>> Sharing information by side-passing it (e.g., via a file on a machine
>>>>> or in a static object) is an anti-pattern and should be avoided.
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> 2018-06-24 20:52 GMT+02:00 Vijay Balakrishnan <bvija...@gmail.com>:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Need to partition by cameraWithCube.getCam() 1st using
>>>>>> parallelCamTasks(passed in as args).
>>>>>>
>>>>>> Then within each partition, need to partition again by
>>>>>> cameraWithCube.getTs() but need to make sure each of the 2nd partition by
>>>>>> getTS() runs on the same physical node ?
>>>>>>
>>>>>> How do I achieve that ?
>>>>>>
>>>>>> DataStream<CameraWithCube> cameraWithCubeDataStream = env
>>>>>>             .addSource(new Source(....))
>>>>>>             .keyBy((cameraWithCube) -> cameraWithCube.getCam() )
>>>>>>             .process(new ProcessFunction<CameraWithCube, 
>>>>>> CameraWithCube>() {
>>>>>>                 public void processElement(CameraWithCube 
>>>>>> cameraWithCube, Context context, Collector<CameraWithCube> collector) 
>>>>>> throws Exception {
>>>>>>                     //do nothing
>>>>>>                 }
>>>>>>             })
>>>>>>             .slotSharingGroup("camSharingGroup")//TODO: how to add 
>>>>>> camera# of the partition
>>>>>>             .setParallelism(parallelCamTasks)
>>>>>>             .keyBy((cameraWithCube) -> cameraWithCube.getTs())
>>>>>>             .process(new ProcessFunction<CameraWithCube, 
>>>>>> CameraWithCube>() {
>>>>>>                 public void processElement(CameraWithCube 
>>>>>> cameraWithCube, Context context, Collector<CameraWithCube> collector) 
>>>>>> throws Exception {
>>>>>>                     //TODO: process code
>>>>>>                 }
>>>>>>             })
>>>>>>             .setParallelism(noOfSlotsInEachPhysicalNode)//TODO: how many 
>>>>>> parallel tasks within physical node
>>>>>>             .slotSharingGroup("??");//TODO: in same physical node
>>>>>>
>>>>>> TIA
>>>>>>
>>>>>
>>>>>
>>

Reply via email to