How to use ApiVersionRequest and ApiVersionResponse

2017-10-09 Thread Hari Krishna
Hi All

I have a requirement where based on the kafka version or compatible api in
the

 broker , we will be dynamically loading the best suited kafka client jar.
I bumped into

ApiVersionRequest and ApiVersionResponse and even went through some code to

in NetworkClient class , but I was not able to find any expilicit Api's in
the client

which could determine whether it is compatible with kafka broker it needs
to

communicate . Any pointers regarding this would be greatly appreciated.

Regards
Hari


Re: Consumer service that supports retry with exponential backoff

2017-10-09 Thread Stephen Powis
Pretty much the implementation that we're using, exception using partitions
within a single topic because we have a low number of failures passing
through. Granted the first partition will take the brunt of the load vs the
higher order partitions.

On Tue, Oct 10, 2017 at 7:30 AM, Steven Schlansker <
sschlans...@opentable.com> wrote:

>
> > On Oct 9, 2017, at 2:41 PM, John Walker  wrote:
> >
> > I have a pair of services.  One dispatches commands to the other for
> > processing.
> >
> > My consumer sometimes fails to execute commands as a result of transient
> > errors.  To deal with this, commands are retried after an exponentially
> > increasing delay up to a maximum of 4 times.  (Delays: 1hr, 2hr, 4hr,
> 8hr.)
> >  What's the standard way to set up something like this using Kafka?
> >
> > The only solution I've found so far is to setup 5 topics (main_topic,
> > delayed_1hr, delayed_2hr, delayed_4hr,delayed_8hr), and then have pollers
> > that poll each of these topics, enforce delays, and escalate messages
> from
> > one topic to another if errors occur.
>
> Our approach for a similar scheduling problem was to assign each command a
> unique key and a "valid after" date.  Any time you fail to execute a
> command,
> you update the "valid after" with your exponential backoff algorithm, and
> produce
> the updated value over the same key.  Old versions are removed by
> compaction.
>
> Biggest downside is now your queue is no longer ordered, but for our
> problem
> in practice the number of pending messages is relatively small so we simply
> scan all outstanding messages every delivery interval to see if any is
> eligible
> for a retry.
>
>


Re: Consumer service that supports retry with exponential backoff

2017-10-09 Thread Steven Schlansker

> On Oct 9, 2017, at 2:41 PM, John Walker  wrote:
> 
> I have a pair of services.  One dispatches commands to the other for
> processing.
> 
> My consumer sometimes fails to execute commands as a result of transient
> errors.  To deal with this, commands are retried after an exponentially
> increasing delay up to a maximum of 4 times.  (Delays: 1hr, 2hr, 4hr, 8hr.)
>  What's the standard way to set up something like this using Kafka?
> 
> The only solution I've found so far is to setup 5 topics (main_topic,
> delayed_1hr, delayed_2hr, delayed_4hr,delayed_8hr), and then have pollers
> that poll each of these topics, enforce delays, and escalate messages from
> one topic to another if errors occur.

Our approach for a similar scheduling problem was to assign each command a
unique key and a "valid after" date.  Any time you fail to execute a command,
you update the "valid after" with your exponential backoff algorithm, and 
produce
the updated value over the same key.  Old versions are removed by compaction.

Biggest downside is now your queue is no longer ordered, but for our problem
in practice the number of pending messages is relatively small so we simply
scan all outstanding messages every delivery interval to see if any is eligible
for a retry.



signature.asc
Description: Message signed with OpenPGP using GPGMail


Fwd: Consumer service that supports retry with exponential backoff

2017-10-09 Thread John Walker
I have a pair of services.  One dispatches commands to the other for
processing.

My consumer sometimes fails to execute commands as a result of transient
errors.  To deal with this, commands are retried after an exponentially
increasing delay up to a maximum of 4 times.  (Delays: 1hr, 2hr, 4hr, 8hr.)
  What's the standard way to set up something like this using Kafka?

The only solution I've found so far is to setup 5 topics (main_topic,
delayed_1hr, delayed_2hr, delayed_4hr,delayed_8hr), and then have pollers
that poll each of these topics, enforce delays, and escalate messages from
one topic to another if errors occur.

Thanks in advance for any pointers you guys can give me,

-John


Re: Serve interactive queries from standby replicas

2017-10-09 Thread Stas Chizhov
Hi,

I have created a ticker: https://issues.apache.org/jira/browse/KAFKA-6031

Best regards,
Stas.

