Hi Shuo, What are your producer and consumer config values?
Guozhang On Fri, Oct 10, 2014 at 3:47 AM, Shuo Chen <[email protected]> wrote: > Dear folks, > I have run some basic tests using kafka. I have started 7 brokers, > created a topic with 21 partitions and 3 replicas. > > 2 producers are started with 5 threads each and pushed 2419200 messages of > 50 bytes in total. > > 3 consumers with 2 in one group and 1 in other are started to fetch the > data at the same time. High level api is used to fetch the messages. > > The whole process runs for about 3 hours. One group got 23991800 messages > and another group got 23911600 messages. I start the consumer again, no > messages are fetched later. > > I try to catch the exceptions with send() and next() functions, however no > exceptions are thrown. I check the offset of zookeeper, but the offsets are > right. > > Here is the most strangest thing that, I try to consume the same topic with > a different groupid, however on messages could be fetched. The offset of > this group jumps to the end of the offset. > > So I think this maybe a configuration problem. Thanks for help in advance! > > My configuration is listed as follows: > > # The receive buffer (SO_RCVBUF) used by the socket server > socket.receive.buffer.bytes=5242880 > > # The maximum size of a request that the socket server will accept > (protection against OOM) > socket.request.max.bytes=104857600 > > > ############################# Log Basics ############################# > > # A comma seperated list of directories under which to store log files > > log.dirs=/data1/kafka-logs,/data2/kafka-logs,/data3/kafka-logs,/data4/kafka-logs,/data5/kafka-logs > > # The default number of log partitions per topic. More partitions allow > greater > # parallelism for consumption, but this will also result in more files > across > # the brokers. > num.partitions=2 > > ############################# Log Flush Policy > ############################# > > # Messages are immediately written to the filesystem but by default we only > fsync() to sync > # the OS cache lazily. The following configurations control the flush of > data to disk. > # There are a few important trade-offs here: > # 1. Durability: Unflushed data may be lost if you are not using > replication. > # 2. Latency: Very large flush intervals may lead to latency spikes when > the flush does occur as there will be a lot of data to flush. > # 3. Throughput: The flush is generally the most expensive operation, > and a small flush interval may lead to exceessive seeks. > # The settings below allow one to configure the flush policy to flush data > after a period of time or > # every N messages (or both). This can be done globally and overridden on a > per-topic basis. > > # The number of messages to accept before forcing a flush of data to disk > #log.flush.interval.messages=10000 > > # The maximum amount of time a message can sit in a log before we force a > flush > #log.flush.interval.ms=1000 > > ############################# Log Retention Policy > ############################# > > # The following configurations control the disposal of log segments. The > policy can > # be set to delete segments after a period of time, or after a given size > has accumulated. > # A segment will be deleted whenever *either* of these criteria are met. > Deletion always happens > # from the end of the log. > > # The minimum age of a log file to be eligible for deletion > log.retention.hours=48 > > # A size-based retention policy for logs. Segments are pruned from the log > as long as the remaining > # segments don't drop below log.retention.bytes. > #log.retention.bytes=1073741824 > > # The maximum size of a log segment file. When this size is reached a new > log segment will be created. > log.segment.bytes=536870912 > > # The interval at which log segments are checked to see if they can be > deleted according > # to the retention policies > log.retention.check.interval.ms=60000 > > # By default the log cleaner is disabled and the log retention policy will > default to just delete segments after their retention expires. > # If log.cleaner.enable=true is set the cleaner will be enabled and > individual logs can then be marked for log compaction. > log.cleaner.enable=false > > ############################# Zookeeper ############################# > # Zookeeper connection string (see zookeeper docs for details). > # This is a comma separated host:port pairs, each corresponding to a zk > # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". > # You can also append an optional chroot string to the urls to specify the > # root directory for all kafka znodes. > zookeeper.connect=10.16.42.225:2199,10.16.42.226:2199,10.16.42.227:2199 > > # Timeout in ms for connecting to zookeeper > zookeeper.connection.timeout.ms=1000000 > > ---- > Shuo Chen > -- -- Guozhang
