Re: Increasing the throughput of Kafka Publisher

2015-03-04 Thread Roger Hoover
Seeing around 5k msgs/s.  The messages are small (average 42 bytes after
snappy compression)

On Wed, Mar 4, 2015 at 11:34 PM, Vineet Mishra 
wrote:

> Hi Roger,
>
> I have already enabled the snappy, the throughput which I have mentioned is
> after only.
>
> Could you mention what's the throughput you have reaching.
>
> Thanks!
>
> On Thu, Mar 5, 2015 at 12:56 PM, Roger Hoover 
> wrote:
>
> > Hi Vineet,
> >
> > Try enabling compression.  That improves throughput 3-4x usually for me.
> > Also, you can use async mode if you're willing to trade some chance of
> > dropping messages for more throughput.
> >
> > kafka {
> >
> >   codec => 'json'
> >
> >   broker_list => "localhost:9092"
> >
> >   topic_id => "blah"
> >
> >   compression_codec => "snappy"
> >
> >   request_required_acks => 1
> >
> >   producer_type => "async"
> >
> > }
> >
> > On Wed, Mar 4, 2015 at 11:03 PM, Vineet Mishra 
> > wrote:
> >
> > > Hi,
> > >
> > > I am having a Logstash Forwarder which is publishing events to Kafka,
> but
> > > as I can see the rate at which the events is published to Kafka is
> really
> > > very slow.
> > >
> > > With the reference to some links I could get the Kafka Publish
> throughput
> > > reaching in 50-60Mbs per second but in my case I am hardly getting few
> > Kbs'
> > > of throughput.
> > >
> > > Looking for Expert intervention to the same.
> > >
> > > Thanks,
> > >
> >
>


Re: Database Replication Question

2015-03-04 Thread James Cheng

> On Mar 3, 2015, at 4:18 PM, Guozhang Wang  wrote:
> 
> Additionally to Jay's recommendation, you also need to have some special
> cares in error handling of the producer in order to preserve ordering since
> producer uses batching and async sending. That is, if you already sent
> messages 1,2,3,4,5 to producer but later on be notified that message 3
> failed to send, you need to avoid continue sending messages 4,5 before 3
> gets fixed or dropped.
> 

Guozhang, how would we do this? Would this require sending each message 
individually and waiting for acknowledgment of each message?

Send 1
Wait for ack
Send 2
Wait for ack
etc

If I try to send 1,2,3,4,5 in a batch, is it possible that the broker could 
receive 1,2 and 4,5, and that only 3 would fail? Or is it always a contiguous 
chunk, and then the first failure would cause the rest of the batch to abort?

-James

> Guozhang
> 
> On Tue, Mar 3, 2015 at 3:45 PM, Xiao  wrote:
> 
>> Hey Josh,
>> 
>> Transactions can be applied in parallel in the consumer side based on
>> transaction dependency checking.
>> 
>> http://www.google.com.ar/patents/US20080163222
>> 
>> This patent documents how it work. It is easy to understand, however, you
>> also need to consider the hash collision issues. This has been implemented
>> in IBM Q Replication since 2001.
>> 
>> Thanks,
>> 
>> Xiao Li
>> 
>> 
>> On Mar 3, 2015, at 3:36 PM, Jay Kreps  wrote:
>> 
>>> Hey Josh,
>>> 
>>> As you say, ordering is per partition. Technically it is generally
>> possible
>>> to publish all changes to a database to a single partition--generally the
>>> kafka partition should be high throughput enough to keep up. However
>> there
>>> are a couple of downsides to this:
>>> 1. Consumer parallelism is limited to one. If you want a total order to
>> the
>>> consumption of messages you need to have just 1 process, but often you
>>> would want to parallelize.
>>> 2. Often what people want is not a full stream of all changes in all
>> tables
>>> in a database but rather the changes to a particular table.
>>> 
>>> To some extent the best way to do this depends on what you will do with
>> the
>>> data. However if you intend to have lots
>>> 
>>> I have seen pretty much every variation on this in the wild, and here is
>>> what I would recommend:
>>> 1. Have a single publisher process that publishes events into Kafka
>>> 2. If possible use the database log to get these changes (e.g. mysql
>>> binlog, Oracle xstreams, golden gate, etc). This will be more complete
>> and
>>> more efficient than polling for changes, though that can work too.
>>> 3. Publish each table to its own topic.
>>> 4. Partition each topic by the primary key of the table.
>>> 5. Include in each message the database's transaction id, scn, or other
>>> identifier that gives the total order within the record stream. Since
>> there
>>> is a single publisher this id will be monotonic within each partition.
>>> 
>>> This seems to be the best set of tradeoffs for most use cases:
>>> - You can have parallel consumers up to the number of partitions you
>> chose
>>> that still get messages in order per ID'd entity.
>>> - You can subscribe to just one table if you like, or to multiple tables.
>>> - Consumers who need a total order over all updates can do a "merge"
>> across
>>> the partitions to reassemble the fully ordered set of changes across all
>>> tables/partitions.
>>> 
>>> One thing to note is that the requirement of having a single consumer
>>> process/thread to get the total order isn't really so much a Kafka
>>> restriction as it just is a restriction about the world, since if you had
>>> multiple threads even if you delivered messages to them in order their
>>> processing might happen out of order (just do to the random timing of the
>>> processing).
>>> 
>>> -Jay
>>> 
>>> 
>>> 
>>> On Tue, Mar 3, 2015 at 3:15 PM, Josh Rader  wrote:
>>> 
 Hi Kafka Experts,
 
 
 
 We have a use case around RDBMS replication where we are investigating
 Kafka.  In this case ordering is very important.  Our understanding is
 ordering is only preserved within a single partition.  This makes sense
>> as
 a single thread will consume these messages, but our question is can we
 somehow parallelize this for better performance?   Is there maybe some
 partition key strategy trick to have your cake and eat it too in terms
>> of
 keeping ordering, but also able to parallelize the processing?
 
 
 
 I am sorry if this has already been asked, but we tried to search
>> through
 the archives and couldn’t find this response.
 
 
 
 Thanks,
 
 Josh
 
>> 
>> 
> 
> 
> --
> -- Guozhang



Re: Increasing the throughput of Kafka Publisher

2015-03-04 Thread Vineet Mishra
Hi Roger,

I have already enabled the snappy, the throughput which I have mentioned is
after only.

Could you mention what's the throughput you have reaching.

Thanks!

On Thu, Mar 5, 2015 at 12:56 PM, Roger Hoover 
wrote:

> Hi Vineet,
>
> Try enabling compression.  That improves throughput 3-4x usually for me.
> Also, you can use async mode if you're willing to trade some chance of
> dropping messages for more throughput.
>
> kafka {
>
>   codec => 'json'
>
>   broker_list => "localhost:9092"
>
>   topic_id => "blah"
>
>   compression_codec => "snappy"
>
>   request_required_acks => 1
>
>   producer_type => "async"
>
> }
>
> On Wed, Mar 4, 2015 at 11:03 PM, Vineet Mishra 
> wrote:
>
> > Hi,
> >
> > I am having a Logstash Forwarder which is publishing events to Kafka, but
> > as I can see the rate at which the events is published to Kafka is really
> > very slow.
> >
> > With the reference to some links I could get the Kafka Publish throughput
> > reaching in 50-60Mbs per second but in my case I am hardly getting few
> Kbs'
> > of throughput.
> >
> > Looking for Expert intervention to the same.
> >
> > Thanks,
> >
>


Re: Increasing the throughput of Kafka Publisher

2015-03-04 Thread Roger Hoover
Hi Vineet,

Try enabling compression.  That improves throughput 3-4x usually for me.
Also, you can use async mode if you're willing to trade some chance of
dropping messages for more throughput.

kafka {

  codec => 'json'

  broker_list => "localhost:9092"

  topic_id => "blah"

  compression_codec => "snappy"

  request_required_acks => 1

  producer_type => "async"

}

On Wed, Mar 4, 2015 at 11:03 PM, Vineet Mishra 
wrote:

> Hi,
>
> I am having a Logstash Forwarder which is publishing events to Kafka, but
> as I can see the rate at which the events is published to Kafka is really
> very slow.
>
> With the reference to some links I could get the Kafka Publish throughput
> reaching in 50-60Mbs per second but in my case I am hardly getting few Kbs'
> of throughput.
>
> Looking for Expert intervention to the same.
>
> Thanks,
>


Mirror maker end to end latency metric

2015-03-04 Thread tao xiao
Hi team,

Is there a built-in metric that can measure the end to end latency in MM?

-- 
Regards,
Tao


Increasing the throughput of Kafka Publisher

2015-03-04 Thread Vineet Mishra
Hi,

I am having a Logstash Forwarder which is publishing events to Kafka, but
as I can see the rate at which the events is published to Kafka is really
very slow.

With the reference to some links I could get the Kafka Publish throughput
reaching in 50-60Mbs per second but in my case I am hardly getting few Kbs'
of throughput.

Looking for Expert intervention to the same.

Thanks,


Re: Explicit control over flushing the messages

2015-03-04 Thread Ponmani Rayar
Thanks a lot Jeff for redirecting me to the right place.. :-)
Is there any tentative date when we can get the official release with this
patch.


On 4 March 2015 at 19:42, Jeff Holoman  wrote:

> Take a look here:
>
> https://issues.apache.org/jira/browse/KAFKA-1865
>
>
>
> On Wed, Mar 4, 2015 at 4:28 AM, Ponmani Rayar  wrote:
>
> > Hi Group,
> >
> >   I have started using Kafka 0.8.2 with the new producer API.
> > Just wanted to know if we can have an explicit control over flushing the
> > messages batch to Kafka cluster.
> >
> > Configuring batch.size will flush the messages when the batch.size is
> > reached for a partition.
> > But is there any API where in i can have a explicit control of flushing
> the
> > messages, something like
> >
> >  *kafkaProducer.flush(); *
> >
> > Please let me know if there is any API or specific configuration to do
> > this.
> >
> > Thanks in advance,
> > Ponmani
> >
>
>
>
> --
> Jeff Holoman
> Systems Engineer
>


please subscribe

2015-03-04 Thread Michael Minar
thank you


Re: Trying to get kafka data to Hadoop

2015-03-04 Thread Neha Narkhede
Thanks Jagat for the callout!

Confluent Platform 1.0  includes Camus and we
were happy to address any questions in our community mailing list
.



On Wed, Mar 4, 2015 at 8:41 PM, max square  wrote:

> Thunder,
>
> thanks for your reply. The hadoop job is now correctly configured (the
> client was not getting the correct jars), however I am getting Avro
> formatting exceptions due to the format the schema-repo server follows. I
> think I will do something similar and create our own branch that uses the
> schema repo. Any gotchas you can advice on?
>
> Thanks!
>
> Max
>
> On Wed, Mar 4, 2015 at 9:24 PM, Thunder Stumpges 
> wrote:
>
> > What branch of camus are you using? We have our own fork that we updated
> > the camus dependency from the avro snapshot of the REST Schema Repository
> > to the new "official" one you mention in github.com/schema-repo. I was
> > not aware of a branch on the main linked-in camus repo that has this.
> >
> > That being said, we are doing essentially this same thing however we are
> > using a single shaded uber-jar. I believe the maven project builds this
> > automatically doesnt it?
> >
> > I'll take a look at the details of how we are invoking this on our site
> > and get back to you.
> >
> > Cheers,
> > Thunder
> >
> >
> > -Original Message-
> > From: max square [max2subscr...@gmail.com]
> > Received: Wednesday, 04 Mar 2015, 5:38PM
> > To: users@kafka.apache.org [users@kafka.apache.org]
> > Subject: Trying to get kafka data to Hadoop
> >
> > Hi all,
> >
> > I have browsed through different conversations around Camus, and bring
> this
> > as a kinda Kafka question. I know is not the most orthodox, but if
> someone
> > has some thoughts I'd appreciate ir.
> >
> > That said, I am trying to set up Camus, using a 3 node Kafka cluster
> > 0.8.2.1, using a project that is trying to build Avro Schema-Repo
> > . All of the Avro schemas
> for
> > the topics are published correctly. I am building Camus and using:
> >
> > hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar
> com.linkedin.camus.etl.
> > kafka.CamusJob -libjars $CAMUS_LIBJARS  -D mapreduce.job.user.classpath.
> > first=true -P config.properties
> >
> > As the command to start the job, where I have set up an environment
> > variable that holds all the libjars that the mvn package command
> generates.
> >
> > I have also set the following properties to configure the job:
> > camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.
> > LatestSchemaKafkaAvroMessageDecoder
> >
> >
> kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.
> > AvroRestSchemaRegistry
> >
> > etl.schema.registry.url=http://10.0.14.25:2876/schema-repo/
> >
> > When I execute the job I get an Exception indicating the
> > AvroRestSchemaRegistry class can't be found (I've double checked it's
> part
> > of the libjars). I wanted to ask if this is the correct way to set up
> this
> > integration, and if anyone has pointers on why the job is not finding the
> > class AvroRestSchemaRegistry
> >
> > Thanks in advance for the help!
> >
> > Max
> >
> > Follows the complete stack trace:
> >
> > [CamusJob] - failed to create decoder
> >
> > com.linkedin.camus.coders.MessageDecoderException:
> > com.linkedin.camus.coders
> > .MessageDecoderException:java.lang.ClassNotFoundException: com.linkedin.
> > camus.schemaregistry.AvroRestSchemaRegistry
> >at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.
> > createMessageDecoder(MessageDecoderFactory.java:29)
> >
> >   at
> > com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.createMessageDecoder
> > (EtlInputFormat.java:391)
> >
> >at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.getSplits(
> > EtlInputFormat.java:256)
> >
> >at
> org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:
> > 1107)
> >
> >at
> > org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1124
> > )
> >
> >  at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:178)
> >
> >at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:1023)
> >
> >at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:976)
> >
> >at java.security.AccessController.doPrivileged(Native Method)
> >
> >at javax.security.auth.Subject.doAs(Subject.java:415)
> >
> >at org.apache.hadoop.security.UserGroupInformation.doAs(
> > UserGroupInformation.java:1642)
> >
> >at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.
> > java:976)
> >
> >at org.apache.hadoop.mapreduce.Job.submit(Job.java:582)
> >
> >at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:335)
> >
> >at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:563)
> >
> >at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
> >
> >at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
> >
> >at com.li

Re: If you run Kafka in AWS or Docker, how do you persist data?

2015-03-04 Thread Colin
Hello,

We use docker for kafka on vm's with both nas and local disk.  We mount the 
volumes externally.  We havent had many problems at all, and a restart has 
cleared any issue.  We are on .8.1

We are also started to deploy to aws.

--
Colin 
+1 612 859 6129
Skype colin.p.clark

> On Mar 4, 2015, at 10:46 PM, Otis Gospodnetic  
> wrote:
> 
> Hi,
> 
>> On Fri, Feb 27, 2015 at 1:36 AM, James Cheng  wrote:
>> 
>> Hi,
>> 
>> I know that Netflix might be talking about "Kafka on AWS" at the March
>> meetup, but I wanted to bring up the topic anyway.
>> 
>> I'm sure that some people are running Kafka in AWS.
> 
> 
> I'd say most, not some :)
> 
> 
>> Is anyone running Kafka within docker in production? How does that work?
> 
> Not us.  When I was at DevOps Days in NYC last year, everyone was talking
> about Docker, but only about 2.5 people in the room actually really used it.
> 
> For both of these, how do you persist data? If on AWS, do you use EBS? Do
>> you use ephemeral storage and then rely on replication? And if using
>> docker, do you persist data outside the docker container and on the host
>> machine?
> 
> We've used both EBD and local disks in AWS.  We don't have Kafka
> replication, as far as I know.
> 
> And related, how do you deal with broker failure? Do you simply replace it,
>> and repopulate a new broker via replication? Or do you bring back up the
>> broker with the persisted files?
> 
> We monitor all Kafka pieces - producers, consumer, and brokers with SPM.
> We have alerts and anomaly detection enabled for various Kafka metrics
> (yeah, consumer lag being one of them).
> Broker failures have been very rare (we've used 0.7.2, 0.8.1.x, and are now
> on 0.8.2).  When they happened a restart was typically enough. I can recall
> one instance where segments recovery tool a long time (minutes, maybe more
> than an hour), but this was >6 months ago.
> 
> 
>> Trying to learn about what people are doing, beyond "on premises and
>> dedicated hardware".
> 
> In my world almost everyone I talk to is in AWS.
> 
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/


Re: [kafka-clients] Re: [VOTE] 0.8.2.1 Candidate 2

2015-03-04 Thread Neha Narkhede
+1. Verified quick start, unit tests.

