Re: Kafka connect JDBC DB SCNs

2016-02-24 Thread Chris Riccomini
Given how messed up MySQL is, in particular, it seems to me like adding a
config that can do a loobback (either ID or timestamp-based) would we
useful, and pretty straight forward to add. This will at least help in
preventing data loss, at the expense of duplicate rows. If people *really*
care about not losing data, they can configure a second connector to do
bulk loads on a more periodic basis. Thoughts?

On Wed, Feb 24, 2016 at 12:41 AM, Chris Riccomini 
wrote:

> Hey all,
>
> Some more details on my question. My concern about using even IDs on
> immutable tables as a way to replicate data seems to be confirmed, at least
> for InnoDB if not configured properly. This page:
>
> http://dev.mysql.com/doc/refman/5.7/en/innodb-auto-increment-handling.html
>
> Describes how auto increment IDs can get out of order. I have replicated
> this issue on my local machine (see example below the fold) with all three
> innodb_autoinc_lock_modes (1, 2, and 3). They all show the same results.
>
> As far as I can tell, at least with MySQL+InnoDB, it seems to me that the
> JDBC connector has no way to configure it such that it isn't at risk of
> dropping data on incremental DB polls (whether you use incrementing,
> timestamp, or timestamp+incrementing).
>
> I'm curious if anyone has any thoughts on this. Again, the link I showed
> in the above email suggests that this is a problem with Oracle as well. Are
> people seeing data loss with the JDBC driver on high-write-load tables? I
> would be surprised if they weren't.
>
> Several solutions covered in that PDF are of interest:
>
> 1. The look-back based approach (NOW() - 1 minute, or MAX(id) - 100). This
> will lead to significant duplication in the Kafka topic. Even with log
> compaction enabled, this will be annoying to users that are reading the
> feed in realtime.
> 2. Some kind of pre-query assignment. Essentially a field that's set to
> NULL on insert. Every update sets the field back to NULL. An async process
> periodically wakes up and sets the field to a monotonically increasing ID.
> In the PDF, they suggest SCN for Oracle. MySQL doesn't easily expose this,
> but I think UNIX_TIMESTAMP() would suffice as long as the updates don't
> happen too frequently.
>
> For (2), one idea would be to add the ability int he JDBC connector to add
> a pre-query, which could be executed just before the SELECT. In this mode,
> you could execute an UPDATE my_table SET scn = UNIX_TIMESTAMP() WHERE scn =
> NULL. As far as I can tell, with MySQL, this would involve using a trigger
> to update the `scn` field back to NULL whenever an update on the row occurs.
>
> Cheers,
> Chris
>
> --
>
>  Start with an empty table
>
> mysql> select * from funding_instructions;
>
> Empty set (0.00 sec)
>
>  Create a transaction, and insert a row with state=2, but don't commit
> (this is done in another terminal).
>
> mysql> begin;
>
> Query OK, 0 rows affected (0.00 sec)
>
> mysql> insert into funding_instructions (state) VALUES (2);
>
> Query OK, 1 row affected, 4 warnings (0.00 sec)
>
>  Outside of the transaction in the main terminal, insert a second row
> with state=3
>
> mysql> insert into funding_instructions (state) VALUES (3);
>
> Query OK, 1 row affected, 4 warnings (0.00 sec)
>
>  Now you see only the second row that was inserted. Note the ID.
>
> mysql> select * from funding_instructions;
>
>
> +--+---+--++--+---+--+-+-+-++--+-+
>
> | id   | state | type | amount | currency | batch | litle_id |
> modify_time | create_time | version | payment_id | mid  |
> funds_transfer_request_date |
>
>
> +--+---+--++--+---+--+-+-+-++--+-+
>
> | 25607888 | 3 |0 |  0 | XXX  | NULL  | NULL |
>   0 |   0 |   0 |  0 | NULL |
> NULL |
>
>
> +--+---+--++--+---+--+-+-+-++--+-+
>
> 1 row in set (0.00 sec)
>
>  Now commit the first transaction.
>
> mysql> commit;
>
> Query OK, 0 rows affected (0.00 sec)
>
>  Now look at your results. Note that the smaller ID now appears. This
> breaks strict ID-based replication, even if the table is append-only.
>
> mysql> select * from funding_instructions;
>
>
> +--+---+--++--+---+--+-+-+-++--

Re: Kafka connect JDBC DB SCNs

2016-02-24 Thread Chris Riccomini
Hey all,

Some more details on my question. My concern about using even IDs on
immutable tables as a way to replicate data seems to be confirmed, at least
for InnoDB if not configured properly. This page:

http://dev.mysql.com/doc/refman/5.7/en/innodb-auto-increment-handling.html

Describes how auto increment IDs can get out of order. I have replicated
this issue on my local machine (see example below the fold) with all three
innodb_autoinc_lock_modes (1, 2, and 3). They all show the same results.

As far as I can tell, at least with MySQL+InnoDB, it seems to me that the
JDBC connector has no way to configure it such that it isn't at risk of
dropping data on incremental DB polls (whether you use incrementing,
timestamp, or timestamp+incrementing).

I'm curious if anyone has any thoughts on this. Again, the link I showed in
the above email suggests that this is a problem with Oracle as well. Are
people seeing data loss with the JDBC driver on high-write-load tables? I
would be surprised if they weren't.

Several solutions covered in that PDF are of interest:

1. The look-back based approach (NOW() - 1 minute, or MAX(id) - 100). This
will lead to significant duplication in the Kafka topic. Even with log
compaction enabled, this will be annoying to users that are reading the
feed in realtime.
2. Some kind of pre-query assignment. Essentially a field that's set to
NULL on insert. Every update sets the field back to NULL. An async process
periodically wakes up and sets the field to a monotonically increasing ID.
In the PDF, they suggest SCN for Oracle. MySQL doesn't easily expose this,
but I think UNIX_TIMESTAMP() would suffice as long as the updates don't
happen too frequently.

For (2), one idea would be to add the ability int he JDBC connector to add
a pre-query, which could be executed just before the SELECT. In this mode,
you could execute an UPDATE my_table SET scn = UNIX_TIMESTAMP() WHERE scn =
NULL. As far as I can tell, with MySQL, this would involve using a trigger
to update the `scn` field back to NULL whenever an update on the row occurs.

Cheers,
Chris

--

 Start with an empty table

mysql> select * from funding_instructions;

Empty set (0.00 sec)

 Create a transaction, and insert a row with state=2, but don't commit
(this is done in another terminal).

mysql> begin;

Query OK, 0 rows affected (0.00 sec)

mysql> insert into funding_instructions (state) VALUES (2);

Query OK, 1 row affected, 4 warnings (0.00 sec)

 Outside of the transaction in the main terminal, insert a second row
with state=3

mysql> insert into funding_instructions (state) VALUES (3);

Query OK, 1 row affected, 4 warnings (0.00 sec)

 Now you see only the second row that was inserted. Note the ID.

mysql> select * from funding_instructions;

+--+---+--++--+---+--+-+-+-++--+-+

| id   | state | type | amount | currency | batch | litle_id |
modify_time | create_time | version | payment_id | mid  |
funds_transfer_request_date |

+--+---+--++--+---+--+-+-+-++--+-+

| 25607888 | 3 |0 |  0 | XXX  | NULL  | NULL |
  0 |   0 |   0 |  0 | NULL |
NULL |

+--+---+--++--+---+--+-+-+-++--+-+

1 row in set (0.00 sec)

 Now commit the first transaction.

mysql> commit;

Query OK, 0 rows affected (0.00 sec)

 Now look at your results. Note that the smaller ID now appears. This
breaks strict ID-based replication, even if the table is append-only.

mysql> select * from funding_instructions;

+--+---+--++--+---+--+-+-+-++--+-+

| id   | state | type | amount | currency | batch | litle_id |
modify_time | create_time | version | payment_id | mid  |
funds_transfer_request_date |

+--+---+--++--+---+--+-+-+-++--+-+

| 25607887 | 2 |0 |  0 | XXX  | NULL  | NULL |
  0 |   0 |   0 |  0 | NULL |
NULL |

