Oops, yep, AvroSource is implemented. Should append() and appendBatch() be 
declared as private? It looks like avro framework is fine working with publicly 
non-accessible methods.

On sources and sinks, I think I wasn't clear. I do think process() should be on 
sinks, what I was suggesting is that maybe that could be defined on the Sink 
interface instead of PollableSink. But if you think there could be sinks that 
don't need to do any processing in the future, I guess that doesn't apply.

I was actually thinking maybe getNext() : List<Events> could be defined on the 
source interface. On second thought though defining it on a base class as 
protected abstract might be even better. It's a little counter intuitive and I 
definitely could be missing something here, but here's my reasoning. From what 
I understand, the role of sources is to get events from somewhere and then put 
them on a channel. The thing that needs to be defined by custom implementations 
seems to be the ability to get events from somewhere. To put it in more 
concrete terms, I'm thinking of along the lines of

abstract class BaseSource {
  _channel;
  protected abstract List<Event> getNext();
  run() {
    _channel.put(getNext())
  }
}

Obviously overly simplified, in reality this could handle channel 
configurations, transaction logic, life cycle, instrumentation and more. Any 
concrete source implementation then, only has to implement getNext() and 
everything should work. I think this makes implementations a little simpler. I 
see almost the same transaction logic defined in all the current sources and I 
see a counter being incremented on successful channel put()s in almost all the 
current sources; that's suggesting to me they could be shifted up to a base 
class. This might also enforce a strong contract on what source implementations 
need to do which could reduce unwanted code paths and make the project a more 
maintainable.

Cheers,
Shu    
________________________________________
From: Eric Sammer [[email protected]]
Sent: Wednesday, December 07, 2011 12:33 PM
To: [email protected]
Cc: Basier Aziz; Robert Mahfoud; Robert Ragno
Subject: Re: early flume-ng feedback

Shu:

On Mon, Dec 5, 2011 at 4:30 PM, Shu Zhang <[email protected]> wrote:

> Hi all, I just took a first look at the alpha2 of flume-ng and thought I'd
> provide a little early high level feedback. First of all, thanks for doing
> this re-architecture, we (medio systems) have had problems with the OG and
> at first glance NG looks very promising and we're very excited to try it
> out.
>
> Ok feedback. Personally I would find it very helpful if
> Source/Sink/Channel were strongly defined. On the wiki, I see the line:
>
> "You still have sources and sinks and they still do the same thing. They
> are now connected by channels."
>
> I'm not finding that to be true. In OG, events are appended to Sinks via
> append(Event) and polled from Sources via next(). That is, some upstream
> component is responsible from getting events somehow and appending them to
> a Sink; some downstream component is responsible for getting events from a
> Source and processing it. For example, Driver is a special case, acting as
> the downstream component of a Source and an upstream component of a Sink;
> its processing is simply appending events to the downstream Sink.
>
> In NG, it seems like Sources map to upstream components of an OG Sink and
> Sinks map to downstream components of an OG Source. A channel maps to the
> combination of OG Sources and Sinks. That is, I see the high level modeling
> as:
> Sources - A component which puts events on a channel. (Implementation
> defines where those events come from).
> Sinks - A component which polls events from a channel and applies some
> processing on them. (Implementation defines processing)
> Channel - Transport between sources and sinks (Implementation defines
> durability, transport mechanism, etc.)
>
> Please let me know if I have the high level picture correctly.
>
> It seems to me, most Source/Sink/Channel do follow the above definition.
> But the avro stuff seems to be a major divergence. I'm a little confused
> about AvroSource; it doesn't seem to do much. It appears to be just a
> vanilla Source that let's you manually pass in events, which it'll then
> pass straight through to a channel. I'm guessing that's a work in progress?
>

It's actually complete (assuming you're looking at the tip of the flume-728
branch). The secret sauce there is that the AvroSource instantiates the
Avro RPC Tranceiver (in this context, NettyTranceiver) and creates a client
of interface AvroSourceProtocol. You'll see in that pair of lines (it's in
createConnection()) that we pass an instance of 'this' when we create the
server. AvroSource also implements AvroSourceProtocol. This is tells Avro
that we are the handler for this server and we implement the
AvroSourceProtocol (which defines the methods to which we respond). When
clients connect to the server port Avro verifies they're speaking
AvroSourceProtocol, handles deserialization, and invokes appendBatch() or
append() based on what the client called.

