Re: kafka performance question

2014-05-16 Thread Robert Turner
A couple of thoughts spring to mind, are you sending the whole file as 1
message? and is your producer code using sync or async mode?

Cheers
   Rob.


On 14 May 2014 15:49, Jun Rao jun...@gmail.com wrote:

 How many brokers and partitions do you have? You may try increasing
 batch.num.messages.

 Thanks,

 Jun


 On Tue, May 13, 2014 at 5:56 PM, Zhujie (zhujie, Smartcare) 
 first.zhu...@huawei.com wrote:

  Dear all,
 
  We want to use kafka to collect and dispatch data file, but the
  performance is maybe lower than we want.
 
  In our cluster,there is a provider and a broker. We use a one thread read
  file from local disk of provider and send it to broker. The average
  throughput is only 3 MB/S~4MB/S.
  But if we just use java NIO API to send file ,the throughput can exceed
  200MB/S.
  Why the kafka performance is so bad in our test, are we missing
 something??
 
 
 
  Our server:
  Cpu: Intel(R) Xeon(R) CPU E5-4650 0 @ 2.70GHz*4
  Mem:300G
  Disk:600G 15K RPM SAS*8
 
  Configuration of provider:
  props.put(serializer.class, kafka.serializer.NullEncoder);
  props.put(metadata.broker.list, 169.10.35.57:9092);
  props.put(request.required.acks, 0);
  props.put(producer.type, async);//异步
  props.put(queue.buffering.max.ms,500);
  props.put(queue.buffering.max.messages,10);
  props.put(batch.num.messages, 1200);
  props.put(queue.enqueue.timeout.ms, -1);
  props.put(send.buffer.bytes, 10240);
 
  Configuration of broker:
 
  # Licensed to the Apache Software Foundation (ASF) under one or more
  # contributor license agreements.  See the NOTICE file distributed with
  # this work for additional information regarding copyright ownership.
  # The ASF licenses this file to You under the Apache License, Version 2.0
  # (the License); you may not use this file except in compliance with
  # the License.  You may obtain a copy of the License at
  #
  #http://www.apache.org/licenses/LICENSE-2.0
  #
  # Unless required by applicable law or agreed to in writing, software
  # distributed under the License is distributed on an AS IS BASIS,
  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 implied.
  # See the License for the specific language governing permissions and
  # limitations under the License.
  # see kafka.server.KafkaConfig for additional details and defaults
 
  # Server Basics #
 
  # The id of the broker. This must be set to a unique integer for each
  broker.
  broker.id=0
 
  # Socket Server Settings
  #
 
  # The port the socket server listens on
  port=9092
 
  # Hostname the broker will bind to. If not set, the server will bind to
  all interfaces
  #host.name=localhost
 
  # Hostname the broker will advertise to producers and consumers. If not
  set, it uses the
  # value for host.name if configured.  Otherwise, it will use the value
  returned from
  # java.net.InetAddress.getCanonicalHostName().
  #advertised.host.name=hostname routable by clients
 
  # The port to publish to ZooKeeper for clients to use. If this is not
 set,
  # it will publish the same port that the broker binds to.
  #advertised.port=port accessible by clients
 
  # The number of threads handling network requests
  #num.network.threads=2
  # The number of threads doing disk I/O
  #num.io.threads=8
 
  # The send buffer (SO_SNDBUF) used by the socket server
  #socket.send.buffer.bytes=1048576
 
  # The receive buffer (SO_RCVBUF) used by the socket server
  #socket.receive.buffer.bytes=1048576
 
  # The maximum size of a request that the socket server will accept
  (protection against OOM)
  #socket.request.max.bytes=104857600
 
 
  # Log Basics #
 
  # A comma seperated list of directories under which to store log files
  log.dirs=/data/kafka-logs
 
  # The default number of log partitions per topic. More partitions allow
  greater
  # parallelism for consumption, but this will also result in more files
  across
  # the brokers.
  #num.partitions=2
 
  # Log Flush Policy
  #
 
  # Messages are immediately written to the filesystem but by default we
  only fsync() to sync
  # the OS cache lazily. The following configurations control the flush of
  data to disk.
  # There are a few important trade-offs here:
  #1. Durability: Unflushed data may be lost if you are not using
  replication.
  #2. Latency: Very large flush intervals may lead to latency spikes
  when the flush does occur as there will be a lot of data to flush.
  #3. Throughput: The flush is generally the most expensive operation,
  and a small flush interval may lead to exceessive seeks.
  # The settings below allow one to configure the flush policy to flush
 data
  after a period of time or
  # every N messages (or both). This can be done globally and overridden on
  a per-topic basis.
 
  # The 

Re: [Using offset to fetch message][Need Help]

2014-01-27 Thread Robert Turner
Rather than fetching the message again you could cache it in the spout and
emit it again if the *fail* method is called and delete it when the
*ack*method is called. This is possible as Storm guarantees to call
the
*fail* and *ack* methods with the *messageId* on the exact same spout that
the message originated from. This means if you have cached the message
there then it will still be available.




On 27 January 2014 19:29, Abhishek Bhattacharjee 
abhishek.bhattacharje...@gmail.com wrote:

 I am using storm and kafka for replaying messages.
 Now I want to save offset of each message and then use it later for
 resending the message.
 So my question is how can I fetch a single message using its offset ?
 That is I know the offset of a message and I want to use the offset to
 fetch that message(only that message).


 Thanks,
 *Abhishek Bhattacharjee*
 *Pune Institute of Computer Technology*




-- 
Cheers
   Rob.


Re: cluster expansion

2013-12-14 Thread Robert Turner
Is the kafka-add-partitons.sh tool stable in 0.8.0?


