Github user zentol commented on the pull request:

    https://github.com/apache/incubator-flink/pull/202#issuecomment-63110487
  
    ok, let's see...
    
    A) data transmission.
    
    Related class: Sender, Receiver and Streamer
    Sender and Receiver are low-level classes that deal with de-/serialization 
and reading/writing to/from the memory-mapped file.
    The Streamer class is one level above them, being the class a function will 
primarily uses.
    
    For example a map function (implemented as a map partition) would use it 
like this:
    
    @Override
    public void mapPartition(Iterable<IN> values, Collector<OUT> out) throws 
Exception {
        streamer.streamBufferWithoutGroups(values.iterator(), out);
    }
    
    As such the streamer basically decides what happens at runtime.
    It communicates with the external process via udp, exchanging signals when 
to read/write buffers.
    
    B) Plan Binding
    
    Related classes: PlanBinder, OperationInfo
    
    The PlanBinder converts data sent from the external process into an actual 
flink plan. The sent data has to follow a very specific scheme defined within 
the class.
    Lets say you have a plan with a single CSV source. In this case for the 
sources you would 
     1. send 1 as an integer, representing the total number of sources
     2. send "CSV" denoting the type of source
     3. send a unique ID integer
     4. send a tuple containing 
(filePath,lineDelimiter,fieldDelimiter,type1,...,typeX)
        the type arguments are exemplary objects, meaning that if you want to 
read Tuple2<Integer,Integer>, type1 and type2 would be integers
    
    These behaviours are completely defined for 
        parameters (DOP, local execution)
        data sources (CSV,TEXT,VALUE (fromElements())
        data sinks (CSV,TEXT,PRINT)
        broadcast variables
    
    operations are a bit different; but they follow the same pattern:
     1. send integer representing the total number of operations
     2. send operation identifier
     3. send all relevant arguments (this is partially implementee defined)
    
    between step 2(exclusive) and 3(inclusive) the following methods are called:
    createOperationInfo(identifier) (abstract)
    createXOperation(operationInfo)
    applyXOperation(parameter1,..., paremeterX, operationInfo) (abstract, only 
for UDF-operations)
    
    createOperationInfo is an abstract method with the purpose of creating an 
<? extends operationInfo> object
    containing all information necessary to apply an (UDF or non-UDF) operation.
    For a python map for example, such an object would contain 
     1. ID of set to apply map on
     2 serialized operator
     3. type argument
     4. some additional info :>
    
    This object is passed to createXOperation. It extracts certain parameters 
from the object based on the identifier(these would be things that you usually 
would use for this operation, a join would extract grouping keys among others 
for example)
    and calls applyXOperation, which actually applies the operation. It was 
constructed this way so that the implementee has the freedom to implement 
operations however he wants, like a map as a map partition. I was wondering 
whether i should remove createXOperation.
    
    To plug in python i had to very little on the java side:
     1. create functions (PythonMap, PythonCoGroup etc)
     2. extend Streamer and implement setupProcess (starting python process)
     3. extend OperationInfo (roughly 160 lines, given 16 operations thats 
about 10 each)
     4. extend PlanBinder (the abstract methods only took 100 lines)
    
    On the external side you mostly need code to 
     1. communicate with the stream (Connection.py)
     2. deserialize data following 3 different protocols(Iterator.py)
      1. <type1><record1><type2><record2>...<typeX><recordX>
      2. <type><record1><record2>...<recordX>
      3. <type1><type2><record1T1><record2T2><record3T1><record4T2>
     3. serialize data following 2 different protocols(Collector.py)
      1. <type1><record1><type2><record2>...<typeX><recordX>
      2. <type><record1><record2>...<recordX>
     4. PlanBinder counterpart, turn API calls into intermediate format and 
send it to java
    
    hmm...I'll leave it at that for now. If i keep writing too many in depth 
implementation details will slip in ^^
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to