At-least-once guarantees with high-level consumer

2015-06-17 Thread Abhishek Nigam
Hi Carl,
** Disclaimer: I know there's a new consumer API on the way, this mail is

about the currently available API. I also apologise if the below has
already been discussed previously. I did try to check previous discussions
It seems to me that the high-level consumer would be able to support
at-least-once messaging, even if one uses auto-commit, by changing
kafka.consumer.ConsumerIterator.next() to call
currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This way, a
consumer thread for a KafkaStream could just loop:
while (true) {
MyMessage message = iterator.next().message();
process(message);
}
Each call to iterator.next() then updates the offset to commit to the end
of the message that was just processed. When offsets are committed for the
ConsumerConnector (either automatically or manually), the commit will not
include offsets of messages that haven't been fully processed.
I've tested the following ConsumerIterator.next(), and it seems to work as
I expect

What you have proposed seems very reasonable. Essentially we need two sets of 
offsets (one which is published to zookeeper)

and one which is used for tracking which event to return from the iterator 
locally on the machine. (It is like prev/current setup).

In my opinion this is how it should have been implemented in the first place 
rather than forcing everyone to write additional code just for

the at least once guarantee.

If you push for this in open source you will have my vote at the very least :)

-Abhishek


Re: Recovering from broker failure with KafkaConsumer

2015-06-17 Thread Jason Gustafson
We have a couple open tickets to address these issues (see KAFKA-1894 and
KAFKA-2168). It's definitely something we want to fix.

On Wed, Jun 17, 2015 at 4:21 AM, Jan Stette jan.ste...@gmail.com wrote:

 Adding some more details to the previous question:

 The indefinite wait doesn't happen on calling subscribe() on the consumer,
 it happens when I (in this case) call seekToEnd().

 A related problem to this is that the seekToEnd() method is synchronized
 (as are the other access methods on KafkaConsumer), so the client holds a
 lock while sitting in this wait. This means that if another thread tries
  to call close(), which is all synchronized, this thread will also be
 blocked.

 Holding locks while performing network I/O seems like a bad idea - is this
 something that's planned to be fixed?

 Jan



 On 17 June 2015 at 10:31, Jan Stette jan.ste...@gmail.com wrote:

  I'm trying out the new KafkaConsumer client API in the trunk of the
 source
  tree, and while I realise that this is a work in progress, I have a
  question that perhaps someone can shed some light on.
 
  I'm looking at how to handle various error scenarios for a Kafka client,
  in particular what happens when trying to connect to the broker but it's
  not available. The behaviour I'm seeing is that the client will retry
  indefinitely (at the configurable interval), basically looping around in
  Fetcher.awaitMetadataUpdate() forever.
 
  I would like to have some way to fail the connection attempt to avoid the
  calling thread being blocked forever. Is this possible with the current
  version of the client? (Snapshot as of 16/6/15). If not, is that
 something
  that's planned for the future?
 
  Jan
 
 



Broker ISR confusion with soft-failed broker

2015-06-17 Thread Bob Cotton
3 Node Kafka - 0.8.2.1
3 node ZK  - 3.4.6