On 13 December 2013 19:21, Neha Narkhede neha.narkh...@gmail.com wrote:

 Partition movement is not an automatic operation in Kafka yet. You need to
 use the partition reassignment tool -

 https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool
 .


 Also, that feature is stable in 0.8.1.

 Thanks,
 Neha


 On Fri, Dec 13, 2013 at 6:48 AM, Robert Turner rob...@bigfoot.com wrote:

  No the 6 partitions for each topic will remain on the original brokers.
 You
  could either reassign some partitions from all topics to the new brokers
 or
  you could add partitions to the new brokers for each topic. In 0.8.0
 there
  is now an add-partitions tool.
 
  Cheers
 Rob Turner.
 
 
  On 13 December 2013 14:42, Yu, Libo libo...@citi.com wrote:
 
   Hi folks,
  
   There are three brokers running 0.8-beta1 in our cluster currently.
  Assume
   all the topics have six partitions.
   I am going to add another three brokers to the cluster and upgrade all
 of
   them to 0.8. My question is after
   the cluster is up, will the partition be evenly distributed to all
   brokers? Thanks.
  
   Regards,
  
   Libo
  
  
 
 
  --
  Cheers
 Rob.
 




-- 
Cheers
   Rob.


Re: cluster expansion

2013-12-13 Thread Robert Turner
No the 6 partitions for each topic will remain on the original brokers. You
could either reassign some partitions from all topics to the new brokers or
you could add partitions to the new brokers for each topic. In 0.8.0 there
is now an add-partitions tool.

Cheers
   Rob Turner.


On 13 December 2013 14:42, Yu, Libo libo...@citi.com wrote:

 Hi folks,

 There are three brokers running 0.8-beta1 in our cluster currently. Assume
 all the topics have six partitions.
 I am going to add another three brokers to the cluster and upgrade all of
 them to 0.8. My question is after
 the cluster is up, will the partition be evenly distributed to all
 brokers? Thanks.

 Regards,

 Libo




-- 
Cheers
   Rob.


Re: Kafka trouble with Avro encoding

2013-12-02 Thread Robert Turner
Hi Brendan,

I would try using Integer, byte[] rather than String, Message and
setting:

props.put(serializer.class, kafka.serializer.DefaultEncoder);

Cheers
   Rob.


On 2 December 2013 22:01, Brenden Cobb brenden.c...@humedica.com wrote:

 Hello - I am trying to understand my trouble passing an Avro message
 through Kafka (0.8)
 From what I see, the class tries to create an instance of the encoder but
 fails as it can not find the constructor, although it is there.

 Here's the code and subsequent error. Appreciate any help!

 Thank you,
 Brenden
 

 public class AvroProducer {

 //public final String zkConnection =
 tlvwhale1:2181,tlvwhale2:2181,tlvwhale3:2181;
 public final String zkConnection = localhost:2181;
 public final String brokerList = localhost:9092, localhost:9093,
 localhost:9094;
 public final String topic = cdrTopic;

 public static void main(String args[]){
 AvroProducer avroProducer = new AvroProducer();
 try {
 //avroProducer.testGenericRecord();
 avroProducer.sendCDRAvroMessage();
 } catch (Exception e) {
 e.printStackTrace();  //To change body of catch statement use
 File | Settings | File Templates.
 }
 }

 private void sendCDRAvroMessage() throws IOException {
 User user1 = new User();
 user1.setName(Brenden);
 user1.setFavoriteNumber(256);
 Properties props = new Properties();
 props.put(zk.connect, zkConnection);
 props.put(metadata.broker.list, brokerList);
 //props.put(serializer.class, kafka.serializer.DefaultEncoder);
 props.put(serializer.class,
 org.apache.avro.io.BufferedBinaryEncoder);
 //props.put(serializer.class,
 org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaEncoder);
 //props.put(serializer.class,
 org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder);

 //ProducerString, Message producer = new ProducerString,
 Message(new ProducerConfig(props));
 ProducerString, Message producer = new ProducerString,
 Message(new ProducerConfig(props));


 ByteArrayOutputStream out = new ByteArrayOutputStream();
 DatumWriterUser userDatumWriter = new
 SpecificDatumWriterUser(User.class);
 //Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
 //Encoder encoder = EncoderFactory.get().directBinaryEncoder(out,
 null);
 EncoderFactory factory = new
 EncoderFactory().configureBufferSize(4096);
 Encoder encoder = factory.directBinaryEncoder(out, null);

 userDatumWriter.write(user1, encoder);
 encoder.flush();
 out.close();
 Message message = new Message(out.toByteArray());

 producer.send(new KeyedMessageString, Message(topic, message));
 //producer.send(new KeyedMessageString, Message(topic, null,
 message));
 }

 }

 ---
 The Error stack:
 ...
 Message(magic = 0, attributes = 0, crc = 2755187525, key = null, payload =
 java.nio.HeapByteBuffer[pos=0 lim=12 cap=12])
 Exception in thread main java.lang.NoSuchMethodException:
 org.apache.avro.io.DirectBinaryEncoder.init(kafka.utils.VerifiableProperties)
 at java.lang.Class.getConstructor0(Class.java:2810)
 at java.lang.Class.getConstructor(Class.java:1718)
 at kafka.utils.Utils$.createObject(Utils.scala:458)
 at kafka.producer.Producer.init(Producer.scala:60)
 at kafka.javaapi.producer.Producer.init(Producer.scala:25)
 at
 com.humca.swizzle.kafka.producer.KafkaProducer.sendAvroMessage(KafkaProducer.java:74)
 at
 com.humca.swizzle.kafka.producer.KafkaProducer.main(KafkaProducer.java:88)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)




-- 
Cheers
   Rob.