Hello,
I wrote a simple junit test to test a Kafka producer.
public class KafkaProducerTest {
private int brokerId = 0;
private String topic = test;
@Test
public void producerTest() throws InterruptedException {
// setup Zookeeper
String zkConnect =
The topic maybe is not created at the broker yet ... take a look at
ProducerTest.scala as example
You could try TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0,
500) to assert that the topic is in fact at the broker before sending after
creating it
I tried that, but I still got
junit.framework.AssertionFailedError: Partition [test,0] metadata not
propagated after timeout
at
kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:506)
at kafka.utils.TestUtils.waitUntilMetadataIsPropagated(TestUtils.scala)
...
The idea here is that many serializers actually need some configuration
properties and without this there is no way to pass these in. We could try
to be a little more user friendly by looking for a constructor that takes
properties, and, if that doesn't exist looking for a no-arg constructor. If
public class MyKafkaEncoderMyType implements EncoderMyType {
// This constructor is expected by the kafka producer, used by reflection
* public MyKafkaEncoder(VerifiableProperties props) {*
*// what can I do with this?*
* }*
You don't need to do anything with the argument - the
I'll have to try it out, but I think we can just have
VerifiableProperties extend from java.util.Properties (so your encoder
can just have Properties in its constructor). The producer can
internally instantiate VerifiableProperties, pass that in to
instantiate the encoder and do verification
So, it seems that if I want to set a custom serializer class on the
producer (in 0.8), I have to use a class that includes a special
constructor like:
public class MyKafkaEncoderMyType implements EncoderMyType {
// This constructor is expected by the kafka producer, used by reflection
* public
Hi Monika,
For Kafka-0.8.0 you will have to register beans with JmxTransServer like
this:
Server kafkaJMXServer = new Server(locahost,);
Query byteInQuery = new Query();
*
byteInQuery.setObj(\kafka.server\:type=\BrokerTopicMetrics\,name=\AllTopicsBytesInPerSec\);
*