| 25607888 | 3 |0 |  0 | XXX  | NULL  | NULL |
  0 |   0 |   0 |  0 | NULL |
NULL |

+--+---+--++--+---+--+-+-+-++--+-----+

On Tue, Feb 23, 2016 at 4:26 PM, Chris Riccomini 
wrote:

> Hey all,
>
> I was reviewing the Kafka connect JDBC driver, and I had a question. Is it
> possible to use the JDBC driver with a look-back con

Kafka connect JDBC DB SCNs

2016-02-23 Thread Chris Riccomini
Hey all,

I was reviewing the Kafka connect JDBC driver, and I had a question. Is it
possible to use the JDBC driver with a look-back configured? The reason
that I ask is that there are some known issues with using a modified
timestamp:

Slide 14 here explains one with Oracle:

https://qconsf.com/sf2007/dl/QConSF2007/slides/public/JeanLucVaillant_LinkedIn.pdf?path=/QConSF2007/slides/public/JeanLucVaillant_LinkedIn.pdf

There is also some SCN-related discussion here:

https://github.com/linkedin/databus/wiki/Databus-for-MySQL

Though that is more specific to MySQL.

I am concerned that using insert IDs might not even be good enough
(assuming my tables were immutable, which they're not), since I believe
some DB storage systems might have the same issue. I think InnoDB's pkey
auto increment ID commit order is even tunable based on config.

I would rather get some duplicates than lose data if at all possible. Can I
configure the JDBC driver to subtract some number from the offset to
prevent (or drastically reduce) lost data?

Cheers,
Chris


Re: KOYA vs. Samza?

2015-01-16 Thread Chris Riccomini
Hey Otis,

I think the key phrase in Samza's description is:

"Apache Samza is a distributed stream processing framework"

Kafka is not a stream processing framework. It's a message queue/broker
system. Stream processing frameworks (e.g. Storm, Spark Streaming, Samza,
etc) use message queueing/brokering systems to pass messages within the
stream processing framework.

Samza is more akin to Storm or Spark Streaming. KOYA is just putting Kafka
brokers in a YARN grid.

At least, that's my understanding.

Cheers,
Chris

On 1/16/15 4:05 PM, "Otis Gospodnetic"  wrote:

>Hm.  My understanding was that both are aimed at basically the same thing
>-
>Kafka on YARN.  From Samza site:
>"Apache Samza is a distributed stream processing framework. It uses Apache
>Kafka <http://kafka.apache.org/> for messaging, and Apache Hadoop YARN
><http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.h
>tml>
>to
>provide fault tolerance, processor isolation, security, and resource
>management."
>
>And KOYA:
>"KOYA is a YARN application that launches Kafka within YARN. It then
>manages the resource negotiation with Resource Manager, and ensures that
>Kafka operates in a YARN native way. For an external publisher or
>subscriber, KOYA would not look any different than Kafka since the same
>code is being run as a YARN application."
>
>It looks like they were never mentioned together until now :)
>
>Otis
>--
>Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>Solr & Elasticsearch Support * http://sematext.com/
>
>
>On Fri, Jan 16, 2015 at 3:34 PM, Chris Riccomini <
>criccom...@linkedin.com.invalid> wrote:
>
>> Hey Otis,
>>
>> I'm not terribly familiar with KOYA, but my understanding is that it's a
>> tool for deploying Kafka brokers to YARN, and administering them. I
>>don't
>> think that it has any stream processing functionality built into it. As
>> such, it seems to me that KOYA and Samza could be used together: you
>>could
>> use KOYA to deploy Kafka in YARN, and Samza to read/write messages from
>> the brokers that have been deployed.
>>
>> Samza provides containers that have consumers/producers in them, and
>>allow
>> you to plug in processing logic as new messages arrive. Samza's goal is
>>to
>> provide features that are useful when you're processing the messages,
>>such
>> as fault tolerance (restarting consumers/producers when they fail),
>> checkpointing (saving offsets), state management (if you're counting
>> messages, you want to make sure your count is accurate even if you
>>fail),
>> etc.
>>
>> Cheers,
>> Chris
>>
>> On 1/16/15 12:14 PM, "Otis Gospodnetic" 
>> wrote:
>>
>> >Hi,
>> >
>> >I was wondering if anyone can compare and contrast KOYA and Samza?
>> >
>> >Thanks,
>> >Otis
>> >--
>> >Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> >Solr & Elasticsearch Support * http://sematext.com/
>>
>>



Re: KOYA vs. Samza?

2015-01-16 Thread Chris Riccomini
Hey Otis,

I'm not terribly familiar with KOYA, but my understanding is that it's a
tool for deploying Kafka brokers to YARN, and administering them. I don't
think that it has any stream processing functionality built into it. As
such, it seems to me that KOYA and Samza could be used together: you could
use KOYA to deploy Kafka in YARN, and Samza to read/write messages from
the brokers that have been deployed.

Samza provides containers that have consumers/producers in them, and allow
you to plug in processing logic as new messages arrive. Samza's goal is to
provide features that are useful when you're processing the messages, such
as fault tolerance (restarting consumers/producers when they fail),
checkpointing (saving offsets), state management (if you're counting
messages, you want to make sure your count is accurate even if you fail),
etc.

Cheers,
Chris

On 1/16/15 12:14 PM, "Otis Gospodnetic"  wrote:

>Hi,
>
>I was wondering if anyone can compare and contrast KOYA and Samza?
>
>Thanks,
>Otis
>--
>Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>Solr & Elasticsearch Support * http://sematext.com/



Control messages in Kafka

2014-09-10 Thread Chris Riccomini
Hey Guys,

The current transactionality proposal 
(https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka)
 and implementation use control messages to handle transactions in Kafka. Kafka 
traditionally hasn't had control messages in its topics. Transactionality (as 
it's implemented) introduces this pattern, but appears to do so in a very 
specific fashion (control messages only for transactions).

It seems to me that a good approach to control messages would be to generalize 
the control message model in Kafka to support not just transaction control 
messages, but arbitrary control messages? On the producer side, arbitrary 
control messages should be allowed to be sent, and on the consumer side, these 
control messages should be dropped by default.

Just like transactionality, this would let frameworks (e.g. Samza) and other 
app-specific implementations take advantage of in-topic control messages (as 
opposed to out of band control messages) without any impact on existing 
consumers.

Thoughts?

Cheers,
Chris


Re: New Consumer API discussion

2014-03-03 Thread Chris Riccomini
Hey Guys,

Also, for reference, we'll be looking to implement new Samza consumers
which have these APIs:

http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
g/apache/samza/system/SystemConsumer.html

http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
g/apache/samza/checkpoint/CheckpointManager.html


Question (3) below is a result of having Samza's SystemConsumers poll
allow specific topic/partitions to be specified.

The split between consumer and checkpoint manager is the reason for
question (12) below.

Cheers,
Chris

On 3/3/14 10:19 AM, "Chris Riccomini"  wrote:

