Re: Passing records between two jobs

2018-06-20 Thread Fabian Hueske
Hi Avihai,

Rafi pointed out the two common approaches to deal with this situation. Let
me expand a bit on those.

1) Transactional producing in to queues: There are two approaches to
accomplish exactly-once producing into queues, 1) using a system with
transactional support such as Kafka or 2) maintaining a write-ahead buffer
in Flink state. In both cases, messages will be committed to the sink
system when a checkpoint completes, i.e., this method can add significant
latency depending on the checkpointing interval (which depends on state
size among other things).

2) Filtering based on a logical sequence number: This approach requires
deterministic sequence number assignment and deterministic routing of
records, i.e., a record should always go to into the same partition of the
queue to ensure that will always be fetched by the same source instance of
the consuming job (job 2 or 3 in your case). Deterministic assignment and
routing is tricky when records are shuffled. Flink operators emit records
in the order in which they were received, but shuffle them if they receive
records from different inputs (typically a keyBy).

Best,
Fabian


2018-06-20 9:30 GMT+02:00 Rafi Aroch :

> Hi Avihai,
>
> The problem is that every message queuing sink only provides at-least-once
>> guarantee
>>
>
> From what I see, possible messaging queue which guarantees exactly-once is
> Kafka 0.11, while using the Kafka transactional messaging
> 
>  feature.
> See here: https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/dev/connectors/kafka.html#kafka-011
>
> Another approach could be to de-dup events from the consuming job side.
> See here: https://github.com/jgrier/FilteringExample
>
> Hope this helps,
>
> Rafi
>
>
> On Mon, Jun 18, 2018 at 6:46 PM Avihai Berkovitz <
> avihai.berkov...@microsoft.com> wrote:
>
>> Hello,
>>
>>
>>
>> We are planning a system that will be comprised of 3 different jobs:
>>
>>1. Getting a stream of events, adding some metadata to the events,
>>and outputting them to a temporary message queue.
>>2. Performing some calculations on the events we got from job 1, as
>>required for product A.
>>3. Performing a different set of calculations of the events from job
>>1, for product B.
>>
>>
>>
>> All 3 jobs will be developed by different teams, so we don’t want to
>> create one massive job that does everything.
>>
>> The problem is that every message queuing sink only provides
>> at-least-once guarantee. If job 1 crashes and recovers, we will get the
>> same events in the queue and jobs 2 and 3 will process events twice. This
>> is obviously a problem, and I guess we are not the first to stumble upon it.
>>
>>
>>
>> Did anyone else had this issue? It seems to me like a fundamental problem
>> of passing data between jobs, so hopefully there are known solutions and
>> best practices. It would be great if you can share any solution.
>>
>>
>>
>> Thanks,
>>
>> Avihai
>>
>>
>>
>


Re: Passing records between two jobs

2018-06-20 Thread Rafi Aroch
Hi Avihai,

The problem is that every message queuing sink only provides at-least-once
> guarantee
>

>From what I see, possible messaging queue which guarantees exactly-once is
Kafka 0.11, while using the Kafka transactional messaging

feature.
See here:
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011

Another approach could be to de-dup events from the consuming job side. See
here: https://github.com/jgrier/FilteringExample

Hope this helps,

Rafi


On Mon, Jun 18, 2018 at 6:46 PM Avihai Berkovitz <
avihai.berkov...@microsoft.com> wrote:

> Hello,
>
>
>
> We are planning a system that will be comprised of 3 different jobs:
>
>1. Getting a stream of events, adding some metadata to the events, and
>outputting them to a temporary message queue.
>2. Performing some calculations on the events we got from job 1, as
>required for product A.
>3. Performing a different set of calculations of the events from job
>1, for product B.
>
>
>
> All 3 jobs will be developed by different teams, so we don’t want to
> create one massive job that does everything.
>
> The problem is that every message queuing sink only provides at-least-once
> guarantee. If job 1 crashes and recovers, we will get the same events in
> the queue and jobs 2 and 3 will process events twice. This is obviously a
> problem, and I guess we are not the first to stumble upon it.
>
>
>
> Did anyone else had this issue? It seems to me like a fundamental problem
> of passing data between jobs, so hopefully there are known solutions and
> best practices. It would be great if you can share any solution.
>
>
>
> Thanks,
>
> Avihai
>
>
>


Passing records between two jobs

2018-06-18 Thread Avihai Berkovitz
Hello,

We are planning a system that will be comprised of 3 different jobs:

  1.  Getting a stream of events, adding some metadata to the events, and 
outputting them to a temporary message queue.
  2.  Performing some calculations on the events we got from job 1, as required 
for product A.
  3.  Performing a different set of calculations of the events from job 1, for 
product B.

All 3 jobs will be developed by different teams, so we don't want to create one 
massive job that does everything.
The problem is that every message queuing sink only provides at-least-once 
guarantee. If job 1 crashes and recovers, we will get the same events in the 
queue and jobs 2 and 3 will process events twice. This is obviously a problem, 
and I guess we are not the first to stumble upon it.

Did anyone else had this issue? It seems to me like a fundamental problem of 
passing data between jobs, so hopefully there are known solutions and best 
practices. It would be great if you can share any solution.

Thanks,
Avihai