2017-10-06 23:39 GMT+02:00 Guozhang Wang :

> Hi Stas,
>
> Would you mind creating a JIRA for this functionality request so that we
> won't forget about it and drop on the floor?
>
>
> Guozhang
>
> On Fri, Oct 6, 2017 at 1:10 PM, Stas Chizhov  wrote:
>
> > Thank you!
> >
> > I guess eventually consistent reads might be a reasonable trade off if
> you
> > can get ability to serve reads without downtime in some cases.
> >
> > By the way standby replicas are just extra consumers/processors of input
> > topics? Or is there  some custom protocol for sinking the state?
> >
> >
> >
> > fre 6 okt. 2017 kl. 20:03 skrev Matthias J. Sax :
> >
> > > No, that is not possible.
> > >
> > > Note: standby replicas might "lag" behind the active store, and thus,
> > > you would get different results if querying standby replicas would be
> > > supported.
> > >
> > > We might add this functionality at some point though -- but there are
> no
> > > concrete plans atm. Contributions are always welcome of course :)
> > >
> > >
> > > -Matthias
> > >
> > > On 10/6/17 4:18 AM, Stas Chizhov wrote:
> > > > Hi
> > > >
> > > > Is there a way to serve read read requests from standby replicas?
> > > > StreamsMeatadata does not seem to provide standby end points as far
> as
> > I
> > > > can see.
> > > >
> > > > Thank you,
> > > > Stas
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: java.lang.NoSuchMethodError: scala.collection.GenTraversableOnce.$init

2017-10-09 Thread Hari Krishna
Hi Amy

Could you check in your pom if any other dependencies defined in the pom is
downloading different version of scala-library jar?. Any chance you are
running kafka from eclipse?. If so sometimes if you are using scala from
eclipse once u add scala nature it inherently picks up its own version of
scala to its build path this can also potentially cause this library
version mismatch.

Regards
Hari

On Tue, Oct 10, 2017 at 12:00 AM,  wrote:

> I'm trying to use kafka_2.11-0.11.* version but it fails with
> java.lang.NoSuchMethodError: scala.collection.GenTraversableOnce.$init.
> I don't have Scala installed on my Mac to cause any conflict.  Anyone has
> seen this error?
>
> [2017-10-09 11:21:23,846] INFO binding to port 0.0.0.0/0.0.0.0:2181
> (org.apache.zookeeper.server.NIOServerCnxnFactory)
> [2017-10-09 11:21:24,054] FATAL  (kafka.Kafka$)
> java.lang.NoSuchMethodError: scala.collection.GenTraversabl
> eOnce.$init$(Lscala/collection/GenTraversableOnce;)V
> at kafka.message.MessageSet.(MessageSet.scala:72)
> at kafka.message.ByteBufferMessageSet.(ByteBufferMessageS
> et.scala:129)
> at kafka.message.MessageSet$.(MessageSet.scala:32)
> at kafka.message.MessageSet$.(MessageSet.scala)
> at kafka.server.Defaults$.(KafkaConfig.scala:50)
> at kafka.server.Defaults$.(KafkaConfig.scala)
> at kafka.server.KafkaConfig$.(KafkaConfig.scala:673)
> at kafka.server.KafkaConfig$.(KafkaConfig.scala)
> at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStar
> table.scala:28)
> at kafka.Kafka$.main(Kafka.scala:58)
> at kafka.Kafka.main(Kafka.scala)
>
>
>


java.lang.NoSuchMethodError: scala.collection.GenTraversableOnce.$init

2017-10-09 Thread amy . roh
I'm trying to use kafka_2.11-0.11.* version but it fails with 
java.lang.NoSuchMethodError: scala.collection.GenTraversableOnce.$init.  
I don't have Scala installed on my Mac to cause any conflict.  Anyone 
has seen this error?


[2017-10-09 11:21:23,846] INFO binding to port 0.0.0.0/0.0.0.0:2181 
(org.apache.zookeeper.server.NIOServerCnxnFactory)

[2017-10-09 11:21:24,054] FATAL  (kafka.Kafka$)
java.lang.NoSuchMethodError: 
scala.collection.GenTraversableOnce.$init$(Lscala/collection/GenTraversableOnce;)V

at kafka.message.MessageSet.(MessageSet.scala:72)
at 
kafka.message.ByteBufferMessageSet.(ByteBufferMessageSet.scala:129)

