Changing producer or consumer configuration during runtime.

2016-08-04 Thread Rodrigo Ottero
Hello.

Let's say that I have a producer (or consumer) running.

Does Kafka allow me to add, remove or change its configuration?

As an example, could I change compression.type or buffer.memory without
first stopping the producer?

Cheers,

Rodrigo


Re: Messages delayed in jUnit test (version 0.9.0.0)

2016-06-26 Thread Rodrigo Ottero
Hello.

I still could not progress in this issue.

As per Jay Kreps recent email in thread 'delay of producer and consumer in
kafka 0.9 is too big to be accepted', I will do the *TestEndToEndLatency *test
to see my latency.

But, besides that, am I doing something wrong in the code below?

Thanks & regards,

Rodrigo

​​
-- Forwarded message ------
From: Rodrigo Ottero 
Date: Fri, Jun 17, 2016 at 5:33 PM
Subject: Messages delayed in jUnit test (version 0.9.0.0)
To: users@kafka.apache.org


Hi.

I am trying to use an embedded Kafka server to allow me to create tests in
jUnit using a real Kafka implementation, instead of a stub or a mock.

I am using Kafka version 0.9.0.0.

The test works, but the consumer poll has to wait for 3 seconds to get the
message.

Here is the code I am running:

--
// given a Kafka producer, a kafka consumer, a topic and a message
final Producer producer = createKafkaProducer();
final KafkaConsumer consumer =
createKafkaConsumer(topicName);

// when the producer publishes the message to the topic
producer.send(new ProducerRecord(topicName,
message));

// then the consumer can read it from the topic
final ConsumerRecords records = consumer.poll(3000);
assertTrue("The topic should have only one message. The number of
messages found is: " + records.count(),
​​
records.count() == 1);
consumer.close();
--

For this to work, consumer.poll(...) must be 3 seconds to get the message,
otherwise the consumer will get an empty ConsumerRecords (
​
records.count() == 0).

I tried sleeping the execution for 10 seconds before reaching the consumer,
to see if it was some delay caused by the server startup, but it did not
change the poll's time.

However, if I use the kafka-console-consumer.bat present in Kafka
installation and point it to my embedded server, it reads it almost
immediately.

Here is the configuration used in the test's consumer:

--
final Properties props = new Properties();
props.put("bootstrap.servers", kafkaServerAndPort);
props.put("group.id", "anyGroupId");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "3");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
final KafkaConsumer consumer = new
KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
--

Would anyone have some idea why it is taking 3 seconds for the poll to grab
the message?

Thanks in advance!

PS: here is part of the code used to startup the Kafka and Zookeeper
servers:

---
public EmbeddedKafkaAndZookeeperServers() throws Exception {
removeExistingLogFiles();
startZookeeperServer();
startKafkaServer();
}

private void startKafkaServer() throws IOException {
final KafkaConfig config =
getKafkaConfig(zookeeperServer.getConnectString());
kafkaServer = new KafkaServerStartable(config);
kafkaServer.startup();
}

private void startZookeeperServer() throws Exception {
final int shouldUseARandomPort = -1;
final File temporaryDirectory = new File(ZOOKEEPER_LOGS_DIRECTORY);
final boolean shouldStartImmediately = true;
zookeeperServer = new TestingServer(shouldUseARandomPort,
temporaryDirectory, shouldStartImmediately);
}

private static KafkaConfig getKafkaConfig(final String
zookeeperConnectString) throws IOException {
final Properties props = new Properties();
props.put("broker.id", BROKER_ID);
props.put("port", BROKER_PORT);
props.put("log.dir", KAFKA_LOGS_DIRECTORY);
props.put("zookeeper.connect", zookeeperConnectString);
props.put("host.name", "127.0.0.1");
props.put("auto.create.topics.enable", "true");
return new KafkaConfig(props);
}

---


Messages delayed in jUnit test (version 0.9.0.0)

2016-06-17 Thread Rodrigo Ottero
Hi.

I am trying to use an embedded Kafka server to allow me to create tests in
jUnit using a real Kafka implementation, instead of a stub or a mock.

I am using Kafka version 0.9.0.0.

The test works, but the consumer poll has to wait for 3 seconds to get the
message.

Here is the code I am running:

--
// given a Kafka producer, a kafka consumer, a topic and a message
final Producer producer = createKafkaProducer();
final KafkaConsumer consumer =
createKafkaConsumer(topicName);

// when the producer publishes the message to the topic
producer.send(new ProducerRecord(topicName,
message));

// then the consumer can read it from the topic
final ConsumerRecords records = consumer.poll(3000);
assertTrue("The topic should have only one message. The number of
messages found is: " + records.count(),
​​
records.count() == 1);
consumer.close();
--

For this to work, consumer.poll(...) must be 3 seconds to get the message,
otherwise the consumer will get an empty ConsumerRecords (
​
records.count() == 0).

I tried sleeping the execution for 10 seconds before reaching the consumer,
to see if it was some delay caused by the server startup, but it did not
change the poll's time.

However, if I use the kafka-console-consumer.bat present in Kafka
installation and point it to my embedded server, it reads it almost
immediately.

Here is the configuration used in the test's consumer:

--
final Properties props = new Properties();
props.put("bootstrap.servers", kafkaServerAndPort);
props.put("group.id", "anyGroupId");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "3");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
final KafkaConsumer consumer = new
KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topicName));
--

Would anyone have some idea why it is taking 3 seconds for the poll to grab
the message?

Thanks in advance!

PS: here is part of the code used to startup the Kafka and Zookeeper
servers:

---
public EmbeddedKafkaAndZookeeperServers() throws Exception {
removeExistingLogFiles();
startZookeeperServer();
startKafkaServer();
}

private void startKafkaServer() throws IOException {
final KafkaConfig config =
getKafkaConfig(zookeeperServer.getConnectString());
kafkaServer = new KafkaServerStartable(config);
kafkaServer.startup();
}

private void startZookeeperServer() throws Exception {
final int shouldUseARandomPort = -1;
final File temporaryDirectory = new File(ZOOKEEPER_LOGS_DIRECTORY);
final boolean shouldStartImmediately = true;
zookeeperServer = new TestingServer(shouldUseARandomPort,
temporaryDirectory, shouldStartImmediately);
}

private static KafkaConfig getKafkaConfig(final String
zookeeperConnectString) throws IOException {
final Properties props = new Properties();
props.put("broker.id", BROKER_ID);
props.put("port", BROKER_PORT);
props.put("log.dir", KAFKA_LOGS_DIRECTORY);
props.put("zookeeper.connect", zookeeperConnectString);
props.put("host.name", "127.0.0.1");
props.put("auto.create.topics.enable", "true");
return new KafkaConfig(props);
}

---