>Hey Guys,
>
>Sorry for the late follow up. Here are my questions/thoughts on the API:
>
>1. Why is the config String->Object instead of String->String?
>
>2. Are these Java docs correct?
>
>  KafkaConsumer(java.util.Map configs)
>  A consumer is instantiated by providing a set of key-value pairs as
>configuration and a ConsumerRebalanceCallback implementation
>
>There is no ConsumerRebalanceCallback parameter.
>
>3. Would like to have a method:
>
>  poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
>TopicPartition... topicAndPartitionsToPoll)
>
>I see I can effectively do this by just fiddling with subscribe and
>unsubscribe before each poll. Is this a low-overhead operation? Can I just
>unsubscribe from everything after each poll, then re-subscribe to a topic
>the next iteration. I would probably be doing this in a fairly tight loop.
>
>4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
>are use cases for decoupling "what to do when no offset exists" from "what
>to do when I'm out of range". I might want to start from smallest the
>first time I run, but fail if I ever get offset out of range.
>
>5. ENABLE_JMX could use Java docs, even though it's fairly
>self-explanatory.
>
>6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
>across all topic/partitions is useful. I believe it's per-topic/partition,
>right? That is, setting to 2 megs with two TopicAndPartitions would result
>in 4 megs worth of data coming in per fetch, right?
>
>7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
>Retry, or throw exception?
>
>8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
>fetch requests?
>
>9. What does SESSION_TIMEOUT_MS default to?
>
>10. Is this consumer thread-safe?
>
>11. How do you use a different offset management strategy? Your email
>implies that it's pluggable, but I don't see how. "The offset management
>strategy defaults to Kafka based offset management and the API provides a
>way for the user to use a customized offset store to manage the consumer's
>offsets."
>
>12. If I wish to decouple the consumer from the offset checkpointing, is
>it OK to use Joel's offset management stuff directly, rather than through
>the consumer's commit API?
>
>
>Cheers,
>Chris
>
>On 2/10/14 10:54 AM, "Neha Narkhede"  wrote:
>
>>As mentioned in previous emails, we are also working on a
>>re-implementation
>>of the consumer. I would like to use this email thread to discuss the
>>details of the public API. I would also like us to be picky about this
>>public api now so it is as good as possible and we don't need to break it
>>in the future.
>>
>>The best way to get a feel for the API is actually to take a look at the
>>javadoc<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc
>>/
>>doc/kafka/clients/consumer/KafkaConsumer.html>,
>>the hope is to get the api docs good enough so that it is
>>self-explanatory.
>>You can also take a look at the configs
>>here<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/do
>>c
>>/kafka/clients/consumer/ConsumerConfig.html>
>>
>>Some background info on implementation:
>>
>>At a high level the primary difference in this consumer is that it
>>removes
>>the distinction between the "high-level" and "low-level" consumer. The
>>new
>>consumer API is non blocking and instead of returning a blocking
>>iterator,
>>the consumer provides a poll() API that returns a list of records. We
>>think
>>this is better compared to the blocking iterators since it effectively
>>decouples the threading strategy used for processing messages from the
>>consumer. It is worth noting that the consumer is entirely single
>>threaded
>>and runs in the user thread. The advantage is that it can be easily
>>rewritten in less multi-threading-friendly languages. The

Re: New Consumer API discussion

2014-03-03 Thread Chris Riccomini
Hey Guys,

Sorry for the late follow up. Here are my questions/thoughts on the API:

1. Why is the config String->Object instead of String->String?

2. Are these Java docs correct?

  KafkaConsumer(java.util.Map configs)
  A consumer is instantiated by providing a set of key-value pairs as
configuration and a ConsumerRebalanceCallback implementation

There is no ConsumerRebalanceCallback parameter.

3. Would like to have a method:

  poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
TopicPartition... topicAndPartitionsToPoll)

I see I can effectively do this by just fiddling with subscribe and
unsubscribe before each poll. Is this a low-overhead operation? Can I just
unsubscribe from everything after each poll, then re-subscribe to a topic
the next iteration. I would probably be doing this in a fairly tight loop.

4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
are use cases for decoupling "what to do when no offset exists" from "what
to do when I'm out of range". I might want to start from smallest the
first time I run, but fail if I ever get offset out of range.

5. ENABLE_JMX could use Java docs, even though it's fairly
self-explanatory.

6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
across all topic/partitions is useful. I believe it's per-topic/partition,
right? That is, setting to 2 megs with two TopicAndPartitions would result
in 4 megs worth of data coming in per fetch, right?

7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
Retry, or throw exception?

8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
fetch requests?

9. What does SESSION_TIMEOUT_MS default to?

10. Is this consumer thread-safe?

11. How do you use a different offset management strategy? Your email
implies that it's pluggable, but I don't see how. "The offset management
strategy defaults to Kafka based offset management and the API provides a
way for the user to use a customized offset store to manage the consumer's
offsets."

12. If I wish to decouple the consumer from the offset checkpointing, is
it OK to use Joel's offset management stuff directly, rather than through
the consumer's commit API?


Cheers,
Chris

On 2/10/14 10:54 AM, "Neha Narkhede"  wrote:

>As mentioned in previous emails, we are also working on a
>re-implementation
>of the consumer. I would like to use this email thread to discuss the
>details of the public API. I would also like us to be picky about this
>public api now so it is as good as possible and we don't need to break it
>in the future.
>
>The best way to get a feel for the API is actually to take a look at the
>javadocdoc/kafka/clients/consumer/KafkaConsumer.html>,
>the hope is to get the api docs good enough so that it is
>self-explanatory.
>You can also take a look at the configs
>here/kafka/clients/consumer/ConsumerConfig.html>
>
>Some background info on implementation:
>
>At a high level the primary difference in this consumer is that it removes
>the distinction between the "high-level" and "low-level" consumer. The new
>consumer API is non blocking and instead of returning a blocking iterator,
>the consumer provides a poll() API that returns a list of records. We
>think
>this is better compared to the blocking iterators since it effectively
>decouples the threading strategy used for processing messages from the
>consumer. It is worth noting that the consumer is entirely single threaded
>and runs in the user thread. The advantage is that it can be easily
>rewritten in less multi-threading-friendly languages. The consumer batches
>data and multiplexes I/O over TCP connections to each of the brokers it
>communicates with, for high throughput. The consumer also allows long poll
>to reduce the end-to-end message latency for low throughput data.
>
>The consumer provides a group management facility that supports the
>concept
>of a group with multiple consumer instances (just like the current
>consumer). This is done through a custom heartbeat and group management
>protocol transparent to the user. At the same time, it allows users the
>option to subscribe to a fixed set of partitions and not use group
>management at all. The offset management strategy defaults to Kafka based
>offset management and the API provides a way for the user to use a
>customized offset store to manage the consumer's offsets.
>
>A key difference in this consumer also is the fact that it does not depend
>on zookeeper at all.
>
>More details about the new consumer design are
>hereRewrite+Design>
>
>Please take a look at the new
>APIkafka/clients/consumer/KafkaConsumer.html>and
>give us any thoughts you may have.
>
>Thanks,
>Neha



Re: New Producer Public API

2014-01-29 Thread Chris Riccomini
Hey Guys,

My 2c.

1. RecordSend is a confusing name to me. Shouldn't it be
RecordSendResponse?
2. Random nit: it's annoying to have the Javadoc info for the contstants
on 
http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.h
tml, but the string constant values on
http://empathybox.com/kafka-javadoc/constant-values.html#kafka.clients.prod
ucer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG. Find myself toggling between
the two a lot. Not sure if this can be fixed easily.
3. MAX_PARTITION_SIZE_CONFIG - this name is kind of confusing.
Specifically, use of the term "partition". thought it was related to Kafka
topic partitions, not grouping together/batching.
4. METADATA_FETCH_TIMEOUT_CONFIG - what happens if this timeout is
exceeded? Do we get an exception on send()?
5. METADATA_REFRESH_MS_CONFIG - why is this useful? Has to do with acks=0,
right? Worth documenting, I think.
6. PARTITIONER_CLASS_CONFIG - link to partitioner interface in javadocs.
Also, missing a period.
7. KafkaProducer.html - send() documentation says "archive" when you mean
achieve, I think.
8. No javadoc for ProduceRequestResult.
9. In ProduceRequestResult, I understand baseOffset to be the first offset
of the set. Is it possible to get the last offset, as well? If I send
messages A, B, C, D, I'm most interested in D's offset.
10. In ProduceRequestResult, prefer Java-bean style (getError,
isCompleted).
11. At first glance, I like option 1A in your serialization list.
12. We should definitely not introduce a ZK dependency for bootstrapping
broker host/ports.
13. No favor on the Future discussion. I really^Int.Max hate checked
exceptions, but I also like standard interfaces. It's a wash in my book.


Cheers,
Chris

On 1/29/14 10:34 AM, "Neha Narkhede"  wrote:

>>> The challenge of directly exposing ProduceRequestResult is that the
>offset
>provided is just the base offset and there is no way to know for a
>particular message where it was in relation to that base offset because
>the
>batching is transparent and non-deterministic.
>
>That's a good point. I need to look into the code more closely to see if
>it
>is possible to expose
>something like Future send(...) where RequestResult has the
>right metadata
>as well as helper APIs that the user would want. For example
>
>Future messageResponse;
>try {
>  messageResponse = send(...)
>} catch(InterruptedException ie) {
>} catch(ExecutionException ee) {
>}
>
>if(messageResponse.hasError())
>  // handle error
>else {
>   String topic = messageResponse.topic();
>   int partition = messageResponse.partition();
>   long offset = messageResponse.offset();   // can this offset return the
>absolute offset instead of just the relative offset?
>   ...
>}
>
>I could've missed some reasons why we can't do the above. I just think
>that
>separating the future-like functionality of RecordSend
>from the actual response metadata could be useful while supporting Future
>at the same time.
>
>Thanks,
>Neha
>
>
>
>On Wed, Jan 29, 2014 at 10:23 AM, Tom Brown  wrote:
>
>> I strongly support the user of Future. In fact, the cancel method may
>>not
>> be useless. Since the producer is meant to be used by N threads, it
>>could
>> easily get overloaded such that a produce request could not be sent
>> immediately and had to be queued. In that case, cancelling should cause
>>it
>> to not actually get sent.
>>
>> --Tom
>>
>>
>> On Wed, Jan 29, 2014 at 11:06 AM, Jay Kreps  wrote:
>>
>> > Hey Neha,
>> >
>> > Error handling in RecordSend works as in Future you will get the
>> exception
>> > if there is one from any of the accessor methods or await().
>> >
>> > The purpose of hasError was that you can write things slightly more
>> simply
>> > (which some people expressed preference for):
>> >   if(send.hasError())
>> > // do something
>> >   long offset = send.offset();
>> >
>> > Instead of the more the slightly longer:
>> > try {
>> >long offset = send.offset();
>> > } catch (KafkaException e) {
>> >// do something
>> > }
>> >
>> >
>> > On Wed, Jan 29, 2014 at 10:01 AM, Neha Narkhede
>>> > >wrote:
>> >
>> > > Regarding the use of Futures -
>> > >
>> > > Agree that there are some downsides to using Futures but both
>> approaches
>> > > have some tradeoffs.
>> > >
>> > > - Standardization and usability
>> > > Future is a widely used and understood Java API and given that the
>> > > functionality that RecordSend hopes to provide is essentially that
>>of
>> > > Future, I think it makes sense to expose a widely understood public
>>API
>> > for
>> > > our clients. RecordSend, on the other hand, seems to provide some
>>APIs
>> > that
>> > > are very similar to that of Future, in addition to exposing a bunch
>>of
>> > APIs
>> > > that belong to ProduceRequestResult. As a user, I would've really
>> > preferred
>> > > to deal with ProduceRequestResult directly -
>> > > Future send(...)
>> > >
>> > > - Error handling
>> > > RecordSend's error handling is quite unintuitive where the user has
>>to
>> > > remember 

Re: Client improvement discussion

2013-07-27 Thread Chris Riccomini
Hey Jay,

Reading over the wiki (and email thread). Here are some questions/comments:

"Make the producer fully async to to allow issuing sends to all brokers
simultaneously and having multiple in-flight requests simultaneously. This
will dramatically reduce the impact of latency on throughput (which is
important with replication)."

Can you say a bit more about this? You're only talking about the async
case, right? If I set a producer to sync, acks=-1, producer.send() will
still block as expected, right?

"Move to server-side offset management will allow us to scale this
facility which is currently a big scalability problem for high-commit rate
consumers due to zk non scalability."

Just confirming that the proposal still allows us to store a K/V map (or
metadata), in addition to just offsets, right? This was in the older
proposal that I saw, but I just wanted to make sure. The consumer API
don't seem to reflect this.

"""
SendResponse r = producer.send(new KafkaMessage(topic, key, message));
r.onCompletion(new Runnable() {System.out.println("All done")})
r.getOffset()
r.getError()
"""

To block (wait on the future to return) in the new API, I have to either
call getOffset or getError? Or is onCompletion blocking?

"List messages = consumer.poll(timeout);"

Does it make sense to allow to poll only specific topics? If we move to an
epoll/selector model, would it be OK to just poll for certain topic
handles, and let the other handles sit there with their socket buffers
filling up? The use case that I'm thinking of is where you've registered
two topics, but only want to consume from one for a while. In the API
you've proposed, the consumer keeps feeding messages from the other topic,
which you then have to buffer (and potentially run out of memory). A
work-around is to have one consumer per-topic, in this case, but I'd
rather just let the OS-level socket buffer do the buffering, if that's
possible.

"The timeout the user specifies will be purely to ensure we have a
mechanism to give control back to the user even when no messages are
delivered. It is up to the user to ensure poll() is called again within
the heartbeat frequency set for the consumer group. Internally the timeout
on our select() may uses a shorter timeout to ensure the heartbeat
frequency is met even when no messages are delivered."

I don't think I understand this. What's the heartbeat you're talking about
here? Is this the consumer membership group heartbeat?

"We will introduce a set of RPC apis for managing partition assignment on
the consumer side ... This set of APIs is orthogonal to the APIs for
producing and consuming data, it is responsible for group membership."

I'm a little confused. I think what you're saying is the API for managing
partition assignment is totally generic, but the consumer is going to use
it to manage its partition groups, right? I could use it for other
partition assignment, if I wanted to, though, correct? Is this going to
require instantiating a consumer to use, or will there be some other
partition group API/connection/thingy that I can use?

"Create a new group with the given name and the specified minimum
heartbeat frequency. Return the id/host/port of the server acting as the
controller for that group.
If the ephemeral flag is set the group will disappear when the last client
exits."

What happens if create_group X is called more than once? Can this be used
for leadership election (first call creates the group, and is notified
that it' the leader)? There are a lot of gaps to fill in there, but it
could be a pretty useful feature.

"on receiving acknowledgements from all consumers of the group membership
change the controller sends the a group_changed message to all the new
group members"

Suppose that an existing consumer owns partition 7 for a given topic. A
new consumer joins the group, and the partition assignments work out such
that the new consumer should own partition 7. As I understand it, tho
means that the old consumer should stop consuming from partition 7 when
begin_group_change is sent, right? This means a) no one consumes partition
7 until consensus is gathered, and b) if the any consumer dies before its
ack is sent, you have to wait up to one full heartbeat before partition 7
is consumed again, correct? What kind of heartbeat do we expect to be
"normal"? 10s? 60s?

Also, how are offsets handled during this transition? Should the old
consumer checkpoint its offset for the topic/partition that it's
relinquishing? Are duplicate messages going to be consumed during this
transition (i.e. A messages is consumed by both the old and new consumer)?

Also^2, what exactly is specified in the begin_group_change notification?
If you don't handle any partition assignment inside Kafka, don't the
consumers all need to know who is in the group? The wiki list no fields
for this. It seems like you'd need something like [consumer1, consumer2,
conumer3], or something, to do deterministic ring/hash-based partition
a

Re: 0.8.0-beta1 is now available in public maven

2013-07-15 Thread Chris Riccomini
Hey Joe,

Awesome. I've updated the Jira:

https://issues.apache.org/jira/browse/KAFKA-974

Cheers,
Chris

On 7/15/13 11:24 AM, "Joe Stein"  wrote:

>hmmm, interesting
>
>I think the issue is that I had to-do multiple pushes to nexus to get this
>to work (unfortunately) and perhaps maven central did not overwrite like
>it
>did with apache rather appending and meshing the poms together.  The POM
>in
>maven central looks like a combination of what I was doing in
>https://issues.apache.org/jira/browse/KAFKA-974
>
>Please create a JIRA however I suspect when we do another release (either
>0.8.0-beta2 or 0.8.0) this will be correct and 1:1 from apache repo
>because
>I won't be doing multiple pushes like i had to-do here
>
>the weird parent block is required by apache when releasing
>http://www.apache.org/dev/publishing-maven-artifacts.html as I understand
>it and read the docs
>
>perhaps for 0.8.0-beta1 folks should just use the repo
>https://repository.apache.org/content/repositories/releases/ because
>matches the POM from the 0.8 branch
>
>great to be working all this stuff out in beta1 so we can have a nice
>clean
>0.8 release =8^)
>
>
>
>On Mon, Jul 15, 2013 at 2:12 PM, Chris Riccomini
>wrote:
>
>> Hey Guys,
>>
>> Digging into this more. Here's a fun fact: Maven Central's POM does not
>> match Apache release's.
>>
>>
>> 
>>http://search.maven.org/remotecontent?filepath=org/apache/kafka/kafka_2.9
>>.2
>> /0.8.0-beta1/kafka_2.9.2-0.8.0-beta1.pom
>>
>>
>> 
>>https://repository.apache.org/content/groups/public/org/apache/kafka/kafk
>>a_
>> 2.9.2/0.8.0-beta1/kafka_2.9.2-0.8.0-beta1.pom
>>
>> Notice in Maven central that it has two  blocks. That
>> doesn't exist in Apache release's. Weird.
>>
>> On a hunch, I explicitly added the Apache release repository as the
>>first
>> repo that Gradle uses:
>>
>>   allprojects {
>> repositories {
>>   maven {
>> url 'https://repository.apache.org/content/groups/public'
>>   }
>>   mavenCentral()
>> }
>>   }
>>
>>
>> This worked! I still have to manually add the exclusions:
>>
>>   compile("org.apache.kafka:kafka_$scalaVersion:$kafkaVersion") {
>> exclude module: 'jms'
>> exclude module: 'jmxtools'
>> exclude module: 'jmxri'
>>   }
>>
>>
>> After that, I got a successful build. I still think the POM is pretty
>> broken, though. Here are the issues I see:
>>
>> 1. Maven central can't resolve it properly (POM is different from Apache
>> release). Have to use Apache release repo directly to get things to
>>work.
>> 2. Exclusions must be manually applied even though they exist in Kafka's
>> POM already. I think Maven can handle this automatically, if the POM is
>> done right.
>> 3. Weird parent block in Kafka POMs that points to org.apache.
>> 4. Would be nice to publish kafka-test jars as well.
>> 5. Would be nice to have SNAPSHOT releases off of trunk using a Hudson
>>job.
>>
>> Shall I open Jiras for this stuff?
>>
>> Cheers,
>> Chris
>>
>> On 7/15/13 10:55 AM, "Chris Riccomini"  wrote:
>>
>> >Hey Guys,
>> >
>> >The problem persists, even when using the explicit URL Joe provided.
>> >
>> >I've also constructed a dummy Maven project, and tested Kafka
>>dependencies
>> >there. That worked, but I had to explicit write the  block.
>> >
>> >I might have to sick our local Gradle experts on this one...
>> >
>> >Cheers,
>> >Chris
>> >
>> >On 7/15/13 10:45 AM, "chetan conikee"  wrote:
>> >
>> >>I stumbled on a few issues when I was attempting to create a Scala
>>based
>> >>Client (Producer/Consumer) project.
>> >>
>> >>Finally worked after revisions:
>> >>
>> >>
>> >>Here's the snippet of by build.sbt
>> >>
>> >>~~
>> >>
>> >>/* scala versions and options */
>> >>scalaVersion := "2.9.2"
>> >>
>> >>
>> >>libraryDependencies +=
>> >>"org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1" excludeAll(
>> >>ExclusionRule(organization = "com.sun.jdmk"),
>> >>ExclusionRule(organization = "com.sun.jmx"),
>> >>

Re: 0.8.0-beta1 is now available in public maven

2013-07-15 Thread Chris Riccomini
Hey Guys,

Digging into this more. Here's a fun fact: Maven Central's POM does not
match Apache release's.


http://search.maven.org/remotecontent?filepath=org/apache/kafka/kafka_2.9.2
/0.8.0-beta1/kafka_2.9.2-0.8.0-beta1.pom


https://repository.apache.org/content/groups/public/org/apache/kafka/kafka_
2.9.2/0.8.0-beta1/kafka_2.9.2-0.8.0-beta1.pom

Notice in Maven central that it has two  blocks. That
doesn't exist in Apache release's. Weird.

On a hunch, I explicitly added the Apache release repository as the first
repo that Gradle uses:

  allprojects {
repositories {
  maven {
url 'https://repository.apache.org/content/groups/public'
  }
  mavenCentral()
}
  }


This worked! I still have to manually add the exclusions:

  compile("org.apache.kafka:kafka_$scalaVersion:$kafkaVersion") {
exclude module: 'jms'
exclude module: 'jmxtools'
exclude module: 'jmxri'
  }


After that, I got a successful build. I still think the POM is pretty
broken, though. Here are the issues I see:

1. Maven central can't resolve it properly (POM is different from Apache
release). Have to use Apache release repo directly to get things to work.
2. Exclusions must be manually applied even though they exist in Kafka's
POM already. I think Maven can handle this automatically, if the POM is
done right.
3. Weird parent block in Kafka POMs that points to org.apache.
4. Would be nice to publish kafka-test jars as well.
5. Would be nice to have SNAPSHOT releases off of trunk using a Hudson job.

Shall I open Jiras for this stuff?

Cheers,
Chris

On 7/15/13 10:55 AM, "Chris Riccomini"  wrote:

>Hey Guys,
>
>The problem persists, even when using the explicit URL Joe provided.
>
>I've also constructed a dummy Maven project, and tested Kafka dependencies
>there. That worked, but I had to explicit write the  block.
>
>I might have to sick our local Gradle experts on this one...
>
>Cheers,
>Chris
>
>On 7/15/13 10:45 AM, "chetan conikee"  wrote:
>
>>I stumbled on a few issues when I was attempting to create a Scala based
>>Client (Producer/Consumer) project.
>>
>>Finally worked after revisions:
>>
>>
>>Here's the snippet of by build.sbt
>>
>>~~
>>
>>/* scala versions and options */
>>scalaVersion := "2.9.2"
>>
>>
>>libraryDependencies +=
>>"org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1" excludeAll(
>>ExclusionRule(organization = "com.sun.jdmk"),
>>ExclusionRule(organization = "com.sun.jmx"),
>>ExclusionRule(organization = "javax.jms")
>>  )
>>
>>
>>/* you may need these repos */
>>resolvers := Seq(
>>  "maven" at "http://mvnrepository.com";,
>>  "maven atrifactory" at "http://central.maven.org/maven2";,
>>  "sonatype releases" at "
>>https://oss.sonatype.org/content/repositories/releases/";,
>>  "sonatype snapshots" at "
>>https://oss.sonatype.org/content/repositories/snapshots/";,
>>  "nexus releases" at "
>>https://oss.sonatype.org/service/local/staging/deploy/maven2";,
>>  "Local Maven Repository" at "file://"+Path.userHome+"/.ivy2/repository"
>>)
>>
>>
>>
>>
>>On Mon, Jul 15, 2013 at 10:39 AM, Chris Riccomini
>>wrote:
>>
>>> Hey Joe,
>>>
>>> The error I see is:
>>>
>>> 10:35:30.431 [ERROR] [org.gradle.BuildExceptionReporter] FAILURE: Build
>>> failed with an exception.
>>> 10:35:30.432 [ERROR] [org.gradle.BuildExceptionReporter]
>>> 10:35:30.432 [ERROR] [org.gradle.BuildExceptionReporter] * What went
>>>wrong:
>>> 10:35:30.432 [ERROR] [org.gradle.BuildExceptionReporter] Could not
>>>resolve
>>> all dependencies for configuration ':samsa-kafka_2.9.2:compile'.
>>> 10:35:30.433 [ERROR] [org.gradle.BuildExceptionReporter] > Could not
>>> resolve org.apache.kafka:kafka_2.9.2:0.8.0-beta1.
>>> 10:35:30.433 [ERROR] [org.gradle.BuildExceptionReporter]   Required by:
>>> 10:35:30.433 [ERROR] [org.gradle.BuildExceptionReporter]
>>> samsa:samsa-kafka_2.9.2:0.7.0
>>> 10:35:30.434 [ERROR] [org.gradle.BuildExceptionReporter]> null name
>>> not allowed
>>>
>>> All I could find on the subject was this:
>>>
>>> 
>>>http://forums.gradle.org/gradle/topics/illegalargumentexception_null_nam
>>>e
>>>_n
>>> 
&

