Hi Stephan,

thanks for the input :).

I have some further questions on how the data is segmented before/when it is moved over the network:

1. What does the number of ResultSubpartition instances in the ResultPartition correspond to? Is one assigned to each consuming task? If so, how can I find for each ResultSubpartition the corresponding Task, Slot or similar? If not how is decided which piece of the data is routed to which consuming task?

2. What defines the number of Buffer instances per ResultSubpartition? Does one Buffer correspond to exactly one serialized Record? Is a Record the single output of an operator, are there multiple records per operator, or does it differ depending on the operator?

3. Or are the Buffers defined in a completely different manner? In that case, could you give me a pointer to understand how Buffer instances are used?

It would be great if you could help me answer those. Even a partial answer would be great ;).

Best regards,
Niklas


On 08.07.2015 13:54, Stephan Ewen wrote:
Hi!

Here are a few pointers:

   - The data transfer is the responsibility of the receiver. The sender
cannot know ahead of time where data is sent

   - On the receiver side, you should be able to count the received bytes in
the RemoteInputChannel or LocalInputChannel.

   - The JobManager is notified of the final state of a task when the task
is completed (successful or unsuccessful) and un-registers. See
"org.apache.flink.runtime.taskmanager.Task#notifyFinalState()".

Let us know if you have more questions.

Greetings,
Stephan


On Mon, Jul 6, 2015 at 8:36 PM, Niklas Semmler <nik...@inet.tu-berlin.de>
wrote:

Hello Flink Community,

I am working on a network scheduler and am currently reading Flink's code
to figure out how the data exchange works. It would be great if you could
help me with some of my issues and questions.

Basically I want to extract from flink the time when a data transmission
between two machines starts (1), their connection details (2), how much
data is involved (3) and when it ends (4).

So far I have understood that the scheduling of tasks is done via the
scheduleOrUpdateConsumers JobManagerMessage. In the function of the same
name in the class Execution I have been able to extract the IP/Port pair of
both the producer and the consumer(s) use.

Furthermore I understand that in the context of a "blocking" data
transmission Flink will first create a ResultPartition and store all the
data in the form of Buffers before starting the transmission. So in
principle I should be able to figure out what amount of data Flink will
communicate by looking at the respective
ResultSubpartition.totalNumberOfBytes, right?
However, in the process I would need to map each ResultSubpartition to a
slot or deployed task, so that I can associate this amount of data with
connection details of the sender and the receiver. Any hints on how to do
that?

Now from what I see the same is not possible in a "pipelined" context,
correct? Can anything be said about the data to be communicated?

Finally, I was unable to locate in the code and in the logs where a Task's
state is changing from RUNNING to FINISHED. Could you give me a pointer?

It would be great if you could share your insights on the problems above
;).

Best regards,
Niklas

--
PhD Student / Research Assistant
INET, TU Berlin
Room 4.029
Marchstr 23
10587 Berlin
Tel: +49 30 314 78752



--
PhD Student / Research Assistant
INET, TU Berlin
Room 4.029
Marchstr 23
10587 Berlin
Tel: +49 30 314 78752

Reply via email to