We experienced a soft-node failure from one of our brokers (#2). The
process was still running but no logs were being generated, it was not
responding to JMX queries etc.

Several consumers were unable to read from certain partitions while this
was occurring, partitions that have a replication factor of 3.

I have all the server, controller and state-change logs from the event, and
am trying to filter out the relevant information.

But this sequence of events about one partition in the ISR struck me (just
kafka.cluster.Partition):
There are others just like it.

_time,sourcetype,host,_raw
2015-06-05T07:31:47.878-0600,kafka_server_log,qd-kafka8-01,[2015-06-05
07:31:47,878] INFO Partition [birdseed-user-stream,5] on broker 1:
Shrinking ISR for partition [birdseed-user-stream,5] from 3,2,1 to 3,1
(kafka.cluster.Partition)
2015-06-05T07:31:47.884-0600,kafka_server_log,qd-kafka8-01,[2015-06-05
07:31:47,884] INFO Partition [birdseed-user-stream,5] on broker 1: Cached
zkVersion [537] not equal to that in zookeeper, skip updating ISR
(kafka.cluster.Partition)
2015-06-05T07:31:57.934-0600,kafka_server_log,qd-kafka8-03,[2015-06-05
07:31:57,934] INFO Partition [birdseed-user-stream,5] on broker 3:
Shrinking ISR for partition [birdseed-user-stream,5] from 3,2 to 3
(kafka.cluster.Partition)
2015-06-05T07:31:59.454-0600,kafka_server_log,qd-kafka8-01,[2015-06-05
07:31:59,454] INFO Partition [birdseed-user-stream,5] on broker 1:
Shrinking ISR for partition [birdseed-user-stream,5] from 3,2,1 to 1
(kafka.cluster.Partition)
2015-06-05T07:31:59.462-0600,kafka_server_log,qd-kafka8-01,[2015-06-05
07:31:59,462] INFO Partition [birdseed-user-stream,5] on broker 1: Cached
zkVersion [537] not equal to that in zookeeper, skip updating ISR
(kafka.cluster.Partition)
2015-06-05T07:32:07.387-0600,kafka_server_log,qd-kafka8-01,[2015-06-05
07:32:07,387] INFO Partition [birdseed-user-stream,5] on broker 1:
Shrinking ISR for partition [birdseed-user-stream,5] from 3,2,1 to 2,1
(kafka.cluster.Partition)
2015-06-05T07:32:07.397-0600,kafka_server_log,qd-kafka8-01,[2015-06-05
07:32:07,397] INFO Partition [birdseed-user-stream,5] on broker 1: Cached
zkVersion [537] not equal to that in zookeeper, skip updating ISR
(kafka.cluster.Partition)
2015-06-05T07:32:33.619-0600,kafka_server_log,qd-kafka8-03,[2015-06-05
07:32:33,619] INFO Partition [birdseed-user-stream,5] on broker 3:
Expanding ISR for partition [birdseed-user-stream,5] from 3 to 3,2
(kafka.cluster.Partition)
2015-06-05T07:33:12.038-0600,kafka_server_log,qd-kafka8-03,[2015-06-05
07:33:12,038] INFO Partition [birdseed-user-stream,5] on broker 3:
Expanding ISR for partition [birdseed-user-stream,5] from 3,2 to 3,2,1
(kafka.cluster.Partition)
2015-06-05T07:33:38.959-0600,kafka_server_log,qd-kafka8-03,[2015-06-05
07:33:38,959] INFO Partition [birdseed-user-stream,5] on broker 3:
Shrinking ISR for partition [birdseed-user-stream,5] from 3,2,1 to 3,1
(kafka.cluster.Partition)
2015-06-05T07:34:47.266-0600,kafka_server_log,qd-kafka8-03,[2015-06-05
07:34:47,266] INFO Partition [birdseed-user-stream,5] on broker 3:
Expanding ISR for partition [birdseed-user-stream,5] from 3,1 to 3,1,2
(kafka.cluster.Partition)
2015-06-05T07:37:00.584-0600,kafka_server_log,qd-kafka8-03,[2015-06-05
07:37:00,584] INFO Partition [birdseed-user-stream,5] on broker 3:
Shrinking ISR for partition [birdseed-user-stream,5] from 3,1,2 to 3
(kafka.cluster.Partition)
2015-06-05T07:37:00.590-0600,kafka_server_log,qd-kafka8-03,[2015-06-05
07:37:00,590] INFO Partition [birdseed-user-stream,5] on broker 3: Cached
zkVersion [543] not equal to that in zookeeper, skip updating ISR
(kafka.cluster.Partition)
2015-06-05T07:37:09.801-0600,kafka_server_log,qd-kafka8-03,[2015-06-05
07:37:09,801] INFO Partition [birdseed-user-stream,5] on broker 3:
Shrinking ISR for partition [birdseed-user-stream,5] from 3,1,2 to 3
(kafka.cluster.Partition)
2015-06-05T07:37:09.804-0600,kafka_server_log,qd-kafka8-03,[2015-06-05
07:37:09,804] INFO Partition [birdseed-user-stream,5] on broker 3: Cached
zkVersion [543] not equal to that in zookeeper, skip updating ISR
(kafka.cluster.Partition)

The last 2 lines repeat every 10 seconds until broker #2 was bounced.

We've seen this twice now in production, and unfortunately did not get a
jstack from the frozen VM.

What more information can I provide to help with this?

Thanks
Bob Cotton
Rally Software


Re: New Producer API - batched sync mode support

2015-06-17 Thread Greg Lloyd
@Shapira You are correct from my perspective. We are using kafka for a
system where panels can send multiple events in a single message. The
current contract is such that all events fail or succeed as a whole. If
there is a failure the panel resends all the events. The existing producer
api supports this fine, am I getting left behind here for the sake of
brevity?

I can get behind not adding every feature people ask for but taking away
something is a different story all together.

On Wed, Apr 29, 2015 at 9:08 PM, Gwen Shapira gshap...@cloudera.com wrote:

 I'm starting to think that the old adage If two people say you are drunk,
 lie down applies here :)

 Current API seems perfectly clear, useful and logical to everyone who wrote
 it... but we are getting multiple users asking for the old batch behavior
 back.
 One reason to get it back is to make upgrades easier - people won't need to
 rethink their existing logic if they get an API with the same behavior in
 the new producer. The other reason is what Ewen mentioned earlier - if
 everyone re-implements Joel's logic, we can provide something for that.

 How about getting the old batch send behavior back by adding a new API
 with:
 public void batchSend(ListProducerRecordK,V)

 With this implementation (mixes the old behavior with Joel's snippet):
 * send records one by one
 * flush
 * iterate on futures and get them
 * log a detailed message on each error
 * throw an exception if any send failed.

 It reproduces the old behavior - which apparently everyone really liked,
 and I don't think it is overly weird. It is very limited, but anyone who
 needs more control over his sends already have plenty of options.

 Thoughts?

 Gwen




 On Tue, Apr 28, 2015 at 5:29 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey guys,
 
  The locking argument is correct for very small records ( 50 bytes),
  batching will help here because for small records locking becomes the big
  bottleneck. I think these use cases are rare but not unreasonable.
 
  Overall I'd emphasize that the new producer is way faster at virtually
 all
  use cases. If there is a use case where that isn't true, let's look at it
  in a data driven way by comparing the old producer to the new producer
 and
  looking for any areas where things got worse.
 
  I suspect the reducing allocations argument to be not a big thing. We
 do
  a number of small per-message allocations and it didn't seem to have much
  impact. I do think there are a couple of big producer memory
 optimizations
  we could do by reusing the arrays in the accumulator in the serialization
  of the request but I don't think this is one of them.
 
  I'd be skeptical of any api that was too weird--i.e. introduces a new way
  of partitioning, gives back errors on a per-partition rather than per
  message basis (given that partitioning is transparent this is really hard
  to think about), etc. Bad apis end up causing a ton of churn and just
 don't
  end up being a good long term commitment as we change how the underlying
  code works over time (i.e. we hyper optimize for something then have to
  maintain some super weird api as it becomes hyper unoptimized for the
  client over time).
 
  Roshan--Flush works as you would hope, it blocks on the completion of all
  outstanding requests. Calling get on the future for the request gives you
  the associated error code back. Flush doesn't throw any exceptions
 because
  waiting for requests to complete doesn't error, the individual requests
  fail or succeed which is always reported with each request.
 
  Ivan--The batches you send in the scala producer today actually aren't
  truely atomic, they just get sent in a single request.
 
  One tricky problem to solve when user's do batching is size limits on
  requests. This can be very hard to manage since predicting the serialized
  size of a bunch of java objects is not always obvious. This was
 repeatedly
  a problem before.
 
  -Jay
 
  On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov ibalas...@gmail.com
  wrote:
 
   I must agree with @Roshan – it's hard to imagine anything more
 intuitive
   and easy to use for atomic batching as old sync batch api. Also, it's
  fast.
   Coupled with a separate instance of producer per
   broker:port:topic:partition it works very well. I would be glad if it
  finds
   its way into new producer api.
  
   On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
   fetchSize must be set at least as batch bytes (before or after
   compression), otherwise client risks not getting any messages?
  
 



Re: NoSuchMethodError with Consumer Instantiation

2015-06-17 Thread Jaikiran Pai
You probably have the wrong version of the Kafka jar(s) within your 
classpath. Which version of Kafka are you using and how have you setup 
the classpath?


-Jaikiran
On Thursday 18 June 2015 08:11 AM, Srividhya Anantharamakrishnan wrote:

Hi,

I am trying to set up Kafka in our cluster and I am running into the
following error when Consumer is getting instantiated:

java.lang.NoSuchMethodError:
org.apache.kafka.common.utils.Utils.newThread(Ljava/lang/String;Ljava/lang/Runnable;Ljava/lang/Boolean;)Ljava/lang/Thread;

 at
kafka.utils.KafkaScheduler$$anon$1.newThread(KafkaScheduler.scala:84)

 at
java.util.concurrent.ThreadPoolExecutor$Worker.init(ThreadPoolExecutor.java:610)

 at
java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:924)

 at
