Re: kafka performance question
A couple of thoughts spring to mind, are you sending the whole file as 1 message? and is your producer code using sync or async mode? Cheers Rob. On 14 May 2014 15:49, Jun Rao jun...@gmail.com wrote: How many brokers and partitions do you have? You may try increasing batch.num.messages. Thanks, Jun On Tue, May 13, 2014 at 5:56 PM, Zhujie (zhujie, Smartcare) first.zhu...@huawei.com wrote: Dear all, We want to use kafka to collect and dispatch data file, but the performance is maybe lower than we want. In our cluster,there is a provider and a broker. We use a one thread read file from local disk of provider and send it to broker. The average throughput is only 3 MB/S~4MB/S. But if we just use java NIO API to send file ,the throughput can exceed 200MB/S. Why the kafka performance is so bad in our test, are we missing something?? Our server: Cpu: Intel(R) Xeon(R) CPU E5-4650 0 @ 2.70GHz*4 Mem:300G Disk:600G 15K RPM SAS*8 Configuration of provider: props.put(serializer.class, kafka.serializer.NullEncoder); props.put(metadata.broker.list, 169.10.35.57:9092); props.put(request.required.acks, 0); props.put(producer.type, async);//异步 props.put(queue.buffering.max.ms,500); props.put(queue.buffering.max.messages,10); props.put(batch.num.messages, 1200); props.put(queue.enqueue.timeout.ms, -1); props.put(send.buffer.bytes, 10240); Configuration of broker: # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the License); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # #http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an AS IS BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults # Server Basics # # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 # Socket Server Settings # # The port the socket server listens on port=9092 # Hostname the broker will bind to. If not set, the server will bind to all interfaces #host.name=localhost # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for host.name if configured. Otherwise, it will use the value returned from # java.net.InetAddress.getCanonicalHostName(). #advertised.host.name=hostname routable by clients # The port to publish to ZooKeeper for clients to use. If this is not set, # it will publish the same port that the broker binds to. #advertised.port=port accessible by clients # The number of threads handling network requests #num.network.threads=2 # The number of threads doing disk I/O #num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server #socket.send.buffer.bytes=1048576 # The receive buffer (SO_RCVBUF) used by the socket server #socket.receive.buffer.bytes=1048576 # The maximum size of a request that the socket server will accept (protection against OOM) #socket.request.max.bytes=104857600 # Log Basics # # A comma seperated list of directories under which to store log files log.dirs=/data/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. #num.partitions=2 # Log Flush Policy # # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: #1. Durability: Unflushed data may be lost if you are not using replication. #2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. #3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The
Re: [Using offset to fetch message][Need Help]
Rather than fetching the message again you could cache it in the spout and emit it again if the *fail* method is called and delete it when the *ack*method is called. This is possible as Storm guarantees to call the *fail* and *ack* methods with the *messageId* on the exact same spout that the message originated from. This means if you have cached the message there then it will still be available. On 27 January 2014 19:29, Abhishek Bhattacharjee abhishek.bhattacharje...@gmail.com wrote: I am using storm and kafka for replaying messages. Now I want to save offset of each message and then use it later for resending the message. So my question is how can I fetch a single message using its offset ? That is I know the offset of a message and I want to use the offset to fetch that message(only that message). Thanks, *Abhishek Bhattacharjee* *Pune Institute of Computer Technology* -- Cheers Rob.
Re: cluster expansion
Is the kafka-add-partitons.sh tool stable in 0.8.0? On 13 December 2013 19:21, Neha Narkhede neha.narkh...@gmail.com wrote: Partition movement is not an automatic operation in Kafka yet. You need to use the partition reassignment tool - https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool . Also, that feature is stable in 0.8.1. Thanks, Neha On Fri, Dec 13, 2013 at 6:48 AM, Robert Turner rob...@bigfoot.com wrote: No the 6 partitions for each topic will remain on the original brokers. You could either reassign some partitions from all topics to the new brokers or you could add partitions to the new brokers for each topic. In 0.8.0 there is now an add-partitions tool. Cheers Rob Turner. On 13 December 2013 14:42, Yu, Libo libo...@citi.com wrote: Hi folks, There are three brokers running 0.8-beta1 in our cluster currently. Assume all the topics have six partitions. I am going to add another three brokers to the cluster and upgrade all of them to 0.8. My question is after the cluster is up, will the partition be evenly distributed to all brokers? Thanks. Regards, Libo -- Cheers Rob. -- Cheers Rob.
Re: cluster expansion
No the 6 partitions for each topic will remain on the original brokers. You could either reassign some partitions from all topics to the new brokers or you could add partitions to the new brokers for each topic. In 0.8.0 there is now an add-partitions tool. Cheers Rob Turner. On 13 December 2013 14:42, Yu, Libo libo...@citi.com wrote: Hi folks, There are three brokers running 0.8-beta1 in our cluster currently. Assume all the topics have six partitions. I am going to add another three brokers to the cluster and upgrade all of them to 0.8. My question is after the cluster is up, will the partition be evenly distributed to all brokers? Thanks. Regards, Libo -- Cheers Rob.
Re: Kafka trouble with Avro encoding
Hi Brendan, I would try using Integer, byte[] rather than String, Message and setting: props.put(serializer.class, kafka.serializer.DefaultEncoder); Cheers Rob. On 2 December 2013 22:01, Brenden Cobb brenden.c...@humedica.com wrote: Hello - I am trying to understand my trouble passing an Avro message through Kafka (0.8) From what I see, the class tries to create an instance of the encoder but fails as it can not find the constructor, although it is there. Here's the code and subsequent error. Appreciate any help! Thank you, Brenden public class AvroProducer { //public final String zkConnection = tlvwhale1:2181,tlvwhale2:2181,tlvwhale3:2181; public final String zkConnection = localhost:2181; public final String brokerList = localhost:9092, localhost:9093, localhost:9094; public final String topic = cdrTopic; public static void main(String args[]){ AvroProducer avroProducer = new AvroProducer(); try { //avroProducer.testGenericRecord(); avroProducer.sendCDRAvroMessage(); } catch (Exception e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } } private void sendCDRAvroMessage() throws IOException { User user1 = new User(); user1.setName(Brenden); user1.setFavoriteNumber(256); Properties props = new Properties(); props.put(zk.connect, zkConnection); props.put(metadata.broker.list, brokerList); //props.put(serializer.class, kafka.serializer.DefaultEncoder); props.put(serializer.class, org.apache.avro.io.BufferedBinaryEncoder); //props.put(serializer.class, org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaEncoder); //props.put(serializer.class, org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder); //ProducerString, Message producer = new ProducerString, Message(new ProducerConfig(props)); ProducerString, Message producer = new ProducerString, Message(new ProducerConfig(props)); ByteArrayOutputStream out = new ByteArrayOutputStream(); DatumWriterUser userDatumWriter = new SpecificDatumWriterUser(User.class); //Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); //Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null); EncoderFactory factory = new EncoderFactory().configureBufferSize(4096); Encoder encoder = factory.directBinaryEncoder(out, null); userDatumWriter.write(user1, encoder); encoder.flush(); out.close(); Message message = new Message(out.toByteArray()); producer.send(new KeyedMessageString, Message(topic, message)); //producer.send(new KeyedMessageString, Message(topic, null, message)); } } --- The Error stack: ... Message(magic = 0, attributes = 0, crc = 2755187525, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=12 cap=12]) Exception in thread main java.lang.NoSuchMethodException: org.apache.avro.io.DirectBinaryEncoder.init(kafka.utils.VerifiableProperties) at java.lang.Class.getConstructor0(Class.java:2810) at java.lang.Class.getConstructor(Class.java:1718) at kafka.utils.Utils$.createObject(Utils.scala:458) at kafka.producer.Producer.init(Producer.scala:60) at kafka.javaapi.producer.Producer.init(Producer.scala:25) at com.humca.swizzle.kafka.producer.KafkaProducer.sendAvroMessage(KafkaProducer.java:74) at com.humca.swizzle.kafka.producer.KafkaProducer.main(KafkaProducer.java:88) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120) -- Cheers Rob.