at kafka.message.MessageSet$.(MessageSet.scala:32)
at kafka.message.MessageSet$.(MessageSet.scala)
at kafka.server.Defaults$.(KafkaConfig.scala:50)
at kafka.server.Defaults$.(KafkaConfig.scala)
at kafka.server.KafkaConfig$.(KafkaConfig.scala:673)
at kafka.server.KafkaConfig$.(KafkaConfig.scala)
at 
kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)

at kafka.Kafka$.main(Kafka.scala:58)
at kafka.Kafka.main(Kafka.scala)




Re: subscription

2017-10-09 Thread Matthias J. Sax
See http://kafka.apache.org/contact

On 10/9/17 8:27 AM, Emanuele Ianni wrote:
> subscription
> 



signature.asc
Description: OpenPGP digital signature


How to use ApiVersionRequest and ApiVersionResponse

2017-10-09 Thread Hari Krishna
Hi All

I have a requirement where based on the kafka version or compatible api in
the

 broker , we will be dynamically loading the best suited kafka client jar.
I bumped into

ApiVersionRequest and ApiVersionResponse and even went through some code to

in NetworkClient class , but I was not able to find any expilicit Api's in
the client

which could determine whether it is compatible with kafka broker it needs
to

communicate . Any pointers regarding this would be greatly appreciated.

Regards
Hari


subscription

2017-10-09 Thread Emanuele Ianni
subscription

-- 
Distinti Saluti
Emanuele Ianni

Le informazioni e gli allegati contenuti in questa e-mail sono considerati
confidenziali e possono essere riservati. Qualora non foste il
destinatario, siete pregati di distruggere questo messaggio e notificarmi
il problema immediatamente. In ogni caso, non dovrete spedire a terze parti
questa e-mail. Vi ricordo che la diffusione, l'utilizzo e/o la
conservazione dei dati ricevuti per errore costituiscono violazione alle
disposizioni del D.L. n. 196/2003 denominato "Codice in materia di
protezione dei dati personali"
Tale disclaimer non vale nel caso il messaggio sia in una mailing list
pubblica.

The information in this e.mail and in any attachments is confidential and
may be privileged. If you are not the intended recipient, please destroy
this message and notify the sender immediately. You should not retain, copy
or use this e.mail for any purpose, nor disclose all or any part of its
contents to any other person according to the Italian Legislative Decree n.
196/2003.
This disclaimer should be not considered if the message is on a public
mailing list.


I meet some problem i wanna use kafka connect oracle and hdfs then use hive。 message:org.apache.hadoop.hive.serde2.avro.AvroSerdeException Invalid precision or scale for decimal type

2017-10-09 Thread han cheryl
this is my oracle table

CREATE TABLE
T_JR_CCQK
(
CC_ID INTEGER NOT NULL,
CC_USER_FK INTEGER,
CC_DEPT_FK INTEGER,
CC_YEAR INTEGER,
CC_MOUTH INTEGER,
CC_DAY INTEGER,
CC_STATES INTEGER,
CC_ZCD VARCHAR2(80),
CC_CCD VARCHAR2(80),
CONSTRAINT P_KEY_JR_CCQK PRIMARY KEY (CC_ID)
);


i use kafka-connector-jdbc。the topic data like this
{"CC_ID":"\u0001`","CC_USER_FK":{"bytes":"\u001Ed"},"CC_DEPT‌​_FK":{"bytes":"\u000‌​3I"},"CC_YEAR":{"byt‌​es":"\u0007à"},"CC_M‌​OUTH":{"bytes":"\f"}‌​,"CC_DAY":{"bytes":"‌​\u0012"},"CC_STATES"‌​:{"bytes":"\u0002"},‌​"CC_ZCD":{"string":"‌​泰安市"},"CC_CCD":{"str‌​ing":"海口市"}}


Re: kafka broker loosing offsets?

2017-10-09 Thread Dmitriy Vsekhvalnov
Hi tao,

we had unclean leader election enabled at the beginning. But then disabled
it and also reduced 'max.poll.records' value.  It helped little bit.

But after today's testing there is strong correlation between lag spike and
what broker we crash. For lowest ID (100) broker :
  1. it always at least 1-2 orders higher lag
  2. we start getting

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with
max.poll.records.

  3. sometime re-consumption from random position

And when we crashing other brokers (101, 102), it just lag spike of ~10Ks
order, settle down quite quickly, no consumer exceptions.

Totally lost what to try next.

On Sat, Oct 7, 2017 at 2:41 AM, tao xiao  wrote:

