I thought you would like to join the non-matched elements with another
(third) stream.

--> s1.union(s2).keyBy().window().apply(//
outerjoin).keyBy.connect(s3.keyBy).coFlatMap(// backup join)

If you want to match the non-matched stream with itself a FlatMapFunction
is the right choice.

--> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(//
backup join)

The backup join puts all non-match elements in the state and waits for
another non-matched element with the same key to do the join.

Best, Fabian



2016-09-01 19:55 GMT+02:00 vinay patil <vinay18.pa...@gmail.com>:

> Yes, that's what I am looking for.
>
> But why to use CoFlatMapFunction , I have already got the
> matchingAndNonMatching Stream , by doing the union of two streams and
> having the logic in apply method for performing outer-join.
>
> I am thinking of applying the same key on matchingAndNonMatching and
> flatmap to take care of rest logic.
>
> Or are you suggestion to use Co-FlatMapFunction after the outer-join
> operation  (I mean after doing the window and
> getting matchingAndNonMatching stream )?
>
> Regards,
> Vinay Patil
>
> On Thu, Sep 1, 2016 at 11:38 AM, Fabian Hueske-2 [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8842&i=0>> wrote:
>
>> Thanks for the explanation. I think I understood your usecase.
>>
>> Yes, I'd go for the RocksDB approach in a CoFlatMapFunction on a keyed
>> stream (keyed by join key).
>> One input would be the unmatched outer join records, the other input
>> would serve the events you want to match them with.
>> Retrieving elements from RocksDB will be local and should be fast.
>>
>> You should be confident though, that all unmatched record will be picked
>> up at some point (RocksDB persists to disk, so you won't run out of memory
>> but snapshots size will increase).
>> The future state expiry feature will avoid such situations.
>>
>> Best, Fabian
>>
>> 2016-09-01 18:29 GMT+02:00 vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8837&i=0>>:
>>
>>> Hi Fabian,
>>>
>>> I had already used Co-Group function earlier but were getting some
>>> issues while dealing with watermarks (for one use case I was not getting
>>> the correct result), so I have used the union operator for performing the
>>> outer-join (WindowFunction on a keyedStream), this approach is working
>>> correctly and giving me correct results.
>>>
>>> As I have discussed the scenario, I want to maintain the non-matching
>>> records in some store, so that's why I was thinking of using RocksDB as a
>>> store here, where I will maintain the user-defined state  after the
>>> outer-join window operator, and I can query it using Flink to check if the
>>> value for a particular key is present or not , if present I can match them
>>> and send it downstream.
>>>
>>> The final goal is to have zero non-matching records, so this is the
>>> backup plan to handle edge-case scenarios.
>>>
>>> I have already integrated code to write to Cassandra using Flink
>>> Connector, but I think this will be a better option rather than hitting the
>>> query to external store since RocksDb will store the data to local TM disk,
>>> the retrieval will be faster here than Cassandra , right ?
>>>
>>> What do you think ?
>>>
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink User
>>> Mailing List archive.] <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=0>> wrote:
>>>
>>>> Hi Vinay,
>>>>
>>>> can you give a bit more detail about how you plan to implement the
>>>> outer join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream?
>>>>
>>>> An alternative could be to use a CoGroup operator which collects from
>>>> two inputs all elements that share a common key (the join key) and are in
>>>> the same window. The interface of the function provides two iterators over
>>>> the elements of both inputs and can be used to implement outer join
>>>> functionality. The benefit of working with a CoGroupFunction is that you do
>>>> not have to take care of state handling at all.
>>>>
>>>> In case you go for a custom implementation you will need to work with
>>>> operator state.
>>>> However, you do not need to directly interact with RocksDB. Flink is
>>>> taking care of that for you.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8832&i=0>>:
>>>>
>>>>> Hi Fabian/Stephan,
>>>>>
>>>>> Waiting for your suggestion
>>>>>
>>>>> Regards,
>>>>> Vinay Patil
>>>>>
>>>>> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email]
>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote:
>>>>>
>>>>>> Hi Fabian/Stephan,
>>>>>>
>>>>>> This makes things clear.
>>>>>>
>>>>>> This is the use case I have :
>>>>>> I am performing a outer join operation on the two streams (in window)
>>>>>> after which I get matchingAndNonMatchingStream, now I want to make sure
>>>>>> that the matching rate is high (matching cannot happen if one of the 
>>>>>> source
>>>>>> is not emitting elements for certain time) , so to tackle this situation 
>>>>>> I
>>>>>> was thinking of using RocksDB as a state Backend, where I will insert the
>>>>>> unmatched records in it (key - will be same as used for window and value
>>>>>> will be DTO ), so before inserting into it I will check if it is already
>>>>>> present in RocksDB, if yes I will take the data from it and send it
>>>>>> downstream (and ensure I perform the clean operation for that key).
>>>>>> (Also the data to store should be encrypted, encryption part can be
>>>>>> handled )
>>>>>>
>>>>>> so instead of using Cassandra , Can I do this using RocksDB as state
>>>>>> backend since the state is not gone after checkpointing ?
>>>>>>
>>>>>> P.S I have kept the watermark behind by 1500 secs just to be safe on
>>>>>> handling late elements but to tackle edge case scenarios like the one
>>>>>> mentioned above we are having a backup plan of using Cassandra as 
>>>>>> external
>>>>>> store since we are dealing with financial critical data.
>>>>>>
>>>>>> Regards,
>>>>>> Vinay Patil
>>>>>>
>>>>>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email]
>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote:
>>>>>>
>>>>>>> Hi Vinaj,
>>>>>>>
>>>>>>> if you use user-defined state, you have to manually clear it.
>>>>>>> Otherwise, it will stay in the state backend (heap or RocksDB) until
>>>>>>> the
>>>>>>> job goes down (planned or due to an OOM error).
>>>>>>>
>>>>>>> This is esp. important to keep in mind, when using keyed state.
>>>>>>> If you have an unbounded, evolving key space you will likely run
>>>>>>> out-of-memory.
>>>>>>> The job will constantly add state for each new key but won't be able
>>>>>>> to
>>>>>>> clean up the state for "expired" keys.
>>>>>>>
>>>>>>> You could implement a clean-up mechanism this if you implement a
>>>>>>> custom
>>>>>>> stream operator.
>>>>>>> However this is a very low level interface and requires solid
>>>>>>> understanding
>>>>>>> of the internals like timestamps, watermarks and the checkpointing
>>>>>>> mechanism.
>>>>>>>
>>>>>>> The community is currently working on a state expiry feature (state
>>>>>>> will be
>>>>>>> discarded if not requested or updated for x minutes).
>>>>>>>
>>>>>>> Regarding the second question: Does state remain local after
>>>>>>> checkpointing?
>>>>>>> Yes, the local state is only copied to the remote FS (HDFS, S3, ...)
>>>>>>> but
>>>>>>> remains in the operator. So the state is not gone after a checkpoint
>>>>>>> is
>>>>>>> completed.
>>>>>>>
>>>>>>> Hope this helps,
>>>>>>> Fabian
>>>>>>>
>>>>>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>:
>>>>>>>
>>>>>>> > Hi Stephan,
>>>>>>> >
>>>>>>> > Just wanted to jump into this discussion regarding state.
>>>>>>> >
>>>>>>> > So do you mean that if we maintain user-defined state (for
>>>>>>> non-window
>>>>>>> > operators), then if we do  not clear it explicitly will the data
>>>>>>> for that
>>>>>>> > key remains in RocksDB.
>>>>>>> >
>>>>>>> > What happens in case of checkpoint ? I read in the documentation
>>>>>>> that after
>>>>>>> > the checkpoint happens the rocksDB data is pushed to the desired
>>>>>>> location
>>>>>>> > (hdfs or s3 or other fs), so for user-defined state does the data
>>>>>>> still
>>>>>>> > remain in RocksDB after checkpoint ?
>>>>>>> >
>>>>>>> > Correct me if I have misunderstood this concept
>>>>>>> >
>>>>>>> > For one of our use we were going for this, but since I read the
>>>>>>> above part
>>>>>>> > in documentation so we are going for Cassandra now (to store
>>>>>>> records and
>>>>>>> > query them for a special case)
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > Regards,
>>>>>>> > Vinay Patil
>>>>>>> >
>>>>>>> > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=3>> wrote:
>>>>>>> >
>>>>>>> > > In streaming, memory is mainly needed for state (key/value
>>>>>>> state). The
>>>>>>> > > exact representation depends on the chosen StateBackend.
>>>>>>> > >
>>>>>>> > > State is explicitly released: For windows, state is cleaned up
>>>>>>> > > automatically (firing / expiry), for user-defined state, keys
>>>>>>> have to be
>>>>>>> > > explicitly cleared (clear() method) or in the future will have
>>>>>>> the option
>>>>>>> > > to expire.
>>>>>>> > >
>>>>>>> > > The heavy work horse for streaming state is currently RocksDB,
>>>>>>> which
>>>>>>> > > internally uses native (off-heap) memory to keep the data.
>>>>>>> > >
>>>>>>> > > Does that help?
>>>>>>> > >
>>>>>>> > > Stephan
>>>>>>> > >
>>>>>>> > >
>>>>>>> > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email]
>>>>>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=4>>
>>>>>>> > > wrote:
>>>>>>> > >
>>>>>>> > > > As per the docs, in Batch mode, dynamic memory allocation is
>>>>>>> avoided by
>>>>>>> > > > storing messages being processed in ByteBuffers via Unsafe
>>>>>>> methods.
>>>>>>> > > >
>>>>>>> > > > Couldn't find any docs  describing mem mgmt in Streamingn
>>>>>>> mode. So...
>>>>>>> > > >
>>>>>>> > > > - Am wondering if this is also the case with Streaming ?
>>>>>>> > > >
>>>>>>> > > > - If so, how does Flink detect that an object is no longer
>>>>>>> being used
>>>>>>> > and
>>>>>>> > > > can be reclaimed for reuse once again ?
>>>>>>> > > >
>>>>>>> > > > -roshan
>>>>>>> > > >
>>>>>>> > >
>>>>>>> >
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> ------------------------------
>>>>> View this message in context: Re: Streaming - memory management
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829.html>
>>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>>> archive
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>> at Nabble.com.
>>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>>> If you reply to this email, your message will be added to the
>>>> discussion below:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/Re-Streaming-memory-management-tp8829p8832.html
>>>> To start a new topic under Apache Flink User Mailing List archive.,
>>>> email [hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=1>
>>>> To unsubscribe from Apache Flink User Mailing List archive., click here
>>>> .
>>>> NAML
>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>>
>>>
>>>
>>> ------------------------------
>>> View this message in context: Re: Streaming - memory management
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8836.html>
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>> at Nabble.com.
>>>
>>
>>
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Re-Streaming-memory-management-tp8829p8837.html
>> To start a new topic under Apache Flink User Mailing List archive., email 
>> [hidden
>> email] <http:///user/SendEmail.jtp?type=node&node=8842&i=1>
>> To unsubscribe from Apache Flink User Mailing List archive., click here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Streaming - memory management
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-Streaming-memory-management-tp8829p8842.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Reply via email to