java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1590)

 at
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:333)

 at
java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:570)

 at kafka.utils.KafkaScheduler.schedule(KafkaScheduler.scala:116)

 at
kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:136)

 at
kafka.javaapi.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:65)

 at
kafka.javaapi.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:68)

 at
kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:120)

 at
kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)


I am guessing that it is missing certain classpath references. If that is
the reason, could someone tell me which jar is it?

If not, what is it that I am missing?


*KafkaConsumer:*


public KafkaConsumer(String topic)

{

* consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
//line where the error is thrown*

  this.topic = topic;

}

  private static ConsumerConfig createConsumerConfig()

{

  Properties props = new Properties();

 props.put(zookeeper.connect, IP:PORT);

 props.put(group.id, group1);

 props.put(zookeeper.session.timeout.ms, 6000);

 props.put(zookeeper.sync.time.ms, 2000);

 props.put(auto.commit.interval.ms, 6);


 return new ConsumerConfig(props);

  }


TIA!





NoSuchMethodError with Consumer Instantiation

2015-06-17 Thread Srividhya Anantharamakrishnan
Hi,

I am trying to set up Kafka in our cluster and I am running into the
following error when Consumer is getting instantiated:

java.lang.NoSuchMethodError:
org.apache.kafka.common.utils.Utils.newThread(Ljava/lang/String;Ljava/lang/Runnable;Ljava/lang/Boolean;)Ljava/lang/Thread;

at
kafka.utils.KafkaScheduler$$anon$1.newThread(KafkaScheduler.scala:84)

at
java.util.concurrent.ThreadPoolExecutor$Worker.init(ThreadPoolExecutor.java:610)

at
java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:924)

at
java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1590)

at
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:333)

at
java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:570)

at kafka.utils.KafkaScheduler.schedule(KafkaScheduler.scala:116)

at
kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:136)

at
kafka.javaapi.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:65)

at
kafka.javaapi.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:68)

at
kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:120)

at
kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)


I am guessing that it is missing certain classpath references. If that is
the reason, could someone tell me which jar is it?

If not, what is it that I am missing?


*KafkaConsumer:*


public KafkaConsumer(String topic)

{

* consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
//line where the error is thrown*

 this.topic = topic;

}

 private static ConsumerConfig createConsumerConfig()

{

 Properties props = new Properties();

props.put(zookeeper.connect, IP:PORT);

props.put(group.id, group1);

props.put(zookeeper.session.timeout.ms, 6000);

props.put(zookeeper.sync.time.ms, 2000);

props.put(auto.commit.interval.ms, 6);


return new ConsumerConfig(props);

 }


TIA!


Re: Keeping Zookeeper and Kafka Server Up

2015-06-17 Thread Shayne S
kafka-server-start.sh has a -daemon option, but I don't think Zookeeper has
it.

On Tue, Jun 16, 2015 at 11:32 PM, Su She suhsheka...@gmail.com wrote:

 It seems like nohup has solved this issue, even when the putty window
 becomes inactive the processes are still running (I din't need to
 interact with them). I might look into using screen or tmux as a long
 term solution.

 Thanks Terry and Mike!

 Best,

 Su


 On Tue, Jun 16, 2015 at 3:42 PM, Terry Bates terryjba...@gmail.com
 wrote:
  Greetings,
 
  nohup does the trick, as Mr. Bridge has shared. If you seem to want to
 run
  these and still have some interactivity with
  the services, consider using screen or tmux as these will enable you
 to
  run these programs in foreground, have added
  windows you can use to access shell, tail logs, and so on, and enable you
  to disconnect from the session, but still have
  these sessions available for re-attachment.
 
  In addition, I using runit for service supervision may enable you to
 keep
  daemons running, but if your services are dying
  you may need to introspect more deeply on the root cause versus working
  around it by restarting them.
 
 
  *Terry Bates*
 
  *Email: *terryjba...@gmail.com
  *Phone: (*412) 215-0881
  *Skype*: terryjbates
  *GitHub*: https://github.com/terryjbates
  *Linkedin*: http://www.linkedin.com/in/terryjbates/
 
 
  On Tue, Jun 16, 2015 at 3:30 PM, Mike Bridge m...@bridgecanada.com
 wrote:
 
  Have you tried using nohup
 
  nohup bin/zookeeper-server-start.sh config/zookeeper.properties 
  nohup bin/kafka-server-start.sh config/server.properties 
 
 
  On Tue, Jun 16, 2015 at 3:21 PM, Su She suhsheka...@gmail.com wrote:
 
   Hello Everyone,
  
   I'm wondering how to keep Zookeeper and Kafka Server up even when my
   SSH (using putty) becomes inactive. I've tried running it in the
   background (using ), but it seems like it stops sometimes after a
   couple hours or so and I'll have to restart zookeeper and/or the kafka
   server.
  
   The only remediation i've found is to export TMOUT=[big number], but
   there must be another solution.
  
   Thank you!
  
   Best,
  
   Su
  
 