> Do you have unclean leader election turned on? If killing 100 is the only
> way to reproduce the problem, it is possible with unclean leader election
> turned on that leadership was transferred to out of ISR follower which may
> not have the latest high watermark
> On Sat, Oct 7, 2017 at 3:51 AM Dmitriy Vsekhvalnov  >
> wrote:
>
> > About to verify hypothesis on monday, but looks like that in latest
> tests.
> > Need to double check.
> >
> > On Fri, Oct 6, 2017 at 11:25 PM, Stas Chizhov 
> wrote:
> >
> > > So no matter in what sequence you shutdown brokers it is only 1 that
> > causes
> > > the major problem? That would indeed be a bit weird. have you checked
> > > offsets of your consumer - right after offsets jump back - does it
> start
> > > from the topic start or does it go back to some random position? Have
> you
> > > checked if all offsets are actually being committed by consumers?
> > >
> > > fre 6 okt. 2017 kl. 20:59 skrev Dmitriy Vsekhvalnov <
> > > dvsekhval...@gmail.com
> > > >:
> > >
> > > > Yeah, probably we can dig around.
> > > >
> > > > One more observation, the most lag/re-consumption trouble happening
> > when
> > > we
> > > > kill broker with lowest id (e.g. 100 from [100,101,102]).
> > > > When crashing other brokers - there is nothing special happening, lag
> > > > growing little bit but nothing crazy (e.g. thousands, not millions).
> > > >
> > > > Is it sounds suspicious?
> > > >
> > > > On Fri, Oct 6, 2017 at 9:23 PM, Stas Chizhov 
> > wrote:
> > > >
> > > > > Ted: when choosing earliest/latest you are saying: if it happens
> that
> > > > there
> > > > > is no "valid" offset committed for a consumer (for whatever reason:
> > > > > bug/misconfiguration/no luck) it will be ok to start from the
> > beginning
> > > > or
> > > > > end of the topic. So if you are not ok with that you should choose
> > > none.
> > > > >
> > > > > Dmitriy: Ok. Then it is spring-kafka that maintains this offset per
> > > > > partition state for you. it might also has that problem of leaving
> > > stale
> > > > > offsets lying around, After quickly looking through
> > > > > https://github.com/spring-projects/spring-kafka/blob/
> > > > > 1945f29d5518e3c4a9950ba82135420dfb61e808/spring-kafka/src/
> > > > > main/java/org/springframework/kafka/listener/
> > > > > KafkaMessageListenerContainer.java
> > > > > it looks possible since offsets map is not cleared upon partition
> > > > > revocation, but that is just a hypothesis. I have no experience
> with
> > > > > spring-kafka. However since you say you consumers were always
> active
> > I
> > > > find
> > > > > this theory worth investigating.
> > > > >
> > > > >
> > > > > 2017-10-06 18:20 GMT+02:00 Vincent Dautremont <
> > > > > vincent.dautrem...@olamobile.com.invalid>:
> > > > >
> > > > > > is there a way to read messages on a topic partition from a
> > specific
> > > > node
> > > > > > we that we choose (and not by the topic partition leader) ?
> > > > > > I would like to read myself that each of the __consumer_offsets
> > > > partition
> > > > > > replicas have the same consumer group offset written in it in it.
> > > > > >
> > > > > > On Fri, Oct 6, 2017 at 6:08 PM, Dmitriy Vsekhvalnov <
> > > > > > dvsekhval...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Stas:
> > > > > > >
> > > > > > > we rely on spring-kafka, it  commits offsets "manually" for us
> > > after
> > > > > > event
> > > > > > > handler completed. So it's kind of automatic once there is
> > constant
> > > > > > stream
> > > > > > > of events (no idle time, which is true for us). Though it's not
> > > what
> > > > > pure
> > > > > > > kafka-client calls "automatic" (flush commits at fixed
> > intervals).
> > > > > > >
> > > > > > > On Fri, Oct 6, 2017 at 7:04 PM, Stas Chizhov <
> schiz...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > You don't have a

Re: How to reduce time for consumer to take over?

2017-10-09 Thread Stevo Slavić
Hello Christian,

Assuming that you are using new Java kafka-clients with Kafka's consumer
group management, you'd have to tune consumers and broker.

See
- Broker config https://kafka.apache.org/documentation/#brokerconfigs
-- group.initial.rebalance.delay.ms
-- group.max.session.timeout.ms
-- group.min.session.timeout.ms
- New consumer config
https://kafka.apache.org/documentation/#newconsumerconfigs
-- heartbeat.interval.ms
-- session.timeout.ms