On Tue, Mar 3, 2015 at 12:09 PM, Joe Stein  wrote:

> Ok, lets fix the transient test failure on trunk agreed not a blocker.
>
> +1 quick start passed, verified artifacts, updates in scala
> https://github.com/stealthly/scala-kafka/tree/0.8.2.1 and go
> https://github.com/stealthly/go_kafka_client/tree/0.8.2.1 look good
>
> ~ Joe Stein
> - - - - - - - - - - - - - - - - -
>
>   http://www.stealth.ly
> - - - - - - - - - - - - - - - - -
>
> On Tue, Mar 3, 2015 at 12:30 PM, Jun Rao  wrote:
>
> > Hi, Joe,
> >
> > Yes, that unit test does have transient failures from time to time. The
> > issue seems to be with the unit test itself and not the actual code. So,
> > this is not a blocker for 0.8.2.1 release. I think we can just fix it in
> > trunk.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Mar 3, 2015 at 9:08 AM, Joe Stein  wrote:
> >
> >> Jun, I have most everything looks good except I keep getting test
> >> failures from wget
> >>
> https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/kafka-0.8.2.1-src.tgz
> >> && tar -xvf kafka-0.8.2.1-src.tgz && cd kafka-0.8.2.1-src && gradle &&
> >> ./gradlew test
> >>
> >> kafka.api.ProducerFailureHandlingTest >
> >> testNotEnoughReplicasAfterBrokerShutdown FAILED
> >> org.scalatest.junit.JUnitTestFailedError: Expected
> >> NotEnoughReplicasException when producing to topic with fewer brokers
> than
> >> min.insync.replicas
> >> at
> >>
> org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101)
> >> at
> >>
> org.scalatest.junit.JUnit3Suite.newAssertionFailedException(JUnit3Suite.scala:149)
> >> at org.scalatest.Assertions$class.fail(Assertions.scala:711)
> >> at org.scalatest.junit.JUnit3Suite.fail(JUnit3Suite.scala:149)
> >> at
> >>
> kafka.api.ProducerFailureHandlingTest.testNotEnoughReplicasAfterBrokerShutdown(ProducerFailureHandlingTest.scala:355)
> >>
> >> This happens to me all the time on a few different machines.
> >>
> >> ~ Joe Stein
> >> - - - - - - - - - - - - - - - - -
> >>
> >>   http://www.stealth.ly
> >> - - - - - - - - - - - - - - - - -
> >>
> >> On Mon, Mar 2, 2015 at 7:36 PM, Jun Rao  wrote:
> >>
> >>> +1 from me. Verified quickstart and unit tests.
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>> On Thu, Feb 26, 2015 at 2:59 PM, Jun Rao  wrote:
> >>>
>  This is the second candidate for release of Apache Kafka 0.8.2.1. This
>  fixes 4 critical issue in 0.8.2.0.
> 
>  Release Notes for the 0.8.2.1 release
> 
> 
> https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/RELEASE_NOTES.html
> 
>  *** Please download, test and vote by Monday, Mar 2, 3pm PT
> 
>  Kafka's KEYS file containing PGP keys we use to sign the release:
>  http://kafka.apache.org/KEYS in addition to the md5, sha1
>  and sha2 (SHA256) checksum.
> 
>  * Release artifacts to be voted upon (source and binary):
>  https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/
> 
>  * Maven artifacts to be voted upon prior to release:
>  https://repository.apache.org/content/groups/staging/
> 
>  * scala-doc
>  https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/scaladoc/
> 
>  * java-doc
>  https://people.apache.org/~junrao/kafka-0.8.2.1-candidate2/javadoc/
> 
>  * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.1 tag
> 
> 
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=bd1bfb63ec73c10d08432ac893a23f28281ea021
>  (git commit ee1267b127f3081db491fa1bf9a287084c324e36)
> 
>  /***
> 
>  Thanks,
> 
>  Jun
> 
> 
> >>>  --
> >>> You received this message because you are subscribed to the Google
> >>> Groups "kafka-clients" group.
> >>> To unsubscribe from this group and stop receiving emails from it, send
> >>> an email to kafka-clients+unsubscr...@googlegroups.com.
> >>> To post to this group, send email to kafka-clie...@googlegroups.com.
> >>> Visit this group at http://groups.google.com/group/kafka-clients.
> >>> To view this discussion on the web visit
> >>>
> https://groups.google.com/d/msgid/kafka-clients/CAFc58G_5FuLKx3_kM4PCgqQL8d%2B4sqE0o-%2BXfu3FJicAgn5KPw%40mail.gmail.com
> >>> <
> https://groups.google.com/d/msgid/kafka-clients/CAFc58G_5FuLKx3_kM4PCgqQL8d%2B4sqE0o-%2BXfu3FJicAgn5KPw%40mail.gmail.com?utm_medium=email&utm_source=footer
> >
> >>> .
> >>>
> >>> For more options, visit https://groups.google.com/d/optout.
> >>>
> >>
> >>
> >
>



-- 
Thanks,
Neha


Re: Kafka Poll: Version You Use?

2015-03-04 Thread Neha Narkhede
Thanks for running the poll and sharing the results!

On Wed, Mar 4, 2015 at 8:34 PM, Otis Gospodnetic  wrote:

> Hi,
>
> You can see the number of voters in the poll itself (view poll results link
> in the poll widget).
> Audience details unknown, but the poll was posted on:
> * twitter - https://twitter.com/sematext/status/57050147435776
> * LinkedIn - a few groups - Kafka, DevOps, and I think another larger one
> * this mailing list
>
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Wed, Mar 4, 2015 at 11:24 PM, Christian Csar  wrote:
>
> > Do you have a anything on the number of voters, or audience breakdown?
> >
> > Christian
> >
> > On Wed, Mar 4, 2015 at 8:08 PM, Otis Gospodnetic <
> > otis.gospodne...@gmail.com
> > > wrote:
> >
> > > Hello hello,
> > >
> > > Results of the poll are here!
> > > Any guesses before looking?
> > > What % of Kafka users are on 0.8.2.x already?
> > > What % of people are still on 0.7.x?
> > >
> > >
> > >
> >
> http://blog.sematext.com/2015/03/04/poll-results-kafka-version-distribution/
> > >
> > > Otis
> > > --
> > > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > > Solr & Elasticsearch Support * http://sematext.com/
> > >
> > >
> > > On Thu, Feb 26, 2015 at 3:32 PM, Otis Gospodnetic <
> > > otis.gospodne...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > With 0.8.2 out I thought it might be useful for everyone to see which
> > > > version(s) of Kafka people are using.
> > > >
> > > > Here's a quick poll:
> > > > http://blog.sematext.com/2015/02/23/kafka-poll-version-you-use/
> > > >
> > > > We'll publish the results next week.
> > > >
> > > > Thanks,
> > > > Otis
> > > > --
> > > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> Management
> > > > Solr & Elasticsearch Support * http://sematext.com/
> > > >
> > > >
> > >
> >
>



-- 
Thanks,
Neha


Re: Best way to show lag?

2015-03-04 Thread Otis Gospodnetic
Hi,

On Sat, Feb 28, 2015 at 9:16 AM, Gene Robichaux 
wrote:

> What is the best way to detect consumer lag?
>
> We are running each consumer as a separate group and I am running the
> ConsumerOffsetChecker to assess the partitions and the lag for each
> group/consumer. I run this every 5 minutes. In some cases I run this
> command up to 75 times on each 5 min polling cycle (once for each
> group/consuer). An example of the command is (bin/kafka-run-class.sh
> kafka.tools.ConsumerOffsetChecker --group consumer-group1 --zkconnect
> zkhost:zkport)
>
> The problem I am running into is CPU usage on the broker when these
> commands run. We have a dedicated broker that has no leader partitions, but
> the high CPU still concerns me.
>
> Is there a better way to detect consumer lag? Preferably one that is less
> impactful?
>

Yeah, that hurts :(.  I just looked at our SPM for Kafka monitoring to see
specifically what we do for Consumer Lag.  I'd send you the screenshot, but
I think the ML blocks it.  Ah, ah, you can actually see it in a demo,
here's the link:  https://apps.sematext.com/demo -- look for SPM apps with
"Kafka" in the name and look for a tab on the left side labeled "Consumer
Lag".

But basically, you can slice and dice consumer lag by any combination of
the following:
* consumer hostname
* client ID
* topic
* partition

Minimal impact and you get your Consumer Lag in more or less RT.

Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


Re: If you run Kafka in AWS or Docker, how do you persist data?

2015-03-04 Thread Otis Gospodnetic
Hi,

On Fri, Feb 27, 2015 at 1:36 AM, James Cheng  wrote:

> Hi,
>
> I know that Netflix might be talking about "Kafka on AWS" at the March
> meetup, but I wanted to bring up the topic anyway.
>
> I'm sure that some people are running Kafka in AWS.


I'd say most, not some :)


> Is anyone running Kafka within docker in production? How does that work?
>

Not us.  When I was at DevOps Days in NYC last year, everyone was talking
about Docker, but only about 2.5 people in the room actually really used it.

For both of these, how do you persist data? If on AWS, do you use EBS? Do
> you use ephemeral storage and then rely on replication? And if using
> docker, do you persist data outside the docker container and on the host
> machine?
>

We've used both EBD and local disks in AWS.  We don't have Kafka
replication, as far as I know.

And related, how do you deal with broker failure? Do you simply replace it,
> and repopulate a new broker via replication? Or do you bring back up the
> broker with the persisted files?
>

We monitor all Kafka pieces - producers, consumer, and brokers with SPM.
We have alerts and anomaly detection enabled for various Kafka metrics
(yeah, consumer lag being one of them).
Broker failures have been very rare (we've used 0.7.2, 0.8.1.x, and are now
on 0.8.2).  When they happened a restart was typically enough. I can recall
one instance where segments recovery tool a long time (minutes, maybe more
than an hour), but this was >6 months ago.


> Trying to learn about what people are doing, beyond "on premises and
> dedicated hardware".
>

In my world almost everyone I talk to is in AWS.

Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


Re: Trying to get kafka data to Hadoop

2015-03-04 Thread max square
Thunder,

thanks for your reply. The hadoop job is now correctly configured (the
client was not getting the correct jars), however I am getting Avro
formatting exceptions due to the format the schema-repo server follows. I
think I will do something similar and create our own branch that uses the
schema repo. Any gotchas you can advice on?

Thanks!

Max

On Wed, Mar 4, 2015 at 9:24 PM, Thunder Stumpges 
wrote:

> What branch of camus are you using? We have our own fork that we updated
> the camus dependency from the avro snapshot of the REST Schema Repository
> to the new "official" one you mention in github.com/schema-repo. I was
> not aware of a branch on the main linked-in camus repo that has this.
>
> That being said, we are doing essentially this same thing however we are
> using a single shaded uber-jar. I believe the maven project builds this
> automatically doesnt it?
>
> I'll take a look at the details of how we are invoking this on our site
> and get back to you.
>
> Cheers,
> Thunder
>
>
> -Original Message-
> From: max square [max2subscr...@gmail.com]
> Received: Wednesday, 04 Mar 2015, 5:38PM
> To: users@kafka.apache.org [users@kafka.apache.org]
> Subject: Trying to get kafka data to Hadoop
>
> Hi all,
>
> I have browsed through different conversations around Camus, and bring this
> as a kinda Kafka question. I know is not the most orthodox, but if someone
> has some thoughts I'd appreciate ir.
>
> That said, I am trying to set up Camus, using a 3 node Kafka cluster
> 0.8.2.1, using a project that is trying to build Avro Schema-Repo
> . All of the Avro schemas for
> the topics are published correctly. I am building Camus and using:
>
> hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.
> kafka.CamusJob -libjars $CAMUS_LIBJARS  -D mapreduce.job.user.classpath.
> first=true -P config.properties
>
> As the command to start the job, where I have set up an environment
> variable that holds all the libjars that the mvn package command generates.
>
> I have also set the following properties to configure the job:
> camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.
> LatestSchemaKafkaAvroMessageDecoder
>
> kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.
> AvroRestSchemaRegistry
>
> etl.schema.registry.url=http://10.0.14.25:2876/schema-repo/
>
> When I execute the job I get an Exception indicating the
> AvroRestSchemaRegistry class can't be found (I've double checked it's part
> of the libjars). I wanted to ask if this is the correct way to set up this
> integration, and if anyone has pointers on why the job is not finding the
> class AvroRestSchemaRegistry
>
> Thanks in advance for the help!
>
> Max
>
> Follows the complete stack trace:
>
> [CamusJob] - failed to create decoder
>
> com.linkedin.camus.coders.MessageDecoderException:
> com.linkedin.camus.coders
> .MessageDecoderException:java.lang.ClassNotFoundException: com.linkedin.
> camus.schemaregistry.AvroRestSchemaRegistry
>at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.
> createMessageDecoder(MessageDecoderFactory.java:29)
>
>   at
> com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.createMessageDecoder
> (EtlInputFormat.java:391)
>
>at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.getSplits(
> EtlInputFormat.java:256)
>
>at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:
> 1107)
>
>at
> org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1124
> )
>
>  at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:178)
>
>at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:1023)
>
>at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:976)
>
>at java.security.AccessController.doPrivileged(Native Method)
>
>at javax.security.auth.Subject.doAs(Subject.java:415)
>
>at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1642)
>
>at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.
> java:976)
>
>at org.apache.hadoop.mapreduce.Job.submit(Job.java:582)
>
>at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:335)
>
>at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:563)
>
>at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
>
>at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
>
>at com.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:518)
>
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodessorImpl.
> java:57)
>
>at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
>at java.lang.reflect.Method.invoke(Method.
>


Re: Kafka Poll: Version You Use?

2015-03-04 Thread Otis Gospodnetic
Hi,

You can see the number of voters in the poll itself (view poll results link
in the poll widget).
Audience details unknown, but the poll was posted on:
* twitter - https://twitter.com/sematext/status/57050147435776
* LinkedIn - a few groups - Kafka, DevOps, and I think another larger one
* this mailing list

Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


On Wed, Mar 4, 2015 at 11:24 PM, Christian Csar  wrote:

> Do you have a anything on the number of voters, or audience breakdown?
>
> Christian
>
> On Wed, Mar 4, 2015 at 8:08 PM, Otis Gospodnetic <
> otis.gospodne...@gmail.com
> > wrote:
>
> > Hello hello,
> >
> > Results of the poll are here!
> > Any guesses before looking?
> > What % of Kafka users are on 0.8.2.x already?
> > What % of people are still on 0.7.x?
> >
> >
> >
> http://blog.sematext.com/2015/03/04/poll-results-kafka-version-distribution/
> >
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
> > On Thu, Feb 26, 2015 at 3:32 PM, Otis Gospodnetic <
> > otis.gospodne...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > With 0.8.2 out I thought it might be useful for everyone to see which
> > > version(s) of Kafka people are using.
> > >
> > > Here's a quick poll:
> > > http://blog.sematext.com/2015/02/23/kafka-poll-version-you-use/
> > >
> > > We'll publish the results next week.
> > >
> > > Thanks,
> > > Otis
> > > --
> > > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > > Solr & Elasticsearch Support * http://sematext.com/
> > >
> > >
> >
>


Re: Kafka Poll: Version You Use?

2015-03-04 Thread Christian Csar
Do you have a anything on the number of voters, or audience breakdown?

Christian

On Wed, Mar 4, 2015 at 8:08 PM, Otis Gospodnetic  wrote:

> Hello hello,
>
> Results of the poll are here!
> Any guesses before looking?
> What % of Kafka users are on 0.8.2.x already?
> What % of people are still on 0.7.x?
>
>
> http://blog.sematext.com/2015/03/04/poll-results-kafka-version-distribution/
>
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Thu, Feb 26, 2015 at 3:32 PM, Otis Gospodnetic <
> otis.gospodne...@gmail.com> wrote:
>
> > Hi,
> >
> > With 0.8.2 out I thought it might be useful for everyone to see which
> > version(s) of Kafka people are using.
> >
> > Here's a quick poll:
> > http://blog.sematext.com/2015/02/23/kafka-poll-version-you-use/
> >
> > We'll publish the results next week.
> >
> > Thanks,
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
>


Re: Kafka Poll: Version You Use?

2015-03-04 Thread Otis Gospodnetic
Hello hello,

Results of the poll are here!
Any guesses before looking?
What % of Kafka users are on 0.8.2.x already?
What % of people are still on 0.7.x?

http://blog.sematext.com/2015/03/04/poll-results-kafka-version-distribution/

Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


On Thu, Feb 26, 2015 at 3:32 PM, Otis Gospodnetic <
otis.gospodne...@gmail.com> wrote:

> Hi,
>
> With 0.8.2 out I thought it might be useful for everyone to see which
> version(s) of Kafka people are using.
>
> Here's a quick poll:
> http://blog.sematext.com/2015/02/23/kafka-poll-version-you-use/
>
> We'll publish the results next week.
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>


RE: Trying to get kafka data to Hadoop

2015-03-04 Thread Thunder Stumpges
What branch of camus are you using? We have our own fork that we updated the 
camus dependency from the avro snapshot of the REST Schema Repository to the 
new "official" one you mention in github.com/schema-repo. I was not aware of a 
branch on the main linked-in camus repo that has this.

That being said, we are doing essentially this same thing however we are using 
a single shaded uber-jar. I believe the maven project builds this automatically 
doesnt it?

I'll take a look at the details of how we are invoking this on our site and get 
back to you.

Cheers,
Thunder


-Original Message-
From: max square [max2subscr...@gmail.com]
Received: Wednesday, 04 Mar 2015, 5:38PM
To: users@kafka.apache.org [users@kafka.apache.org]
Subject: Trying to get kafka data to Hadoop

Hi all,

I have browsed through different conversations around Camus, and bring this
as a kinda Kafka question. I know is not the most orthodox, but if someone
has some thoughts I'd appreciate ir.

That said, I am trying to set up Camus, using a 3 node Kafka cluster
0.8.2.1, using a project that is trying to build Avro Schema-Repo
. All of the Avro schemas for
the topics are published correctly. I am building Camus and using:

hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.
kafka.CamusJob -libjars $CAMUS_LIBJARS  -D mapreduce.job.user.classpath.
first=true -P config.properties

As the command to start the job, where I have set up an environment
variable that holds all the libjars that the mvn package command generates.

I have also set the following properties to configure the job:
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.
LatestSchemaKafkaAvroMessageDecoder
kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.
AvroRestSchemaRegistry

etl.schema.registry.url=http://10.0.14.25:2876/schema-repo/

When I execute the job I get an Exception indicating the
AvroRestSchemaRegistry class can't be found (I've double checked it's part
of the libjars). I wanted to ask if this is the correct way to set up this
integration, and if anyone has pointers on why the job is not finding the
class AvroRestSchemaRegistry

Thanks in advance for the help!

Max

Follows the complete stack trace:

[CamusJob] - failed to create decoder

com.linkedin.camus.coders.MessageDecoderException: com.linkedin.camus.coders
.MessageDecoderException:java.lang.ClassNotFoundException: com.linkedin.
camus.schemaregistry.AvroRestSchemaRegistry
   at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.
createMessageDecoder(MessageDecoderFactory.java:29)

  at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.createMessageDecoder
(EtlInputFormat.java:391)

   at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.getSplits(
EtlInputFormat.java:256)

   at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:
1107)

   at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1124
)

 at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:178)

   at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:1023)

   at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:976)

   at java.security.AccessController.doPrivileged(Native Method)

   at javax.security.auth.Subject.doAs(Subject.java:415)

   at org.apache.hadoop.security.UserGroupInformation.doAs(
UserGroupInformation.java:1642)

   at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.
java:976)

   at org.apache.hadoop.mapreduce.Job.submit(Job.java:582)

   at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:335)

   at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:563)

   at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)

   at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)

   at com.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:518)

   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodessorImpl.
java:57)

   at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)

   at java.lang.reflect.Method.invoke(Method.


Re: Database Replication Question

2015-03-04 Thread Jonathan Hodges
Thanks James.  This is really helpful.  Another extreme edge case might be
that the single producer is sending the database log changes and the
network causes them to reach Kafka out of order.  How do you prevent
something like this, I guess relying on the scn on the consumer side?


On Wed, Mar 4, 2015 at 5:59 PM, James Cheng  wrote:

> Another thing to think about is delivery guarantees. Exactly once, at
> least once, etc.
>
> If you have a publisher that consumes from the database log and pushes out
> to Kafka, and then the publisher crashes, what happens when it starts back
> up? Depending on how you keep track of the database's transaction
> id/scn/offset, you may end up re-publishing events that you already
> published out to the kafka topic.
>
> I am also working on database replication, namely from MySQL to Kafka. I'm
> using some of the ideas from http://ben.kirw.in/2014/11/28/kafka-patterns/
> in order to get exactly once processing, so that I don't have any
> duplicates in my kafka stream.
>
> Specifically, I have the publisher write messages to a single topic (I
> think/hope that Kafka's throughput is high enough). I include MySQL's
> binary log coordinates into my output messages. Upon startup, I read back
> the "end" of my topic to find out what messages I published. This gives me
> 2 pieces of information:
> 1) The MySQL binary log coordinates, so I know where to start again.
> 2) The messages that I last published, to make sure that I don't
> re-publish them.
>
> That does mean that all data from all tables is in a single topic. I will
> probably have a consumer that will read that "all tables" topic, and split
> the data out into separate topics, for consumers who just want a subset of
> the data.
>
> -James
>
> On Mar 4, 2015, at 9:28 AM, Jonathan Hodges  wrote:
>
> > Yes you are right on the oplog per partition as well as that mapping well
> > to the Kafka partitions.  I think we are making this harder than it is
> > based on previous attempts and trying to leverage something like Databus
> > for propagating log changes from MongoDB and Cassandra since it requires
> a
> > scn.  Sounds like direct Kafka makes more sense for these use cases.
> > Thanks again!
> >
> >
> > On Wed, Mar 4, 2015 at 8:56 AM, Jay Kreps  wrote:
> >
> >> Hey Josh,
> >>
> >> NoSQL DBs may actually be easier because they themselves generally don't
> >> have a global order. I.e. I believe Mongo has a per-partition oplog, is
> >> that right? Their partitions would match our partitions.
> >>
> >> -Jay
> >>
> >> On Wed, Mar 4, 2015 at 5:18 AM, Josh Rader  wrote:
> >>
> >>> Thanks everyone for your responses!  These are great.  It seems our
> cases
> >>> matches closest to Jay's recommendations.
> >>>
> >>> The one part that sounds a little tricky is point #5 'Include in each
> >>> message the database's transaction id, scn, or other identifier '.
> This
> >> is
> >>> pretty straightforward with the RDBMS case that I mentioned, but I
> could
> >>> see wanting to extend this to replicate NoSQL stores (Cassandra, Mongo)
> >>> which might not always have a readily available monotonic id,
> >> particularly
> >>> in failover scenarios.  I guess in that case we can think about
> creating
> >>> this id ourselves from the single producer.
> >>>
> >>> Xiao,
> >>>
> >>> I think in the Kafka failover cases you mention if we also store the
> >> offset
> >>> with replicated data we should be able to pick up where we left off
> since
> >>> we are using the low level consumer.  Maybe I am missing your point
> >>> though...
> >>>
> >>> Guozhang,
> >>>
> >>> Very good point that we didn't think of.  We will need to think this
> >>> through, as you say avoid resending other messages in a batch if one is
> >>> failed.  I wonder if we might also manage this on the consumer side too
> >>> with idempotency.  Thanks for raising this!
> >>>
> >>> Josh
> >>>
> >>>
> >>>
> >>> On Tue, Mar 3, 2015 at 6:08 PM, Xiao  wrote:
> >>>
>  Hey Josh,
> 
>  Sorry, after reading codes, Kafka did fsync the data using a separate
>  thread. The recovery point (oldest transaction timestamp) can be got
> >> from
>  the file recovery-point-offset-checkpoint.
> 
>  You can adjust the value config.logFlushOffsetCheckpointIntervalMs, if
> >>> you
>  think the speed is not quick enough. When the workloads is huge, the
>  bottleneck could be in your target side or source side. That means,
> >> your
>  apply could have enough jobs to do.
> 
>  Basically, you need to keep reading this file for determining the
> >> oldest
>  timestamps of all relevant partitions. Then, apply the transactions
> >> until
>  that timestamp.
> 
>  Note, this does not protect the transaction consistency. This is just
> >> for
>  ensuring the data at the target side is consistent at one timestamp
> >> when
>  you have multiple channel to send data changes. The implementation
> >> should
>  be simple if you can understand 

Re: Trying to get kafka data to Hadoop

2015-03-04 Thread Lakshmanan Muthuraman
I think the libjars is not required. Maven package command for the camus
project, builds the uber jar(fat jar) which contains all the dependencies
in it. I generally run camus the following way.

hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar
com.linkedin.camus.etl.kafka.CamusJob -P camus.properties

On Wed, Mar 4, 2015 at 2:16 PM, Jagat Singh  wrote:

> Also see the related tool
>
> http://confluent.io/downloads/
>
> Confluent is bringing the glue together for Kafta , Avro , Camus
>
> Though there is no clarity around support (e.g update of Kafta) around it
> at this moment.
>
>
>
> On Thu, Mar 5, 2015 at 8:57 AM, Joel Koshy  wrote:
>
> > I think the camus mailing list would be more suitable for this
> > question.
> >
> > Thanks,
> >
> > Joel
> >
> > On Wed, Mar 04, 2015 at 11:00:51AM -0500, max square wrote:
> > > Hi all,
> > >
> > > I have browsed through different conversations around Camus, and bring
> > this
> > > as a kinda Kafka question. I know is not the most orthodox, but if
> > someone
> > > has some thoughts I'd appreciate ir.
> > >
> > > That said, I am trying to set up Camus, using a 3 node Kafka cluster
> > > 0.8.2.1, using a project that is trying to build Avro Schema-Repo
> > > . All of the Avro schemas
> > for
> > > the topics are published correctly. I am building Camus and using:
> > >
> > > hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar
> > com.linkedin.camus.etl.
> > > kafka.CamusJob -libjars $CAMUS_LIBJARS  -D
> mapreduce.job.user.classpath.
> > > first=true -P config.properties
> > >
> > > As the command to start the job, where I have set up an environment
> > > variable that holds all the libjars that the mvn package command
> > generates.
> > >
> > > I have also set the following properties to configure the job:
> > > camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.
> > > LatestSchemaKafkaAvroMessageDecoder
> > >
> >
> kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.
> > > AvroRestSchemaRegistry
> > >
> > > etl.schema.registry.url=http://10.0.14.25:2876/schema-repo/
> > >
> > > When I execute the job I get an Exception indicating the
> > > AvroRestSchemaRegistry class can't be found (I've double checked it's
> > part
> > > of the libjars). I wanted to ask if this is the correct way to set up
> > this
> > > integration, and if anyone has pointers on why the job is not finding
> the
> > > class AvroRestSchemaRegistry
> > >
> > > Thanks in advance for the help!
> > >
> > > Max
> > >
> > > Follows the complete stack trace:
> > >
> > > [CamusJob] - failed to create decoder
> > >
> > > com.linkedin.camus.coders.MessageDecoderException:
> > com.linkedin.camus.coders
> > > .MessageDecoderException:java.lang.ClassNotFoundException:
> com.linkedin.
> > > camus.schemaregistry.AvroRestSchemaRegistry
> > >at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.
> > > createMessageDecoder(MessageDecoderFactory.java:29)
> > >
> > >   at
> > com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.createMessageDecoder
> > > (EtlInputFormat.java:391)
> > >
> > >at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.getSplits(
> > > EtlInputFormat.java:256)
> > >
> > >at
> > org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:
> > > 1107)
> > >
> > >at
> > org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1124
> > > )
> > >
> > >  at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:178)
> > >
> > >at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:1023)
> > >
> > >at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:976)
> > >
> > >at java.security.AccessController.doPrivileged(Native Method)
> > >
> > >at javax.security.auth.Subject.doAs(Subject.java:415)
> > >
> > >at org.apache.hadoop.security.UserGroupInformation.doAs(
> > > UserGroupInformation.java:1642)
> > >
> > >at
> org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.
> > > java:976)
> > >
> > >at org.apache.hadoop.mapreduce.Job.submit(Job.java:582)
> > >
> > >at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:335)
> > >
> > >at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:563)
> > >
> > >at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
> > >
> > >at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
> > >
> > >at com.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:518)
> > >
> > >at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > >
> > >at
> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodessorImpl.
> > > java:57)
> > >
> > >at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > > DelegatingMethodAccessorImpl.java:43)
> > >
> > >at java.lang.reflect.Method.invoke(Method.
> >
> >
>


Re: Database Replication Question

2015-03-04 Thread James Cheng
Another thing to think about is delivery guarantees. Exactly once, at least 
once, etc.

If you have a publisher that consumes from the database log and pushes out to 
Kafka, and then the publisher crashes, what happens when it starts back up? 
Depending on how you keep track of the database's transaction id/scn/offset, 
you may end up re-publishing events that you already published out to the kafka 
topic.

I am also working on database replication, namely from MySQL to Kafka. I'm 
using some of the ideas from http://ben.kirw.in/2014/11/28/kafka-patterns/ in 
order to get exactly once processing, so that I don't have any duplicates in my 
kafka stream.

Specifically, I have the publisher write messages to a single topic (I 
think/hope that Kafka's throughput is high enough). I include MySQL's binary 
log coordinates into my output messages. Upon startup, I read back the "end" of 
my topic to find out what messages I published. This gives me 2 pieces of 
information:
1) The MySQL binary log coordinates, so I know where to start again.
2) The messages that I last published, to make sure that I don't re-publish 
them.

That does mean that all data from all tables is in a single topic. I will 
probably have a consumer that will read that "all tables" topic, and split the 
data out into separate topics, for consumers who just want a subset of the data.

-James

On Mar 4, 2015, at 9:28 AM, Jonathan Hodges  wrote:

> Yes you are right on the oplog per partition as well as that mapping well
> to the Kafka partitions.  I think we are making this harder than it is
> based on previous attempts and trying to leverage something like Databus
> for propagating log changes from MongoDB and Cassandra since it requires a
> scn.  Sounds like direct Kafka makes more sense for these use cases.
> Thanks again!
> 
> 
> On Wed, Mar 4, 2015 at 8:56 AM, Jay Kreps  wrote:
> 
>> Hey Josh,
>> 
>> NoSQL DBs may actually be easier because they themselves generally don't
>> have a global order. I.e. I believe Mongo has a per-partition oplog, is
>> that right? Their partitions would match our partitions.
>> 
>> -Jay
>> 
>> On Wed, Mar 4, 2015 at 5:18 AM, Josh Rader  wrote:
>> 
>>> Thanks everyone for your responses!  These are great.  It seems our cases
>>> matches closest to Jay's recommendations.
>>> 
>>> The one part that sounds a little tricky is point #5 'Include in each
>>> message the database's transaction id, scn, or other identifier '.  This
>> is
>>> pretty straightforward with the RDBMS case that I mentioned, but I could
>>> see wanting to extend this to replicate NoSQL stores (Cassandra, Mongo)
>>> which might not always have a readily available monotonic id,
>> particularly
>>> in failover scenarios.  I guess in that case we can think about creating
>>> this id ourselves from the single producer.
>>> 
>>> Xiao,
>>> 
>>> I think in the Kafka failover cases you mention if we also store the
>> offset
>>> with replicated data we should be able to pick up where we left off since
>>> we are using the low level consumer.  Maybe I am missing your point
>>> though...
>>> 
>>> Guozhang,
>>> 
>>> Very good point that we didn't think of.  We will need to think this
>>> through, as you say avoid resending other messages in a batch if one is
>>> failed.  I wonder if we might also manage this on the consumer side too
>>> with idempotency.  Thanks for raising this!
>>> 
>>> Josh
>>> 
>>> 
>>> 
>>> On Tue, Mar 3, 2015 at 6:08 PM, Xiao  wrote:
>>> 
 Hey Josh,
 
 Sorry, after reading codes, Kafka did fsync the data using a separate
 thread. The recovery point (oldest transaction timestamp) can be got
>> from
 the file recovery-point-offset-checkpoint.
 
 You can adjust the value config.logFlushOffsetCheckpointIntervalMs, if
>>> you
 think the speed is not quick enough. When the workloads is huge, the
 bottleneck could be in your target side or source side. That means,
>> your
 apply could have enough jobs to do.
 
 Basically, you need to keep reading this file for determining the
>> oldest
 timestamps of all relevant partitions. Then, apply the transactions
>> until
 that timestamp.
 
 Note, this does not protect the transaction consistency. This is just
>> for
 ensuring the data at the target side is consistent at one timestamp
>> when
 you have multiple channel to send data changes. The implementation
>> should
 be simple if you can understand the concepts. I am unable to find the
>>> filed
 patent application about it. This is one related paper. It covers the
>>> main
 concepts about the issues you are facing. "Inter-Data-Center
>> Large-Scale
 Database Replication Optimization - A Workload Driven Partitioning
>>> Approach"
 
 Hopefully, you understood what I explained above.
 
 Best wishes,
 
 Xiao Li
 
 Best wishes,
 
 Xiao Li
 
 On Mar 3, 2015, at 4:23 PM, Xiao  wrote:
 
> Hey Josh,
> 
>

Re: Topicmetadata response miss some partitions information sometimes

2015-03-04 Thread Mayuresh Gharat
Cool. So then this is a non issue then. To make things better we can expose
the availablePartitons() api through Kafka producer. What do you think?

Thanks,

Mayuresh

On Tue, Mar 3, 2015 at 4:56 PM, Guozhang Wang  wrote:

> Hey Jun,
>
> You are right. Previously I thought only in your recent patches you add the
> partitionWithAvailableLeaders that this gets exposed, however it is the
> opposite case.
>
> Guozhang
>
> On Tue, Mar 3, 2015 at 4:40 PM, Jun Rao  wrote:
>
> > Guozhang,
> >
> > Actually, we always return all partitions in the metadata response
> whether
> > the leaders are available or not.
> >
> > Thanks,
> >
> > Jun
> >
> > On Sat, Feb 28, 2015 at 10:46 PM, Guozhang Wang 
> > wrote:
> >
> > > Hi Honghai,
> > >
> > > 1. If a partition has no leader (i.e. all of its replicas are down) it
> > will
> > > become offline, and hence the metadata response will not have this
> > > partition's info.
> > >
> > > 2. Any of the brokers cache metadata and hence can handle the metadata
> > > request. It's just that their cache are updated asynchronously and
> hence
> > > when there is a update to the metadata, some brokers may got the new
> > > metadata value a bit eariler than others.
> > >
> > > On Thu, Feb 26, 2015 at 7:21 PM, ChenHongHai <
> > waldenchenka...@outlook.com>
> > > wrote:
> > >
> > > > We have one topic with 4 partitions, but sometimes only get metadata
> > of 2
> > > > partitions, did anyone meet this kind of situation before?If some
> > > partition
> > > > has no leader at that moment, will it cause this problem? How to
> > make
> > > > some partition has no leader?If 6 brokers has some partitions of the
> > > topic,
> > > > will they return same result?  Do I need try all of them and merge
> the
> > > > result?
> > > >
> > > >  SimpleConsumer consumer = consumerPool.getConsumer(seed.host,
> > > > seed.port, connectionTimeOut, consumerBufferSize,
> > > "refreshPartitionMeta");
> > > >   List topics = new ArrayList() {{
> > > >   add(topic);}};
> > TopicMetadataResponse
> > > > resp = consumer.send(new TopicMetadataRequest(topics));
> > > > List metaData = resp.topicsMetadata();
> > > > for (TopicMetadata item : metaData) {
> > > > if(item.errorCode() != kafka.common.ErrorMapping.NoError())
> > > > LOG.error(String.format("Something wrong with topic metadata
> > for
> > > > topic: %s error code: %d ", item.topic(), item.errorCode() ));
> > > >   for (PartitionMetadata part : item.partitionsMetadata()) {
> > > > partitionMeta.put(part.partitionId(), part);
> > > >  }}
> > > >
> > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: high level consumer rollback

2015-03-04 Thread Luiz Geovani Vier
Thanks, Mayuresh and Joel. Reconnecting works just fine, although it's
much more complex than just calling rollback(), so I'm looking forward
to the new version :)

-Geovani


On Wed, Mar 4, 2015 at 4:57 PM, Joel Koshy  wrote:
> This is not possible with the current high-level consumer without a
> restart, but the new consumer (under development) does have support
> for this.
>
> On Wed, Mar 04, 2015 at 03:04:57PM -0500, Luiz Geovani Vier wrote:
>> Hello,
>>
>> I'm using the high level consumer with auto-commit disabled and a
>> single thread per consumer, in order to consume messages in batches.
>> In case of failures on the database, I'd like to stop processing,
>> rollback and restart from the last commited offset.
>> Is there a way to receive the messages since the last commit again,
>> without reconnecting? (something like a reset on KafkaStream or
>> ConsumerIterator)
>>
>> Thanks,
>> -Geovani
>


Re: Camus reads from multiple offsets in parallel?

2015-03-04 Thread Yang
Thanks for that info Jun.

On Tue, Mar 3, 2015 at 3:56 PM, Jun Rao  wrote:

> Camus only fetches from different partitions in parallel.
>
> Thanks,
>
> Jun
>
> On Fri, Feb 27, 2015 at 4:24 PM, Yang  wrote:
>
> > we have a single partition, and the topic contains 300k events.
> >
> > we fired off a camus job, it finished within 1 minute. this is rather
> fast.
> >
> > I was guess that the multiple mappers must be reading from multiple
> offsets
> > in parallel, right?
> >
> > otherwise if they are reading in serial (like in a consumer group, which
> a
> > previous thead has clarified to be not the case, i.e. camus does not use
> > group api), it would be very slow.
> >
> >
> > Thanks
> > Yang
> >
>


Re: Trying to get kafka data to Hadoop

2015-03-04 Thread Jagat Singh
Also see the related tool

http://confluent.io/downloads/

Confluent is bringing the glue together for Kafta , Avro , Camus

Though there is no clarity around support (e.g update of Kafta) around it
at this moment.



On Thu, Mar 5, 2015 at 8:57 AM, Joel Koshy  wrote:

> I think the camus mailing list would be more suitable for this
> question.
>
> Thanks,
>
> Joel
>
> On Wed, Mar 04, 2015 at 11:00:51AM -0500, max square wrote:
> > Hi all,
> >
> > I have browsed through different conversations around Camus, and bring
> this
> > as a kinda Kafka question. I know is not the most orthodox, but if
> someone
> > has some thoughts I'd appreciate ir.
> >
> > That said, I am trying to set up Camus, using a 3 node Kafka cluster
> > 0.8.2.1, using a project that is trying to build Avro Schema-Repo
> > . All of the Avro schemas
> for
> > the topics are published correctly. I am building Camus and using:
> >
> > hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar
> com.linkedin.camus.etl.
> > kafka.CamusJob -libjars $CAMUS_LIBJARS  -D mapreduce.job.user.classpath.
> > first=true -P config.properties
> >
> > As the command to start the job, where I have set up an environment
> > variable that holds all the libjars that the mvn package command
> generates.
> >
> > I have also set the following properties to configure the job:
> > camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.
> > LatestSchemaKafkaAvroMessageDecoder
> >
> kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.
> > AvroRestSchemaRegistry
> >
> > etl.schema.registry.url=http://10.0.14.25:2876/schema-repo/
> >
> > When I execute the job I get an Exception indicating the
> > AvroRestSchemaRegistry class can't be found (I've double checked it's
> part
> > of the libjars). I wanted to ask if this is the correct way to set up
> this
> > integration, and if anyone has pointers on why the job is not finding the
> > class AvroRestSchemaRegistry
> >
> > Thanks in advance for the help!
> >
> > Max
> >
> > Follows the complete stack trace:
> >
> > [CamusJob] - failed to create decoder
> >
> > com.linkedin.camus.coders.MessageDecoderException:
> com.linkedin.camus.coders
> > .MessageDecoderException:java.lang.ClassNotFoundException: com.linkedin.
> > camus.schemaregistry.AvroRestSchemaRegistry
> >at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.
> > createMessageDecoder(MessageDecoderFactory.java:29)
> >
> >   at
> com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.createMessageDecoder
> > (EtlInputFormat.java:391)
> >
> >at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.getSplits(
> > EtlInputFormat.java:256)
> >
> >at
> org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:
> > 1107)
> >
> >at
> org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1124
> > )
> >
> >  at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:178)
> >
> >at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:1023)
> >
> >at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:976)
> >
> >at java.security.AccessController.doPrivileged(Native Method)
> >
> >at javax.security.auth.Subject.doAs(Subject.java:415)
> >
> >at org.apache.hadoop.security.UserGroupInformation.doAs(
> > UserGroupInformation.java:1642)
> >
> >at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.
> > java:976)
> >
> >at org.apache.hadoop.mapreduce.Job.submit(Job.java:582)
> >
> >at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:335)
> >
> >at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:563)
> >
> >at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
> >
> >at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
> >
> >at com.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:518)
> >
> >at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >
> >at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodessorImpl.
> > java:57)
> >
> >at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > DelegatingMethodAccessorImpl.java:43)
> >
> >at java.lang.reflect.Method.invoke(Method.
>
>


Re: moving replications

2015-03-04 Thread Joel Koshy
I think what you may be looking for is being discussed here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-6+-+New+reassignment+partition+logic+for+rebalancing

On Wed, Mar 04, 2015 at 12:34:30PM +0530, sunil kalva wrote:
> Is there any way to automate
> On Mar 3, 2015 11:57 AM, "sunil kalva"  wrote:
> 
> > Why can't kafka automatically rebalances partitions with new broker and
> > adjust with existing brokers ?
> > Why should we run manually ?
> >
> > On Tue, Mar 3, 2015 at 6:41 AM, Gwen Shapira 
> > wrote:
> >
> >> I think the ReassignPartitionsTool does what you need, at least partially.
> >>
> >> It will move partitions of given topics to a new set of brokers - this
> >> includes replicas and leaders from what I can tell.
> >>
> >> Here's the documentation of one of the options:
> >>
> >> topics-to-move-json-file: Generate a reassignment configuration to move
> >> the partitions of the specified topics to the list of brokers specified by
> >> the --broker-list option. The format to use is...
> >>
> >> So it sounds like what you looked for?
> >>
> >> Since the tool runs in "dry run" mode, you can try and see without
> >> actually moving anything.
> >>
> >> Gwen
> >>
> >>
> >>
> >> On Mar 2, 2015 10:05 AM, "sunil kalva"  wrote:
> >>
> >>> Shapira
> >>>
> >>> tx for quick reply, but this tool elects a new leader for a given
> >>> partition from the existing replicas of that partition.
> >>> But my problem is basically move one replica completely from old broker
> >>> to new broker and eventually move leader also to new broker (with out
> >>> incrementing replica count for that partition)
> >>> Please let me know if more information required.
> >>>
> >>> t
> >>> SunilKalva
> >>>
> >>> On Mon, Mar 2, 2015 at 10:43 PM, Gwen Shapira 
> >>> wrote:
> >>>
>  Take a look at the Reassign Partition Tool. It lets you specify which
>  replica exists on which broker:
> 
>  https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool
> 
>  Its a bit tricky to use, so feel free to follow up with more questions
>  :)
> 
>  Gwen
> 
>  On Mon, Mar 2, 2015 at 7:59 AM, sunil kalva 
>  wrote:
>  > Hi
>  > How to move replications from one broker to another broker ?
>  >
>  > --
>  > SunilKalva
> 
> >>>
> >>>
> >>>
> >>> --
> >>> SunilKalva
> >>>
> >>
> >
> >
> > --
> > SunilKalva
> >



Re: high level consumer rollback

2015-03-04 Thread Joel Koshy
This is not possible with the current high-level consumer without a
restart, but the new consumer (under development) does have support
for this.

On Wed, Mar 04, 2015 at 03:04:57PM -0500, Luiz Geovani Vier wrote:
> Hello,
> 
> I'm using the high level consumer with auto-commit disabled and a
> single thread per consumer, in order to consume messages in batches.
> In case of failures on the database, I'd like to stop processing,
> rollback and restart from the last commited offset.
> Is there a way to receive the messages since the last commit again,
> without reconnecting? (something like a reset on KafkaStream or
> ConsumerIterator)
> 
> Thanks,
> -Geovani



Re: Trying to get kafka data to Hadoop

2015-03-04 Thread Joel Koshy
I think the camus mailing list would be more suitable for this
question.

Thanks,

Joel

On Wed, Mar 04, 2015 at 11:00:51AM -0500, max square wrote:
> Hi all,
> 
> I have browsed through different conversations around Camus, and bring this
> as a kinda Kafka question. I know is not the most orthodox, but if someone
> has some thoughts I'd appreciate ir.
> 
> That said, I am trying to set up Camus, using a 3 node Kafka cluster
> 0.8.2.1, using a project that is trying to build Avro Schema-Repo
> . All of the Avro schemas for
> the topics are published correctly. I am building Camus and using:
> 
> hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.
> kafka.CamusJob -libjars $CAMUS_LIBJARS  -D mapreduce.job.user.classpath.
> first=true -P config.properties
> 
> As the command to start the job, where I have set up an environment
> variable that holds all the libjars that the mvn package command generates.
> 
> I have also set the following properties to configure the job:
> camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.
> LatestSchemaKafkaAvroMessageDecoder
> kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.
> AvroRestSchemaRegistry
> 
> etl.schema.registry.url=http://10.0.14.25:2876/schema-repo/
> 
> When I execute the job I get an Exception indicating the
> AvroRestSchemaRegistry class can't be found (I've double checked it's part
> of the libjars). I wanted to ask if this is the correct way to set up this
> integration, and if anyone has pointers on why the job is not finding the
> class AvroRestSchemaRegistry
> 
> Thanks in advance for the help!
> 
> Max
> 
> Follows the complete stack trace:
> 
> [CamusJob] - failed to create decoder
> 
> com.linkedin.camus.coders.MessageDecoderException: com.linkedin.camus.coders
> .MessageDecoderException:java.lang.ClassNotFoundException: com.linkedin.
> camus.schemaregistry.AvroRestSchemaRegistry
>at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.
> createMessageDecoder(MessageDecoderFactory.java:29)
> 
>   at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.createMessageDecoder
> (EtlInputFormat.java:391)
> 
>at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.getSplits(
> EtlInputFormat.java:256)
> 
>at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:
> 1107)
> 
>at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1124
> )
> 
>  at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:178)
> 
>at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:1023)
> 
>at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:976)
> 
>at java.security.AccessController.doPrivileged(Native Method)
> 
>at javax.security.auth.Subject.doAs(Subject.java:415)
> 
>at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1642)
> 
>at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.
> java:976)
> 
>at org.apache.hadoop.mapreduce.Job.submit(Job.java:582)
> 
>at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:335)
> 
>at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:563)
> 
>at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
> 
>at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
> 
>at com.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:518)
> 
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
>at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodessorImpl.
> java:57)
> 
>at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> 
>at java.lang.reflect.Method.invoke(Method.



Re: Camus Issue about Output File EOF Issue

2015-03-04 Thread Bhavesh Mistry
Hi Gwen,

The root cause of all io related problems seems to be file rename that
Camus does and underlying Hadoop MapR FS.

We are copying files from user volume to a day volume (rename does copy)
when mapper commits file to FS.  Please refer to
http://answers.mapr.com/questions/162562/volume-issues-end-of-file-javaioioexception-file-i.html
for
more info.

We will have to patch Camus to copy to tmp directory then move to final
destination as work around for now to make rename or file rename a more
reliable.


Thanks,

Bhavesh

On Monday, March 2, 2015, Bhavesh Mistry  wrote:

>
> I suspect Camus job has issue because other process  ( another separate
> Map/Reduce Job) also write to same "time" (folders) bucket and it does not
> have this issue at all (so far) when reading from other dependent Hive
> job.  This dependent Hive job only have issue with files created via camus
> job ( not always but intermittently and hive job fails with read error
> about EOF and work around for now is to remove these unclosed file from the
> folder and hive job succeeds  ).
>
>
>
> Thanks,
>
> Bhavesh
>
> On Mon, Mar 2, 2015 at 5:27 PM, Gwen Shapira  > wrote:
>
>> Actually, the error you sent shows that its trying to read a TEXT file
>> as if it was Seq. Thats why I suspected a misconfiguration of some
>> sort.
>>
>> Why do you suspect a race condition?
>>
>> On Mon, Mar 2, 2015 at 5:19 PM, Bhavesh Mistry
>> > > wrote:
>> > Hi Gwen,
>> >
>> > We are using MapR (Sorry no Cloudera) distribution.
>> >
>> >
>> > I am suspecting it is code issue.  I am in-processor review the code
>> about
>> > MultiOutputFormat class.
>> >
>> >
>> https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputFormat.java#L67https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/mapred/EtlMultiOutputRecordWriter.java#L35
>> >
>> > I am suspecting that due to some concurrency, it is replacing older
>> writer
>> > with new one (old writer does not close).   The file it crates is
>> usually
>> > small,and has very small content for problematic files (EOF file).
>> >
>> >
>> https://github.com/linkedin/camus/blob/master/camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common/SequenceFileRecordWriterProvider.java#L91
>> >
>> >
>> > Based on above code, Do you think there is likelihood that output file
>> may
>> > be unclosed file ?  Also, my plan is to add isClose() api to each
>> writer,
>> > and if you have time, you can quickly review them (suggest or your
>> > feedback) about unclosed files.  By the way, we are on Hadoop 1.0.3 API
>> (
>> > so I was thinking about
>> >
>> http://tool.oschina.net/uploads/apidocs/hadoop/org/apache/hadoop/mapred/MapReduceBase.html#close()
>> > and make sure within the close we close all the File Writers.. let me
>> know
>> > if this is good or not do final clean-up).
>> >
>> >
>> > public interface RecordWriterWithCloseStatus extends
>> RecordWriter> > V>{
>> >
>> > /**
>> >
>> >  * Give Ability to check if close has been called on the writer or File
>> has
>> > been closed on not..
>> >
>> >  * @return
>> >
>> >  */
>> >
>> > public boolean isClose();
>> >
>> > }
>> >
>> > And each of the writer will have ability check for clean at all the
>> time:
>> >
>> > eg:
>> >
>> > {code}
>> >
>> >   return new RecordWriterWithStatus() {
>> >
>> > private volatile boolean close;
>> >
>> >
>> > @Override
>> >
>> > public void write(IEtlKey key, CamusWrapper data) throws
>> > IOException, InterruptedException {
>> >
>> >
>> >
>> >
>> > /**
>> >
>> >  * What if file is closed ?  Should we create a new one
>> here..?
>> >
>> >  */
>> >
>> >
>> >
>> > // Use the timestamp from the EtlKey as the key for this
>> > record.
>> >
>> > // TODO: Is there a better key to use here?
>> >
>> > writer.append(new LongWritable(key.getTime()), new Text(
>> > record));
>> >
>> > }
>> >
>> >
>> > @Override
>> >
>> > public void close(TaskAttemptContext context) throws
>> > IOException, InterruptedException {
>> >
>> > writer.close();
>> >
>> > close = true;
>> >
>> > }
>> >
>> >
>> >
>> > protected void finalize() throws Throwable {
>> >
>> > if(this.close){
>> >
>> >  log.error("This file was not closed so try to close during
>> the
>> > JVM finalize..");
>> >
>> >  try{
>> >
>> >  writer.close();
>> >
>> >  }catch(Throwable th){
>> >
>> >  log.error("File Close erorr during finalize()");
>> >
>> >  }
>> >
>> > }
>> >
>> > super.finalize();
>> >
>> > }
>> >
>> > @Override
>> >
>> > public boolean isClose() {
>> >
>> >   return close;
>> >
>> > }
>> >
>> > @

Re: high level consumer rollback

2015-03-04 Thread Mayuresh Gharat
As per my knowledge, I don't think we you can do that with an online
stream. You will have to reset the offsets to a particular offset in the
past to start consuming from that. Another way would be start a separate
consumer with different groupId.

In any case you cannot consume from past offset without bouncing the
consumer I think. Once you bounce the consumer it will start consuming from
the last committed offset.

Thanks,

Mayuresh

On Wed, Mar 4, 2015 at 12:04 PM, Luiz Geovani Vier  wrote:

> Hello,
>
> I'm using the high level consumer with auto-commit disabled and a
> single thread per consumer, in order to consume messages in batches.
> In case of failures on the database, I'd like to stop processing,
> rollback and restart from the last commited offset.
> Is there a way to receive the messages since the last commit again,
> without reconnecting? (something like a reset on KafkaStream or
> ConsumerIterator)
>
> Thanks,
> -Geovani
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


high level consumer rollback

2015-03-04 Thread Luiz Geovani Vier
Hello,

I'm using the high level consumer with auto-commit disabled and a
single thread per consumer, in order to consume messages in batches.
In case of failures on the database, I'd like to stop processing,
rollback and restart from the last commited offset.
Is there a way to receive the messages since the last commit again,
without reconnecting? (something like a reset on KafkaStream or
ConsumerIterator)

Thanks,
-Geovani


Re: New Errors in 0.8.2 Protocol

2015-03-04 Thread Evan Huus
Thanks Joe, keeping documentation in sync with KIPs does seem like a
reasonable process going forward. And I apologize for the confrontational
tone I used to end my original email, that was not called for.

In the mean time, where can I find the answers to my two actual questions?
I think I've figured out InvalidTopicException based on reading some of the
Java code, but I'm not particularly confident in my interpretation.

Thanks,
Evan

On Wed, Mar 4, 2015 at 1:45 PM, Joe Stein  wrote:

> Hey Evan, moving forward (so 0.8.3.0 and beyond) the release documentation
> is going to match up more with specific KIP changes
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> which elaborated on things like "breaking changes" and "major modifications
> you should adopt before breaking changes happen", "etc". This will be
> helpful not only to know what is changing but a place for discussions prior
> to those changes to happen in a better forum for everyone.
>
> One of the issues we have now (agreed) is that flattened wire protocol
> document doesn't provide the information everyone needs fluidly enough.
>
> ~ Joe Stein
> - - - - - - - - - - - - - - - - -
>
>   http://www.stealth.ly
> - - - - - - - - - - - - - - - - -
>
> On Wed, Mar 4, 2015 at 12:44 PM, Evan Huus  wrote:
>
> > Hey all, it seems that 0.8.2 has added a handful more errors to the
> > protocol which are not yet reflected on the wiki page [1]. Specifically,
> > [2] seems to indicate that codes 17-20 now have associated meanings.
> >
> > My questions are:
> > - Which of these are exposed "publicly"? (for example, the existing error
> > 13 is only ever internal to the brokers, so is irrelevant for third-party
> > clients to know about, are any of the new ones like that?)
> > - When (if ever) is InvalidTopicException returned, and what is it for
> that
> > UnknownTopicOrPartition couldn't be used?
> >
> > I would also note that this is the *second* issue I've come across where
> > the protocol specification is not up-to-date with the protocol actually
> > used by the 0.8.2 brokers. That specification is what I, as developer of
> > third-party language bindings, rely on in order to be compatible with
> Kafka
> > proper. If you want a healthy community of third-party bindings and
> > clients, you *have* to do a better job of keeping that documentation up
> to
> > date, this is getting really frustrating. Fortunately none of these
> issues
> > have caused data loss for us. Yet.
> >
> > Thanks,
> > Evan
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > (currently down for maintenance apparently)
> > [2]
> >
> >
> https://github.com/apache/kafka/blob/ee1267b127f3081db491fa1bf9a287084c324e36/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java#L46-L49
> >
>


NodeJS Consumer library for 0.8.2

2015-03-04 Thread Julio Castillo
Looking around the nom repo, it looks like there is no current support for
0.8.2.

Is the only alternative to use REST/Proxy?

Thanks

Julio Castillo


NOTICE: This e-mail and any attachments to it may be privileged, confidential 
or contain trade secret information and is intended only for the use of the 
individual or entity to which it is addressed. If this e-mail was sent to you 
in error, please notify me immediately by either reply e-mail or by phone at 
408.498.6000, and do not use, disseminate, retain, print or copy the e-mail or 
any attachment. All messages sent to and from this e-mail address may be 
monitored as permitted by or necessary under applicable law and regulations.


Re: Problem deleting topics in 0.8.2?

2015-03-04 Thread Jeff Schroeder
Timothy and Harsha,

Conveniently, a coworker figured this out almost immediately after I sent
this email.

I was passing the zookeeper bits as:

--zookeeper 'host1:2181,host2:2181,host3:2181/path/to/zk/chroot'

When the actual correct thing to do was:

--zookeeper 'host1:2181/path/to/zk/chroot'

It appears that the topics are removed in ~30 seconds or less, which is
more inline with what I'd expect.

Timothy, would you mind sharing a "production log4j.properties that you
use"? This is a bit more complicated because I'm running kafka under some
hand rolled docker containers running on mesos via marathon, but it works
quite well.

Thanks for the quick response folks!

On Wed, Mar 4, 2015 at 12:35 PM, Timothy Chen  wrote:

> Hi Jeff,
>
> The controller should have a Topic deletion thread running
> coordinating the delete in the cluster, and the progress should be
> logged to the controller log.
>
> Can you look at the controller log to see what's going on?
>
> Tim
>
> On Wed, Mar 4, 2015 at 10:28 AM, Jeff Schroeder
>  wrote:
> > So I've got 3 kafka brokers that were started with delete.topic.enable
> set
> > to true. When they start, I can see in the logs that the property was
> > successfully set. The dataset in each broker is only approximately 2G
> (per
> > du). When running kafaka-delete.sh with the correct arguments to delete
> all
> > of the topics, it says that the topic is marked for deletion. When
> running
> > again, it says that the topic is already marked for deletion.
> >
> > From reading the documentation, my understanding is that one of the 10
> > (default) background threads would eventually process the deletes, and
> > clean up both the topics in zookeeper, and the actual data on disk. In
> > reality, it didnt seem to delete the data on disk or remove anything in
> > zookeeper.
> >
> > What is the correct way to remove a topic in kafka 0.8.2 and what is the
> > expected timeframe for that to complete expected to be? My "solution" was
> > stopping the brokers and rm -rf /var/lib/kafka/*, but that is clearly a
> > very poor one once we are done testing our kafka + storm setup.
> >
> > --
> > Jeff Schroeder
> >
> > Don't drink and derive, alcohol and analysis don't mix.
> > http://www.digitalprognosis.com
>



-- 
Jeff Schroeder

Don't drink and derive, alcohol and analysis don't mix.
http://www.digitalprognosis.com


Re: New Errors in 0.8.2 Protocol

2015-03-04 Thread Joe Stein
Hey Evan, moving forward (so 0.8.3.0 and beyond) the release documentation
is going to match up more with specific KIP changes
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
which elaborated on things like "breaking changes" and "major modifications
you should adopt before breaking changes happen", "etc". This will be
helpful not only to know what is changing but a place for discussions prior
to those changes to happen in a better forum for everyone.

One of the issues we have now (agreed) is that flattened wire protocol
document doesn't provide the information everyone needs fluidly enough.

~ Joe Stein
- - - - - - - - - - - - - - - - -

  http://www.stealth.ly
- - - - - - - - - - - - - - - - -

On Wed, Mar 4, 2015 at 12:44 PM, Evan Huus  wrote:

> Hey all, it seems that 0.8.2 has added a handful more errors to the
> protocol which are not yet reflected on the wiki page [1]. Specifically,
> [2] seems to indicate that codes 17-20 now have associated meanings.
>
> My questions are:
> - Which of these are exposed "publicly"? (for example, the existing error
> 13 is only ever internal to the brokers, so is irrelevant for third-party
> clients to know about, are any of the new ones like that?)
> - When (if ever) is InvalidTopicException returned, and what is it for that
> UnknownTopicOrPartition couldn't be used?
>
> I would also note that this is the *second* issue I've come across where
> the protocol specification is not up-to-date with the protocol actually
> used by the 0.8.2 brokers. That specification is what I, as developer of
> third-party language bindings, rely on in order to be compatible with Kafka
> proper. If you want a healthy community of third-party bindings and
> clients, you *have* to do a better job of keeping that documentation up to
> date, this is getting really frustrating. Fortunately none of these issues
> have caused data loss for us. Yet.
>
> Thanks,
> Evan
>
> [1]
>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> (currently down for maintenance apparently)
> [2]
>
> https://github.com/apache/kafka/blob/ee1267b127f3081db491fa1bf9a287084c324e36/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java#L46-L49
>


Re: Problem deleting topics in 0.8.2?

2015-03-04 Thread Timothy Chen
Hi Jeff,

The controller should have a Topic deletion thread running
coordinating the delete in the cluster, and the progress should be
logged to the controller log.

Can you look at the controller log to see what's going on?

Tim

On Wed, Mar 4, 2015 at 10:28 AM, Jeff Schroeder
 wrote:
> So I've got 3 kafka brokers that were started with delete.topic.enable set
> to true. When they start, I can see in the logs that the property was
> successfully set. The dataset in each broker is only approximately 2G (per
> du). When running kafaka-delete.sh with the correct arguments to delete all
> of the topics, it says that the topic is marked for deletion. When running
> again, it says that the topic is already marked for deletion.
>
> From reading the documentation, my understanding is that one of the 10
> (default) background threads would eventually process the deletes, and
> clean up both the topics in zookeeper, and the actual data on disk. In
> reality, it didnt seem to delete the data on disk or remove anything in
> zookeeper.
>
> What is the correct way to remove a topic in kafka 0.8.2 and what is the
> expected timeframe for that to complete expected to be? My "solution" was
> stopping the brokers and rm -rf /var/lib/kafka/*, but that is clearly a
> very poor one once we are done testing our kafka + storm setup.
>
> --
> Jeff Schroeder
>
> Don't drink and derive, alcohol and analysis don't mix.
> http://www.digitalprognosis.com


Re: Problem deleting topics in 0.8.2?

2015-03-04 Thread Harsha
Hi Jeff,
 Are you seeing any errors in state-change.log or controller.log
 after issuing kafka-topics.sh --delete command.
There is another known issue is if you have auto.topic.enable.create =
true (this is true by default) your consumer or producer can re-create
the topic. So try stopping any of your consumers or producers run the
delete topic command again.
-Harsha

On Wed, Mar 4, 2015, at 10:28 AM, Jeff Schroeder wrote:
> So I've got 3 kafka brokers that were started with delete.topic.enable
> set
> to true. When they start, I can see in the logs that the property was
> successfully set. The dataset in each broker is only approximately 2G
> (per
> du). When running kafaka-delete.sh with the correct arguments to delete
> all
> of the topics, it says that the topic is marked for deletion. When
> running
> again, it says that the topic is already marked for deletion.
> 
> From reading the documentation, my understanding is that one of the 10
> (default) background threads would eventually process the deletes, and
> clean up both the topics in zookeeper, and the actual data on disk. In
> reality, it didnt seem to delete the data on disk or remove anything in
> zookeeper.
> 
> What is the correct way to remove a topic in kafka 0.8.2 and what is the
> expected timeframe for that to complete expected to be? My "solution" was
> stopping the brokers and rm -rf /var/lib/kafka/*, but that is clearly a
> very poor one once we are done testing our kafka + storm setup.
> 
> -- 
> Jeff Schroeder
> 
> Don't drink and derive, alcohol and analysis don't mix.
> http://www.digitalprognosis.com


Problem deleting topics in 0.8.2?

2015-03-04 Thread Jeff Schroeder
So I've got 3 kafka brokers that were started with delete.topic.enable set
to true. When they start, I can see in the logs that the property was
successfully set. The dataset in each broker is only approximately 2G (per
du). When running kafaka-delete.sh with the correct arguments to delete all
of the topics, it says that the topic is marked for deletion. When running
again, it says that the topic is already marked for deletion.

>From reading the documentation, my understanding is that one of the 10
(default) background threads would eventually process the deletes, and
clean up both the topics in zookeeper, and the actual data on disk. In
reality, it didnt seem to delete the data on disk or remove anything in
zookeeper.

What is the correct way to remove a topic in kafka 0.8.2 and what is the
expected timeframe for that to complete expected to be? My "solution" was
stopping the brokers and rm -rf /var/lib/kafka/*, but that is clearly a
very poor one once we are done testing our kafka + storm setup.

-- 
Jeff Schroeder

Don't drink and derive, alcohol and analysis don't mix.
http://www.digitalprognosis.com


New Errors in 0.8.2 Protocol

2015-03-04 Thread Evan Huus
Hey all, it seems that 0.8.2 has added a handful more errors to the
protocol which are not yet reflected on the wiki page [1]. Specifically,
[2] seems to indicate that codes 17-20 now have associated meanings.

My questions are:
- Which of these are exposed "publicly"? (for example, the existing error
13 is only ever internal to the brokers, so is irrelevant for third-party
clients to know about, are any of the new ones like that?)
- When (if ever) is InvalidTopicException returned, and what is it for that
UnknownTopicOrPartition couldn't be used?

I would also note that this is the *second* issue I've come across where
the protocol specification is not up-to-date with the protocol actually
used by the 0.8.2 brokers. That specification is what I, as developer of
third-party language bindings, rely on in order to be compatible with Kafka
proper. If you want a healthy community of third-party bindings and
clients, you *have* to do a better job of keeping that documentation up to
date, this is getting really frustrating. Fortunately none of these issues
have caused data loss for us. Yet.

Thanks,
Evan

[1]
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
(currently down for maintenance apparently)
[2]
https://github.com/apache/kafka/blob/ee1267b127f3081db491fa1bf9a287084c324e36/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java#L46-L49


Re: Database Replication Question

2015-03-04 Thread Jonathan Hodges
Yes you are right on the oplog per partition as well as that mapping well
to the Kafka partitions.  I think we are making this harder than it is
based on previous attempts and trying to leverage something like Databus
for propagating log changes from MongoDB and Cassandra since it requires a
scn.  Sounds like direct Kafka makes more sense for these use cases.
Thanks again!


On Wed, Mar 4, 2015 at 8:56 AM, Jay Kreps  wrote:

> Hey Josh,
>
> NoSQL DBs may actually be easier because they themselves generally don't
> have a global order. I.e. I believe Mongo has a per-partition oplog, is
> that right? Their partitions would match our partitions.
>
> -Jay
>
> On Wed, Mar 4, 2015 at 5:18 AM, Josh Rader  wrote:
>
> > Thanks everyone for your responses!  These are great.  It seems our cases
> > matches closest to Jay's recommendations.
> >
> > The one part that sounds a little tricky is point #5 'Include in each
> > message the database's transaction id, scn, or other identifier '.  This
> is
> > pretty straightforward with the RDBMS case that I mentioned, but I could
> > see wanting to extend this to replicate NoSQL stores (Cassandra, Mongo)
> > which might not always have a readily available monotonic id,
> particularly
> > in failover scenarios.  I guess in that case we can think about creating
> > this id ourselves from the single producer.
> >
> > Xiao,
> >
> > I think in the Kafka failover cases you mention if we also store the
> offset
> > with replicated data we should be able to pick up where we left off since
> > we are using the low level consumer.  Maybe I am missing your point
> > though...
> >
> > Guozhang,
> >
> > Very good point that we didn't think of.  We will need to think this
> > through, as you say avoid resending other messages in a batch if one is
> > failed.  I wonder if we might also manage this on the consumer side too
> > with idempotency.  Thanks for raising this!
> >
> > Josh
> >
> >
> >
> > On Tue, Mar 3, 2015 at 6:08 PM, Xiao  wrote:
> >
> > > Hey Josh,
> > >
> > > Sorry, after reading codes, Kafka did fsync the data using a separate
> > > thread. The recovery point (oldest transaction timestamp) can be got
> from
> > > the file recovery-point-offset-checkpoint.
> > >
> > > You can adjust the value config.logFlushOffsetCheckpointIntervalMs, if
> > you
> > > think the speed is not quick enough. When the workloads is huge, the
> > > bottleneck could be in your target side or source side. That means,
> your
> > > apply could have enough jobs to do.
> > >
> > > Basically, you need to keep reading this file for determining the
> oldest
> > > timestamps of all relevant partitions. Then, apply the transactions
> until
> > > that timestamp.
> > >
> > > Note, this does not protect the transaction consistency. This is just
> for
> > > ensuring the data at the target side is consistent at one timestamp
> when
> > > you have multiple channel to send data changes. The implementation
> should
> > > be simple if you can understand the concepts. I am unable to find the
> > filed
> > > patent application about it. This is one related paper. It covers the
> > main
> > > concepts about the issues you are facing. "Inter-Data-Center
> Large-Scale
> > > Database Replication Optimization - A Workload Driven Partitioning
> > Approach"
> > >
> > > Hopefully, you understood what I explained above.
> > >
> > > Best wishes,
> > >
> > > Xiao Li
> > >
> > > Best wishes,
> > >
> > > Xiao Li
> > >
> > > On Mar 3, 2015, at 4:23 PM, Xiao  wrote:
> > >
> > > > Hey Josh,
> > > >
> > > > If you put different tables into different partitions or topics, it
> > > might break transaction ACID at the target side. This is risky for some
> > use
> > > cases. Besides unit of work issues, you also need to think about the
> load
> > > balancing too.
> > > >
> > > > For failover, you have to find the timestamp for point-in-time
> > > consistency. This part is tricky. You have to ensure all the changes
> > before
> > > a specific timestamp have been flushed to the disk. Normally, you can
> > > maintain a bookmark for different partition at the target side to know
> > what
> > > is the oldest transactions have been flushed to the disk.
> Unfortunately,
> > > based on my understanding, Kafka is unable to do it because it does not
> > do
> > > fsync regularly for achieving better throughput.
> > > >
> > > > Best wishes,
> > > >
> > > > Xiao Li
> > > >
> > > >
> > > > On Mar 3, 2015, at 3:45 PM, Xiao  wrote:
> > > >
> > > >> Hey Josh,
> > > >>
> > > >> Transactions can be applied in parallel in the consumer side based
> on
> > > transaction dependency checking.
> > > >>
> > > >> http://www.google.com.ar/patents/US20080163222
> > > >>
> > > >> This patent documents how it work. It is easy to understand,
> however,
> > > you also need to consider the hash collision issues. This has been
> > > implemented in IBM Q Replication since 2001.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Xiao Li
> > > >>
> > > >>
> > > >> On Mar 3, 2015, at 3:

Re: Database Replication Question

2015-03-04 Thread Jay Kreps
Hey Xiao,

Yeah I agree that without fsync you will not get durability in the case of
a power outage or other correlated failure, and likewise without
replication you won't get durability in the case of disk failure.

If each batch is fsync'd it will definitely be slower, depending on the
capability of the disk subsystem. Either way that feature is there now.

-Jay

On Wed, Mar 4, 2015 at 8:50 AM, Xiao  wrote:

> Hey Jay,
>
> Yeah. I understood the advantage of Kafka is one to many. That is why I am
> reading the source codes of Kafka. Your guys did a good product! : )
>
> Our major concern is its message persistency. Zero data loss is a must in
> our applications. Below is what I copied from the Kafka document.
>
> "The log takes two configuration parameter M which gives the number of
> messages to write before forcing the OS to flush the file to disk, and S
> which gives a number of seconds after which a flush is forced. This gives a
> durability guarantee of losing at most M messages or S seconds of data in
> the event of a system crash."
>
> Basically, our producers needs to know if the data have been
> flushed/fsynced to the disk. Our model is disconnected. Producers and
> consumers do not talk with each other. The only media is a Kafka-like
> persistence message queue.
>
> Unplanned power outage is not rare in 24/7 usage. Any data loss could
> cause a very expensive full refresh. That is not acceptable for many
> financial companies.
>
> If we do fsync for each transaction or each batch, the throughput could be
> low? Or another way is to let our producers check recovery points very
> frequently, and then the performance bottleneck will be on reading/copying
> the recovery-point file. Any other ideas?
>
> I have not read the source codes for synchronous disk replication. That
> will be my next focus. I am not sure if that can resolve our above concern.
>
> BTW, do you have any plan to support mainframe?
>
> Thanks,
>
> Xiao Li
>
>
> On Mar 4, 2015, at 8:01 AM, Jay Kreps  wrote:
>
> > Hey Xiao,
> >
> > 1. Nothing prevents applying transactions transactionally on the
> > destination side, though that is obviously more work. But I think the key
> > point here is that much of the time the replication is not
> Oracle=>Oracle,
> > but Oracle=>{W, X, Y, Z} where W/X/Y/Z are totally heterogenous systems
> > that aren't necessarily RDBMSs.
> >
> > 2. I don't think fsync is really relevant. You can fsync on every message
> > if you like, but Kafka's durability guarantees don't depend on this as it
> > allows synchronous commit across replicas. This changes the guarantee
> from
> > "won't be lost unless the disk dies" to "won't be lost unless all
> replicas
> > die" but the later is generally a stronger guarantee in practice given
> the
> > empirical reliability of disks (#1 reason for server failure in my
> > experience was disk failure).
> >
> > -Jay
> >
> > On Tue, Mar 3, 2015 at 4:23 PM, Xiao  wrote:
> >
> >> Hey Josh,
> >>
> >> If you put different tables into different partitions or topics, it
> might
> >> break transaction ACID at the target side. This is risky for some use
> >> cases. Besides unit of work issues, you also need to think about the
> load
> >> balancing too.
> >>
> >> For failover, you have to find the timestamp for point-in-time
> >> consistency. This part is tricky. You have to ensure all the changes
> before
> >> a specific timestamp have been flushed to the disk. Normally, you can
> >> maintain a bookmark for different partition at the target side to know
> what
> >> is the oldest transactions have been flushed to the disk. Unfortunately,
> >> based on my understanding, Kafka is unable to do it because it does not
> do
> >> fsync regularly for achieving better throughput.
> >>
> >> Best wishes,
> >>
> >> Xiao Li
> >>
> >>
> >> On Mar 3, 2015, at 3:45 PM, Xiao  wrote:
> >>
> >>> Hey Josh,
> >>>
> >>> Transactions can be applied in parallel in the consumer side based on
> >> transaction dependency checking.
> >>>
> >>> http://www.google.com.ar/patents/US20080163222
> >>>
> >>> This patent documents how it work. It is easy to understand, however,
> >> you also need to consider the hash collision issues. This has been
> >> implemented in IBM Q Replication since 2001.
> >>>
> >>> Thanks,
> >>>
> >>> Xiao Li
> >>>
> >>>
> >>> On Mar 3, 2015, at 3:36 PM, Jay Kreps  wrote:
> >>>
>  Hey Josh,
> 
>  As you say, ordering is per partition. Technically it is generally
> >> possible
>  to publish all changes to a database to a single partition--generally
> >> the
>  kafka partition should be high throughput enough to keep up. However
> >> there
>  are a couple of downsides to this:
>  1. Consumer parallelism is limited to one. If you want a total order
> to
> >> the
>  consumption of messages you need to have just 1 process, but often you
>  would want to parallelize.
>  2. Often what people want is not a full stream of all changes in all
> >> tables
>  in

Re: Database Replication Question

2015-03-04 Thread Xiao
Hey Jay, 

Yeah. I understood the advantage of Kafka is one to many. That is why I am 
reading the source codes of Kafka. Your guys did a good product! : )

Our major concern is its message persistency. Zero data loss is a must in our 
applications. Below is what I copied from the Kafka document. 

"The log takes two configuration parameter M which gives the number of messages 
to write before forcing the OS to flush the file to disk, and S which gives a 
number of seconds after which a flush is forced. This gives a durability 
guarantee of losing at most M messages or S seconds of data in the event of a 
system crash."

Basically, our producers needs to know if the data have been flushed/fsynced to 
the disk. Our model is disconnected. Producers and consumers do not talk with 
each other. The only media is a Kafka-like persistence message queue. 

Unplanned power outage is not rare in 24/7 usage. Any data loss could cause a 
very expensive full refresh. That is not acceptable for many financial 
companies. 

If we do fsync for each transaction or each batch, the throughput could be low? 
Or another way is to let our producers check recovery points very frequently, 
and then the performance bottleneck will be on reading/copying the 
recovery-point file. Any other ideas? 

I have not read the source codes for synchronous disk replication. That will be 
my next focus. I am not sure if that can resolve our above concern. 

BTW, do you have any plan to support mainframe? 

Thanks, 

Xiao Li


On Mar 4, 2015, at 8:01 AM, Jay Kreps  wrote:

> Hey Xiao,
> 
> 1. Nothing prevents applying transactions transactionally on the
> destination side, though that is obviously more work. But I think the key
> point here is that much of the time the replication is not Oracle=>Oracle,
> but Oracle=>{W, X, Y, Z} where W/X/Y/Z are totally heterogenous systems
> that aren't necessarily RDBMSs.
> 
> 2. I don't think fsync is really relevant. You can fsync on every message
> if you like, but Kafka's durability guarantees don't depend on this as it
> allows synchronous commit across replicas. This changes the guarantee from
> "won't be lost unless the disk dies" to "won't be lost unless all replicas
> die" but the later is generally a stronger guarantee in practice given the
> empirical reliability of disks (#1 reason for server failure in my
> experience was disk failure).
> 
> -Jay
> 
> On Tue, Mar 3, 2015 at 4:23 PM, Xiao  wrote:
> 
>> Hey Josh,
>> 
>> If you put different tables into different partitions or topics, it might
>> break transaction ACID at the target side. This is risky for some use
>> cases. Besides unit of work issues, you also need to think about the load
>> balancing too.
>> 
>> For failover, you have to find the timestamp for point-in-time
>> consistency. This part is tricky. You have to ensure all the changes before
>> a specific timestamp have been flushed to the disk. Normally, you can
>> maintain a bookmark for different partition at the target side to know what
>> is the oldest transactions have been flushed to the disk. Unfortunately,
>> based on my understanding, Kafka is unable to do it because it does not do
>> fsync regularly for achieving better throughput.
>> 
>> Best wishes,
>> 
>> Xiao Li
>> 
>> 
>> On Mar 3, 2015, at 3:45 PM, Xiao  wrote:
>> 
>>> Hey Josh,
>>> 
>>> Transactions can be applied in parallel in the consumer side based on
>> transaction dependency checking.
>>> 
>>> http://www.google.com.ar/patents/US20080163222
>>> 
>>> This patent documents how it work. It is easy to understand, however,
>> you also need to consider the hash collision issues. This has been
>> implemented in IBM Q Replication since 2001.
>>> 
>>> Thanks,
>>> 
>>> Xiao Li
>>> 
>>> 
>>> On Mar 3, 2015, at 3:36 PM, Jay Kreps  wrote:
>>> 
 Hey Josh,
 
 As you say, ordering is per partition. Technically it is generally
>> possible
 to publish all changes to a database to a single partition--generally
>> the
 kafka partition should be high throughput enough to keep up. However
>> there
 are a couple of downsides to this:
 1. Consumer parallelism is limited to one. If you want a total order to
>> the
 consumption of messages you need to have just 1 process, but often you
 would want to parallelize.
 2. Often what people want is not a full stream of all changes in all
>> tables
 in a database but rather the changes to a particular table.
 
 To some extent the best way to do this depends on what you will do with
>> the
 data. However if you intend to have lots
 
 I have seen pretty much every variation on this in the wild, and here is
 what I would recommend:
 1. Have a single publisher process that publishes events into Kafka
 2. If possible use the database log to get these changes (e.g. mysql
 binlog, Oracle xstreams, golden gate, etc). This will be more complete
>> and
 more efficient than polling for changes, though that can work too.
>>

Re: Database Replication Question

2015-03-04 Thread Jay Kreps
Hey Xiao,

1. Nothing prevents applying transactions transactionally on the
destination side, though that is obviously more work. But I think the key
point here is that much of the time the replication is not Oracle=>Oracle,
but Oracle=>{W, X, Y, Z} where W/X/Y/Z are totally heterogenous systems
that aren't necessarily RDBMSs.

2. I don't think fsync is really relevant. You can fsync on every message
if you like, but Kafka's durability guarantees don't depend on this as it
allows synchronous commit across replicas. This changes the guarantee from
"won't be lost unless the disk dies" to "won't be lost unless all replicas
die" but the later is generally a stronger guarantee in practice given the
empirical reliability of disks (#1 reason for server failure in my
experience was disk failure).

-Jay

On Tue, Mar 3, 2015 at 4:23 PM, Xiao  wrote:

> Hey Josh,
>
> If you put different tables into different partitions or topics, it might
> break transaction ACID at the target side. This is risky for some use
> cases. Besides unit of work issues, you also need to think about the load
> balancing too.
>
> For failover, you have to find the timestamp for point-in-time
> consistency. This part is tricky. You have to ensure all the changes before
> a specific timestamp have been flushed to the disk. Normally, you can
> maintain a bookmark for different partition at the target side to know what
> is the oldest transactions have been flushed to the disk. Unfortunately,
> based on my understanding, Kafka is unable to do it because it does not do
> fsync regularly for achieving better throughput.
>
> Best wishes,
>
> Xiao Li
>
>
> On Mar 3, 2015, at 3:45 PM, Xiao  wrote:
>
> > Hey Josh,
> >
> > Transactions can be applied in parallel in the consumer side based on
> transaction dependency checking.
> >
> > http://www.google.com.ar/patents/US20080163222
> >
> > This patent documents how it work. It is easy to understand, however,
> you also need to consider the hash collision issues. This has been
> implemented in IBM Q Replication since 2001.
> >
> > Thanks,
> >
> > Xiao Li
> >
> >
> > On Mar 3, 2015, at 3:36 PM, Jay Kreps  wrote:
> >
> >> Hey Josh,
> >>
> >> As you say, ordering is per partition. Technically it is generally
> possible
> >> to publish all changes to a database to a single partition--generally
> the
> >> kafka partition should be high throughput enough to keep up. However
> there
> >> are a couple of downsides to this:
> >> 1. Consumer parallelism is limited to one. If you want a total order to
> the
> >> consumption of messages you need to have just 1 process, but often you
> >> would want to parallelize.
> >> 2. Often what people want is not a full stream of all changes in all
> tables
> >> in a database but rather the changes to a particular table.
> >>
> >> To some extent the best way to do this depends on what you will do with
> the
> >> data. However if you intend to have lots
> >>
> >> I have seen pretty much every variation on this in the wild, and here is
> >> what I would recommend:
> >> 1. Have a single publisher process that publishes events into Kafka
> >> 2. If possible use the database log to get these changes (e.g. mysql
> >> binlog, Oracle xstreams, golden gate, etc). This will be more complete
> and
> >> more efficient than polling for changes, though that can work too.
> >> 3. Publish each table to its own topic.
> >> 4. Partition each topic by the primary key of the table.
> >> 5. Include in each message the database's transaction id, scn, or other
> >> identifier that gives the total order within the record stream. Since
> there
> >> is a single publisher this id will be monotonic within each partition.
> >>
> >> This seems to be the best set of tradeoffs for most use cases:
> >> - You can have parallel consumers up to the number of partitions you
> chose
> >> that still get messages in order per ID'd entity.
> >> - You can subscribe to just one table if you like, or to multiple
> tables.
> >> - Consumers who need a total order over all updates can do a "merge"
> across
> >> the partitions to reassemble the fully ordered set of changes across all
> >> tables/partitions.
> >>
> >> One thing to note is that the requirement of having a single consumer
> >> process/thread to get the total order isn't really so much a Kafka
> >> restriction as it just is a restriction about the world, since if you
> had
> >> multiple threads even if you delivered messages to them in order their
> >> processing might happen out of order (just do to the random timing of
> the
> >> processing).
> >>
> >> -Jay
> >>
> >>
> >>
> >> On Tue, Mar 3, 2015 at 3:15 PM, Josh Rader  wrote:
> >>
> >>> Hi Kafka Experts,
> >>>
> >>>
> >>>
> >>> We have a use case around RDBMS replication where we are investigating
> >>> Kafka.  In this case ordering is very important.  Our understanding is
> >>> ordering is only preserved within a single partition.  This makes
> sense as
> >>> a single thread will consume these messages, but o

Trying to get kafka data to Hadoop

2015-03-04 Thread max square
Hi all,

I have browsed through different conversations around Camus, and bring this
as a kinda Kafka question. I know is not the most orthodox, but if someone
has some thoughts I'd appreciate ir.

That said, I am trying to set up Camus, using a 3 node Kafka cluster
0.8.2.1, using a project that is trying to build Avro Schema-Repo
. All of the Avro schemas for
the topics are published correctly. I am building Camus and using:

hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.
kafka.CamusJob -libjars $CAMUS_LIBJARS  -D mapreduce.job.user.classpath.
first=true -P config.properties

As the command to start the job, where I have set up an environment
variable that holds all the libjars that the mvn package command generates.

I have also set the following properties to configure the job:
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.
LatestSchemaKafkaAvroMessageDecoder
kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.
AvroRestSchemaRegistry

etl.schema.registry.url=http://10.0.14.25:2876/schema-repo/

When I execute the job I get an Exception indicating the
AvroRestSchemaRegistry class can't be found (I've double checked it's part
of the libjars). I wanted to ask if this is the correct way to set up this
integration, and if anyone has pointers on why the job is not finding the
class AvroRestSchemaRegistry

Thanks in advance for the help!

Max

Follows the complete stack trace:

[CamusJob] - failed to create decoder

com.linkedin.camus.coders.MessageDecoderException: com.linkedin.camus.coders
.MessageDecoderException:java.lang.ClassNotFoundException: com.linkedin.
camus.schemaregistry.AvroRestSchemaRegistry
   at com.linkedin.camus.etl.kafka.coders.MessageDecoderFactory.
createMessageDecoder(MessageDecoderFactory.java:29)

  at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.createMessageDecoder
(EtlInputFormat.java:391)

   at com.linkedin.camus.etl.kafka.mapred.EtlInputFormat.getSplits(
EtlInputFormat.java:256)

   at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:
1107)

   at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1124
)

 at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:178)

   at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:1023)

   at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:976)

   at java.security.AccessController.doPrivileged(Native Method)

   at javax.security.auth.Subject.doAs(Subject.java:415)

   at org.apache.hadoop.security.UserGroupInformation.doAs(
UserGroupInformation.java:1642)

   at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.
java:976)

   at org.apache.hadoop.mapreduce.Job.submit(Job.java:582)

   at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:335)

   at com.linkedin.camus.etl.kafka.CamusJob.run(CamusJob.java:563)

   at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)

   at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)

   at com.linkedin.camus.etl.kafka.CamusJob.main(CamusJob.java:518)

   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodessorImpl.
java:57)

   at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)

   at java.lang.reflect.Method.invoke(Method.


Re: Database Replication Question

2015-03-04 Thread Jay Kreps
Hey Josh,

NoSQL DBs may actually be easier because they themselves generally don't
have a global order. I.e. I believe Mongo has a per-partition oplog, is
that right? Their partitions would match our partitions.

-Jay

On Wed, Mar 4, 2015 at 5:18 AM, Josh Rader  wrote:

> Thanks everyone for your responses!  These are great.  It seems our cases
> matches closest to Jay's recommendations.
>
> The one part that sounds a little tricky is point #5 'Include in each
> message the database's transaction id, scn, or other identifier '.  This is
> pretty straightforward with the RDBMS case that I mentioned, but I could
> see wanting to extend this to replicate NoSQL stores (Cassandra, Mongo)
> which might not always have a readily available monotonic id, particularly
> in failover scenarios.  I guess in that case we can think about creating
> this id ourselves from the single producer.
>
> Xiao,
>
> I think in the Kafka failover cases you mention if we also store the offset
> with replicated data we should be able to pick up where we left off since
> we are using the low level consumer.  Maybe I am missing your point
> though...
>
> Guozhang,
>
> Very good point that we didn't think of.  We will need to think this
> through, as you say avoid resending other messages in a batch if one is
> failed.  I wonder if we might also manage this on the consumer side too
> with idempotency.  Thanks for raising this!
>
> Josh
>
>
>
> On Tue, Mar 3, 2015 at 6:08 PM, Xiao  wrote:
>
> > Hey Josh,
> >
> > Sorry, after reading codes, Kafka did fsync the data using a separate
> > thread. The recovery point (oldest transaction timestamp) can be got from
> > the file recovery-point-offset-checkpoint.
> >
> > You can adjust the value config.logFlushOffsetCheckpointIntervalMs, if
> you
> > think the speed is not quick enough. When the workloads is huge, the
> > bottleneck could be in your target side or source side. That means, your
> > apply could have enough jobs to do.
> >
> > Basically, you need to keep reading this file for determining the oldest
> > timestamps of all relevant partitions. Then, apply the transactions until
> > that timestamp.
> >
> > Note, this does not protect the transaction consistency. This is just for
> > ensuring the data at the target side is consistent at one timestamp when
> > you have multiple channel to send data changes. The implementation should
> > be simple if you can understand the concepts. I am unable to find the
> filed
> > patent application about it. This is one related paper. It covers the
> main
> > concepts about the issues you are facing. "Inter-Data-Center Large-Scale
> > Database Replication Optimization – A Workload Driven Partitioning
> Approach"
> >
> > Hopefully, you understood what I explained above.
> >
> > Best wishes,
> >
> > Xiao Li
> >
> > Best wishes,
> >
> > Xiao Li
> >
> > On Mar 3, 2015, at 4:23 PM, Xiao  wrote:
> >
> > > Hey Josh,
> > >
> > > If you put different tables into different partitions or topics, it
> > might break transaction ACID at the target side. This is risky for some
> use
> > cases. Besides unit of work issues, you also need to think about the load
> > balancing too.
> > >
> > > For failover, you have to find the timestamp for point-in-time
> > consistency. This part is tricky. You have to ensure all the changes
> before
> > a specific timestamp have been flushed to the disk. Normally, you can
> > maintain a bookmark for different partition at the target side to know
> what
> > is the oldest transactions have been flushed to the disk. Unfortunately,
> > based on my understanding, Kafka is unable to do it because it does not
> do
> > fsync regularly for achieving better throughput.
> > >
> > > Best wishes,
> > >
> > > Xiao Li
> > >
> > >
> > > On Mar 3, 2015, at 3:45 PM, Xiao  wrote:
> > >
> > >> Hey Josh,
> > >>
> > >> Transactions can be applied in parallel in the consumer side based on
> > transaction dependency checking.
> > >>
> > >> http://www.google.com.ar/patents/US20080163222
> > >>
> > >> This patent documents how it work. It is easy to understand, however,
> > you also need to consider the hash collision issues. This has been
> > implemented in IBM Q Replication since 2001.
> > >>
> > >> Thanks,
> > >>
> > >> Xiao Li
> > >>
> > >>
> > >> On Mar 3, 2015, at 3:36 PM, Jay Kreps  wrote:
> > >>
> > >>> Hey Josh,
> > >>>
> > >>> As you say, ordering is per partition. Technically it is generally
> > possible
> > >>> to publish all changes to a database to a single partition--generally
> > the
> > >>> kafka partition should be high throughput enough to keep up. However
> > there
> > >>> are a couple of downsides to this:
> > >>> 1. Consumer parallelism is limited to one. If you want a total order
> > to the
> > >>> consumption of messages you need to have just 1 process, but often
> you
> > >>> would want to parallelize.
> > >>> 2. Often what people want is not a full stream of all changes in all
> > tables
> > >>> in a database but rather the changes to

Re: Database Replication Question

2015-03-04 Thread Xiao
Hi, Josh, 

That depends on how you implemented it. 

Basically, Kafka can provide a good throughput only when you have multiple 
partitions. 

- If you have multiple consumers and multiple partitions, each of which has a 
dedicated partition. That means, you need a coordinator to ensure all the 
consumers do not apply the transactions until all the transactions have been 
fully fsynced to the disk.  

- If you have a single consumer and multiple partitions, you need to rebuild 
the transactions at the target side when you put different tables to different 
partitions. If you do not split transactions, you can do a round robin 
scheduling. A complex parallel Apply logics is normally needed. To do a key 
dependency in parallel, you need to consider all the constraints (key 
dependency, 2nd unique indexing, foreign key). 

Kafka does not support unit of works. A single transaction at the source side 
might be split to multiple messages. The producer does not know if all the 
messages have been fsynced to the disk. Some unflushed messages might be lost 
during a disaster. Thus, you might need to hack Kafka code or do it inside the 
producers or consumers. Of course, you might see a big performance reduction. 

Writing a course project should be not hard, but the most difficult part is 
error handling for a product. In the design, you also need to consider the DDL 
changes at the source side, a monster transaction, Kafka message pruning, and 
many many. At least, you need to fully understand Kafka source codes before 
using it.

Best wishes, 

Xiao Li

On Mar 4, 2015, at 5:18 AM, Josh Rader  wrote:

> Thanks everyone for your responses!  These are great.  It seems our cases
> matches closest to Jay's recommendations.
> 
> The one part that sounds a little tricky is point #5 'Include in each
> message the database's transaction id, scn, or other identifier '.  This is
> pretty straightforward with the RDBMS case that I mentioned, but I could
> see wanting to extend this to replicate NoSQL stores (Cassandra, Mongo)
> which might not always have a readily available monotonic id, particularly
> in failover scenarios.  I guess in that case we can think about creating
> this id ourselves from the single producer.
> 
> Xiao,
> 
> I think in the Kafka failover cases you mention if we also store the offset
> with replicated data we should be able to pick up where we left off since
> we are using the low level consumer.  Maybe I am missing your point
> though...
> 
> Guozhang,
> 
> Very good point that we didn't think of.  We will need to think this
> through, as you say avoid resending other messages in a batch if one is
> failed.  I wonder if we might also manage this on the consumer side too
> with idempotency.  Thanks for raising this!
> 
> Josh
> 
> 
> 
> On Tue, Mar 3, 2015 at 6:08 PM, Xiao  wrote:
> 
>> Hey Josh,
>> 
>> Sorry, after reading codes, Kafka did fsync the data using a separate
>> thread. The recovery point (oldest transaction timestamp) can be got from
>> the file recovery-point-offset-checkpoint.
>> 
>> You can adjust the value config.logFlushOffsetCheckpointIntervalMs, if you
>> think the speed is not quick enough. When the workloads is huge, the
>> bottleneck could be in your target side or source side. That means, your
>> apply could have enough jobs to do.
>> 
>> Basically, you need to keep reading this file for determining the oldest
>> timestamps of all relevant partitions. Then, apply the transactions until
>> that timestamp.
>> 
>> Note, this does not protect the transaction consistency. This is just for
>> ensuring the data at the target side is consistent at one timestamp when
>> you have multiple channel to send data changes. The implementation should
>> be simple if you can understand the concepts. I am unable to find the filed
>> patent application about it. This is one related paper. It covers the main
>> concepts about the issues you are facing. "Inter-Data-Center Large-Scale
>> Database Replication Optimization – A Workload Driven Partitioning Approach"
>> 
>> Hopefully, you understood what I explained above.
>> 
>> Best wishes,
>> 
>> Xiao Li
>> 
>> Best wishes,
>> 
>> Xiao Li
>> 
>> On Mar 3, 2015, at 4:23 PM, Xiao  wrote:
>> 
>>> Hey Josh,
>>> 
>>> If you put different tables into different partitions or topics, it
>> might break transaction ACID at the target side. This is risky for some use
>> cases. Besides unit of work issues, you also need to think about the load
>> balancing too.
>>> 
>>> For failover, you have to find the timestamp for point-in-time
>> consistency. This part is tricky. You have to ensure all the changes before
>> a specific timestamp have been flushed to the disk. Normally, you can
>> maintain a bookmark for different partition at the target side to know what
>> is the oldest transactions have been flushed to the disk. Unfortunately,
>> based on my understanding, Kafka is unable to do it because it does not do
>> fsync regularly for achieving better throughp

Re: Explicit control over flushing the messages

2015-03-04 Thread Jeff Holoman
Take a look here:

https://issues.apache.org/jira/browse/KAFKA-1865



On Wed, Mar 4, 2015 at 4:28 AM, Ponmani Rayar  wrote:

> Hi Group,
>
>   I have started using Kafka 0.8.2 with the new producer API.
> Just wanted to know if we can have an explicit control over flushing the
> messages batch to Kafka cluster.
>
> Configuring batch.size will flush the messages when the batch.size is
> reached for a partition.
> But is there any API where in i can have a explicit control of flushing the
> messages, something like
>
>  *kafkaProducer.flush(); *
>
> Please let me know if there is any API or specific configuration to do
> this.
>
> Thanks in advance,
> Ponmani
>



-- 
Jeff Holoman
Systems Engineer


Re: Got negative offset lag after restarting brokers

2015-03-04 Thread tao xiao
Thanks guy. with unclean.leader.election.enable set to false the issue is
fixed

On Tue, Mar 3, 2015 at 2:50 PM, Gwen Shapira  wrote:

> of course :)
> unclean.leader.election.enable
>
> On Mon, Mar 2, 2015 at 9:10 PM, tao xiao  wrote:
> > How do I achieve point 3? is there a config that I can set?
> >
> > On Tue, Mar 3, 2015 at 1:02 PM, Jiangjie Qin 
> > wrote:
> >
> >> The scenario you mentioned is equivalent to an unclean leader election.
> >> The following settings will make sure there is no data loss:
> >> 1. Set replica factor to 3 and minimum ISR size to 2.
> >> 2. When produce, use acks=-1 or acks=all
> >> 3. Disable unclean leader election.
> >>
> >> 1) and 2) Guarantees committed messages will be at least in to brokers.
> >> 3) Means if a broker is not in ISR, it cannot be elected as a leader, so
> >> the log truncate as mentioned earlier will not happen.
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On 3/2/15, 7:16 PM, "tao xiao"  wrote:
> >>
> >> >Since I reused the same consumer group to consume the messages after
> step
> >> >6
> >> >data there was no data loss occurred. But if I create a new consumer
> group
> >> >for sure the new consumer will suffer data loss.
> >> >
> >> >I am more concerning about if this is an acceptable behavior by Kafka
> that
> >> >an out of sync broker can be elected as the leader for a partition. Is
> >> >there any mechanism built around Kafka to ensure that only the in-sync
> >> >broker can be chosen to be a leader? If no, what is the best practice
> to
> >> >restart brokers if some of the replicas are out of sync?
> >> >
> >> >On Tue, Mar 3, 2015 at 2:35 AM, Jiangjie Qin  >
> >> >wrote:
> >> >
> >> >> In this case you have data loss. In step 6, when broker 1 comes up,
> it
> >> >> becomes the leader and has log end offset 1000. When broker 0 comes
> up,
> >> >>it
> >> >> becomes follower and will truncate its log to 1000, i.e. 1000
> messages
> >> >> were lost. Next time when the consumer starts, its offset will be
> reset
> >> >>to
> >> >> either the smallest or the largest depending on the setting.
> >> >>
> >> >> Jiangjie (Becket) Qin
> >> >>
> >> >> On 3/2/15, 9:32 AM, "Stuart Reynolds"  wrote:
> >> >>
> >> >> >Each topic has:  earliest and latest offsets (per partition)
> >> >> >Each consumer group has a current offset (per topic, partition pair)
> >> >> >
> >> >> >I see -1 for the current offsets new consumer groups that haven't
> yet
> >> >> >committed an offset. I think it means that the offsets for that
> >> >> >consumer group are undefined.
> >> >> >
> >> >> >Is it possible you generated new consumer groups when you restarted
> >> >>your
> >> >> >broker?
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >On Mon, Mar 2, 2015 at 3:15 AM, tao xiao 
> wrote:
> >> >> >> Hi team,
> >> >> >>
> >> >> >> I have 2 brokers (0 and 1) serving a topic mm-benchmark-test. I
> did
> >> >>some
> >> >> >> tests on the two brokers to verify how leader got elected. Here
> are
> >> >>the
> >> >> >> steps:
> >> >> >>
> >> >> >> 1. started 2 brokers
> >> >> >> 2. created a topic with partition=1 and replication-factor=2. Now
> >> >> >>brokers 1
> >> >> >> was elected as leader
> >> >> >> 3. sent 1000 messages to the topic and consumed from a high level
> >> >> >>consumer
> >> >> >> using zk as the offset storage.
> >> >> >> 4. shutdown broker 1 and now broker 0 was elected as leader
> >> >> >> 5. sent another 1000 messages to topic and consumed again
> >> >> >> 6. completely shutdown broker 0 and then started broker 1. now
> >> >>broker 1
> >> >> >> became the leader
> >> >> >> 7. started broker 0 and ran ConsumerOffsetChecker which showed
> >> >>negative
> >> >> >>lag
> >> >> >> (-1000 in my case)
> >> >> >>
> >> >> >> I think this is because the consumed offset in zk was 2000 and
> >> >>logsize
> >> >> >> retrieved from the leader (broker 1) which missed 1000 messages in
> >> >>step
> >> >> >>5
> >> >> >> in this case was 1000 there -1000 = 1000 - 2000 was given.
> >> >> >>
> >> >> >> Is this a bug or expected behavior?
> >> >> >>
> >> >> >> --
> >> >> >> Regards,
> >> >> >> Tao
> >> >>
> >> >>
> >> >
> >> >
> >> >--
> >> >Regards,
> >> >Tao
> >>
> >>
> >
> >
> > --
> > Regards,
> > Tao
>



-- 
Regards,
Tao


Re: Database Replication Question

2015-03-04 Thread Josh Rader
Thanks everyone for your responses!  These are great.  It seems our cases
matches closest to Jay's recommendations.

The one part that sounds a little tricky is point #5 'Include in each
message the database's transaction id, scn, or other identifier '.  This is
pretty straightforward with the RDBMS case that I mentioned, but I could
see wanting to extend this to replicate NoSQL stores (Cassandra, Mongo)
which might not always have a readily available monotonic id, particularly
in failover scenarios.  I guess in that case we can think about creating
this id ourselves from the single producer.

Xiao,

I think in the Kafka failover cases you mention if we also store the offset
with replicated data we should be able to pick up where we left off since
we are using the low level consumer.  Maybe I am missing your point
though...

Guozhang,

Very good point that we didn't think of.  We will need to think this
through, as you say avoid resending other messages in a batch if one is
failed.  I wonder if we might also manage this on the consumer side too
with idempotency.  Thanks for raising this!

Josh



On Tue, Mar 3, 2015 at 6:08 PM, Xiao  wrote:

> Hey Josh,
>
> Sorry, after reading codes, Kafka did fsync the data using a separate
> thread. The recovery point (oldest transaction timestamp) can be got from
> the file recovery-point-offset-checkpoint.
>
> You can adjust the value config.logFlushOffsetCheckpointIntervalMs, if you
> think the speed is not quick enough. When the workloads is huge, the
> bottleneck could be in your target side or source side. That means, your
> apply could have enough jobs to do.
>
> Basically, you need to keep reading this file for determining the oldest
> timestamps of all relevant partitions. Then, apply the transactions until
> that timestamp.
>
> Note, this does not protect the transaction consistency. This is just for
> ensuring the data at the target side is consistent at one timestamp when
> you have multiple channel to send data changes. The implementation should
> be simple if you can understand the concepts. I am unable to find the filed
> patent application about it. This is one related paper. It covers the main
> concepts about the issues you are facing. "Inter-Data-Center Large-Scale
> Database Replication Optimization – A Workload Driven Partitioning Approach"
>
> Hopefully, you understood what I explained above.
>
> Best wishes,
>
> Xiao Li
>
> Best wishes,
>
> Xiao Li
>
> On Mar 3, 2015, at 4:23 PM, Xiao  wrote:
>
> > Hey Josh,
> >
> > If you put different tables into different partitions or topics, it
> might break transaction ACID at the target side. This is risky for some use
> cases. Besides unit of work issues, you also need to think about the load
> balancing too.
> >
> > For failover, you have to find the timestamp for point-in-time
> consistency. This part is tricky. You have to ensure all the changes before
> a specific timestamp have been flushed to the disk. Normally, you can
> maintain a bookmark for different partition at the target side to know what
> is the oldest transactions have been flushed to the disk. Unfortunately,
> based on my understanding, Kafka is unable to do it because it does not do
> fsync regularly for achieving better throughput.
> >
> > Best wishes,
> >
> > Xiao Li
> >
> >
> > On Mar 3, 2015, at 3:45 PM, Xiao  wrote:
> >
> >> Hey Josh,
> >>
> >> Transactions can be applied in parallel in the consumer side based on
> transaction dependency checking.
> >>
> >> http://www.google.com.ar/patents/US20080163222
> >>
> >> This patent documents how it work. It is easy to understand, however,
> you also need to consider the hash collision issues. This has been
> implemented in IBM Q Replication since 2001.
> >>
> >> Thanks,
> >>
> >> Xiao Li
> >>
> >>
> >> On Mar 3, 2015, at 3:36 PM, Jay Kreps  wrote:
> >>
> >>> Hey Josh,
> >>>
> >>> As you say, ordering is per partition. Technically it is generally
> possible
> >>> to publish all changes to a database to a single partition--generally
> the
> >>> kafka partition should be high throughput enough to keep up. However
> there
> >>> are a couple of downsides to this:
> >>> 1. Consumer parallelism is limited to one. If you want a total order
> to the
> >>> consumption of messages you need to have just 1 process, but often you
> >>> would want to parallelize.
> >>> 2. Often what people want is not a full stream of all changes in all
> tables
> >>> in a database but rather the changes to a particular table.
> >>>
> >>> To some extent the best way to do this depends on what you will do
> with the
> >>> data. However if you intend to have lots
> >>>
> >>> I have seen pretty much every variation on this in the wild, and here
> is
> >>> what I would recommend:
> >>> 1. Have a single publisher process that publishes events into Kafka
> >>> 2. If possible use the database log to get these changes (e.g. mysql
> >>> binlog, Oracle xstreams, golden gate, etc). This will be more complete
> and
> >>> more effici

Kafka web console error

2015-03-04 Thread Bhuvana Baskar
Hi,

Using kafka-Web-Console:

when i run the command play start, it works fine.
I tried to register the zookeeper, but getting the below error.

*java.nio.channels.ClosedChannelException*
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:433)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromUserCode(AbstractNioWorker.java:128)
at
org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:99)
at
org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:36)
at org.jboss.netty.channel.Channels.write(Channels.java:725)
at
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71)
at
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59)
at
com.typesafe.netty.http.pipelining.HttpPipeliningHandler.handleDownstream(HttpPipeliningHandler.java:106)
at org.jboss.netty.channel.Channels.write(Channels.java:704)
at org.jboss.netty.channel.Channels.write(Channels.java:671)
at
org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:248)
at
play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$play$core$server$netty$PlayDefaultUpstreamHandler$$iteratee$1$1.apply(PlayDefaultUpstreamHandler.scala:295)
at
play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$play$core$server$netty$PlayDefaultUpstreamHandler$$iteratee$1$1.apply(PlayDefaultUpstreamHandler.scala:292)
at
play.api.libs.iteratee.Iteratee$$anonfun$flatMap$1$$anonfun$apply$16.apply(Iteratee.scala:501)
at
play.api.libs.iteratee.Iteratee$$anonfun$flatMap$1$$anonfun$apply$16.apply(Iteratee.scala:501)
at
play.api.libs.iteratee.Concurrent$$anonfun$1$$anonfun$apply$8.apply(Concurrent.scala:104)
at
play.api.libs.iteratee.Concurrent$$anonfun$1$$anonfun$apply$8.apply(Concurrent.scala:101)
at
play.api.libs.iteratee.ImmediateIteratee$$anonfun$fold$2.apply(Iteratee.scala:595)
at
play.api.libs.iteratee.ImmediateIteratee$$anonfun$fold$2.apply(Iteratee.scala:595)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at
play.api.libs.iteratee.Execution$$anon$1.execute(Execution.scala:43)
at scala.concurrent.impl.Future$.apply(Future.scala:31)
at scala.concurrent.Future$.apply(Future.scala:482)
at play.api.libs.iteratee.internal$.executeFuture(package.scala:35)
at
play.api.libs.iteratee.ImmediateIteratee$class.fold(Iteratee.scala:596)
at play.api.libs.iteratee.ContIteratee.fold(Iteratee.scala:640)
at
play.api.libs.iteratee.FutureIteratee$$anonfun$fold$3.apply(Iteratee.scala:659)
at
play.api.libs.iteratee.FutureIteratee$$anonfun$fold$3.apply(Iteratee.scala:659)
at
scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251)
at
scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:249)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
play.api.libs.iteratee.Execution$$anon$2.execute(Execution.scala:70)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.scala$concurrent$impl$Promise$DefaultPromise$$dispatchOrAddCallback(Promise.scala:280)
at
scala.concurrent.impl.Promise$DefaultPromise.onComplete(Promise.scala:270)
at scala.concurrent.Future$class.flatMap(Future.scala:249)
at
scala.concurrent.impl.Promise$DefaultPromise.flatMap(Promise.scala:153)
at play.api.libs.iteratee.FutureIteratee.fold(Iteratee.scala:659)
at
play.api.libs.iteratee.FutureIteratee$$anonfun$fold$3.apply(Iteratee.scala:659)
at
play.api.libs.iteratee.FutureIteratee$$anonfun$fold$3.apply(Iteratee.scala:659)
at
scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:251)
at
scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:249)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
play.api.libs.iteratee.Execution$$anon$2.execute(Execution.scala:70)
at
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.scala$concurrent$impl$Promise$DefaultPromise$$dispatchOrAddCallback(Promise.scala:280)
at
scala.concurrent.impl.Promise$DefaultPromise.onComplete(Promise.scala:270)
at scala.concurrent.Future$class.flatMap(Future.scala:249)
at
scala.concurrent.impl.Promise$DefaultPromise.flatMap(Promise.scala:153)
at play.api.libs.iteratee.FutureIteratee.fold(Iteratee.scala:659)
at
play.api.libs.iteratee.Iteratee$class.pureFold(Iteratee.scala:416)
at
pla

Re: reassign a topic partition which has no ISR and leader set to -1

2015-03-04 Thread todd
When we ran in to this problem we ended up going in to zookeeper and changing 
the leader to point to one of the replicas, then did a force leader election. 
This got the partition back online.


  Original Message  
From: Virendra Pratap Singh
Sent: Wednesday, March 4, 2015 2:00 AM
To: Gwen Shapira; users@kafka.apache.org
Reply To: users@kafka.apache.org
Subject: Re: reassign a topic partition which has no ISR and leader set to -1

Thanks Gwen for your info. I brought another kafka server up with assigned 
broker id of the failed leader and the reassignment went through.
@kafka developers who may be on this distribution list, is there a feature 
planned in 0.8.2 or can we plan one, where in if the leader and all the replica 
brokers are lost, the manual assignment of such a partition can be performed. 
As of current implementation, it looks like the reassignment will not budge 
till it sees at least one of the leader/replica brokers back again. 
Regards,Virendra From: Gwen Shapira 
To: "users@kafka.apache.org" ; Virendra Pratap Singh 
 
Sent: Tuesday, March 3, 2015 4:29 PM
Subject: Re: reassign a topic partition which has no ISR and leader set to -1

I hate bringing bad news, but...

You can't really reassign replicas if the leader is not available.
Since the leader is gone, the replicas have no where to replicate the data from.

Until you bring the leader back (or one of the replicas with unclean
leader election), you basically lost this partition.

Gwen



On Tue, Mar 3, 2015 at 3:57 PM, Virendra Pratap Singh
 wrote:
> Ran into a situation where both the leader and replica nodes for a few 
> partitions of a given topic went down. So now these partitions have no 
> in-sync replicas and neither any leader (leader set to -1).
> I tried to reassign these partitions to a different set of brokers using 
> partition reassignment tool and then running the preferred replica. However I 
> don't see the partitions getting reassigned and the leader still value still 
> remains "-1" for these.
> What do I need to get this working.
> Kafka cluster is on 0.8.1.1
> Regards,Virendra




JSON parsing causing rebalance to fail

2015-03-04 Thread Arunkumar Srambikkal (asrambik)
Hi,

When I start a new consumer, it throws a Rebalance exception.

However I hit it only on some machines where the run time libraries are 
different

The stack given below is what I encounter - is this a known issue?

I saw this Jira but it's not resolved  so thought to confirm - 
https://issues.apache.org/jira/browse/KAFKA-1405

Thanks
Arun

[2015-03-04 14:30:37,609] INFO [name], exception during rebalance  
(kafka.consumer.ZookeeperConsumerConnector)

kafka.common.KafkaException: Failed to parse the broker info from zookeeper: 
{"jmx_port":-1,"timestamp":"1425459616502","host":"1.1.1.1","version":1,"port":64356}
at kafka.cluster.Broker$.createBroker(Broker.scala:35)

Caused by: java.lang.ClassCastException: java.lang.Double cannot be cast to 
java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:105)
at kafka.cluster.Broker$.createBroker(Broker.scala:40)



RE: Kafka producer failed to send but actually does

2015-03-04 Thread Arunkumar Srambikkal (asrambik)
Thanks for responding. 

I was creating an instance of kafka.server.KafkaServer in my code for running 
some tests and this was what I referred to by an embedded broker. 

The scenario you described was what was happening. In my case when I kill my 
broker, it fails to send an ack. I added handling of duplicates and resolved 
this issue.

Thanks
Arun

-Original Message-
From: Jiangjie Qin [mailto:j...@linkedin.com.INVALID] 
Sent: Tuesday, March 03, 2015 11:13 PM
To: users@kafka.apache.org
Subject: Re: Kafka producer failed to send but actually does

What do you mean by Kafka embedded broker?
Anyway, this could happen. For example, producer sends message to broker.
After that some network issue occurs and the producer did not got confirmation 
from broker, so the producer thought the send failed. But the broker actually 
got the message. The produce is expected to resend the message, so broker will 
have duplicate messages, and that¹s also why we say Kafka guarantees at least 
once.

-Jiangjie (Becket) Qin

On 3/3/15, 4:01 AM, "Arunkumar Srambikkal (asrambik)" 
wrote:

>Hi,
>
>I'm running some tests with the Kafka embedded broker and I see cases 
>where the producer gets the FailedToSendMessageException but in reality 
>the message is transferred and consumer gets it
>
>Is this expected / known issue?
>
>Thanks
>Arun
>
>My producer config =
>
>props.put("producer.type", "sync")
>props.put("serializer.class",  "kafka.serializer.StringEncoder");
>props.put("partitioner.class", "com.test.PartMe");
>props.put("metadata.broker.list",  "127.0.0.1:"+port);
>props.put("request.required.acks", "-1");
>props.put("message.send.max.retries", "0")



Explicit control over flushing the messages

2015-03-04 Thread Ponmani Rayar
Hi Group,

  I have started using Kafka 0.8.2 with the new producer API.
Just wanted to know if we can have an explicit control over flushing the
messages batch to Kafka cluster.

Configuring batch.size will flush the messages when the batch.size is
reached for a partition.
But is there any API where in i can have a explicit control of flushing the
messages, something like

 *kafkaProducer.flush(); *

Please let me know if there is any API or specific configuration to do this.

Thanks in advance,
Ponmani