[
https://issues.apache.org/jira/browse/FLUME-3439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17612989#comment-17612989
]
Eiichi Sato commented on FLUME-3439:
------------------------------------
Thanks for the response!
{quote}The Channel only waits until events in the putList have been consumed.
{quote}
No, the commit() on a Source-side transaction is blocked until all the events
in the transaction are taken from the channel by Sink-side transactions *and*
the Sink-side transactions are successfully committed. If the machine crashes
before the Source calls commit() or while the Source is blocked on commit(),
the source will re-deliver the events. So, no events are lost.
In a simplest setup with a single Source (Thread 1) and a single Sink (Thread
2), the threads interact like below for a single batch of events:
{noformat}
1. [Thread 2] The Sink begins a transaction B.
2. [Thread 2][Transaction B] The Sink calls take() and the thread goes into
a sleep waiting for an event. The call doesn't return yet.
3. [Thread 1] The Source begins a transaction A.
4. [Thread 1][Transaction A] The Source calls put() 100 times to add 100
events to the Channel.
5. [Thread 1][Transaction A] The Source calls commit(), which (i) makes the
events visible from other tx, (ii) notifies other threads waiting on take(),
and (iii) goes into a sleep. The call doesn't return yet.
6. [Thread 2][Transaction B] ...resuming from 2. The Sink calls take() 79
times more and receives 80 events from the Channel.
7. [Thread 2][Transaction B] The Sink calls commit().
8. [Thread 2] The Sink begins a transaction C.
9. [Thread 2][Transaction C] The Sink calls take() 20 times and receives 20
events from the Channel. The 21th call to take() immediately returns null
because there are no more events available for now.
10. [Thread 2][Transaction C] The Sink calls commit(), which notifies the
Thread 1 to resume, now that the all the events for the transaction A is
consumed.
11. [Thread 1][Transaction A] ...resuming from 5. Now that the 100 events are
consumed, the commit() call returns.{noformat}
Even though sinks and sources run on different threads, they act synchronously.
{quote}The only difference I see between this and MemoryChannel is that the
MemoryChannel requires a limit on the number of events that can be queued
whereas this does not.
{quote}
Aside from the synchronous behavior described above, they are quite similar.
The reason I didn't add {{capacity}} option that limits the number of events
that can be queued is that I expect the number of events that will stay in
memory to be fairly low. Since a Source-side transaction is blocked until all
the events are consumed, the maximum number of events that reside in memory is
equal to the maximum number of simultaneous transactions multiplied by the
batch size. The number of simultaneous transactions doesn't exceed the number
of threads the Source(s) uses and it is usually limited or configurable. For
example,
* TaildirSource only uses a single thread, which means the maximum number of
events that stay in memory is its {{{}batchSize{}}}.
* AvroSource by default doesn't limit the number of threads, but it accepts
{{threads}} option to control the maximum number of worker threads it uses.
**
[https://github.com/apache/flume/blob/rel/1.10.1/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java#L108-L109]
While it is possible to impose a hard limit on the number of events by blocking
put() when the channel is full, I don't see much value in doing so.
> Introduce SynchronousChannel, a fast disk-less channel that doesn't lose
> events
> -------------------------------------------------------------------------------
>
> Key: FLUME-3439
> URL: https://issues.apache.org/jira/browse/FLUME-3439
> Project: Flume
> Issue Type: Improvement
> Components: Channel
> Reporter: Eiichi Sato
> Priority: Major
>
> Recently, I implemented
> [SynchronousChannel|https://github.com/eiiches/flume-synchronous-channel], in
> which every transaction that puts events waits for corresponding transactions
> that take the events to complete.
> * It's fast because it doesn't use disks.
> * It doesn't lose events because it doesn't actually store events. It has no
> capacity.
> Motivation behind this channel is that, when using a Taildir Source to
> collect logs and sending them to a remote Flume instance, we typically use
> File Channel or Memory Channel. Memory Channel is fast, but could lose
> events. File Channel is durable, but slow. Using a File Channel also means we
> are writing the same contents twice on the disk: first for a log file that
> Taildir Source is watching and secondly for the channel data. We don't need
> to buffer events in a channel because events are already there in a log file
> and Taildir Source can just read at its own pace.
> Expected use cases are:
> * Taildir Source --> Synchronous Channel --> Avro Sink
> * Kinesis Source --> Synchronous Channel --> Avro Sink
> * Cloud Pub/Sub Source --> Synchronous Channel --> Avro Sink
> In all these cases, the channel doesn't need to buffer events because the
> source already works like a buffer.
> In [this
> benchmark|https://github.com/eiiches/flume-synchronous-channel/tree/main/docs/benchmark]
> that uses Taildir Source + Synchronous Channel, I observed 84% increase in
> throughput and 75-81% reduction in CPU usage compared to File Channel when
> event body is 512-byte.
>
> ----
>
> The code is around 220 LOC (excluding tests) and doesn't pull additional
> third-party dependencies.
> I can work on a PR, but before doing so, I want a general feedback from the
> community. I'm wondering if this channel is useful or generic enough to be
> included in Flume or if this should be kept in a separate repository. What do
> you think?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]