Notice, like always, there's a tradeoff involved.

Kind regards,
Stevo Slavic.


On Mon, Oct 9, 2017 at 2:29 PM, Christian Schneider  wrote:

> I have the following case:
>
> 1. I have 1 consumer that starts processing a message, then crashes
> (Java-VM shuts down)
> 2. I start a second consumer that should process the message instead.
>
> It seems that it takes about 60 seconds for the second consumer to take
> over the processing.
> Can this be speeded up? I use this in a test and would like to make that
> test faster.
>
> Christian
>
> --
> --
> Christian Schneider
> http://www.liquid-reality.de
>  46&URL=http%3a%2f%2fwww.liquid-reality.de>
>
> Computer Scientist
> http://www.adobe.com
>


How to reduce time for consumer to take over?

2017-10-09 Thread Christian Schneider
I have the following case:

1. I have 1 consumer that starts processing a message, then crashes
(Java-VM shuts down)
2. I start a second consumer that should process the message instead.

It seems that it takes about 60 seconds for the second consumer to take
over the processing.
Can this be speeded up? I use this in a test and would like to make that
test faster.

Christian

-- 
-- 
Christian Schneider
http://www.liquid-reality.de


Computer Scientist
http://www.adobe.com


Re: NPE on ConsumerRecords$ConcatenatedIterable$1.makeNext() while iterating records

2017-10-09 Thread Manikumar
Hi,

Can you reproduce the error? Is it happening at the same offset every time?
Try to reproduce with the console-consumer tool.

You can raise JIRA issue here.
https://issues.apache.org/jira/projects/KAFKA

On Mon, Oct 9, 2017 at 3:00 PM, Michael Keinan 
wrote:

> Thank you for your response.
> No consumer interceptor involved
>
> Michael
>
>
>
>
> On Oct 8, 2017, at 7:04 PM, Ted Yu mailto:yu
> zhih...@gmail.com>> wrote:
>
> Was there any consumer interceptor involved ?
>
> Cheers
>
> On Sun, Oct 8, 2017 at 6:29 AM, Michael Keinan  mailto:micha...@capitolis.com>>
> wrote:
>
> Hi
> Using Kafka 0.10.2.0
> I get a NPE while iterating the records after polling them using poll
> method.
> - Any idea where does it come from ?
> - How can I open an issue to Kafka team ?
>
> Stacktrace:
> java.lang.NullPointerException
> at org.apache.kafka.clients.consumer.ConsumerRecords$
> ConcatenatedIterable$1.makeNext(ConsumerRecords.java:112)
> at org.apache.kafka.clients.consumer.ConsumerRecords$
> ConcatenatedIterable$1.makeNext(ConsumerRecords.java:101)
> at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(
> AbstractIterator.java:79)
> at org.apache.kafka.common.utils.AbstractIterator.hasNext(
> AbstractIterator.java:45)
> at com.capitolis.messagespersistency.KafkaListener.listen(
> KafkaListener.java:104)
>
> My code
>
>
>
> while (true) {
>ConsumerRecords records = consumer.poll(timeout);
>
> (Error occurs here—>)for (ConsumerRecord record :
> records) {
>String value = record.value();
>
>}
>
>
>
>
>
>
> Michael Keinan
>
>
>
>
>
>
>
>
>


Re: NPE on ConsumerRecords$ConcatenatedIterable$1.makeNext() while iterating records

2017-10-09 Thread Michael Keinan
Thank you for your response.
No consumer interceptor involved

Michael




On Oct 8, 2017, at 7:04 PM, Ted Yu 
mailto:yuzhih...@gmail.com>> wrote:

Was there any consumer interceptor involved ?

Cheers

On Sun, Oct 8, 2017 at 6:29 AM, Michael Keinan 
mailto:micha...@capitolis.com>>
wrote:

Hi
Using Kafka 0.10.2.0
I get a NPE while iterating the records after polling them using poll
method.
- Any idea where does it come from ?
- How can I open an issue to Kafka team ?

Stacktrace:
java.lang.NullPointerException
at org.apache.kafka.clients.consumer.ConsumerRecords$
ConcatenatedIterable$1.makeNext(ConsumerRecords.java:112)
at org.apache.kafka.clients.consumer.ConsumerRecords$
ConcatenatedIterable$1.makeNext(ConsumerRecords.java:101)
at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(
AbstractIterator.java:79)
at org.apache.kafka.common.utils.AbstractIterator.hasNext(
AbstractIterator.java:45)
at com.capitolis.messagespersistency.KafkaListener.listen(
KafkaListener.java:104)

My code



while (true) {
   ConsumerRecords records = consumer.poll(timeout);

(Error occurs here—>)for (ConsumerRecord record :
records) {
   String value = record.value();

   }






Michael Keinan










Re: The idea of "composite key" to make log compaction more flexible - question / proposal

2017-10-09 Thread Michal Michalski
Hey Jay,

Thanks for reply. Yes, this should do the job.

We were thinking about something that's abstracting away this logic from
user (e.g. the same way Cassandra handles its PK definitions in CQL -
"hiding" the row key and optional clustering key behind the concept of
"primary key"), but introducing such design obviously has some pros/cons
and non-trivial implications, so if using the partitioner interface is the
way to go in Kafka - we'll use it :-)

Thanks,
Michał


On 5 October 2017 at 15:22, Jay Kreps  wrote:

> I think you can do this now by using a custom partitioner, no?
>
> https://kafka.apache.org/0110/javadoc/org/apache/kafka/
> clients/producer/Partitioner.html
>
> -Jay
>
> On Mon, Oct 2, 2017 at 6:29 AM Michal Michalski <
> michal.michal...@zalando.ie>
> wrote:
>
> > Hi,
> >
> > TL;DR: I'd love to be able to make log compaction more "granular" than
> just
> > per-partition-key, so I was thinking about the concept of a "composite
> > key", where partitioning logic is using one part of the key, while
> > compaction uses the whole key - is this something desirable / doable /
> > worth a KIP?
> >
> > Longer story / use case:
> >
> > I'm currently a member of a team working on a project that's using a
> bunch
> > of applications to ingest data to the system (one "entity type" per app).
> > Once ingested by each application, since the entities are referring to
> each
> > other, they're all published to a single topic to ensure ordering for
> later
> > processing stages. Because of the nature of the data, for a given set of
> > entities related together, there's always a single "master" / parent"
> > entity, which ID we're using as the partition key; to give an example:
> > let's say you have "product" entity which can have things like "media",
> > "reviews", "stocks" etc. associated with it - product ID will be the
> > partition key for *all* these entities. However, with this approach we
> > simply cannot use log compaction because having e.g. "product", "media"
> and
> > "review" events, all with the same partition key "X", means that
> compaction
> > process will at some point delete all but one of them, causing a data
> loss
> > - only a single entity with key "X" will remain (and that's absolutely
> > correct - Kafka doesn't "understand" what does the message contain).
> >
> > We were thinking about introducing something we internally called
> > "composite key". The idea is to have a key that's not just a single
> String
> > K, but a pair of Strings: (K1, K2). For specifying the partition that the
> > message should be sent to, K1 would be used; however, for log compaction
> > purposes, the whole (K1, K2) would be used instead. This way, referring
> to
> > the example above, different entities "belonging" to the same "master
> > entity" (product), could be published to that topic with composite keys:
> > (productId, "product"), (productId, "media") and (productId, "review"),
> so
> > they all end up in single partition (specified by K1, which is always:
> > productId), but they won't get compacted together, because the K2 part is
> > different for them, making the whole "composite key" (K1, K2) different.
> Of
> > course K2 would be optional, so for someone who only needs the default
> > behaviour nothing would change.
> >
> > Since I'm not a Kafka developer and I don't know its internals that
> well, I
> > can't say if this idea is technically feasible or not, but I'd think it
> is
> > - I'd be more afraid of the complexity around backwards compatibility
> etc.
> > and potential performance implications of such change.
> >
> > I know that similar behaviour is achievable by using the producer API
> that
> > allows explicitly specifying the partition ID (and the key), but I think
> > it's a bit "clunky" (for each message, generate a key that this message
> > should normally be using [productId] and somehow "map" that key into a
> > partition X; then send that message to this partition X, *but* use the
> > "compaction" key instead [productId, entity type] as the message key) and
> > it's something that could be abstracted away from the user.
> >
> > Thoughts?
> >
> > Question to Kafka users: Is this something that anyone here would find
> > useful? Is anyone here dealing with similar problem?
> >
> > Question to Kafka maintainers: Is this something that you could
> potentially
> > consider a useful feature? Would it be worth a KIP? Is something like
> this
> > (technically) doable at all?
> >
> > --
> > Kind regards,
> > Michał Michalski
> >
>