The AvroSink shows the other half of the equation where a client is created
and invokes the remote methods (or stubs of a proxy that do the work).


> Avro transport, seems to me, best modeled as an AvroChannel, which can
> link a source and sink which in turn defines where the events come from and
> what to do with them on the other side; that is modeling avro transport as
> a channel seems like it might provide more flexibility for less
> configuration. Also, modeling avro transport as a channel seems like it
> would make it easier to configure for different reliability levels through
> composition with other channels. The way AvroSink is written can work, but
> I see advantages in modeling avro transport as a channel instead, any
> thoughts?
>

See my previous email about the intentions behind channel implementations
and their role in NG. In OG, there was no channel; the source (or sink)
contained a queue somewhere, by convention, that filled this role. The
problem with that model - and the reason it was escalated to a first class
citizen - is because any slow down / blocking by the sink caused the source
to be exclusively held in a critical section where no other threads could
get at the queue. This prevented multithreaded work stealing sinks and
caused things like fan out to block on the slowest client. NG doesn't have
this problem.


>
> For the most part, I'm a big fan of the high level modeling in NG. One
> thing I want to bring up is the fact that channel has both put() and take()
> on it. I'm not seeing the case where the same component would want to both
> take() an event for processing and also put() an event on to the same
> channel, since that component has a good chance being the one that ends up
> take()ing the event back. Because of that, I think it could be a good idea
> to separate channel into 2 interfaces. I can see channel-like
> implementations, for which it's more difficult to implement both put() and
> take() and I don't see the need for both to be in every implementation
> (though most will be, and that's ok). I guess what I'm thinking is along
> the lines of
> interface ChannelPoller { take() }
> interface ChannelSender { put() }
> public class FileChannel implements ChannelReceivingSide,
> ChannelSendingSide {...}
> public class SomeSource { ChannelSender _channel; ... }
> public class SomeSink { ChannelPoller _channel; ... }
>
> One final thing is, if I have the right idea on the high level modeling
> then it seems like a method like process() should be defined on the sinks
> interface and a method like List<Event> getNext() should be defined on the
> sources interface, thoughts?


I agree that getNext() : List<Event> is a *much* nicer interface than a
process() that assumes the implementation Does The Right Thing(tm). I spent
quite a bit of time thinking about how to do this *while maintaining
durability* and not getting extremely complicated; I didn't see a way.
Here's the short version:

The source runner would do:

1. while (true) {
2.   events = source.getNext();
3.
4.   for (event : event) {
5.     channel.put(event);
6.   }
7. }

The problem is that the source would have to have already told the sender
of the event that it was successfully received. In other words, if Flume
were to die while execution is at line 3, the application would have
continued on its way and data would be lost. Flume OG currently has this
problem.

One solution would be to put the transaction boundary in the source runner
that controls this while loop but then the source doesn't know if the
transaction failed to commit; what response does it send?

The other option is to force all sources to handle their own while loop and
control, but that's a ton of repeated code and drastically increases the
complexity of authoring a source. Still more options include doing fancier
async back channels to let the source know the transaction completed or
failed but this is also very complicated for source developers. None of
these things are appealing. So, while the interface is ugly, it's extremely
robust. Of course I'm interested in things I'm not seeing here.


What mind a sink do if it doesn't have a process() defined?


You could have an event driven sink that reacts to the availability of data
in the channel. Such a sink requires a special runner that contains a
different while loop. I didn't necessarily think this was critical nor did
I have the impetus to implement it, but it would be an incremental
improvement of a polling sink runner. I didn't want to preclude such a
feature.


> Anyways thanks again for doing this work, I think it's very positive. I'll
> be talking to people internally about helping out, I think it could be good
> for all involved. I apologize if I've misunderstood anything or made any
> wrong assumptions. When we get around to testing it out, I'll get back to
> you guys on lower level issues.
>

This has been great Shu. No apologies necessary. I believe I speak for
everyone when I say feedback like this is greatly appreciated.
Contributions are even better!

Thanks!


> Cheers,
> Shu




--
Eric Sammer
twitter: esammer
data: www.cloudera.com

Reply via email to