Re: Recovering from broker failure with KafkaConsumer

2015-06-17 Thread Jan Stette
Adding some more details to the previous question:

The indefinite wait doesn't happen on calling subscribe() on the consumer,
it happens when I (in this case) call seekToEnd().

A related problem to this is that the seekToEnd() method is synchronized
(as are the other access methods on KafkaConsumer), so the client holds a
lock while sitting in this wait. This means that if another thread tries
 to call close(), which is all synchronized, this thread will also be
blocked.

Holding locks while performing network I/O seems like a bad idea - is this
something that's planned to be fixed?

Jan



On 17 June 2015 at 10:31, Jan Stette jan.ste...@gmail.com wrote:

 I'm trying out the new KafkaConsumer client API in the trunk of the source
 tree, and while I realise that this is a work in progress, I have a
 question that perhaps someone can shed some light on.

 I'm looking at how to handle various error scenarios for a Kafka client,
 in particular what happens when trying to connect to the broker but it's
 not available. The behaviour I'm seeing is that the client will retry
 indefinitely (at the configurable interval), basically looping around in
 Fetcher.awaitMetadataUpdate() forever.

 I would like to have some way to fail the connection attempt to avoid the
 calling thread being blocked forever. Is this possible with the current
 version of the client? (Snapshot as of 16/6/15). If not, is that something
 that's planned for the future?

 Jan




Re: OutOfMemoryError in mirror maker

2015-06-17 Thread tao xiao
Thank you for the reply.

Patch submitted https://issues.apache.org/jira/browse/KAFKA-2281