Re: 0.8.0-beta1 is now available in public maven

2013-07-15 Thread Chris Riccomini
Hey Guys,

The problem persists, even when using the explicit URL Joe provided.

I've also constructed a dummy Maven project, and tested Kafka dependencies
there. That worked, but I had to explicit write the  block.

I might have to sick our local Gradle experts on this one...

Cheers,
Chris

On 7/15/13 10:45 AM, "chetan conikee"  wrote:

>I stumbled on a few issues when I was attempting to create a Scala based
>Client (Producer/Consumer) project.
>
>Finally worked after revisions:
>
>
>Here's the snippet of by build.sbt
>
>~~
>
>/* scala versions and options */
>scalaVersion := "2.9.2"
>
>
>libraryDependencies +=
>"org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1" excludeAll(
>ExclusionRule(organization = "com.sun.jdmk"),
>ExclusionRule(organization = "com.sun.jmx"),
>ExclusionRule(organization = "javax.jms")
>  )
>
>
>/* you may need these repos */
>resolvers := Seq(
>  "maven" at "http://mvnrepository.com";,
>  "maven atrifactory" at "http://central.maven.org/maven2";,
>  "sonatype releases" at "
>https://oss.sonatype.org/content/repositories/releases/";,
>  "sonatype snapshots" at "
>https://oss.sonatype.org/content/repositories/snapshots/";,
>  "nexus releases" at "
>https://oss.sonatype.org/service/local/staging/deploy/maven2";,
>  "Local Maven Repository" at "file://"+Path.userHome+"/.ivy2/repository"
>)
>
>
>
>
>On Mon, Jul 15, 2013 at 10:39 AM, Chris Riccomini
>wrote:
>
>> Hey Joe,
>>
>> The error I see is:
>>
>> 10:35:30.431 [ERROR] [org.gradle.BuildExceptionReporter] FAILURE: Build
>> failed with an exception.
>> 10:35:30.432 [ERROR] [org.gradle.BuildExceptionReporter]
>> 10:35:30.432 [ERROR] [org.gradle.BuildExceptionReporter] * What went
>>wrong:
>> 10:35:30.432 [ERROR] [org.gradle.BuildExceptionReporter] Could not
>>resolve
>> all dependencies for configuration ':samsa-kafka_2.9.2:compile'.
>> 10:35:30.433 [ERROR] [org.gradle.BuildExceptionReporter] > Could not
>> resolve org.apache.kafka:kafka_2.9.2:0.8.0-beta1.
>> 10:35:30.433 [ERROR] [org.gradle.BuildExceptionReporter]   Required by:
>> 10:35:30.433 [ERROR] [org.gradle.BuildExceptionReporter]
>> samsa:samsa-kafka_2.9.2:0.7.0
>> 10:35:30.434 [ERROR] [org.gradle.BuildExceptionReporter]> null name
>> not allowed
>>
>> All I could find on the subject was this:
>>
>> 
>>http://forums.gradle.org/gradle/topics/illegalargumentexception_null_name
>>_n
>> 
>>ot_allowed_resolving_org_iso_relax_verifier_jaxp_validation_isorelax_jaxp
>>_b
>> ridge_1_0-cavg1
>>
>>
>> As far as I can tell, SBT's "intransitive()" method disabled ALL
>> transitive dependencies:
>>
>> "By default, these declarations fetch all project dependencies,
>> transitively. In some instances, you may find that the dependencies
>>listed
>> for a project aren't necessary for it to build. Projects using the Felix
>> OSGI framework, for instance, only explicitly require its main jar to
>> compile and run. Avoid fetching artifact dependencies with either
>> intransitive() or notTransitive(), as in this example:"
>>
>>
>> -- https://github.com/harrah/xsbt/wiki/Library-Management
>>
>>
>> I tried doing the Gradle equivalent of SBT's intransitive:
>>
>> compile("org.apache.kafka:kafka_$scalaVersion:$kafkaVersion@jar") {
>>   transitive = false
>> }
>>
>> The build still fails with that error.
>>
>>
>> Cheers,
>>
>> Chris
>>
>> On 7/15/13 10:34 AM, "Joe Stein"  wrote:
>>
>> >What is/are the error(s) you are getting?
>> >
>> >"intransitive" in the SBT line is so that SBT does not fetch the
>>libraries
>> >what is set for exclusion in the POM
>> >
>> >an example of an error if you don't do use what should be excluded is
>>in
>> >https://issues.apache.org/jira/browse/KAFKA-974
>> >
>> >
>> >On Mon, Jul 15, 2013 at 1:30 PM, Chris Riccomini
>> >wrote:
>> >
>> >> Hey Joe,
>> >>
>> >> First off, thanks for doing this!
>> >>
>> >> I'm trying to use the publication with Gradle, and I'm running into
>> >> problems. It's giving me a funky exception. I did som

Re: 0.8.0-beta1 is now available in public maven

2013-07-15 Thread Chris Riccomini
Hey Joe,

I can give that a shot. I did have to add the apache release repository to
get kafka to resolve.

I'll get back to you in a few.

Cheers,
Chris

On 7/15/13 10:43 AM, "Joe Stein"  wrote:

>I wonder if Gradle does not use Maven Central as the default repository
>like SBT does maybe you have to explicitly set this (shot in the dark,
>don't know Gradle).
>
>Maven Central Repository for resolver = http://repo1.maven.org/maven2/
>
>
>On Mon, Jul 15, 2013 at 1:39 PM, Chris Riccomini
>wrote:
>
>> Hey Joe,
>>
>> The error I see is:
>>
>> 10:35:30.431 [ERROR] [org.gradle.BuildExceptionReporter] FAILURE: Build
>> failed with an exception.
>> 10:35:30.432 [ERROR] [org.gradle.BuildExceptionReporter]
>> 10:35:30.432 [ERROR] [org.gradle.BuildExceptionReporter] * What went
>>wrong:
>> 10:35:30.432 [ERROR] [org.gradle.BuildExceptionReporter] Could not
>>resolve
>> all dependencies for configuration ':samsa-kafka_2.9.2:compile'.
>> 10:35:30.433 [ERROR] [org.gradle.BuildExceptionReporter] > Could not
>> resolve org.apache.kafka:kafka_2.9.2:0.8.0-beta1.
>> 10:35:30.433 [ERROR] [org.gradle.BuildExceptionReporter]   Required by:
>> 10:35:30.433 [ERROR] [org.gradle.BuildExceptionReporter]
>> samsa:samsa-kafka_2.9.2:0.7.0
>> 10:35:30.434 [ERROR] [org.gradle.BuildExceptionReporter]> null name
>> not allowed
>>
>> All I could find on the subject was this:
>>
>> 
>>http://forums.gradle.org/gradle/topics/illegalargumentexception_null_name
>>_n
>> 
>>ot_allowed_resolving_org_iso_relax_verifier_jaxp_validation_isorelax_jaxp
>>_b
>> ridge_1_0-cavg1
>>
>>
>> As far as I can tell, SBT's "intransitive()" method disabled ALL
>> transitive dependencies:
>>
>> "By default, these declarations fetch all project dependencies,
>> transitively. In some instances, you may find that the dependencies
>>listed
>> for a project aren't necessary for it to build. Projects using the Felix
>> OSGI framework, for instance, only explicitly require its main jar to
>> compile and run. Avoid fetching artifact dependencies with either
>> intransitive() or notTransitive(), as in this example:"
>>
>>
>> -- https://github.com/harrah/xsbt/wiki/Library-Management
>>
>>
>> I tried doing the Gradle equivalent of SBT's intransitive:
>>
>> compile("org.apache.kafka:kafka_$scalaVersion:$kafkaVersion@jar") {
>>   transitive = false
>> }
>>
>> The build still fails with that error.
>>
>>
>> Cheers,
>>
>> Chris
>>
>> On 7/15/13 10:34 AM, "Joe Stein"  wrote:
>>
>> >What is/are the error(s) you are getting?
>> >
>> >"intransitive" in the SBT line is so that SBT does not fetch the
>>libraries
>> >what is set for exclusion in the POM
>> >
>> >an example of an error if you don't do use what should be excluded is
>>in
>> >https://issues.apache.org/jira/browse/KAFKA-974
>> >
>> >
>> >On Mon, Jul 15, 2013 at 1:30 PM, Chris Riccomini
>> >wrote:
>> >
>> >> Hey Joe,
>> >>
>> >> First off, thanks for doing this!
>> >>
>> >> I'm trying to use the publication with Gradle, and I'm running into
>> >> problems. It's giving me a funky exception. I did some digging, and
>> >> apparently the exception implies that Gradle has been given a 1.0
>>pom.
>> >>
>> >> I took a look at the Kafka pom:
>> >>
>> >>
>> >>
>> >>
>> 
>>https://repository.apache.org/content/groups/public/org/apache/kafka/kafk
>> >>a_
>> >> 2.9.2/0.8.0-beta1/kafka_2.9.2-0.8.0-beta1.pom
>> >>
>> >>
>> >> It is indeed interesting. It appears to be a 2.0 pom, but it has a
>> >>parent
>> >> pointing to org.apache (?!). I dug around, and I haven't seen any
>>other
>> >> POMs with this style. I am not Maven expert, but at this point, it
>>seems
>> >> like the POMs Maven Central are somewhat broken in that regard.
>> >>
>> >> Have you successfully used the POMs with anything other than SBT? I
>> >> noticed you're including "intransitive" in the SBT coordinates. Why
>>is
>> >> that? In general, we'd want Kafka's transitive dependencies to be
>>pulled
>> >> in, as well. Are you turning off transitivity

Re: 0.8.0-beta1 is now available in public maven

2013-07-15 Thread Chris Riccomini
Hey Joe,

The error I see is:

10:35:30.431 [ERROR] [org.gradle.BuildExceptionReporter] FAILURE: Build
failed with an exception.
10:35:30.432 [ERROR] [org.gradle.BuildExceptionReporter]
10:35:30.432 [ERROR] [org.gradle.BuildExceptionReporter] * What went wrong:
10:35:30.432 [ERROR] [org.gradle.BuildExceptionReporter] Could not resolve
all dependencies for configuration ':samsa-kafka_2.9.2:compile'.
10:35:30.433 [ERROR] [org.gradle.BuildExceptionReporter] > Could not
resolve org.apache.kafka:kafka_2.9.2:0.8.0-beta1.
10:35:30.433 [ERROR] [org.gradle.BuildExceptionReporter]   Required by:
10:35:30.433 [ERROR] [org.gradle.BuildExceptionReporter]
samsa:samsa-kafka_2.9.2:0.7.0
10:35:30.434 [ERROR] [org.gradle.BuildExceptionReporter]> null name
not allowed

All I could find on the subject was this:

http://forums.gradle.org/gradle/topics/illegalargumentexception_null_name_n
ot_allowed_resolving_org_iso_relax_verifier_jaxp_validation_isorelax_jaxp_b
ridge_1_0-cavg1


As far as I can tell, SBT's "intransitive()" method disabled ALL
transitive dependencies:

"By default, these declarations fetch all project dependencies,
transitively. In some instances, you may find that the dependencies listed
for a project aren't necessary for it to build. Projects using the Felix
OSGI framework, for instance, only explicitly require its main jar to
compile and run. Avoid fetching artifact dependencies with either
intransitive() or notTransitive(), as in this example:"


-- https://github.com/harrah/xsbt/wiki/Library-Management


I tried doing the Gradle equivalent of SBT's intransitive:

compile("org.apache.kafka:kafka_$scalaVersion:$kafkaVersion@jar") {
  transitive = false
}

The build still fails with that error.


Cheers,

Chris

On 7/15/13 10:34 AM, "Joe Stein"  wrote:

>What is/are the error(s) you are getting?
>
>"intransitive" in the SBT line is so that SBT does not fetch the libraries
>what is set for exclusion in the POM
>
>an example of an error if you don't do use what should be excluded is in
>https://issues.apache.org/jira/browse/KAFKA-974
>
>
>On Mon, Jul 15, 2013 at 1:30 PM, Chris Riccomini
>wrote:
>
>> Hey Joe,
>>
>> First off, thanks for doing this!
>>
>> I'm trying to use the publication with Gradle, and I'm running into
>> problems. It's giving me a funky exception. I did some digging, and
>> apparently the exception implies that Gradle has been given a 1.0 pom.
>>
>> I took a look at the Kafka pom:
>>
>>
>> 
>>https://repository.apache.org/content/groups/public/org/apache/kafka/kafk
>>a_
>> 2.9.2/0.8.0-beta1/kafka_2.9.2-0.8.0-beta1.pom
>>
>>
>> It is indeed interesting. It appears to be a 2.0 pom, but it has a
>>parent
>> pointing to org.apache (?!). I dug around, and I haven't seen any other
>> POMs with this style. I am not Maven expert, but at this point, it seems
>> like the POMs Maven Central are somewhat broken in that regard.
>>
>> Have you successfully used the POMs with anything other than SBT? I
>> noticed you're including "intransitive" in the SBT coordinates. Why is
>> that? In general, we'd want Kafka's transitive dependencies to be pulled
>> in, as well. Are you turning off transitivity because of an issue
>>similar
>> to the one I'm seeing?
>>
>> Thanks!
>> Chris
>>
>> On 7/14/13 6:32 PM, "Joe Stein"  wrote:
>>
>> >With SBT you can use 0.8.1-beta built with any of these four Scala
>> >versions
>> >in libraryDependencies now
>> >
>> >"org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1" intransitive()
>> >
>> >or
>> >
>> >"org.apache.kafka" % "kafka_2.9.1" % "0.8.0-beta1" intransitive()
>> >
>> >or
>> >
>> >"org.apache.kafka" % "kafka_2.8.2" % "0.8.0-beta1" intransitive()
>> >
>> >or
>> >
>> >"org.apache.kafka" % "kafka_2.8.0" % "0.8.0-beta1" intransitive()
>> >
>> >
>> >/*
>> >Joe Stein
>> >http://www.linkedin.com/in/charmalloc
>> >Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>> >*/
>>
>>
>
>
>-- 
>
>/*
>Joe Stein
>http://www.linkedin.com/in/charmalloc
>Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>*/



Re: 0.8.0-beta1 is now available in public maven

2013-07-15 Thread Chris Riccomini
Hey Joe,

First off, thanks for doing this!

I'm trying to use the publication with Gradle, and I'm running into
problems. It's giving me a funky exception. I did some digging, and
apparently the exception implies that Gradle has been given a 1.0 pom.

I took a look at the Kafka pom:


https://repository.apache.org/content/groups/public/org/apache/kafka/kafka_
2.9.2/0.8.0-beta1/kafka_2.9.2-0.8.0-beta1.pom


It is indeed interesting. It appears to be a 2.0 pom, but it has a parent
pointing to org.apache (?!). I dug around, and I haven't seen any other
POMs with this style. I am not Maven expert, but at this point, it seems
like the POMs Maven Central are somewhat broken in that regard.

Have you successfully used the POMs with anything other than SBT? I
noticed you're including "intransitive" in the SBT coordinates. Why is
that? In general, we'd want Kafka's transitive dependencies to be pulled
in, as well. Are you turning off transitivity because of an issue similar
to the one I'm seeing?

Thanks!
Chris

On 7/14/13 6:32 PM, "Joe Stein"  wrote:

>With SBT you can use 0.8.1-beta built with any of these four Scala
>versions
>in libraryDependencies now
>
>"org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1" intransitive()
>
>or
>
>"org.apache.kafka" % "kafka_2.9.1" % "0.8.0-beta1" intransitive()
>
>or
>
>"org.apache.kafka" % "kafka_2.8.2" % "0.8.0-beta1" intransitive()
>
>or
>
>"org.apache.kafka" % "kafka_2.8.0" % "0.8.0-beta1" intransitive()
>
>
>/*
>Joe Stein
>http://www.linkedin.com/in/charmalloc
>Twitter: @allthingshadoop 
>*/



Re: heterogenous kafka cluster?

2013-05-17 Thread Chris Riccomini
Hey guys,

I have no idea if this would be reasonable, but what about just running
two Kafka processes on the bigger box?

Cheers,
Chris

On 5/17/13 2:48 PM, "Jason Rosenberg"  wrote:

>Just resource allocation issues.  E.g. imagine having an existing kafka
>cluster with one machine spec, and getting access to a few more hosts to
>augment the cluster, which are newer and therefore have twice the disk
>storage.  I'd like to seamlessly add them into the cluster, without having
>to replace everything en masse.  Thus, it would be nice for the newer ones
>to take proportionally more load based on the relative storage available,
>etc.
>
>Jason
>
>
>On Fri, May 17, 2013 at 2:34 PM, Neha Narkhede
>wrote:
>
>> That does seem a little hacky. But I'm trying to understand the
>>requirement
>> behind having to deploy heterogeneous hardware. What are you trying to
>> achieve or optimize?
>>
>> Thanks,
>> Neha
>>
>>
>> On Fri, May 17, 2013 at 2:29 PM, Jason Rosenberg 
>>wrote:
>>
>> > Hi,
>> >
>> > I'm wondering if there's a good way to have a heterogenous kafka
>>cluster
>> > (specifically, if we have nodes with different sized disks).  So, we
>> might
>> > want a larger node to receive more messages than a smaller node, etc.
>> >
>> > I expect there's something we can do with using a partitioner that has
>> > specific knowledge about the hosts in the cluster, but this feels
>>messy,
>> to
>> > have this config on every producer client
>> >
>> > Thoughts?
>> >
>> > Jason
>> >
>>



Re: Non-blocking Kafka stream iterators

2013-01-22 Thread Chris Riccomini
Hey Guys,

One other potentially large benefit is to decouple broker dependencies
from consumer/producer dependencies. This makes upgrading the
consumer/producer and managing jar conflicts a lot less of a hassle.
Putting the consumer and producer in their own packages might hopefully
alleviate some of this. I'm not sure how much the broker is pulling in
that the consumer/producer aren't using, but it might be worth a look, if
there are a lot of jars that only the broker is using.

Cheers,
Chris

On 1/22/13 12:57 PM, "Evan Chan"  wrote:

>Hi Jay,
>
>Actually, it's mostly the ability to easily cross-build;   also the ease
>of
>understanding the code (less code to grok) and implementing alternatives
>(I
>guess all of those falls under cleanliness).
>
>thanks,
>Evan
>
>
>On Tue, Jan 22, 2013 at 12:47 PM, Jay Kreps  wrote:
>
>> Hi Evan,
>>
>> Makes sense. Is your goal in separating the client shrinking the jar
>>size?
>> or just general cleanliness?
>>
>> -Jay
>>
>>
>> On Tue, Jan 22, 2013 at 10:53 AM, Evan Chan  wrote:
>>
>> > Jay,
>> >
>> > Comments inlined.
>> >
>> > On Tue, Jan 22, 2013 at 10:15 AM, Jay Kreps 
>>wrote:
>> >
>> > > Hey Evan,
>> > >
>> > > Great points, some comments:
>> > > - Not sure if I understand what you mean by separating consumer and
>> main
>> > > logic.
>> > >
>> >
>> > I just meant having a separate Scala/Java client jar, so it's more
>> > lightweight and easier to build independently kind of like the
>> > consumers for the other languages.
>> >
>> >
>> > > - Yes, cross-building, I think this is in progress now for kafka as
>>a
>> > whole
>> > > so it should be in either 0.8 or 0.8.1
>> > > - Yes, forgot to mention offset initialization, but that is
>>definitely
>> > > needed.
>> > >
>> > > For the hasNext functionality, even that is not very good since if
>>you
>> > have
>> > > two streams and want to take the next message from either you would
>> have
>> > to
>> > > busy wait calling hasNext on both in a loop.
>> > >
>> > > An alternative would be something like
>> > > val client = new ConsumerClient(topics, config)
>> > > client.select(timeout: Long): Iterator[MessageAndMetadata]
>> > >
>> > > This method would have no internal threading. It would
>>scatter-gather
>> > over
>> > > the topic/partitions assigned to this consumer (whether they are
>> > statically
>> > > or dynamically assigned would be specified in the config). The
>>select
>> > call
>> > > would internally just do an epoll/select on all the connections and
>> > return
>> > > the first message set it gets back or an empty list if it hits the
>> > timeout
>> > > and no one has responded.
>> > >
>> >
>> > Hm, I like that API actually.  It would definitely be more flexible.
>> >
>> >
>> > >
>> > > This api is less intuitive then the blocking iterator, but more
>> flexible
>> > > and enables a better, faster implementation. There would be no
>>threads
>> > > aside from the client's thread. It allows non-blocking or blocking
>> > > consumption. And it generalizes easily to consuming from many
>> > > topics/partitions simultaneously.
>> > >
>> > > We could implement an iterator like wrapper for this to ease the
>> > transition
>> > > that just used this api under the covers.
>> > >
>> > > Anyhow this is a ways out, and we haven't really had any proposals
>>or
>> > > discussions on it, but this is what I was thinking.
>> > >
>> > > -Jay
>> > >
>> > >
>> > >
>> > >
>> > > On Tue, Jan 22, 2013 at 9:37 AM, Evan Chan  wrote:
>> > >
>> > > > Jay,
>> > > >
>> > > > For the consumer:
>> > > > - Separation of the consumer logic from the main logic
>> > > > - Making it easier to build the consumer for different versions of
>> > Scala
>> > > > (say 2.10)
>> > > > - Make it easier to read from any offset you want, while being
>>able
>> to
>> > > keep
>> > > > partition management features
>> > > > - Better support for Akka and other non-blocking / event-based
>> > frameworks
>> > > > (instead of a timeout, implement true hasNext functionality, for
>> > example)
>> > > >
>> > > > thanks,
>> > > > Evan
>> > > >
>> > > >
>> > > > On Mon, Jan 21, 2013 at 9:27 AM, Jay Kreps 
>> > wrote:
>> > > >
>> > > > > It's worth mentioning that we are interested in exploring
>>potential
>> > > > > generalizations of the producer and consumer API, but as a
>> practical
>> > > > matter
>> > > > > most of the committers are working on getting a stable 0.8
>>release
>> > out
>> > > > the
>> > > > > door. So an improved consumer and producer api would be a 0.9
>> > feature.
>> > > > >
>> > > > > If you have a concrete thing you are trying to do now that is
>> awkward
>> > > it
>> > > > > would be great to hear about the use case.
>> > > > >
>> > > > > Possible goals of improving the apis and client impls would
>>include
>> > the
>> > > > > following:
>> > > > >
>> > > > > Producer:
>> > > > > 1. Include the offset in the information returned to the
>>producer
>> > > > > 2. Pipeline producer requests to improve throughput for
>>synchronous
>> > > > > 

Re: About kafka 0.8 producer zookeeper-based load balancing on per-request basis

2013-01-15 Thread Chris Riccomini
Hey Guys,

Correct me if I'm wrong, but I believe in 0.8, the producer uses a
metadata request to get topic/partition mappings from the broker. The
broker then interacts with ZK (rather than having the producer do it using
zk.connect).

In the event that the master for a topic/partition fails, a new master
broker will be elected for a given topic/partition pair. When the producer
tries to send to the old broker (which is either dead, or a slave now),
the broker will either not respond, or the response will contain an error
code. In either case, I think the producer will do a new metadata request
(to a broker) to get the latest topic/partition to broker mappings. In
this way, it avoids having to use ZooKeeper: it offloads all ZK work to
the broker.

Cheers,
Chris

On 1/14/13 9:35 PM, "Jun Guo -X (jungu - CIIC at Cisco)" 
wrote:

>Hi,
>We know, in kafka 0.8, producer connect to broker directly, it without
>connecting to zookeeper. Than how it achieve zookeeper-based load balance
>on per-request basis?
>Actually, when a topic be created, its partition will distributed in one
>or more brokers. When a message be sent, it will be delivered to a
>certain partition according to its key word. That is to say ,a certain
>must be sent to a fixed partition on a fixed broker. How the so called
>load balancing works?
>
>Best Regards