[
https://issues.apache.org/jira/browse/KAFKA-1227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13880752#comment-13880752
]
Jay Kreps commented on KAFKA-1227:
----------------------------------
Hey Guozhang, thanks for the detailed stylistic questions. I think these are
important to discuss. Quick responses inline:
_1. How to we decide where to put Exception definitions? Currently we have an
errors folder in kafka.comm and some folders also have their only exceptions._
That package was meant to be explicitly API errors. I.e. those errors which are
defined in ErrorKeys.java with a registered error code and have a bidirectional
mapping to this code. These represent communication between the client and
server so I wanted to put them in a special place. In general exceptions should
be kept with the package with which they most naturally fit (ConfigException
goes with config, etc).
The most important code organization principle I had in mind was that each
package should be either public or private. Only public packages will be
javadoc'd. All classes in a public package are exposed to the user and are part
of the public interface. The idea is that we would be very careful with these
public packages. I considered actually differentiating these as something like
kafka.clients.producer.pub or something but I thought that was a bit
ugly--maybe there is another way to differentiate or annotate classes so that
we are very explicit about public or private. Essentially any change to
interfaces in these packages is breaking all our users so we have to think very
carefully about API design and change. The rest of the classes (most of them
actually) are really just an implementation detail and we can change them at
will.
Currently the public packages are just:
kafka.clients.producer
kafka.common
kafka.common.errors
One item I wanted to document and discuss as a separate thread was code
organization as I think these kinds of conventions only work if they are
documented and broadly understood.
2. Shall we merge kafka.comm.protocol and kafka.comm.request folders since the
requests definitions are highly dependent on the protocol class?
I would rather not. The kafka.common.network package defines a low-level
network framing based on size delimited messages. This is fully generic--the
unit test tests an "echo server". It is not tied to any details of our protocol
and it is really important that people not leak details of our protocol into
it!!!! :-)
The protocol is just a bunch of message definitions and isn't tied to the
network transport or framing at all. It is just a way of laying out bytes.
The request package combines the protocol definition and network framing.
I am hoping to keep these things orthogonal.
Once we get our build fixed (ahem), I'd really like us to get checkstyle
integrated so we can enforce these kinds of package dependencies and keep some
kind of logical coherence. It has been a struggle otherwise.
3. Shall we put Node, Cluster, Partition, TopicPartition in kafka.comm into one
sub-folder, for example, called kafka.comm.metadata?
I'm open to that. I liked having the current flat package for simplicity for
the user (fewer things to import). Basically I am trying to make the javadoc
for the producer as simple and flat as possible.
4. Shall we put the Serializer classes into the protocol folder?
The Serializer is the PUBLIC interface for users to serialize their messages.
It actually isn't related to the definition of our protocol definition.
5. Shall we move the kafka.clients.common.network sub-folder to kafka.common?
Yeah I think that is actually how it is. I previously had it separate and the
rationale was that many of the classes...e.g. the Selector were really written
with the clients in mind. Theoretically the same Selector class could be the
basis for the socket server but I didn't really think those use cases through.
1. Since the nextNode use global round robin, we need to make sure no more than
one objects access a single Cluster’s nextNode.
That may just be a bad name. The goal of that method was load balancing not
iterating over the nodes. So actually the intention was to give a different
node to each thread in the multithreaded case.
1. Shall we put config names such as ENCODING_CONFIG all in a single file?
I planned to do a discussion on config. The way it works is that configs are
defined by the ConfigDef. However we allow plug-in interfaces (Serializer,
Partitioner, etc). These may need configs too, but these are (generally
speaking) user classes. So we allow including user-defined configs. So
StringSerializer is essentially a user plug-in that seemed useful enough to
include in the main code. I think it makes more sense to document it's configs
with the class rather than elsewhere.
— kafka.common.AbstractIterator
1. makeNext is not supposed to left in other states other than DONE and READY?
Yeah this is basically a transliteration of the same class in the main code
base which is a transliteration of the iterator in Google Collections.
1. kafka.common.protocl.Schema: Will Field order difference make to different
schemas?
Yes our protocol is based on position not name.
1. kafka.common.protocl.ProtoUtil: parseMetadataResponse: after reading the
function I feel that the TopicInfo/PartitionInfo object for parsing might be
preferable. We can put these objects in the Protocol.java file so any protocol
change would only require one file edit.
I'd like to have a discussion about this. I tried this approach. The problem is
that the mixing of protocol definition with business logic leads to leaking of
logic into the protocol and makes the protocol hard to read. There are several
other options. It would be good to discuss.
1. kafka.common.record.LogEntry: Maybe we can rename to OffsetRecord?
Hmm, I'm open to changing it but I'm not sure that's better. I try to avoid
names like TopicPartition which are just the concatenation of all the fields as
I feel the purpose of a name is to capture the concept the fields describe.
I.e. if we add a new field we shouldn't need to lengthen the name!
1. kafka.common.record.Record: Do we expect MIN_HEADER_SIZE and RECORD_OVERHEAD
to be different in the future? Currently their values are the same and the way
they are computed are also identical.
Good point I'll look into this, these are just copied from the scala.
1. kafka.common.request.RequestHeader: Is it better to define "client_id"
strings as static field in the Protocol.java?
Unlikely. Currently you can access a field by the string name or the field
instance. The field instance is an array access and the name is a hash table
lookup to get the field followed by an array access. So the two reasonable
options are to have static variables for the Fields or to access with Strings.
Let's procrastinate that to the discussion of handling request definitions.
2. kafka.client.common.NetworkReceive: Does REQUEST/RESPONSE_HEADER also need
to be versioned?
If we want to change it. We didn't have a version number for these in the
protocol so we can't add one now. Any header change today is non-backwards
compatible.
1. In the first constructor, why not also initializing the size buffer also to
ByteBuffer.allocate(4)?
The point of the size buffer is to read the size to allocate and read the
message buffer, but that constructor takes an already allocated/read message
buffer. I was using that constructor for unit testing to fake responses that
weren't really being read.
2. Why NetworkReceive not extending ByteBufferReceive?
Yes this distressed me as well. Here is the issue. I want
ByteBufferSend/NetworkSend to work on an array of bytebuffers to handle the
case where you already have serialized chunks of data (i.e. message sets). But
in the case of a receive we don't currently have a good way to concatenate
buffers so reading into multiple buffers isn't useful. This is basically a
limitation of the ByteBuffer api.
1. kafka.client.common.Selector: “transmissions.send.remaining() <= 0”, under
what condition can remaining() be < 0?
None, I think.
2. “if (trans != null) this.disconnected.add(trans.id); “, should it be trans
== null?
Well that would give a null pointer, no? What this is saying is "if we have an
id for this connection, record it as disconnected".
1. kafka.client.producer.internals.BufferPoolIn the freeUp() function, should
use this.free.pollLast().capacity() instead of limit()?
Yeah that would be better, technically anything on the free list must have
capacity==limit but I think it is bad to depend on that.
2. What is the rational of having just one poolable size?
Basically just to avoid implementing malloc. I guess the choice is either to
implement a generic BufferPool or one specific to the needs of the producer.
Since most of the time the producer will be doing allocations of that poolable
size it makes sense to keep those around. If you try to keep arbitrary sizes I
think things quickly get complex but since we will almost always be allocating
the same size it seemed simpler to just handle that case. I'm open to
generalizing it if it isn't too complex.
— kafka.clients.producer.internals.Metadata
1. After configs are added, we need to remove the hard-coded default values. So
for all of these places we could leave a TODO mark for now.
Yup.
— kafka.clients.producer.internals.ProduceRequestResult
1. Its member fields are dependent on Protocol.java, so once we change the
protocol we would probably also need to change this file.
I don't believe this is dependent on the protocol, maybe you can elaborate?
1. kafka.clients.producer.internals.RecordAccumulatorTypo: “Get a list of
topic-partitions which are ready to be send.”
Ack.
— kafka.clients.producer.internals.Sender
1. One corner case we may need to consider is the following: if a partition
becomes not available, and producer keep sending data to this partition, then
later on this partition could exhaust the memory, keeping other partitions to
not able to take more messages but block waiting.
If I understand the case you are describing you are saying that the producer
could use up the full buffer on a partition which is not available and the
producer will then block. This is correct and that is the intention. This
shouldn't block the sender, though, it will keep trying to send until the
partition becomes available again. I think this is what you want: you can
buffer for a while but eventually must either block or drop data if memory is
bounded.
2. In handling dis-connection, the ProduceRequestResult will set the exception,
and if await() is called this exception will be thrown and the callback not be
executed. Since this exception is already stored in the RecordSend I think a
better way is not throw exception on await() but let the callback function to
handle it. That would make the application code more clean since otherwise the
application need so try-catch the await() call.
I think the callback is always executed...if there is a case this doesn't
happen it is a bug.
I agree that
if(result.hasError())
// do something
is easier to read. The problem is that it is incumbant on you to check and if
you don't it silently fails. This is the complaint people have about mongodb.
The principle I am going on is that
producer.send(message).await()
should be exactly interchangable with a blocking call. Anyhow I am sympathetic
to your point, let's move it into the public api discussion.
3. In closing the producer, there is another corner case that the io thread can
keep trying to send the rest of the data and failed. Probably we could add
another option to drop whatever is in the buffer and let the callback functions
of the application to handle them.
I think what you are saying is that close() blocks until all data is sent. That
is the intention. Since send is async I think it is counterintuitive to
fail/drop in-progress calls as the user may not know their calls aren't
completed.
> Code dump of new producer
> -------------------------
>
> Key: KAFKA-1227
> URL: https://issues.apache.org/jira/browse/KAFKA-1227
> Project: Kafka
> Issue Type: New Feature
> Reporter: Jay Kreps
> Assignee: Jay Kreps
> Attachments: KAFKA-1227.patch
>
>
> The plan is to take a dump of the producer code "as is" and then do a series
> of post-commit reviews to get it into shape. This bug tracks just the code
> dump.
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)