On Mon, 15 Jun 2015 at 02:16 Jiangjie Qin j...@linkedin.com.invalid wrote:

 Hi Tao,

 Yes, the issue that ErrorLoggingCallback keeps value as local variable is
 known for a while and we probably should fix it as the value is not used
 except logging the its size. Can you open a ticket and maybe also submit a
 patch?

 For unreachable objects I don¹t think it is memory leak. As you said, GC
 should take care of this. In LinkedIn we are using G1GC with some tunings
 made by our SRE. You can try that if interested.

 Thanks,

 Jiangjie (Becket) Qin

 On 6/13/15, 11:39 AM, tao xiao xiaotao...@gmail.com wrote:

 Hi,
 
 I am using mirror maker in trunk to replica data across two data centers.
 While the destination broker was having busy load and unresponsive the
 send
 rate of mirror maker was very low and the available producer buffer was
 quickly filled up. At the end mirror maker threw OOME. Detailed exception
 can be found here
 
 https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-oome-exceptio
 n-L1
 
 I started up mirror maker with 1G memory and 256M producer buffer. I used
 eclipse MAT to analyze the heap dump and found out the retained heap size
 of all RecordBatch objects were more than 500MB half of which were used to
 retain data that were to send to destination broker which makes sense to
 me
 as it is close to 256MB producer buffer but the other half of which were
 used by kafka.tools.MirrorMaker$MirrorMakerProducerCallback. As every
 producer callback in mirror maker takes the message value and hold it
 until
 the message is successfully delivered. In my case since the destination
 broker was very unresponsive the message value held by callback would stay
 forever which I think is a waste and it is a major contributor to the OOME
 issue. screenshot of MAT
 
 https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-mat-screensho
 t-png
 
 The other interesting problem I observed is that when I turned on
 unreachable object parsing in MAT more than 400MB memory was occupied by
 unreachable objects. It surprised me that gc didn't clean them up before
 OOME was thrown. As suggested in gc log
 
 https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-oome-gc-log-L
 1
 Full GC was unable to reclaim any memory and when facing OOME these
 unreachable objects should have been cleaned up. so either eclipse MAT has
 issue parsing the heap dump or there is hidden memory leak that is hard to
 find. I attached the sample screenshot of the unreachable objects here
 
 https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-unreachable-o
 bjects-png
 
 The consumer properties
 
 zookeeper.connect=zk
 zookeeper.connection.timeout.ms=100
 group.id=mm
 auto.offset.reset=smallest
 partition.assignment.strategy=roundrobin
 
 The producer properties
 
 bootstrap.servers=brokers
 client.id=mirror-producer
 producer.type=async
 compression.codec=none
 serializer.class=kafka.serializer.DefaultEncoder
 key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
 buffer.memory=268435456
 batch.size=1048576
 max.request.size=5242880
 send.buffer.bytes=1048576
 
 The java command to start mirror maker
 java -Xmx1024M -Xms512M -XX:+HeapDumpOnOutOfMemoryError
 -XX:HeapDumpPath=/home/kafka/slc-phx-mm-cg.hprof
 -XX:+PrintTenuringDistribution -XX:MaxTenuringThreshold=3 -server
 -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled
 -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC
 -Djava.awt.headless=true
 -Xloggc:/var/log/kafka/kafka-phx/cg/mirrormaker-gc.log -verbose:gc
 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps
 -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
 -XX:GCLogFileSize=10M -Dcom.sun.management.jmxremote
 -Dcom.sun.management.jmxremote.authenticate=false
 -Dcom.sun.management.jmxremote.ssl=false
 -Dkafka.logs.dir=/var/log/kafka/kafka-phx/cg
 -Dlog4j.configuration=file:/usr/share/kafka/bin/../config/tools-log4j.prop
 erties
 -cp libs/* kafka.tools.MirrorMaker --consumer.config
 consumer.properties --num.streams 10 --producer.config
 producer.properties --whitelist test.*




Re: At-least-once guarantees with high-level consumer

2015-06-17 Thread Stevo Slavić
With auto-commit one can only have at-most-once delivery guarantee - after
commit but before message is delivered for processing, or even after it is
delivered but before it is processed, things can fail, causing event not to
be processed, which is basically same outcome as if it was not delivered.

On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann ch.heym...@gmail.com wrote:

 Hi

 ** Disclaimer: I know there's a new consumer API on the way, this mail is
 about the currently available API. I also apologise if the below has
 already been discussed previously. I did try to check previous discussions
 on ConsumerIterator **

 It seems to me that the high-level consumer would be able to support
 at-least-once messaging, even if one uses auto-commit, by changing
 kafka.consumer.ConsumerIterator.next() to call
 currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This way, a
 consumer thread for a KafkaStream could just loop:

 while (true) {
 MyMessage message = iterator.next().message();
 process(message);
 }

 Each call to iterator.next() then updates the offset to commit to the end
 of the message that was just processed. When offsets are committed for the
 ConsumerConnector (either automatically or manually), the commit will not
 include offsets of messages that haven't been fully processed.

 I've tested the following ConsumerIterator.next(), and it seems to work as
 I expect:

   override def next(): MessageAndMetadata[K, V] = {
 // New code: reset consumer offset to the end of the previously
 consumed message:
 if (consumedOffset  -1L  currentTopicInfo != null) {
 currentTopicInfo.resetConsumeOffset(consumedOffset)
 val topic = currentTopicInfo.topic
 trace(Setting %s consumed offset to %d.format(topic,
 consumedOffset))
 }

 // Old code, excluding reset:
 val item = super.next()
 if(consumedOffset  0)
   throw new KafkaException(Offset returned by the message set is
 invalid %d.format(consumedOffset))
 val topic = currentTopicInfo.topic
 consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
 consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
 item
   }

 I've seen several people asking about managing commit offsets manually with
 the high level consumer. I suspect that this approach (the modified
 ConsumerIterator) would scale better than having a separate
 ConsumerConnecter per stream just so that you can commit offsets with
 at-least-once semantics. The downside of this approach is more duplicate
 deliveries after recovery from hard failure (but this is at least once,
 right, not exactly once).

 I don't propose that the code necessarily be changed like this in trunk, I
 just want to know if the approach seems reasonable.

 Regards
 Carl Heymann



How to specify kafka bootstrap jvm options?

2015-06-17 Thread luo.fucong
I want to tune the kafka jvm options, but nowhere can I pass the options to the 
kafka startup script(bin/kafka-server-start.sh).
How to pass in the jvm options?

Recovering from broker failure with KafkaConsumer

2015-06-17 Thread Jan Stette
I'm trying out the new KafkaConsumer client API in the trunk of the source
tree, and while I realise that this is a work in progress, I have a
question that perhaps someone can shed some light on.

I'm looking at how to handle various error scenarios for a Kafka client, in
particular what happens when trying to connect to the broker but it's not
available. The behaviour I'm seeing is that the client will retry
indefinitely (at the configurable interval), basically looping around in
Fetcher.awaitMetadataUpdate() forever.

I would like to have some way to fail the connection attempt to avoid the
calling thread being blocked forever. Is this possible with the current
version of the client? (Snapshot as of 16/6/15). If not, is that something
that's planned for the future?

Jan


Re: How to specify kafka bootstrap jvm options?

2015-06-17 Thread Manikumar Reddy
Most of the tuning options are available in kafka-run-class.sh. You can
override required props (KAFKA_HEAP_OPTS , KAFKA_JVM_PERFORMANCE_OPTS) to
kafka-server-start.sh script.

On Wed, Jun 17, 2015 at 2:11 PM, luo.fucong bayinam...@gmail.com wrote:

 I want to tune the kafka jvm options, but nowhere can I pass the options
 to the kafka startup script(bin/kafka-server-start.sh).
 How to pass in the jvm options?


Re: Log compaction not working as expected

2015-06-17 Thread Jan Filipiak

Hi,

you might want to have a look here: 
http://kafka.apache.org/documentation.html#topic-config
_segment.ms_ and _segment.bytes _ should allow you to control the 
time/size when segments are rolled.


Best
Jan

On 16.06.2015 14:05, Shayne S wrote:

Some further information, and is this a bug?  I'm using 0.8.2.1.

Log compaction will only occur on the non active segments.  Intentional or
not, it seems that the last segment is always the active segment.  In other
words, an expired segment will not be cleaned until a new segment has been
created.

As a result, a log won't be compacted until new data comes in (per
partition). Does this mean I need to send the equivalent of a pig (
https://en.wikipedia.org/wiki/Pigging) through each partition in order to
force compaction?  Or can I force the cleaning somehow?

Here are the steps to recreate:

1. Create a new topic with a 5 minute segment.ms:

kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
--replication-factor 1 --partitions 1 --config cleanup.policy=compact
--config min.cleanable.dirty.ratio=0.01 --config segment.ms=30

2. Repeatedly add messages with identical keys (3x):

echo ABC123,{\test\: 1} | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

3. Wait 5+ minutes and confirm no log compaction.
4. Once satisfied, send a new message:

echo DEF456,{\test\: 1} | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

5. Log compaction will occur quickly soon after.

Is my use case of infrequent logs not supported? Is this intentional
behavior? It's unnecessarily challenging to target each partition with a
dummy message to trigger compaction.

Also, I believe there is another issue with logs originally configured
without a segment timeout that lead to my original issue.  I still cannot
get those logs to compact.

Thanks!
Shayne





Re: At-least-once guarantees with high-level consumer

2015-06-17 Thread yewton

So Carl Heymann's ConsumerIterator.next hack approach is not reasonable?

2015-06-17 08:12:50 + 上のメッセージ Stevo Slavić:


--047d7bfcf30ed09b460518b241db

Content-Type: text/plain; charset=UTF-8



With auto-commit one can only have at-most-once delivery guarantee - after

commit but before message is delivered for processing, or even after it is

delivered but before it is processed, things can fail, causing event not to

be processed, which is basically same outcome as if it was not delivered.



On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann 
ch.heym...@gmail.com wrote:




 Hi



 ** Disclaimer: I know there's a new consumer API on the way, this mail is

 about the currently available API. I also apologise if the below has

 already been discussed previously. I did try to check previous discussions

 on ConsumerIterator **



 It seems to me that the high-level consumer would be able to support

 at-least-once messaging, even if one uses auto-commit, by changing

 kafka.consumer.ConsumerIterator.next() to call

 currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This way, a

 consumer thread for a KafkaStream could just loop:



 while (true) {

 MyMessage message = iterator.next().message();

 process(message);

 }



 Each call to iterator.next() then updates the offset to commit to the end

 of the message that was just processed. When offsets are committed for the

 ConsumerConnector (either automatically or manually), the commit will not

 include offsets of messages that haven't been fully processed.



 I've tested the following ConsumerIterator.next(), and it seems to work as

 I expect:



   override def next(): MessageAndMetadata[K, V] = {

 // New code: reset consumer offset to the end of the previously

 consumed message:

 if (consumedOffset  -1L  currentTopicInfo != null) {

 currentTopicInfo.resetConsumeOffset(consumedOffset)

 val topic = currentTopicInfo.topic

 trace(Setting %s consumed offset to %d.format(topic,

 consumedOffset))

 }



 // Old code, excluding reset:

 val item = super.next()

 if(consumedOffset  0)

   throw new KafkaException(Offset returned by the message set is

 invalid %d.format(consumedOffset))

 val topic = currentTopicInfo.topic

 consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()

 consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()

 item

   }



 I've seen several people asking about managing commit offsets manually with

 the high level consumer. I suspect that this approach (the modified

 ConsumerIterator) would scale better than having a separate

 ConsumerConnecter per stream just so that you can commit offsets with

 at-least-once semantics. The downside of this approach is more duplicate

 deliveries after recovery from hard failure (but this is at least once,

 right, not exactly once).



 I don't propose that the code necessarily be changed like this in trunk, I

 just want to know if the approach seems reasonable.



 Regards

 Carl Heymann





--047d7bfcf30ed09b460518b241db--









Re: Log compaction not working as expected

2015-06-17 Thread Jan Filipiak

Ah misread that sorry!

On 17.06.2015 14:26, Shayne S wrote:

Right, you can see I've got segment.ms set.  The trick is that they don't
actually roll over until something new arrives. If your topic is idle (not
receiving messages), it won't ever roll over to a new segment, and thus the
last segment will never be compacted.

Thanks!
Shayne

On Wed, Jun 17, 2015 at 5:58 AM, Jan Filipiak jan.filip...@trivago.com
wrote:


Hi,

you might want to have a look here:
http://kafka.apache.org/documentation.html#topic-config
_segment.ms_ and _segment.bytes _ should allow you to control the
time/size when segments are rolled.

Best
Jan


On 16.06.2015 14:05, Shayne S wrote:


Some further information, and is this a bug?  I'm using 0.8.2.1.

Log compaction will only occur on the non active segments.  Intentional or
not, it seems that the last segment is always the active segment.  In
other
words, an expired segment will not be cleaned until a new segment has been
created.

As a result, a log won't be compacted until new data comes in (per
partition). Does this mean I need to send the equivalent of a pig (
https://en.wikipedia.org/wiki/Pigging) through each partition in order to
force compaction?  Or can I force the cleaning somehow?

Here are the steps to recreate:

1. Create a new topic with a 5 minute segment.ms:

kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
--replication-factor 1 --partitions 1 --config cleanup.policy=compact
--config min.cleanable.dirty.ratio=0.01 --config segment.ms=30

2. Repeatedly add messages with identical keys (3x):

echo ABC123,{\test\: 1} | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

3. Wait 5+ minutes and confirm no log compaction.
4. Once satisfied, send a new message:

echo DEF456,{\test\: 1} | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

5. Log compaction will occur quickly soon after.

Is my use case of infrequent logs not supported? Is this intentional
behavior? It's unnecessarily challenging to target each partition with a
dummy message to trigger compaction.

Also, I believe there is another issue with logs originally configured
without a segment timeout that lead to my original issue.  I still cannot
get those logs to compact.

Thanks!
Shayne






Re: Keeping Zookeeper and Kafka Server Up

2015-06-17 Thread Kashyap Mhaisekar
We use supervisord for this. It ensures that the processes are always up
and running.

Thanks
Kashyap

On Wednesday, June 17, 2015, Shayne S shaynest...@gmail.com wrote:

 kafka-server-start.sh has a -daemon option, but I don't think Zookeeper has
 it.

 On Tue, Jun 16, 2015 at 11:32 PM, Su She suhsheka...@gmail.com
 javascript:; wrote:

  It seems like nohup has solved this issue, even when the putty window
  becomes inactive the processes are still running (I din't need to
  interact with them). I might look into using screen or tmux as a long
  term solution.
 
  Thanks Terry and Mike!
 
  Best,
 
  Su
 
 
  On Tue, Jun 16, 2015 at 3:42 PM, Terry Bates terryjba...@gmail.com
 javascript:;
  wrote:
   Greetings,
  
   nohup does the trick, as Mr. Bridge has shared. If you seem to want to
  run
   these and still have some interactivity with
   the services, consider using screen or tmux as these will enable
 you
  to
   run these programs in foreground, have added
   windows you can use to access shell, tail logs, and so on, and enable
 you
   to disconnect from the session, but still have
   these sessions available for re-attachment.
  
   In addition, I using runit for service supervision may enable you to
  keep
   daemons running, but if your services are dying
   you may need to introspect more deeply on the root cause versus working
   around it by restarting them.
  
  
   *Terry Bates*
  
   *Email: *terryjba...@gmail.com javascript:;
   *Phone: (*412) 215-0881
   *Skype*: terryjbates
   *GitHub*: https://github.com/terryjbates
   *Linkedin*: http://www.linkedin.com/in/terryjbates/
  
  
   On Tue, Jun 16, 2015 at 3:30 PM, Mike Bridge m...@bridgecanada.com
 javascript:;
  wrote:
  
   Have you tried using nohup
  
   nohup bin/zookeeper-server-start.sh config/zookeeper.properties 
   nohup bin/kafka-server-start.sh config/server.properties 
  
  
   On Tue, Jun 16, 2015 at 3:21 PM, Su She suhsheka...@gmail.com
 javascript:; wrote:
  
Hello Everyone,
   
I'm wondering how to keep Zookeeper and Kafka Server up even when my
SSH (using putty) becomes inactive. I've tried running it in the
background (using ), but it seems like it stops sometimes after a
couple hours or so and I'll have to restart zookeeper and/or the
 kafka
server.
   
The only remediation i've found is to export TMOUT=[big number], but
there must be another solution.
   
Thank you!
   
Best,
   
Su
   
  
 



Re: QuickStart OK locally, but getting WARN Property topic is not valid and LeaderNotAvailableException remotely

2015-06-17 Thread Mike Bridge
I set this up on EC2 in exactly the same way and had the same errors when
accessing it with a producer that was outside EC2.  Is there something else
I have to configure other than to set advertised.host.name to my external
IP address?


On Tue, Jun 16, 2015 at 4:27 PM, Mike Bridge m...@bridgecanada.com wrote:

 Running this seems to indicate that there is a leader at 0:

 $ ./bin/kafka-topics.sh --zookeeper MY.EXTERNAL.IP:2181 --describe
 --topic test123
 -- Topic:test123 PartitionCount:1 ReplicationFactor:1 Configs:
 Topic: test123 Partition: 0 Leader: 0 Replicas: 0 Isr: 0

 I reran this test and my server.log indicates that there is a leader at 0:
 ...
 [2015-06-16 21:58:04,498] INFO 0 successfully elected as leader
 (kafka.server.ZookeeperLeaderElector)
 [2015-06-16 21:58:04,642] INFO Registered broker 0 at path
 /brokers/ids/0 with address MY.EXTERNAL.IP:9092. (kafka.utils.ZkUtils$)
 [2015-06-16 21:58:04,670] INFO [Kafka Server 0], started
 (kafka.server.KafkaServer)
 [2015-06-16 21:58:04,736] INFO New leader is 0
 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

 I see this error in the logs when I send a message from the producer:

[2015-06-16 22:18:24,584] ERROR [KafkaApi-0] error when handling
 request Name: TopicMetadataRequest; Version: 0; CorrelationId: 7; ClientId:
 console-producer; Topics: test123 (kafka.server.KafkaApis)
 kafka.admin.AdminOperationException: replication factor: 1 larger than
 available brokers: 0
 at
 kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)

 If it sees my broker when I start it up, why can't it find it later when
 it receives a message?

 Thanks,

 -Mike


 On Tue, Jun 16, 2015 at 3:11 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

 The topic warning is a bug (i.e the fact that you get a warning on
 perfectly valid parameter). We fixed it for next release.

 It is also unrelated to the real issue with the LeaderNotAvailable




Re: Keeping Zookeeper and Kafka Server Up

2015-06-17 Thread Dillian Murphey
supervisord is pretty easy to use.  Netflix Exhibitor will manage this all
for zookeeper, if you want to try that tool.

On Wed, Jun 17, 2015 at 7:03 AM, Kashyap Mhaisekar kashya...@gmail.com
wrote:

 We use supervisord for this. It ensures that the processes are always up
 and running.

 Thanks
 Kashyap

 On Wednesday, June 17, 2015, Shayne S shaynest...@gmail.com wrote:

  kafka-server-start.sh has a -daemon option, but I don't think Zookeeper
 has
  it.
 
  On Tue, Jun 16, 2015 at 11:32 PM, Su She suhsheka...@gmail.com
  javascript:; wrote:
 
   It seems like nohup has solved this issue, even when the putty window
   becomes inactive the processes are still running (I din't need to
   interact with them). I might look into using screen or tmux as a long
   term solution.
  
   Thanks Terry and Mike!
  
   Best,
  
   Su
  
  
   On Tue, Jun 16, 2015 at 3:42 PM, Terry Bates terryjba...@gmail.com
  javascript:;
   wrote:
Greetings,
   
nohup does the trick, as Mr. Bridge has shared. If you seem to want
 to
   run
these and still have some interactivity with
the services, consider using screen or tmux as these will enable
  you
   to
run these programs in foreground, have added
windows you can use to access shell, tail logs, and so on, and enable
  you
to disconnect from the session, but still have
these sessions available for re-attachment.
   
In addition, I using runit for service supervision may enable you
 to
   keep
daemons running, but if your services are dying
you may need to introspect more deeply on the root cause versus
 working
around it by restarting them.
   
   
*Terry Bates*
   
*Email: *terryjba...@gmail.com javascript:;
*Phone: (*412) 215-0881
*Skype*: terryjbates
*GitHub*: https://github.com/terryjbates
*Linkedin*: http://www.linkedin.com/in/terryjbates/
   
   
On Tue, Jun 16, 2015 at 3:30 PM, Mike Bridge m...@bridgecanada.com
  javascript:;
   wrote:
   
Have you tried using nohup
   
nohup bin/zookeeper-server-start.sh config/zookeeper.properties
 
nohup bin/kafka-server-start.sh config/server.properties 
   
   
On Tue, Jun 16, 2015 at 3:21 PM, Su She suhsheka...@gmail.com
  javascript:; wrote:
   
 Hello Everyone,

 I'm wondering how to keep Zookeeper and Kafka Server up even when
 my
 SSH (using putty) becomes inactive. I've tried running it in the
 background (using ), but it seems like it stops sometimes after a
 couple hours or so and I'll have to restart zookeeper and/or the
  kafka
 server.

 The only remediation i've found is to export TMOUT=[big number],
 but
 there must be another solution.

 Thank you!

 Best,

 Su

   
  
 



Re: Keeping Zookeeper and Kafka Server Up

2015-06-17 Thread Joe Stein
+1 to using exhibitor. Besides managing the ensemble it also helps with
backups and zk log cleanup (which if you don't do your machine will run out
of space).

~ Joestein
On Jun 17, 2015 9:44 AM, Dillian Murphey crackshotm...@gmail.com wrote:

 supervisord is pretty easy to use.  Netflix Exhibitor will manage this all
 for zookeeper, if you want to try that tool.

 On Wed, Jun 17, 2015 at 7:03 AM, Kashyap Mhaisekar kashya...@gmail.com
 wrote:

  We use supervisord for this. It ensures that the processes are always up
  and running.
 
  Thanks
  Kashyap
 
  On Wednesday, June 17, 2015, Shayne S shaynest...@gmail.com wrote:
 
   kafka-server-start.sh has a -daemon option, but I don't think Zookeeper
  has
   it.
  
   On Tue, Jun 16, 2015 at 11:32 PM, Su She suhsheka...@gmail.com
   javascript:; wrote:
  
It seems like nohup has solved this issue, even when the putty window
becomes inactive the processes are still running (I din't need to
interact with them). I might look into using screen or tmux as a long
term solution.
   
Thanks Terry and Mike!
   
Best,
   
Su
   
   
On Tue, Jun 16, 2015 at 3:42 PM, Terry Bates terryjba...@gmail.com
   javascript:;
wrote:
 Greetings,

 nohup does the trick, as Mr. Bridge has shared. If you seem to want
  to
run
 these and still have some interactivity with
 the services, consider using screen or tmux as these will
 enable
   you
to
 run these programs in foreground, have added
 windows you can use to access shell, tail logs, and so on, and
 enable
   you
 to disconnect from the session, but still have
 these sessions available for re-attachment.

 In addition, I using runit for service supervision may enable you
  to
keep
 daemons running, but if your services are dying
 you may need to introspect more deeply on the root cause versus
  working
 around it by restarting them.


 *Terry Bates*

 *Email: *terryjba...@gmail.com javascript:;
 *Phone: (*412) 215-0881
 *Skype*: terryjbates
 *GitHub*: https://github.com/terryjbates
 *Linkedin*: http://www.linkedin.com/in/terryjbates/


 On Tue, Jun 16, 2015 at 3:30 PM, Mike Bridge 
 m...@bridgecanada.com
   javascript:;
wrote:

 Have you tried using nohup

 nohup bin/zookeeper-server-start.sh
 config/zookeeper.properties
  
 nohup bin/kafka-server-start.sh config/server.properties 


 On Tue, Jun 16, 2015 at 3:21 PM, Su She suhsheka...@gmail.com
   javascript:; wrote:

  Hello Everyone,
 
  I'm wondering how to keep Zookeeper and Kafka Server up even
 when
  my
  SSH (using putty) becomes inactive. I've tried running it in the
  background (using ), but it seems like it stops sometimes
 after a
  couple hours or so and I'll have to restart zookeeper and/or the
   kafka
  server.
 
  The only remediation i've found is to export TMOUT=[big number],
  but
  there must be another solution.
 
  Thank you!
 
  Best,
 
  Su
 

   
  
 



Re: duplicate messages at consumer

2015-06-17 Thread Adam Shannon
This is actually an expected consequence of using distributed systems. The
kafka FAQ has a good answer

https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIgetexactly-oncemessagingfromKafka
?

On Tue, Jun 16, 2015 at 11:06 PM, Kris K squareksc...@gmail.com wrote:

 Hi,

 While testing message delivery using kafka, I realized that few duplicate
 messages got delivered by the consumers in the same consumer group (two
 consumers got the same message with few milli-seconds difference). However,
 I do not see any redundancy at the producer or broker. One more observation
 is that - this is not happening when I use only one consumer thread.

 I am running 3 brokers (0.8.2.1) with 3 Zookeeper nodes. There are 3
 partitions in the topic and replication-factor is 3. For producing, am
 using New Producer with compression.type=none.

 On the consumer end, I have 3 High level consumers in the same consumer
 group running with one consumer thread each, on three different hosts. Auto
 commit is set to true for consumer.

 Size of each message would range anywhere between 0.7 KB and  2 MB. The max
 volume for this test is 100 messages/hr.

 I looked at controller log for any possibility of consumer rebalance during
 this time, but did not find any. In the server log of all the brokers the
 error - java.io.IOException: Connection reset by peer is almost being
 written continuously.

 So, is it possible to achieve exactly-once delivery with the current high
 level consumer without needing an extra layer to remove redundancy?

 Could you please point me to any settings or logs that would help me tune
 the configuration ?

 *PS: I tried searching for similar discussions, but could not find any. If
 its already been answered, please provide the link.

 Thanks,
 Kris




-- 
Adam Shannon | Software Engineer | Banno | Jack Henry
206 6th Ave Suite 1020 | Des Moines, IA 50309 | Cell: 515.867.8337